Skip to main content

Polyglot RPC protocol layer (pre-1.0; API may break in minor versions).

Project description

clamator-protocol

Pure JSON-RPC 2.0 protocol primitives plus Pydantic-derived envelope types for clamator. No I/O, ever — anything that touches a network, filesystem, or process belongs in a transport adapter. Requires Pydantic v2 (pinned >=2.5); v1 is not supported.

Install

pip install clamator-protocol

When you reach for this

  • Defining a Contract (in tests, in custom tooling).
  • Building a custom transport adapter that needs the wire-envelope models, the Transport and Dispatcher interfaces, or the reserved JSON-RPC error codes.

If you only consume generated clients and servers, you don't import this package directly — your transport package (clamator-over-memory, clamator-over-redis) re-exports the few symbols you need.

Defining a contract

The Python counterpart of a Zod contract is a Contract with MethodEntry rows that bind Pydantic models to handler attribute names:

arith = Contract(
    service="arith",
    methods={
        "add": MethodEntry(params_model=AddP, result_model=AddR, handler_attr="add"),
        "ping": MethodEntry(params_model=PingP, result_model=None, handler_attr="ping"),
    },
)

(Verbatim from py/packages/over-memory/tests/test_loopback.py:22-28.)

When clamator-protocol is consumed alongside generated wrappers from @clamator/codegen, the Contract and MethodEntry values are produced by codegen — the snippet above is what direct authors of test contracts or custom tooling write.

