CAP (Cordum Agent Protocol) Python SDK
Project description
CAP Python SDK
Asyncio-first SDK with NATS helpers for CAP workers and clients.
Quick Start
-
Generate protobuf stubs into this SDK (one-time per proto change):
python -m grpc_tools.protoc \ -I../../proto \ --python_out=./cap/pb \ --grpc_python_out=./cap/pb \ ../../proto/cordum/agent/v1/*.proto
(Or run
./tools/make_protos.shfrom repo root withCAP_RUN_PY=1and copy/pythonintosdk/python/cap/pbif you want vendored stubs.) -
Install:
pip install -e .
-
Run a worker:
import asyncio from cap import worker from cap.pb.cordum.agent.v1 import job_pb2 async def handle(req: job_pb2.JobRequest): return job_pb2.JobResult( job_id=req.job_id, status=job_pb2.JOB_STATUS_SUCCEEDED, result_ptr=f"redis://res/{req.job_id}", worker_id="worker-echo-1", ) asyncio.run(worker.run_worker("nats://127.0.0.1:4222", "job.echo", handle))
-
Submit a job (client):
import asyncio from cryptography.hazmat.primitives.asymmetric import ec from cap import client from cap.pb.cordum.agent.v1 import job_pb2 import nats async def main(): nc = await nats.connect("nats://127.0.0.1:4222") priv = ec.generate_private_key(ec.SECP256R1()) req = job_pb2.JobRequest( job_id="job-echo-1", topic="job.echo", context_ptr="redis://ctx/job-echo-1", ) await client.submit_job(nc, req, "trace-1", "client-py", priv) await nc.drain() asyncio.run(main())
Files
cap/bus.py— NATS connector.cap/worker.py— worker skeleton with handler hook.cap/client.py— publish JobRequest tosys.job.submit.cap/pb/— protobuf stubs (generated).
Defaults
- Subjects:
sys.job.submit,sys.job.result,sys.heartbeat. - Protocol version:
1. - Signing:
submit_jobandrun_workersign envelopes when given anec.EllipticCurvePrivateKey. Signatures use deterministic protobuf serialization (map entries ordered by key) for cross-SDK verification. Generate a keypair withcryptography:from cryptography.hazmat.primitives.asymmetric import ec priv = ec.generate_private_key(ec.SECP256R1()) pub = priv.public_key()
- Set
public_keysonrun_workerto verify incoming packets. - Omit
public_keysto accept unsigned packets. - Pass
private_key=Nonetosubmit_jobif you want to send unsigned envelopes.
Swap out cap.bus if you need a different transport.
Testing
The cap.testing module lets you test handlers without running NATS or Redis.
from cap.testing import run_handler
from cap.pb.cordum.agent.v1 import job_pb2
async def test_echo():
result = await run_handler(
lambda ctx, data: {"echo": data["prompt"]},
{"prompt": "hello"},
topic="job.echo",
)
assert result.status == job_pb2.JOB_STATUS_SUCCEEDED
run_handler(handler, input, **options)— runs a single handler invocation and returns theJobResult.create_test_agent(**options)— returns(agent, mock_nats, store)pre-wired withMockNATS+InMemoryBlobStore.MockNATS— in-memory NATS mock for custom test setups.
Runtime (High-Level SDK)
The runtime hides NATS/Redis plumbing and gives you typed handlers.
import asyncio
from pydantic import BaseModel
from cap.runtime import Agent, Context
class Input(BaseModel):
prompt: str
class Output(BaseModel):
summary: str
agent = Agent(retries=2)
@agent.job("job.summarize", input_model=Input, output_model=Output)
async def summarize(ctx: Context, data: Input) -> Output:
return Output(summary=data.prompt[:140])
asyncio.run(agent.run())
Middleware
Add cross-cutting concerns (logging, auth, metrics) without modifying handlers:
from cap.middleware import logging_middleware
# Built-in logging middleware
agent.use(logging_middleware())
# Custom middleware
async def timing(ctx, data, next_fn):
import time
start = time.monotonic()
result = await next_fn(ctx, data)
elapsed = time.monotonic() - start
print(f"job {ctx.job_id} took {elapsed:.3f}s")
return result
agent.use(timing)
Middleware executes in registration order (FIFO). Each can inspect context,
measure timing, or short-circuit by returning without calling next_fn.
Redis TLS
The Python SDK provides redis_ssl_context_from_env() to build an SSLContext for secure Redis connections. It reads:
REDIS_TLS_CA(orSSL_CERT_FILEfallback): Path to CA certificate.REDIS_TLS_CERT/REDIS_TLS_KEY: Path to client certificate/key pair.REDIS_TLS_SERVER_NAME: SNI server name override.REDIS_TLS_INSECURE: Set to1ortrueto skip certificate verification (dev only).
Environment
NATS_URL(defaultnats://127.0.0.1:4222)REDIS_URL(defaultredis://127.0.0.1:6379/0)
Generating API Docs
Generate HTML API reference locally using pdoc:
pip install cap-sdk-python[dev]
pdoc ./cap --output-dir docs
Output is written to docs/ (gitignored). Open docs/index.html to browse.
Observability
Structured Logging
The runtime Agent and Worker use logging.Logger (stdlib) for structured logging. All log calls include contextual fields (job_id, trace_id, topic, sender_id). Pass a custom logger or leave as default:
import logging
from cap.runtime import Agent
logger = logging.getLogger("my-agent")
logger.setLevel(logging.DEBUG)
agent = Agent(logger=logger)
MetricsHook
Implement the MetricsHook protocol to integrate with Prometheus, OpenTelemetry, or any metrics system:
from cap.metrics import MetricsHook
class MetricsHook(Protocol):
def on_job_received(self, job_id: str, topic: str) -> None: ...
def on_job_completed(self, job_id: str, duration_ms: int, status: str) -> None: ...
def on_job_failed(self, job_id: str, error_msg: str) -> None: ...
def on_heartbeat_sent(self, worker_id: str) -> None: ...
The default is NoopMetrics (zero overhead). Example Prometheus integration:
from cap.runtime import Agent
class PromMetrics:
def on_job_received(self, job_id, topic):
jobs_received.labels(topic=topic).inc()
def on_job_completed(self, job_id, duration_ms, status):
job_duration.labels(status=status).observe(duration_ms)
def on_job_failed(self, job_id, error_msg):
jobs_failed.inc()
def on_heartbeat_sent(self, worker_id):
pass
agent = Agent(metrics=PromMetrics())
The trace_id is propagated through all log and metrics calls for distributed tracing correlation.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cap_sdk_python-2.10.0.tar.gz.
File metadata
- Download URL: cap_sdk_python-2.10.0.tar.gz
- Upload date:
- Size: 40.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c08733bcce35f011064ebcf734e44dc3dcc02f161ec1110fe8c0bf1f1b6ee446
|
|
| MD5 |
be0f8c37fcd8f9246790ab88b0a17f8f
|
|
| BLAKE2b-256 |
6ab2246ec4f0eaf42db60ba53093fecd4fc4dadfd4477d83cca624af6de8fb4a
|
Provenance
The following attestation bundles were made for cap_sdk_python-2.10.0.tar.gz:
Publisher:
publish-python.yml on cordum-io/cap
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
cap_sdk_python-2.10.0.tar.gz -
Subject digest:
c08733bcce35f011064ebcf734e44dc3dcc02f161ec1110fe8c0bf1f1b6ee446 - Sigstore transparency entry: 1357018112
- Sigstore integration time:
-
Permalink:
cordum-io/cap@5d4b5d48006157db7980a232d2ffecb1a2af09e1 -
Branch / Tag:
refs/tags/v2.10.0 - Owner: https://github.com/cordum-io
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-python.yml@5d4b5d48006157db7980a232d2ffecb1a2af09e1 -
Trigger Event:
release
-
Statement type:
File details
Details for the file cap_sdk_python-2.10.0-py3-none-any.whl.
File metadata
- Download URL: cap_sdk_python-2.10.0-py3-none-any.whl
- Upload date:
- Size: 40.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bc5acc87f927b3f7ed0484d77a91f3ac462bb0ca9a773fca591497cafb1af7fe
|
|
| MD5 |
9e3d60381743ea2889a3b007214597f8
|
|
| BLAKE2b-256 |
88c18d5785e1b0546f76a06ac057aa40b8453535cd764d6b3fb96101a5c849c1
|
Provenance
The following attestation bundles were made for cap_sdk_python-2.10.0-py3-none-any.whl:
Publisher:
publish-python.yml on cordum-io/cap
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
cap_sdk_python-2.10.0-py3-none-any.whl -
Subject digest:
bc5acc87f927b3f7ed0484d77a91f3ac462bb0ca9a773fca591497cafb1af7fe - Sigstore transparency entry: 1357018117
- Sigstore integration time:
-
Permalink:
cordum-io/cap@5d4b5d48006157db7980a232d2ffecb1a2af09e1 -
Branch / Tag:
refs/tags/v2.10.0 - Owner: https://github.com/cordum-io
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-python.yml@5d4b5d48006157db7980a232d2ffecb1a2af09e1 -
Trigger Event:
release
-
Statement type: