Skip to main content

Redis-streams transport for clamator (pre-1.0).

Project description

clamator-over-redis

Redis-streams transport for clamator. Implements the Transport interface from clamator-protocol so JSON-RPC traffic flows over Redis streams between processes — typically a Py service and a TS service, or two Py services on different hosts. Requires Pydantic v2 and redis>=5.

Install

pip install clamator-over-redis clamator-protocol redis

Quickstart

Contracts are authored in TypeScript and the Python sibling is produced by @clamator/codegen:

npx @clamator/codegen --src contracts --out-py generated

The emitted generated/arith.py exports Pydantic models, a typed ArithClient, an ArithService ABC, and the arith_contract Contract object. Wire server and client through Redis, talk via ArithClient.

Server-side — register handlers and start:

from clamator_over_redis import RedisRpcServer
from redis.asyncio import Redis

from .generated.arith import AddParams, AddResult, ArithService, PingParams, arith_contract


class Arith(ArithService):
    async def add(self, params: AddParams) -> AddResult:
        return AddResult(sum=params.a + params.b)

    async def ping(self, params: PingParams) -> None:
        return None


async def build_arith_server(*, redis: Redis, key_prefix: str) -> RedisRpcServer:
    server = RedisRpcServer(redis=redis, key_prefix=key_prefix)  # injected redis= not closed by stop() — caller owns lifecycle; omit to let transport own it  # noqa: E501
    server.register_service(arith_contract, Arith())  # must precede start() — post-start registrations are silently ignored, no consumer group or read loop is created  # noqa: E501
    await server.start()
    return server

(Verbatim from py/packages/over-redis/tests/server.py:1-19.)

Client-side — call the typed proxy:

from clamator_over_redis import RedisRpcClient
from redis.asyncio import Redis

from .generated.arith import AddParams, AddResult, ArithClient


async def call_arith(*, redis: Redis, key_prefix: str) -> AddResult:
    client = RedisRpcClient(redis=redis, key_prefix=key_prefix, default_timeout_ms=3000)  # default timeout 30 s on the full round-trip (xadd → handler → reply); no auto-retry on disconnect; timeouts not propagated to server (server completes the handler and writes a reply the client ignores)  # noqa: E501
    await client.start()
    arith = ArithClient(client)
    r = await arith.add(AddParams(a=2, b=3))
    await client.stop()
    return r

(Verbatim from py/packages/over-redis/tests/client.py:1-13.)

server.start() returns once each registered service has its consumer group created and its read loop spawned; it does not block. Your application controls the server's lifetime. Call await server.stop() to shut down — drains in-flight handlers up to grace_ms (default 5 s) before disconnecting. start() and stop() are both idempotent (calling either twice is a no-op); once stop() has been called, calling start() again raises — create a new instance to restart. Client-side cancellation (e.g., asyncio.CancelledError raised in the awaiter) is not propagated to the server; the server completes the handler and writes a reply that the canceled caller never reads.

A single server can host multiple services. Call register_service(contract, handler_obj) once per contract before start(); each service gets its own consumer group keyed by the service name. Registrations after start() are silently ignored — no consumer group or read loop is created for them.

