Redis-streams transport for clamator (pre-1.0).
Project description
clamator-over-redis
Redis-streams transport for clamator. Implements the Transport interface from clamator-protocol so JSON-RPC traffic flows over Redis streams between processes — typically a Py service and a TS service, or two Py services on different hosts.
Install
pip install clamator-over-redis clamator-protocol redis
Quickstart
Contracts are authored in TypeScript and the Python sibling is produced by @clamator/codegen:
npx @clamator/codegen --src contracts --out-py generated
The emitted generated/arith.py exports Pydantic models, a typed ArithClient, an ArithService ABC, and the arith_contract Contract object. Wire server and client through Redis, talk via ArithClient.
Server-side — register handlers and start:
from clamator_over_redis import RedisRpcServer
from redis.asyncio import Redis
from .generated.arith import AddParams, AddResult, ArithService, PingParams, arith_contract
class Arith(ArithService):
async def add(self, params: AddParams) -> AddResult:
return AddResult(sum=params.a + params.b)
async def ping(self, params: PingParams) -> None:
return None
async def build_arith_server(*, redis: Redis, key_prefix: str) -> RedisRpcServer:
server = RedisRpcServer(redis=redis, key_prefix=key_prefix) # injected redis= not closed by stop() — caller owns lifecycle; omit to let transport own it # noqa: E501
server.register_service(arith_contract, Arith()) # must precede start() — post-start registrations are silently ignored, no consumer group or read loop is created # noqa: E501
await server.start()
return server
(Verbatim from py/packages/over-redis/tests/server.py:1-19.)
Client-side — call the typed proxy:
from clamator_over_redis import RedisRpcClient
from redis.asyncio import Redis
from .generated.arith import AddParams, AddResult, ArithClient
async def call_arith(*, redis: Redis, key_prefix: str) -> AddResult:
client = RedisRpcClient(redis=redis, key_prefix=key_prefix, default_timeout_ms=3000) # default timeout 30 s; no auto-retry on disconnect; timeouts not propagated to server # noqa: E501
await client.start()
arith = ArithClient(client)
r = await arith.add(AddParams(a=2, b=3))
await client.stop()
return r
(Verbatim from py/packages/over-redis/tests/client.py:1-13.)
Call await server.stop() to shut down — drains in-flight handlers up to grace_ms (default 5 s) before disconnecting.
By default the connection is built from $REDIS_URL (or redis://localhost:6379). Pass redis_url= for a different URL, or redis= for a pre-built redis.asyncio.Redis instance.
Key surface
RedisRpcServer(*, key_prefix, redis=None, redis_url=None, ...)—register_service(contract, handler_obj),start(),stop().RedisRpcClient(*, key_prefix, redis=None, redis_url=None, default_timeout_ms=30_000)—start(),stop(). The instance is aClamatorClient, so it can be wrapped by a generated*Clientproxy.
Worker-pool semantics
Multiple RedisRpcServer instances sharing the same key_prefix form a competing-consumers pool: each call is processed by exactly one instance. They share a single Redis consumer group per service (named <service>); each server is a unique consumer (named <service>:<instance_id>). XREADGROUP delivers each request to exactly one server. A reclaim loop (XAUTOCLAIM) re-delivers messages unacknowledged for consumer_claim_idle_ms (default 60,000 ms). Delivery semantics are at-least-once. To run a single-consumer scenario, run one server.
Keys owned under key_prefix
| Pattern | Type | Purpose |
|---|---|---|
<key_prefix>:cmds:<service> |
stream | inbound command stream per service; servers consume via XREADGROUP, clients write via XADD |
<key_prefix>:replies:<instance_id> |
stream | per-client reply stream; servers write replies via XADD, the client reads via XREAD; deleted by client stop() |
<service> |
consumer group | competing-consumers pool name (lives inside the cmds stream's metadata; not a top-level key) |
When to reach for this vs. clamator-over-memory
clamator-over-memory— tests, embedded scenarios, anything single-process.clamator-over-redis— cross-process, cross-host, durable streams, production.
Links
- Sibling (TypeScript):
@clamator/over-redis - Codegen:
@clamator/codegen(run from TS side; consume the generated Python output) - Design spec:
docs/2026-05-07-clamator-design.md - Agent rules:
AGENTS.md
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file clamator_over_redis-0.1.2.tar.gz.
File metadata
- Download URL: clamator_over_redis-0.1.2.tar.gz
- Upload date:
- Size: 9.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
63ba53c9cb14e5d91f673c8e1934728eee0624bc788531bf5dc94ca1a78580b7
|
|
| MD5 |
2f7be68743dbe3114097aeefb8d7b4a9
|
|
| BLAKE2b-256 |
9345a299dad20953a08413cc4852600db330d9771e0a7afda24e4a5574cd112e
|
File details
Details for the file clamator_over_redis-0.1.2-py3-none-any.whl.
File metadata
- Download URL: clamator_over_redis-0.1.2-py3-none-any.whl
- Upload date:
- Size: 12.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
36cf6fd077f26cdcc63567b3c18812f28a0b86b43789958f0661e16feb1f6459
|
|
| MD5 |
d93421ac959fbcd92e4b7d3467e5edf7
|
|
| BLAKE2b-256 |
738d5d4b11a5113d3c2771cb4b52e4306d9e7a779127d91babadae227cae9bd4
|