Skip to main content

FastStream broker integration for Redis-backed distributed timer scheduling

Project description

faststream-redis-timers

Supported versions downloads

faststream-redis-timers is a FastStream broker integration for Redis-backed distributed timer scheduling.

Schedule messages to be delivered to subscribers at a future point in time, with at-least-once delivery across multiple workers.

from datetime import timedelta
from faststream import FastStream
from faststream_redis_timers import TimersBroker
from redis.asyncio import Redis

client = Redis.from_url("redis://localhost:6379")
broker = TimersBroker(client)
app = FastStream(broker)

@broker.subscriber("invoices")
async def handle_invoice(invoice_id: str) -> None:
    print(f"Invoice {invoice_id} is due!")

@app.after_startup
async def schedule() -> None:
    await broker.publish(
        "INV-001",
        topic="invoices",
        activate_in=timedelta(days=30),
    )

Schedule at an absolute time with activate_at instead. publish() returns the resolved timer_id so you can keep it for cancellation:

from datetime import UTC, datetime

timer_id = await broker.publish(
    "INV-001",
    topic="invoices",
    activate_at=datetime(2026, 6, 1, 9, tzinfo=UTC),
)

How it works

Timers are stored in Redis as two keys per topic:

  • A sorted set (timers_timeline:{topic}) with the activation timestamp as score
  • A hash (timers_payloads:{topic}) with the serialized message body

So a timer on the invoices topic lives under timers_timeline:invoices and timers_payloads:invoices — handy when poking around in redis-cli. A TimersRouter(prefix="my-service:") extends the suffix to my-service:invoices.

A polling loop checks for due timers and atomically claims each one via a Lua script that pushes its score forward by lease_ttl seconds — granting the worker a lease. The timer is removed from Redis only after the handler completes successfully. If the worker crashes mid-handler or the handler raises, the lease eventually expires and another worker re-claims the timer.

This is the standard SQS-style visibility-timeout pattern: at-least-once delivery with no data loss on crash, at the cost of requiring idempotent handlers.

Cancellation

timer_id = await broker.publish("INV-001", topic="invoices", activate_in=timedelta(days=30))
await broker.cancel_timer("invoices", timer_id)

Tracing & headers

publish() accepts correlation_id and headers — both round-trip to the handler via the standard FastStream StreamMessage:

await broker.publish(
    {"order_id": 42},
    topic="orders",
    correlation_id="trace-abc-123",
    headers={"x-tenant": "acme", "x-priority": "high"},
)

Inside the handler:

from faststream import Context

@broker.subscriber("orders")
async def handle(
    body: dict,
    correlation_id: str = Context("message.correlation_id"),
    tenant: str = Context("message.headers.x-tenant"),
) -> None:
    ...

Connection ownership

TimersBroker does not close the Redis client you pass in — the caller owns its lifecycle. The same client can be shared across multiple brokers, so closing it from one broker would surprise the others. Manage the client with async with or a try/finally:

async with Redis.from_url("redis://localhost:6379") as client:
    broker = TimersBroker(client)
    app = FastStream(broker)
    await app.run()

Tuning

Per-subscriber knobs (passed to @broker.subscriber("topic", ...)):

  • lease_ttl (default 30 seconds) — how long a worker holds the lease while processing. Handlers must complete within this window or another worker may re-deliver the timer (duplicate). Increase if your handlers are slow.
  • polling_interval (default 0.05 s) — base poll interval used when the queue has work or just transitioned from idle. Doubles on each consecutive empty cycle, capped at max_polling_interval, with ±50% jitter applied each sleep.
  • max_polling_interval (default 5.0 s) — ceiling for the adaptive idle backoff. Lower it for tighter delivery latency on idle queues; raise it to reduce Redis load on workloads with long idle stretches.
  • max_concurrent (default 5) — maximum number of handlers running in parallel per subscriber. Also caps the fetch batch size per poll cycle. Handlers must be safe under concurrency in addition to being idempotent.

Failure modes

  • Handlers must be idempotent. A handler that ran successfully but whose ack failed to land in Redis (network blip) will be retried; a handler that takes longer than lease_ttl may be re-delivered to another worker.
  • Buggy handler retries forever. If a handler always raises, the timer is retried every lease_ttl seconds indefinitely — there is no built-in attempt counter. Raise faststream.exceptions.RejectMessage from your handler to drop a poison-pill timer permanently, or track attempts in your own state if you need a hard cap.

High availability

Run multiple TimersBroker processes against the same Redis keys. The Lua claim script ensures each due timer is leased by exactly one worker at a time; failover is automatic via lease expiry.

broker = TimersBroker(
    Redis.from_url("redis://..."),
    timeline_key="my_timeline",
    payloads_key="my_payloads",
)

📚 Documentation

📦 PyPi

📝 License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

faststream_redis_timers-0.2.1.tar.gz (15.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

faststream_redis_timers-0.2.1-py3-none-any.whl (25.8 kB view details)

Uploaded Python 3

File details

Details for the file faststream_redis_timers-0.2.1.tar.gz.

File metadata

  • Download URL: faststream_redis_timers-0.2.1.tar.gz
  • Upload date:
  • Size: 15.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for faststream_redis_timers-0.2.1.tar.gz
Algorithm Hash digest
SHA256 14c5e6ec7e03d27577aa74b522ced705c500e7cb48940c42e69ac09c7f56504e
MD5 d3bbedc6e33f9a274de5767e9d537070
BLAKE2b-256 ea35a7039c38fae39a9a43d3b26a1601b86a27c3fb6492d0f6b0f6fd3f7bf9c6

See more details on using hashes here.

File details

Details for the file faststream_redis_timers-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: faststream_redis_timers-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 25.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for faststream_redis_timers-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 741b29820eed9c3b57efe53dfde3110b971b9e354f0c33e0e469167ffcd2ec61
MD5 894061e2e02ac5dbedbb1353e318d20b
BLAKE2b-256 0954925a12ecf11b9b1519a62ef0ecee6e1b074d1fbe638b9d12037cddf3fee0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page