Python message queuing with Redis and message deduplication
Project description
redis-message-queue
Lightweight Python message queuing with Redis and built-in publish-side deduplication. Deduplicate publishes within a TTL window, with crash recovery (at-least-once) on by default — across any number of producers and consumers.
pip install "redis-message-queue>=8.3.1,<9.0.0"
Requires Python >= 3.12 and Redis server >= 6.2.
Mental model: redis-message-queue is a payload queue, not a task framework. Producers publish a str or dict; consumers decide what it means. There is no task registry, result backend, scheduler, or handler-level retry policy — and an ordinary exception raised inside a handler is terminal, not an automatic retry. Coming from Celery, RQ, Dramatiq, or taskiq? Read Migrating from task frameworks before porting code.
Quickstart
Redis must be running locally first: use redis-server or
docker run -it --rm -p 6379:6379 redis:7.
Local Redis data: The sync and async quickstarts below connect to
redis://localhost:6379/0and use the fixed queue namespacequickstart. Each snippet publishes a message, then claims and removes one message under that namespace. If local DB 0 already containsquickstartdata that matters, use a disposable Redis instance, a separate DB/port, or change the URL/queue name before running them.
import json
from uuid import uuid4
from redis import Redis
from redis_message_queue import RedisMessageQueue
client = Redis.from_url("redis://localhost:6379/0", decode_responses=True)
queue = RedisMessageQueue(
"quickstart",
client=client,
deduplication=True,
get_deduplication_key=lambda msg: msg["id"],
)
message = {"id": f"msg-{uuid4().hex}", "text": "hello"}
queue.publish(message)
with queue.process_message() as message:
if message is not None:
payload = json.loads(message)
print(f"got {payload['text']}")
# Expected output: got hello
RedisMessageQueue itself is not a context manager. Use
with queue.process_message() as message: for each message.
Important: In the sync API, work inside
process_message()must be synchronous. If your handler isasync def, returns a coroutine, or returns any other awaitable, useredis_message_queue.asyncio.RedisMessageQueue. The sync context manager does not inspect the handler's return value; an unawaited coroutine can be dropped while the message is acked. For sync callback-style handlers, useprocess_message_callback(handler): it checks for awaitable returns before acking and raisesTypeErrorif one is returned.
Async quickstart
import asyncio
import json
from uuid import uuid4
from redis.asyncio import Redis
from redis_message_queue.asyncio import RedisMessageQueue
async def main():
client = Redis.from_url("redis://localhost:6379/0", decode_responses=True)
queue = RedisMessageQueue(
"quickstart",
client=client,
deduplication=True,
get_deduplication_key=lambda msg: msg["id"],
)
message = {"id": f"msg-{uuid4().hex}", "text": "hello"}
await queue.publish(message)
async with queue.process_message() as message:
if message is not None:
payload = json.loads(message)
print(f"got {payload['text']}")
await client.aclose()
asyncio.run(main()) # Expected output: got hello
Why redis-message-queue
The problem: You're sending messages between services or workers and need guarantees. Simple Redis LPUSH/BRPOP loses messages on crashes, doesn't deduplicate, and gives you no visibility into what succeeded or failed.
The solution: Atomic Lua scripts for publish + dedup, a processing queue for in-flight tracking (with optional crash recovery via visibility timeouts), and optional success/failure logs for observability.
| Feature | Details |
|---|---|
| Deduplicated publish | Lua-scripted atomic SET NX + LPUSH prevents duplicate enqueues within a configurable TTL window (default: 1 hour), even with producer retries. Requires an explicit get_deduplication_key callable so your application defines what counts as a duplicate. Note: deduplication is publish-side only and does not prevent duplicate delivery under at-least-once visibility-timeout reclaim |
| Visibility-timeout redelivery | Crashed or stalled consumers' messages are reclaimed and redelivered when a visibility timeout is configured |
| Completed & failed queues | Optional completed/failed queues for auditing, inspection, and application-owned manual reprocessing, with configurable max length to prevent unbounded growth |
| Dead-letter queue | Poison messages that exceed a configurable delivery count are automatically routed to a dead-letter queue instead of being redelivered indefinitely |
| Graceful shutdown | Built-in interrupt handler lets consumers finish current work before stopping |
| Lease heartbeats | Optional background lease renewal keeps long-running handlers from being redelivered prematurely |
| Connection retries | Exponential backoff with jitter for Redis ops; idempotent paths (deduplicated publish, ack, lease renewal, claim recovery) replay safely under retries, while non-deduplicated publish is intentionally not retried so the caller decides whether to retry (accepting potential duplicates). See Custom gateway |
| Async support | Mirrored async variant — same method and parameter names, but callbacks are not interchangeable: the sync queue rejects async callables, and on the async queue on_event must be async (get_deduplication_key and on_heartbeat_failure may be sync or async) |
All features are optional and can be enabled or disabled as needed.
Delivery semantics
| Configuration | Delivery guarantee |
|---|---|
Default (visibility_timeout_seconds=300) |
At-least-once — expired messages are reclaimed and redelivered |
With visibility_timeout_seconds=None, max_delivery_count=None |
At-most-once — a consumer crash loses the in-flight message |
See Crash recovery with visibility timeout for details and tradeoffs.
Because delivery-count limits depend on visibility-timeout reclaim, disabling
lease-based crash recovery requires setting both visibility_timeout_seconds=None
and max_delivery_count=None.
Important: Ordinary
Exceptionsubclasses raised by handler code are terminal. This library is a payload queue, not a task framework: raising an ordinaryExceptioninsideprocess_message()does not requeue the message. Withenable_failed_queue=False, the message is removed fromprocessing; withenable_failed_queue=True, it is moved to the failed list.Fatal
BaseExceptionpaths such asKeyboardInterrupt,SystemExit, and externally cancelled async tasks (asyncio.CancelledError) are shutdown/cancellation paths, not failed handler work. They can leave the message inprocessingfor visibility-timeout reclaim, or orphan it whenvisibility_timeout_seconds=None, max_delivery_count=None; see Graceful shutdown and Abandoned in-flight messages.
Configuration
Every feature is optional and set through constructor arguments. The complete reference — with runnable snippets for each option — lives in docs/configuration.md:
- Deduplication — publish-side dedup keys, TTL windows, and cardinality guidance
- Success and failure tracking — optional completed/failed audit lists and their caps
- Publish backpressure —
max_pending_lengthand theraise/drop_oldest/blockoverload policies - Crash recovery with visibility timeout — leases, heartbeats, and redelivery
- Ordering and multi-consumer fairness — the claim-order guarantee and its limits
- Dead-letter queue — routing poison messages off the redelivery path
- Graceful shutdown —
drain()/aclose()and the three shutdown shapes - Custom gateway — tuning retries and dedup TTL, or subclassing for new semantics
- Connection pool sizing — sizing Redis pools for heartbeat concurrency
Async API
Replace the import to use the async variant — it mirrors the sync API with the
same method and parameter names (call the awaitable methods with await):
from redis_message_queue.asyncio import RedisMessageQueue
The sync and async classes intentionally share names. In modules that use both,
alias the imports explicitly, for example
from redis_message_queue import RedisMessageQueue as SyncRedisMessageQueue and
from redis_message_queue.asyncio import RedisMessageQueue as AsyncRedisMessageQueue.
Callbacks are not interchangeable between the two classes: the sync queue rejects
async callables, and on the async queue on_event must be async, while
get_deduplication_key and on_heartbeat_failure may be sync or async.
The examples otherwise work the same way. Remember to close the connection when done:
import redis.asyncio as redis
client = redis.Redis()
# ... your code
await client.aclose()
For the sync Redis client, call client.close() during application shutdown when
you own the client lifecycle.
Migrating from RQ / Celery / Dramatiq / taskiq
redis-message-queue is a payload queue, not a task framework. It has no task
registry, job object, result backend, scheduler, workflow canvas, callback
graph, or handler-level retry policy. Producers publish a str or dict
payload, and consumers decide what that payload means.
The most important semantic differences from sibling task libraries are:
- Ordinary
Exceptionsubclasses raised by handler code are terminal. Raising an ordinaryExceptioninsideprocess_message()removes the message fromprocessing, or moves it to the failed list whenenable_failed_queue=True; it does not requeue or retry the message. FatalBaseExceptionshutdown or cancellation paths are covered by Graceful shutdown and Abandoned in-flight messages. visibility_timeout_secondsis a crash/stall recovery lease, not a runtime limit. Slow handlers are not interrupted; after the lease expires another consumer can process the same payload concurrently.on_eventis telemetry only. Callback exceptions are logged and emitted asRuntimeWarning, but they do not affect ack/nack, failed-queue movement, or any other message outcome. Do not useon_eventfor sagas, follow-up writes, billing callbacks, or other correctness-critical work.- Dict payloads are JSON data, not Python call arguments. JSON does not preserve every Python type: tuples become lists, and sets or custom objects raise unless you encode them into JSON-native values first.
- Process-global signal ownership cannot be safely chained with Celery, RQ, or
Dramatiq CLI workers. Prefer one top-level owner that calls
queue.drain()or sets an application stop event, and run sibling workers in separate processes.
When migrating on the same Redis deployment, prefer separate Redis DBs or hard
namespaces. Do not point a Celery, RQ, Dramatiq, or taskiq worker at an rmq
pending key. A sibling worker can pop the rmq stored message, fail its own
decoder, and leave the rmq queue without that message. Also avoid custom
key_separator values that synthesize another library's key namespace, such as
using ":queue:" with a queue name that overlaps RQ keys. rmq has no fixed
library prefix; generated keys share the Redis DB namespace with every other
Redis user.
Set strict_envelope_decoding=True if this Redis is shared with sibling task
libraries (Celery, RQ, Dramatiq) to fail-fast on foreign payloads. With the
default False, non-rmq values that do not start with the rmq envelope prefix
remain backward-compatible raw messages and are yielded to the handler.
Production notes
Deploying to production? See docs/operations.md for fork safety and pre-fork servers (gunicorn --preload, multiprocessing, ProcessPoolExecutor) and Redis memory sizing for deduplication and replay metadata.
Observability
redis-message-queue emits lifecycle events through an optional on_event callback — publish, dedup hits, claims, reclaims, ack/nack, DLQ moves, heartbeats, drain, and retries — and exposes a typed exception hierarchy rooted at RedisMessageQueueError. The full guide (event catalog, dispatch context, timing versus Redis commit, intentionally silent paths, secret-safety for event.error, and the exception tree) is in docs/observability.md.
Known limitations
Known limitations and edge cases — timed-wait polling, Lua atomicity, batch-reclaim bounds, Redis Cluster hash-tag requirements, non-ASCII payload sizing, and client-side Retry interactions — are catalogued in docs/operations.md#known-limitations. For the full residual-risk register, see docs/production-readiness.md.
Upgrading
Version migration guides — v7→v8, v6→v7, v5→v6, v2→v3, and the destructive-on-live-queues configuration changes — are in UPGRADING.md. Per-release detail lives in CHANGELOG.md.
Running locally
These examples ship in the GitHub repo, not the PyPI package — clone the repo and
run uv sync first.
Start a local Redis server with redis-server, or with Docker:
docker run -it --rm -p 6379:6379 redis:7
Try the examples with multiple terminals:
These examples connect to REDIS_URL when it is set; otherwise they use
redis://localhost:6379/0 (database 0). The send and receive examples use the
fixed queue name my_message_queue: publishers write queue data under that
namespace, including pending and deduplication keys, and consumers can claim and
remove messages from it. If existing local Redis data in localhost:6379/0
matters, run a disposable Redis instance or select a separate Redis database
before running the commands.
# Two publishers
uv run python -m examples.send_messages
uv run python -m examples.send_messages
# Three consumers
uv run python -m examples.receive_messages
uv run python -m examples.receive_messages
uv run python -m examples.receive_messages
These publisher and consumer examples are long-running; stop them with Ctrl+C or
another interrupt when you are done. Publishers print Success: Sent message ...
or Duplicate: Message .... Consumers print Received Message: ... before
simulated time.sleep(...) work and Finished processing message ...
afterward. On a clean single handled shutdown, the consumer prints Exiting...
and the signal handler prints Received signal: ... on stderr.
When examples run through wrappers such as uv run, terminal interrupts may
reach the process group more than once. If an interrupt lands during the
consumer's simulated time.sleep(...) work, you can see a KeyboardInterrupt
traceback instead of the clean Exiting... line.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file redis_message_queue-8.3.1.tar.gz.
File metadata
- Download URL: redis_message_queue-8.3.1.tar.gz
- Upload date:
- Size: 81.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
60bb1c3a0283bb305be15864f1b74f420a923c7705c7c48c9a9b7df810f1424f
|
|
| MD5 |
4d8817dc9dbfad32edd90c2a89b4a04f
|
|
| BLAKE2b-256 |
b64558166e8211a0071dcb2eb03e59b9da03e81a7ed0547706d51908f580cf0f
|
Provenance
The following attestation bundles were made for redis_message_queue-8.3.1.tar.gz:
Publisher:
release.yml on Elijas/redis-message-queue
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
redis_message_queue-8.3.1.tar.gz -
Subject digest:
60bb1c3a0283bb305be15864f1b74f420a923c7705c7c48c9a9b7df810f1424f - Sigstore transparency entry: 1725561810
- Sigstore integration time:
-
Permalink:
Elijas/redis-message-queue@fbbc3fcaa891be4d38f726c986732b84a565a18d -
Branch / Tag:
refs/tags/v8.3.1 - Owner: https://github.com/Elijas
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@fbbc3fcaa891be4d38f726c986732b84a565a18d -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file redis_message_queue-8.3.1-py3-none-any.whl.
File metadata
- Download URL: redis_message_queue-8.3.1-py3-none-any.whl
- Upload date:
- Size: 90.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
db95498c6c9446afe25159b46d845873aba490be6c187051188dbbc958ab6614
|
|
| MD5 |
2e074bd25edd1947b18259b089001f29
|
|
| BLAKE2b-256 |
2ce7f94ac4054e24c3d24c2399dcf4cc3c0bc3001dd6c51513ea170d12d6c4d1
|
Provenance
The following attestation bundles were made for redis_message_queue-8.3.1-py3-none-any.whl:
Publisher:
release.yml on Elijas/redis-message-queue
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
redis_message_queue-8.3.1-py3-none-any.whl -
Subject digest:
db95498c6c9446afe25159b46d845873aba490be6c187051188dbbc958ab6614 - Sigstore transparency entry: 1725561913
- Sigstore integration time:
-
Permalink:
Elijas/redis-message-queue@fbbc3fcaa891be4d38f726c986732b84a565a18d -
Branch / Tag:
refs/tags/v8.3.1 - Owner: https://github.com/Elijas
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@fbbc3fcaa891be4d38f726c986732b84a565a18d -
Trigger Event:
workflow_dispatch
-
Statement type: