Skip to main content

CAP (Cordum Agent Protocol) Python SDK

Project description

CAP Python SDK

Asyncio-first SDK with NATS helpers for CAP workers and clients.

Quick Start

  1. Generate protobuf stubs into this SDK (one-time per proto change):

    python -m grpc_tools.protoc \
      -I../../proto \
      --python_out=./cap/pb \
      --grpc_python_out=./cap/pb \
      ../../proto/cordum/agent/v1/*.proto
    

    (Or run ./tools/make_protos.sh from repo root with CAP_RUN_PY=1 and copy /python into sdk/python/cap/pb if you want vendored stubs.)

  2. Install:

    pip install -e .
    
  3. Run a worker:

    import asyncio
    from cap import worker
    from cap.pb.cordum.agent.v1 import job_pb2
    
    async def handle(req: job_pb2.JobRequest):
        return job_pb2.JobResult(
            job_id=req.job_id,
            status=job_pb2.JOB_STATUS_SUCCEEDED,
            result_ptr=f"redis://res/{req.job_id}",
            worker_id="worker-echo-1",
        )
    
    asyncio.run(worker.run_worker("nats://127.0.0.1:4222", "job.echo", handle))
    
  4. Submit a job (client):

    import asyncio
    from cryptography.hazmat.primitives.asymmetric import ec
    from cap import client
    from cap.pb.cordum.agent.v1 import job_pb2
    import nats
    
    async def main():
        nc = await nats.connect("nats://127.0.0.1:4222")
        priv = ec.generate_private_key(ec.SECP256R1())
        req = job_pb2.JobRequest(
            job_id="job-echo-1",
            topic="job.echo",
            context_ptr="redis://ctx/job-echo-1",
        )
        await client.submit_job(nc, req, "trace-1", "client-py", priv)
        await nc.drain()
    
    asyncio.run(main())
    

Files

  • cap/bus.py — NATS connector.
  • cap/worker.py — worker skeleton with handler hook.
  • cap/client.py — publish JobRequest to sys.job.submit.
  • cap/pb/ — protobuf stubs (generated).

Defaults

  • Subjects: sys.job.submit, sys.job.result, sys.heartbeat.
  • Protocol version: 1.
  • Signing: submit_job and run_worker sign envelopes when given an ec.EllipticCurvePrivateKey. Signatures use deterministic protobuf serialization (map entries ordered by key) for cross-SDK verification. Generate a keypair with cryptography:
    from cryptography.hazmat.primitives.asymmetric import ec
    priv = ec.generate_private_key(ec.SECP256R1())
    pub = priv.public_key()
    
  • Set public_keys on run_worker to verify incoming packets.
  • Omit public_keys to accept unsigned packets.
  • Pass private_key=None to submit_job if you want to send unsigned envelopes.

Swap out cap.bus if you need a different transport.

Testing

The cap.testing module lets you test handlers without running NATS or Redis.

from cap.testing import run_handler
from cap.pb.cordum.agent.v1 import job_pb2

async def test_echo():
    result = await run_handler(
        lambda ctx, data: {"echo": data["prompt"]},
        {"prompt": "hello"},
        topic="job.echo",
    )
    assert result.status == job_pb2.JOB_STATUS_SUCCEEDED
  • run_handler(handler, input, **options) — runs a single handler invocation and returns the JobResult.
  • create_test_agent(**options) — returns (agent, mock_nats, store) pre-wired with MockNATS + InMemoryBlobStore.
  • MockNATS — in-memory NATS mock for custom test setups.

Runtime (High-Level SDK)

The runtime hides NATS/Redis plumbing and gives you typed handlers.

import asyncio
from pydantic import BaseModel
from cap.runtime import Agent, Context

class Input(BaseModel):
    prompt: str

class Output(BaseModel):
    summary: str

agent = Agent(retries=2)

@agent.job("job.summarize", input_model=Input, output_model=Output)
async def summarize(ctx: Context, data: Input) -> Output:
    return Output(summary=data.prompt[:140])

asyncio.run(agent.run())

Middleware

Add cross-cutting concerns (logging, auth, metrics) without modifying handlers:

from cap.middleware import logging_middleware

# Built-in logging middleware
agent.use(logging_middleware())

# Custom middleware
async def timing(ctx, data, next_fn):
    import time
    start = time.monotonic()
    result = await next_fn(ctx, data)
    elapsed = time.monotonic() - start
    print(f"job {ctx.job_id} took {elapsed:.3f}s")
    return result

agent.use(timing)

Middleware executes in registration order (FIFO). Each can inspect context, measure timing, or short-circuit by returning without calling next_fn.

Environment

  • NATS_URL (default nats://127.0.0.1:4222)
  • REDIS_URL (default redis://127.0.0.1:6379/0)

Generating API Docs

Generate HTML API reference locally using pdoc:

pip install cap-sdk-python[dev]
pdoc ./cap --output-dir docs

Output is written to docs/ (gitignored). Open docs/index.html to browse.

Observability

Structured Logging

The runtime Agent and Worker use logging.Logger (stdlib) for structured logging. All log calls include contextual fields (job_id, trace_id, topic, sender_id). Pass a custom logger or leave as default:

import logging
from cap.runtime import Agent

logger = logging.getLogger("my-agent")
logger.setLevel(logging.DEBUG)
agent = Agent(logger=logger)

MetricsHook

Implement the MetricsHook protocol to integrate with Prometheus, OpenTelemetry, or any metrics system:

from cap.metrics import MetricsHook

class MetricsHook(Protocol):
    def on_job_received(self, job_id: str, topic: str) -> None: ...
    def on_job_completed(self, job_id: str, duration_ms: int, status: str) -> None: ...
    def on_job_failed(self, job_id: str, error_msg: str) -> None: ...
    def on_heartbeat_sent(self, worker_id: str) -> None: ...

The default is NoopMetrics (zero overhead). Example Prometheus integration:

from cap.runtime import Agent

class PromMetrics:
    def on_job_received(self, job_id, topic):
        jobs_received.labels(topic=topic).inc()

    def on_job_completed(self, job_id, duration_ms, status):
        job_duration.labels(status=status).observe(duration_ms)

    def on_job_failed(self, job_id, error_msg):
        jobs_failed.inc()

    def on_heartbeat_sent(self, worker_id):
        pass

agent = Agent(metrics=PromMetrics())

The trace_id is propagated through all log and metrics calls for distributed tracing correlation.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cap_sdk_python-2.0.20.tar.gz (30.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cap_sdk_python-2.0.20-py3-none-any.whl (34.2 kB view details)

Uploaded Python 3

File details

Details for the file cap_sdk_python-2.0.20.tar.gz.

File metadata

  • Download URL: cap_sdk_python-2.0.20.tar.gz
  • Upload date:
  • Size: 30.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for cap_sdk_python-2.0.20.tar.gz
Algorithm Hash digest
SHA256 dc09a30c9c41bae248fdbcd8960a82f49e8ffbb3af229c65b0556f30c1d1202f
MD5 f040b1e21962cec62e21baddfa466b07
BLAKE2b-256 46b99c3a645ad015ce390508dc70cc0c51ba0e986ebaeeb128c1243100dd6da5

See more details on using hashes here.

Provenance

The following attestation bundles were made for cap_sdk_python-2.0.20.tar.gz:

Publisher: publish-python.yml on cordum-io/cap

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file cap_sdk_python-2.0.20-py3-none-any.whl.

File metadata

File hashes

Hashes for cap_sdk_python-2.0.20-py3-none-any.whl
Algorithm Hash digest
SHA256 d0cd0907212b3b52dc447c1fc8dc619cb803494d2d488ac71e3d1f2cf4048718
MD5 e54f701e93faaca37eec1eee10ab6771
BLAKE2b-256 7e06eb8d18d640f3c651a417c9020311d144e511b8f99a57f29a831a22b83755

See more details on using hashes here.

Provenance

The following attestation bundles were made for cap_sdk_python-2.0.20-py3-none-any.whl:

Publisher: publish-python.yml on cordum-io/cap

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page