By default the connection is built from $REDIS_URL (or redis://localhost:6379). Pass redis_url= for a different URL, or redis= for a pre-built redis.asyncio.Redis instance.

key_prefix is used as a literal Redis key prefix — clamator does not parse it. Any string Redis accepts as a key works, including slashes, colons, and embedded path-like separators (e.g., my-app/tenant-42).

Sharing one injected redis instance across multiple RedisRpcServer and RedisRpcClient instances — and across your application's other Redis usage on the same instance — is safe. Each server/client manages its own subscription internally; XREADGROUP and reply-stream XREAD calls use short polling blocks, so non-blocking ops (XADD, XACK, XAUTOCLAIM) on the same connection interleave without deadlock.

Per-client reply streams are bounded: the server XADDs replies with maxlen=reply_stream_maxlen (default 1024, approximate) and the client deletes its reply stream on stop(). If a client process crashes without calling stop(), the reply-stream key persists with up to ~1024 entries until manually deleted; there is no Redis-side TTL.

Key surface

  • RedisRpcServer(*, key_prefix, redis=None, redis_url=None, ...)register_service(contract, handler_obj), start(), stop().
  • RedisRpcClient(*, key_prefix, redis=None, redis_url=None, default_timeout_ms=30_000)start(), stop(). The instance is a ClamatorClient, so it can be wrapped by a generated *Client proxy.

Client lifetime and fan-out

RedisRpcClient and RedisRpcServer are stateful: each spawns a background reply/consumer loop and (RedisRpcClient) maintains a per-instance reply-stream key in Redis. Construct once and keep alive for the application's lifetime — do not construct/destroy per call.

A key_prefix identifies a backend, not a service. One RedisRpcClient can back many service proxies — wrap it with each generated *Client:

from clamator_over_redis import RedisRpcClient

from .generated.arith import AddParams, AddResult, ArithClient
from .generated.logger import LoggerClient, LogParams


# One key_prefix-pinned RedisRpcClient backs many service proxies.
async def call_multiple_services(key_prefix: str) -> AddResult:
    client = RedisRpcClient(key_prefix=key_prefix)
    await client.start()
    arith = ArithClient(client)
    logger = LoggerClient(client)
    r = await arith.add(AddParams(a=2, b=3))
    await logger.log(LogParams(msg=f"sum={r.sum}"))
    await client.stop()
    return r

(Verbatim from py/packages/over-redis/tests/multi_service_example.py:1-16.)

For multiple backends, construct one RedisRpcClient per key_prefix and hold them in named variables. The same injected redis instance can back every client, so the marginal cost of an additional key_prefix is one background task + one reply-stream key in Redis.

Call await client.stop() on each client during application shutdown to drain the reply loop and delete the reply-stream key.

Worker-pool semantics

Multiple RedisRpcServer instances sharing the same key_prefix form a competing-consumers pool: each call is processed by exactly one instance. They share a single Redis consumer group per service (named <service>); each server is a unique consumer (named <service>:<instance_id>). XREADGROUP delivers each request to exactly one server. A reclaim loop (XAUTOCLAIM) re-delivers messages unacknowledged for consumer_claim_idle_ms (default 60,000 ms). Delivery semantics are at-least-once. To run a single-consumer scenario, run one server.

Handlers must be idempotent. A handler whose execution exceeds consumer_claim_idle_ms is reclaimed and re-dispatched to another consumer (or itself), so the same request may run more than once. A client timeout does not propagate to the server (see the client comment above), so a request the client gave up on may still complete server-side.

On start. The server's consumer loop reads new entries via XREADGROUP with id >. Pending entries from a prior session — entries XREADGROUPed but not XACKed before a crash — are reclaimed via XAUTOCLAIM after consumer_claim_idle_ms (default 60s) elapses; new entries arriving in the meantime are processed normally.

Per-service dispatch is serialized within a single server. Each registered service has its own consumer loop that reads up to 16 messages per XREADGROUP poll and processes them one at a time (await per message; no detached tasks). Multiple services registered on the same server run their own consumer loops concurrently, but two requests for the same service on the same server are not parallelized.

Single-server in-order invariant. Within one RedisRpcServer instance, this serialization is a documented invariant: a request arriving after another on the same service observes the full effect of the prior request's handler before its own handler runs. Handlers can rely on previous-call mutations being visible (state machines, per-aggregate updates) without explicit locking. The invariant survives as long as handler latency stays well under consumer_claim_idle_ms (default 60s) — a handler exceeding the reclaim threshold can be redelivered while still running, which violates the order.

Multi-server / worker-pool ordering is not yet finalized — clamator is pre-1.0. The current behavior under worker-pool fan-out (multiple RedisRpcServer instances sharing the same key_prefix) is competing-consumers via XREADGROUP, with no in-order guarantee across servers. That falls out of the XREADGROUP design rather than being a stable contract. If you need ordered processing today, the supported pattern is partition by key_prefix: assign a unique key_prefix per ordering domain (e.g., per tenant, per aggregate root, per database) and run exactly one server per key_prefix. Richer multi-server ordering primitives (e.g., sticky transactions, contract-level partition keys) are candidates for a future minor release.

To process one service's requests in parallel within a single server (when ordering is not required), have your handler spawn the work as asyncio.create_task(...) and return immediately; the consumer-loop dispatch becomes effectively non-blocking and the reply confirms acceptance, not completion.

Single-consumer case. For single-server deployments (one server per backend), worker-pool semantics degenerate trivially: the consumer group has one consumer, every request goes to that consumer, and no fan-out concerns apply.

Fire-and-forget operations

Operations the caller doesn't need a reply for — telemetry, cache invalidations, status pings — should be modeled as notifications in the contract (defineNotification on the TS side; MethodEntry(result_model=None, ...) on the Py side). The generated proxy emits a typed notification method that returns once the request envelope is XADDed to Redis; it does not wait for the server to process.

from clamator_over_redis import RedisRpcClient

from .generated.arith import ArithClient, PingParams


# Fire-and-forget: notification proxies return once the request is queued in Redis;
# they do not wait for the server to process. Handlers must be idempotent — see
# "Worker-pool semantics" for the at-least-once delivery details.
async def fire_notification(key_prefix: str) -> None:
    client = RedisRpcClient(key_prefix=key_prefix)
    await client.start()
    arith = ArithClient(client)
    await arith.ping(PingParams())
    await client.stop()

(Verbatim from py/packages/over-redis/tests/fire_and_forget_example.py:1-14.)

The await resolves once the message is on the stream. It does not confirm the server received, processed, or finished the call. Notification handlers run under the same at-least-once delivery semantics as method handlers — design them to be idempotent.

Long-running background processes

clamator's RPC surface is request/reply (and fire-and-forget for notifications). It does not provide a server-to-client streaming or progress channel — the typed proxy is a single round-trip. If you need actual monitoring and control of long-running background processes (start, stop, query state, report progress, cancel, sequential and parallel children, persistence across restarts), which is a different concern from RPC, look at Optio: a Python process-management framework that handles exactly that.

Authorization

clamator has no authorization at the RPC layer. Any process that can read/write this Redis instance can call any registered method or send any notification — there is no caller identity in the wire envelope.

Apply caller-identity checks at the boundary: a gateway (HTTP server, message-bus filter, etc.) enforces who-can-call-what before invoking the typed proxy. Deploy Redis behind a network you trust (TLS, AUTH, ACLs, private VPC); the transport assumes the substrate is already restricted to authenticated participants.

Keys owned under key_prefix

Pattern Type Purpose
<key_prefix>:cmds:<service> stream inbound command stream per service; servers consume via XREADGROUP, clients write via XADD
<key_prefix>:replies:<instance_id> stream per-client reply stream; servers write replies via XADD, the client reads via XREAD; deleted by client stop()
<service> consumer group competing-consumers pool name (lives inside the cmds stream's metadata; not a top-level key)

When to reach for this vs. clamator-over-memory

  • clamator-over-memory — tests, embedded scenarios, anything single-process.
  • clamator-over-redis — cross-process, cross-host, durable streams, production.

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

clamator_over_redis-0.1.4.tar.gz (13.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

clamator_over_redis-0.1.4-py3-none-any.whl (16.1 kB view details)

Uploaded Python 3

File details

Details for the file clamator_over_redis-0.1.4.tar.gz.

File metadata

  • Download URL: clamator_over_redis-0.1.4.tar.gz
  • Upload date:
  • Size: 13.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for clamator_over_redis-0.1.4.tar.gz
Algorithm Hash digest
SHA256 aac1286215bd2e13046164af548574cd40f822d5aa858198c1aeb57b5c4b5678
MD5 9e55ab808ee7c4d57a6d6a0b3711b443
BLAKE2b-256 00c16c242bff3514d3fdeab03c5802bd697a2fd7f0ad9a59115bd6feb059f58d

See more details on using hashes here.

File details

Details for the file clamator_over_redis-0.1.4-py3-none-any.whl.

File metadata

File hashes

Hashes for clamator_over_redis-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 6b8af4b7681706790c13c66ab934b1d0b2bd8bfc6da66b0f34c426fa87f0869a
MD5 6d163fd47312e24b1144fd951cce5b23
BLAKE2b-256 306cfe9a360db0c282b4f80d0b3f0b0c6760c0e111a94a55b13d93922f416698

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page