The single methods dict holds both methods and notifications. A MethodEntry with result_model=None declares a notification (the snippet's ping is one); there is no separate notifications= kwarg. handler_attr is the attribute the dispatcher resolves on the registered handler instance — it is independent of the wire-side method name (the dict key) and is conventionally snake_case.

Key exports

  • Contract, MethodEntry — declare a service's methods and notifications with Pydantic models for params and results.
  • RpcError — the error type you raise from a handler to surface a structured JSON-RPC error to the caller.
  • ClamatorProtocolError, ClamatorTransportError — distinguishable error classes for protocol-level vs. transport-level failures.
  • Transport, Dispatcher — interfaces a custom transport adapter implements.
  • RpcServerCore, RpcClientCore — base classes the transport packages' *RpcServer / *RpcClient extend. Useful for building custom transport adapters or for type annotations across transport boundaries.

Base-class interface guarantees

Both transport packages' *RpcServer classes (MemoryRpcServer, RedisRpcServer) inherit from RpcServerCore; both *RpcClient classes inherit from RpcClientCore. The base classes fix the common surface — what every transport must expose — and the methods listed below are defined on the base, not on the subclasses. Type your own code against RpcServerCore | None (or RpcClientCore) when writing wrappers that should accept either transport.

Import. from clamator_protocol import RpcServerCore, RpcClientCore.

Server interface (RpcServerCore).

  • register_service(contract: Contract, instance: Any) -> None — register a service. Calling it twice with the same contract.service raises ValueError. Must be called before start(); new services registered after start() are silently ignored (the consumer-group / read loop is created only inside start()). Re-registering an already-registered service after start() raises the same ValueError as before-start.
  • async start() -> None — idempotent. Calling start() after stop() raises RuntimeError("server has been stopped").
  • async stop(*, grace_ms: int = 5000) -> None — idempotent. Drains in-flight handlers up to grace_ms before disconnecting from the transport.

Client interface (RpcClientCore).

  • async call(service, method, params, *, timeout_ms=None) -> Any and async notify(service, method, params) -> None — the ClamatorClient Protocol. Codegen-emitted proxy classes accept any object satisfying this Protocol.
  • async start() -> None / async stop() -> None — same idempotency rules as the server.

These guarantees apply uniformly across clamator-over-memory and clamator-over-redis. Transport-specific subclasses add construction kwargs (e.g., redis, key_prefix, consumer_claim_idle_ms for RedisRpcServer; bus for MemoryRpcServer) but do not override the methods above.

Hand-built contracts

The Contract and MethodEntry classes are first-class — you do not need to run codegen to use them. The "Defining a contract" snippet above is itself hand-built. Codegen exists to keep TS and Py contracts in lockstep when both languages consume the same wire-side service; if you only have a Py-side service, or if you need to build the contract dynamically at runtime (e.g., from a registry of handler functions keyed by command type), build the Contract by hand.

register_service(contract, handler_instance) accepts any Contract regardless of how it was built. The dispatcher calls getattr(handler_instance, method_entry.handler_attr)(params) for each request — the handler instance doesn't need to subclass any particular ABC, only to expose the right async attributes. The same instance can also be invoked directly by other in-process code (your test suite, a non-RPC caller in the same process), sharing state — clamator is one access path, direct method calls remain a valid second. Codegen-emitted contracts and hand-built contracts are interchangeable at the dispatch layer; the choice is purely about authoring ergonomics. You can mix both on a single server: register codegen-emitted services and hand-built services in the same startup sequence; each call is independent and the service-name uniqueness check is the only constraint.

Three behaviors worth knowing:

  • setattr works. The dispatcher resolves handlers via getattr(handler_instance, handler_attr), so a plain object with attributes set at runtime via setattr(obj, name, async_fn) is a valid handler. You don't need a class.
  • Duplicate register_service raises. Calling register_service(c1, h1) followed by register_service(c2, h2) with the same contract.service value raises ValueError. There is no replace-or-merge semantic — pick one path or build the union contract before registering.
  • register_service after start() is silently ineffective. The protocol-level state is updated, but the transport's consumer-loop machinery is initialized once at start() and never revisited. New entries don't get a consumer group / read loop spawned, so requests for them are never dispatched. Register all services before calling start().
  • Handler-attribute resolution is lazy. register_service(contract, handler_instance) does not validate that handler_instance exposes every handler_attr named in the contract. The dispatcher does getattr(handler_instance, attr, None) per request — a missing attribute surfaces as RpcError(-32601, "Method not found") at call time, not at registration. Subclassing the codegen-emitted <Service>Service ABC pushes that check to class-instantiation time (Python complains about missing abstract methods); plain duck-typed handlers don't get that benefit.

Be aware that runtime contract construction defeats the point of having a contract. clamator's value comes from mechanically-guaranteed compatibility between RPC client and server: the same Zod source produces both sides, codegen ensures they stay in lockstep, and the manifest-diff workflow catches drift. A contract built at runtime — where one side's available methods aren't known until the program runs — gives up all of that. If you find yourself reaching for runtime contract construction, consider whether clamator is the right primitive for what you're doing; a thinner JSON-RPC stack, or a queue with hand-rolled envelopes, may serve you better.

Codegen workflow

clamator's codegen is an npm package (@clamator/codegen) regardless of which language consumes the output. For a Py-only project, run the CLI against your Zod contract source and emit the Python wrappers into your package's source tree:

npx @clamator/codegen --src contracts --out-py src/myapp/_generated

Commit the emitted files alongside your code — they are vendored generated artifacts. Re-run codegen on contract changes; for drift detection, also pass --manifest and diff the manifest in CI (see @clamator/codegen for the full pattern).

The Python package then imports AddParams, AddResult, ArithClient, ArithService, and arith_contract from myapp._generated.arith.

Version compatibility

All seven clamator packages (TS + Py protocol, both transports on both languages, codegen) are released in lockstep — same X.Y.Z version, every time. The release-verification workflow refuses to publish a tag unless every package's manifest reports the matching version, and the same workflow runs the cross-language interop test suite. Pin all your clamator packages to the same X.Y.Z on both client and server sides — clamator-protocol==X.Y.Z + clamator-over-redis==X.Y.Z on the Py side, @clamator/protocol@X.Y.Z + @clamator/over-redis@X.Y.Z on the TS side.

The drift you do need to worry about is your contract source diverging from your committed generated wrappers. The "Drift detection via the manifest" pattern in @clamator/codegen is the right tool: regenerate the manifest in CI and diff against the committed copy. At runtime, a contract mismatch surfaces as RpcError(-32602, "Invalid params") from server-side Pydantic validation — useful but generic; the manifest-diff pre-deploy check gives a more actionable error.

Method or notification?

Both methods and notifications send a request envelope; only methods produce a response envelope. Pick by the caller's needs, not the handler's.

  • Use a method when the caller needs to know whether the operation succeeded, get a value back, surface a structured RpcError, or sequence subsequent calls on completion. Methods carry a request id and the caller waits for the matching response or a timeout.
  • Use a notification when the caller is doing fire-and-forget work where neither success/failure nor a return value matters in the moment — telemetry, cache-busting, status pings. Notifications have no request id and produce no response; the caller cannot tell whether the handler ran, succeeded, or threw.

If you would otherwise add a method that returns nothing solely to confirm delivery, prefer a method returning an empty Pydantic model over a notification — the response envelope is the confirmation. Pick a notification only when "did this run?" is genuinely not a question the caller will ever ask.

Validation pipeline

Server-side handlers receive already-validated Pydantic instances, not raw dicts. The dispatcher does the work in this order on every incoming envelope:

  1. Params validation. The wire dict goes through method_entry.params_model.model_validate(...). Failures produce RpcError(-32602, "Invalid params", data={"errors": <ValidationError details>}) and the request is rejected before the handler runs. Notifications with bad params are silently dropped.
  2. Handler dispatch. The dispatcher calls getattr(handler_instance, handler_attr)(params) — passing the validated params_model instance. Handlers declare their type as the model class (e.g., async def add(self, params: AddParams) -> AddResult) and will never see a dict at runtime.
  3. Handler exceptions. A handler that raises RpcError(code, message, data) produces a response with that exact code/message/data. Any other exception is wrapped as RpcError(-32603, "Internal error", data={...exception details}).
  4. Result validation. If the method has a result_model, the return value is run through result_model.model_validate(...). A handler returning the wrong shape is reported to the client as RpcError(-32603, "Result validation failed", data={"errors": ...}) — there is no automatic coercion. Notifications skip result validation.

Handlers are insulated from wire-format details: if the dispatch reaches your code, the params are valid; if your return value fails validation, the client sees a structured error rather than a corrupted reply. The result is serialized to the wire via validated.model_dump(mode='json', by_alias=True) — any field aliases declared on the result_model (e.g., Field(alias="processId")) are applied on the way out, matching the camelCase the TS side expects.

Notification handler exceptions are silently swallowed. The dispatcher catches RpcError and any other exception in the same try/except block, but returns None on the notification path (no response envelope to write). There is no built-in logging hook — if your notification handlers can fail in interesting ways, wrap the handler body with your own try/except + observability so the failure isn't invisible.

Wire-format and serialization

The wire format is JSON. Pydantic v2 default serializers apply when params and results cross the wire: datetime → ISO 8601 string, Decimal → string, bytes → base64-encoded string, UUID → canonical hex. Native JSON types (string, number, boolean, null, list, dict) round-trip without configuration. For non-primitive custom types (e.g., MongoDB ObjectId), define matching serializers on both sides — Pydantic field serializers / model_serializer on Py, Zod .transform(...) on TS. The cross-language interop suite verifies primitive-type round-trip on every release; custom types are the integrator's responsibility.

Observability

clamator provides no built-in logging, metrics, or tracing hooks. The dispatcher does not log handler invocations, errors, or timings — your handler body is the right place for instrumentation. Wrap each handler with your own structured logging or OpenTelemetry spans; the typed params and the handler's return value are the natural span attributes.

Errors

Raise RpcError from a handler to surface a structured JSON-RPC error to the caller. The constructor takes a code, a message, and an optional data payload:

from clamator_protocol import RpcError

RPC_FORBIDDEN = -32001  # application-defined; outside the reserved -32600..-32099 range


def test_rpc_error_construction():
    err = RpcError(RPC_FORBIDDEN, "forbidden", {"reason": "no-token"})
    assert err.code == RPC_FORBIDDEN
    assert err.message == "forbidden"
    assert err.data == {"reason": "no-token"}

(Verbatim from py/packages/protocol/tests/test_rpc_error.py:1-10.)

Reserved JSON-RPC error codes (-32600 to -32603 for protocol-level errors, -32000 to -32099 reserved for transport implementations) are owned by the protocol layer; pick application-specific codes outside that range. A workable convention is to pick a contiguous private band per error category (e.g., -32100..-32199 for state-machine refusals, -32200..-32299 for resource-not-found shapes) and document the band in your contract's documentation. Codegen does not reserve any band — application codes are entirely your namespace.

What the client sees:

  • A handler that raises RpcError(code, message, data) produces an error response carrying that exact code/message/data on the client side; the proxy method re-raises an RpcError with the same fields.
  • A handler that raises any other exception is caught by the protocol layer and wrapped: clients receive RpcError(code=-32603, message="Internal error", data={...}) with exception details in data.
  • A client-side call that exceeds default_timeout_ms raises clamator_protocol.ClamatorTransportError("call timeout") from the transport layer, NOT asyncio.TimeoutError. The same exception class surfaces when no server is consuming the request stream — there is no distinct "no consumer" error.
  • Envelope-level parse and validation failures use the JSON-RPC reserved codes: -32700 (parse error), -32600 (invalid request), -32601 (method not found), -32602 (invalid params), -32603 (internal error).

Failure as data vs. RpcError

Two patterns work for handlers that need to refuse a request:

  1. Raise RpcError. Surfaces as a JSON-RPC error envelope on the client side; the proxy method re-raises RpcError carrying the code/message/data. Right for exceptional refusals — protocol violations, missing-resource cases, and anything the client should treat as a raised exception.

  2. Return a result-shape union. Declare the method's result_model as a Pydantic discriminated union over success and refusal cases — e.g., RootModel[Annotated[Union[Success, Refusal], Field(discriminator='ok')]] with Success(ok=True, value=...) vs Refusal(ok=False, reason=Literal['not-found', 'conflict', ...]). The handler returns the appropriate variant. The client sees a normal success envelope and matches on result.ok. Right for expected refusals — state-machine guards ("process already running"), capability checks, validation outcomes the application treats as data rather than as an error.

The two patterns compose. Use unions for state-machine refusals the application is expected to handle; reserve RpcError for genuine errors that should propagate as raised exceptions. Codegen-emitted proxy methods return the full union type, so type checkers (mypy / pyright) enforce exhaustive matching at the call site.

Common gateway integration

When clamator sits behind an HTTP gateway (typically a TS API in front of a Py engine, or vice versa), the gateway translates the typed RPC reply into an HTTP response. Two recommendations:

Map RpcError codes to HTTP status by class. The framework reserves -32700 / -32600 / -32601 / -32602 / -32603 (parse / invalid request / method not found / invalid params / internal error). -32601 and -32602 are caller bugs and naturally map to 400. -32603 is 500. Application-defined codes (-32000 and below) are gateway-specific — map them per the meaning your handlers assign them.

Map result-union refusal reason strings to HTTP status by convention. A typical mapping the gateway can implement once and reuse across endpoints:

result.reason HTTP status
not-found 404
conflict, already-running, already-exists 409
forbidden, not-authorized 403
validation-failed, invalid-input 422
not-launchable, precondition-failed 412
(default) 409 (request was understood but cannot be satisfied)

Successful results (result.ok == True) map to 200. The exhaustive reason union is part of the contract, so the gateway's match is type-checked against it — adding a new refusal reason without updating the gateway is a static-analysis error.

Authorization

clamator has no authorization at the protocol or transport layer. Any process that can reach the underlying transport — a Redis instance for over-redis, the parent process for over-memory — can call any registered method or send any notification on any registered service.

Apply caller-identity checks at the boundary: a gateway (typically an HTTP server in front of the typed proxy) enforces who-can-call-what before invoking the proxy method. For network-substrate transports, deploy the substrate behind a network you trust (TLS, AUTH, ACLs, private VPC).

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

clamator_protocol-0.1.8.tar.gz (16.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

clamator_protocol-0.1.8-py3-none-any.whl (19.3 kB view details)

Uploaded Python 3

File details

Details for the file clamator_protocol-0.1.8.tar.gz.

File metadata

  • Download URL: clamator_protocol-0.1.8.tar.gz
  • Upload date:
  • Size: 16.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for clamator_protocol-0.1.8.tar.gz
Algorithm Hash digest
SHA256 210e240ef3dde02b050cd387ccf96f38874680af9d0a61ef9199a2cd2aea0d8b
MD5 698ad286a44a91cdc75853c9df647d50
BLAKE2b-256 854ba19537a0f0ccc77a45d7cbea050e8e693f24e4f4709ee55d779c5773f867

See more details on using hashes here.

File details

Details for the file clamator_protocol-0.1.8-py3-none-any.whl.

File metadata

File hashes

Hashes for clamator_protocol-0.1.8-py3-none-any.whl
Algorithm Hash digest
SHA256 7e9486622949be03bcf2d71b1cdc430e98c03c05a69f5cdeb9db75f94597f0c9
MD5 a1f483ebbcb111f208c4897fc970e468
BLAKE2b-256 621c0738459924c76d3077c2eecb55fa4ed1db50843ab14af0dafbdc370a179d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page