Skip to main content

Redis-streams transport for clamator (pre-1.0).

Project description

clamator-over-redis

Redis-streams transport for clamator. Implements the Transport interface from clamator-protocol so JSON-RPC traffic flows over Redis streams between processes — typically a Py service and a TS service, or two Py services on different hosts.

Install

pip install clamator-over-redis clamator-protocol redis

Quickstart

Contracts are authored in TypeScript and the Python sibling is produced by @clamator/codegen:

npx @clamator/codegen --src contracts --out-py generated

The emitted generated/arith.py exports Pydantic models, a typed ArithClient, an ArithService ABC, and the arith_contract Contract object. Wire server and client through Redis, talk via ArithClient.

Server-side — register handlers and start:

from clamator_over_redis import RedisRpcServer
from redis.asyncio import Redis

from .generated.arith import AddParams, AddResult, ArithService, PingParams, arith_contract


class Arith(ArithService):
    async def add(self, params: AddParams) -> AddResult:
        return AddResult(sum=params.a + params.b)

    async def ping(self, params: PingParams) -> None:
        return None


async def build_arith_server(*, redis: Redis, key_prefix: str) -> RedisRpcServer:
    server = RedisRpcServer(redis=redis, key_prefix=key_prefix)  # injected redis= not closed by stop() — caller owns lifecycle; omit to let transport own it  # noqa: E501
    server.register_service(arith_contract, Arith())  # must precede start() — post-start registrations are silently ignored, no consumer group or read loop is created  # noqa: E501
    await server.start()
    return server

(Verbatim from py/packages/over-redis/tests/server.py:1-19.)

Client-side — call the typed proxy:

from clamator_over_redis import RedisRpcClient
from redis.asyncio import Redis

from .generated.arith import AddParams, AddResult, ArithClient


async def call_arith(*, redis: Redis, key_prefix: str) -> AddResult:
    client = RedisRpcClient(redis=redis, key_prefix=key_prefix, default_timeout_ms=3000)  # default timeout 30 s; no auto-retry on disconnect; timeouts not propagated to server  # noqa: E501
    await client.start()
    arith = ArithClient(client)
    r = await arith.add(AddParams(a=2, b=3))
    await client.stop()
    return r

(Verbatim from py/packages/over-redis/tests/client.py:1-13.)

Call await server.stop() to shut down — drains in-flight handlers up to grace_ms (default 5 s) before disconnecting.

By default the connection is built from $REDIS_URL (or redis://localhost:6379). Pass redis_url= for a different URL, or redis= for a pre-built redis.asyncio.Redis instance.

Key surface

  • RedisRpcServer(*, key_prefix, redis=None, redis_url=None, ...)register_service(contract, handler_obj), start(), stop().
  • RedisRpcClient(*, key_prefix, redis=None, redis_url=None, default_timeout_ms=30_000)start(), stop(). The instance is a ClamatorClient, so it can be wrapped by a generated *Client proxy.

Worker-pool semantics

Multiple RedisRpcServer instances sharing the same key_prefix form a competing-consumers pool: each call is processed by exactly one instance. They share a single Redis consumer group per service (named <service>); each server is a unique consumer (named <service>:<instance_id>). XREADGROUP delivers each request to exactly one server. A reclaim loop (XAUTOCLAIM) re-delivers messages unacknowledged for consumer_claim_idle_ms (default 60,000 ms). Delivery semantics are at-least-once. To run a single-consumer scenario, run one server.

Keys owned under key_prefix

Pattern Type Purpose
<key_prefix>:cmds:<service> stream inbound command stream per service; servers consume via XREADGROUP, clients write via XADD
<key_prefix>:replies:<instance_id> stream per-client reply stream; servers write replies via XADD, the client reads via XREAD; deleted by client stop()
<service> consumer group competing-consumers pool name (lives inside the cmds stream's metadata; not a top-level key)

When to reach for this vs. clamator-over-memory

  • clamator-over-memory — tests, embedded scenarios, anything single-process.
  • clamator-over-redis — cross-process, cross-host, durable streams, production.

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

clamator_over_redis-0.1.2.tar.gz (9.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

clamator_over_redis-0.1.2-py3-none-any.whl (12.8 kB view details)

Uploaded Python 3

File details

Details for the file clamator_over_redis-0.1.2.tar.gz.

File metadata

  • Download URL: clamator_over_redis-0.1.2.tar.gz
  • Upload date:
  • Size: 9.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for clamator_over_redis-0.1.2.tar.gz
Algorithm Hash digest
SHA256 63ba53c9cb14e5d91f673c8e1934728eee0624bc788531bf5dc94ca1a78580b7
MD5 2f7be68743dbe3114097aeefb8d7b4a9
BLAKE2b-256 9345a299dad20953a08413cc4852600db330d9771e0a7afda24e4a5574cd112e

See more details on using hashes here.

File details

Details for the file clamator_over_redis-0.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for clamator_over_redis-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 36cf6fd077f26cdcc63567b3c18812f28a0b86b43789958f0661e16feb1f6459
MD5 d93421ac959fbcd92e4b7d3467e5edf7
BLAKE2b-256 738d5d4b11a5113d3c2771cb4b52e4306d9e7a779127d91babadae227cae9bd4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page