Skip to main content

CAP (Cordum Agent Protocol) Python SDK

Project description

CAP Python SDK

Asyncio-first SDK with NATS helpers for CAP workers and clients.

Quick Start

  1. Generate protobuf stubs into this SDK (one-time per proto change):

    python -m grpc_tools.protoc \
      -I../../proto \
      --python_out=./cap/pb \
      --grpc_python_out=./cap/pb \
      ../../proto/cordum/agent/v1/*.proto
    

    (Or run ./tools/make_protos.sh from repo root with CAP_RUN_PY=1 and copy /python into sdk/python/cap/pb if you want vendored stubs.)

  2. Install:

    pip install -e .
    
  3. Run a worker:

    import asyncio
    from cap import worker
    from cap.pb.cordum.agent.v1 import job_pb2
    
    async def handle(req: job_pb2.JobRequest):
        return job_pb2.JobResult(
            job_id=req.job_id,
            status=job_pb2.JOB_STATUS_SUCCEEDED,
            result_ptr=f"redis://res/{req.job_id}",
            worker_id="worker-echo-1",
        )
    
    asyncio.run(worker.run_worker("nats://127.0.0.1:4222", "job.echo", handle))
    
  4. Submit a job (client):

    import asyncio
    from cryptography.hazmat.primitives.asymmetric import ec
    from cap import client
    from cap.pb.cordum.agent.v1 import job_pb2
    import nats
    
    async def main():
        nc = await nats.connect("nats://127.0.0.1:4222")
        priv = ec.generate_private_key(ec.SECP256R1())
        req = job_pb2.JobRequest(
            job_id="job-echo-1",
            topic="job.echo",
            context_ptr="redis://ctx/job-echo-1",
        )
        await client.submit_job(nc, req, "trace-1", "client-py", priv)
        await nc.drain()
    
    asyncio.run(main())
    

Files

  • cap/bus.py — NATS connector.
  • cap/worker.py — worker skeleton with handler hook.
  • cap/client.py — publish JobRequest to sys.job.submit.
  • cap/pb/ — protobuf stubs (generated).

Defaults

  • Subjects: sys.job.submit, sys.job.result, sys.heartbeat.
  • Protocol version: 1.
  • Signing: submit_job and run_worker sign envelopes when given an ec.EllipticCurvePrivateKey. Signatures use deterministic protobuf serialization (map entries ordered by key) for cross-SDK verification. Generate a keypair with cryptography:
    from cryptography.hazmat.primitives.asymmetric import ec
    priv = ec.generate_private_key(ec.SECP256R1())
    pub = priv.public_key()
    
  • Set public_keys on run_worker to verify incoming packets.
  • Omit public_keys to accept unsigned packets.
  • Pass private_key=None to submit_job if you want to send unsigned envelopes.

Swap out cap.bus if you need a different transport.

Testing

The cap.testing module lets you test handlers without running NATS or Redis.

from cap.testing import run_handler
from cap.pb.cordum.agent.v1 import job_pb2

async def test_echo():
    result = await run_handler(
        lambda ctx, data: {"echo": data["prompt"]},
        {"prompt": "hello"},
        topic="job.echo",
    )
    assert result.status == job_pb2.JOB_STATUS_SUCCEEDED
  • run_handler(handler, input, **options) — runs a single handler invocation and returns the JobResult.
  • create_test_agent(**options) — returns (agent, mock_nats, store) pre-wired with MockNATS + InMemoryBlobStore.
  • MockNATS — in-memory NATS mock for custom test setups.

Runtime (High-Level SDK)

The runtime hides NATS/Redis plumbing and gives you typed handlers.

import asyncio
from pydantic import BaseModel
from cap.runtime import Agent, Context

class Input(BaseModel):
    prompt: str

class Output(BaseModel):
    summary: str

agent = Agent(retries=2)

@agent.job("job.summarize", input_model=Input, output_model=Output)
async def summarize(ctx: Context, data: Input) -> Output:
    return Output(summary=data.prompt[:140])

asyncio.run(agent.run())

Middleware

Add cross-cutting concerns (logging, auth, metrics) without modifying handlers:

from cap.middleware import logging_middleware

# Built-in logging middleware
agent.use(logging_middleware())

# Custom middleware
async def timing(ctx, data, next_fn):
    import time
    start = time.monotonic()
    result = await next_fn(ctx, data)
    elapsed = time.monotonic() - start
    print(f"job {ctx.job_id} took {elapsed:.3f}s")
    return result

agent.use(timing)

Middleware executes in registration order (FIFO). Each can inspect context, measure timing, or short-circuit by returning without calling next_fn.

Environment

  • NATS_URL (default nats://127.0.0.1:4222)
  • REDIS_URL (default redis://127.0.0.1:6379/0)

Generating API Docs

Generate HTML API reference locally using pdoc:

pip install cap-sdk-python[dev]
pdoc ./cap --output-dir docs

Output is written to docs/ (gitignored). Open docs/index.html to browse.

Observability

Structured Logging

The runtime Agent and Worker use logging.Logger (stdlib) for structured logging. All log calls include contextual fields (job_id, trace_id, topic, sender_id). Pass a custom logger or leave as default:

import logging
from cap.runtime import Agent

logger = logging.getLogger("my-agent")
logger.setLevel(logging.DEBUG)
agent = Agent(logger=logger)

MetricsHook

Implement the MetricsHook protocol to integrate with Prometheus, OpenTelemetry, or any metrics system:

from cap.metrics import MetricsHook

class MetricsHook(Protocol):
    def on_job_received(self, job_id: str, topic: str) -> None: ...
    def on_job_completed(self, job_id: str, duration_ms: int, status: str) -> None: ...
    def on_job_failed(self, job_id: str, error_msg: str) -> None: ...
    def on_heartbeat_sent(self, worker_id: str) -> None: ...

The default is NoopMetrics (zero overhead). Example Prometheus integration:

from cap.runtime import Agent

class PromMetrics:
    def on_job_received(self, job_id, topic):
        jobs_received.labels(topic=topic).inc()

    def on_job_completed(self, job_id, duration_ms, status):
        job_duration.labels(status=status).observe(duration_ms)

    def on_job_failed(self, job_id, error_msg):
        jobs_failed.inc()

    def on_heartbeat_sent(self, worker_id):
        pass

agent = Agent(metrics=PromMetrics())

The trace_id is propagated through all log and metrics calls for distributed tracing correlation.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cap_sdk_python-2.6.1.tar.gz (35.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cap_sdk_python-2.6.1-py3-none-any.whl (37.3 kB view details)

Uploaded Python 3

File details

Details for the file cap_sdk_python-2.6.1.tar.gz.

File metadata

  • Download URL: cap_sdk_python-2.6.1.tar.gz
  • Upload date:
  • Size: 35.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for cap_sdk_python-2.6.1.tar.gz
Algorithm Hash digest
SHA256 f0dffcb7cc0782bb7a4243a2eaa1b939c321b331c7271ee2e534345b4177ca21
MD5 c9ad6ca81a40c3cd54573fde3d648428
BLAKE2b-256 eb6603d768d16b9925382cffa85110b2918d754d3f6fc18a8a89d955ff89f4fe

See more details on using hashes here.

Provenance

The following attestation bundles were made for cap_sdk_python-2.6.1.tar.gz:

Publisher: publish-python.yml on cordum-io/cap

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file cap_sdk_python-2.6.1-py3-none-any.whl.

File metadata

  • Download URL: cap_sdk_python-2.6.1-py3-none-any.whl
  • Upload date:
  • Size: 37.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for cap_sdk_python-2.6.1-py3-none-any.whl
Algorithm Hash digest
SHA256 070fab84552d31ab184a7e26910340e8e8677685f818b7854aebcfa5d41c69b6
MD5 56c1c12d89b2d2b5057067ec88922f72
BLAKE2b-256 c7d57f23fadbd1352cccd2751c10b002449b2f74406f6059e12ec8e8fc21534b

See more details on using hashes here.

Provenance

The following attestation bundles were made for cap_sdk_python-2.6.1-py3-none-any.whl:

Publisher: publish-python.yml on cordum-io/cap

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page