Skip to main content

Python message queuing with Redis and message deduplication

Project description

redis-message-queue

Robust Python queuing with message deduplication.

Features

  • Exactly-once delivery and publish guarantees: Our system ensures that messages are both delivered and published no more than once. This is achieved through Redis' atomic transactions, message deduplication, and idempotent processing, which together prevent race conditions, duplicate processing, and multiple publications.
  • Message deduplication and idempotent processing: By default, messages are deduplicated to prevent multiple sends of the same message. This ensures that each message is processed only once, maintaining idempotency even with producer retries.
  • Automatic message acknowledgement and resilient processing: Messages are automatically acknowledged post-processing, with a robust mechanism in place to handle consumer crashes. Failed messages are moved to a dedicated log within Redis, preventing loss and allowing for recovery and reprocessing.
  • Efficient and visible message handling: Success and failure logs provide insight into message processing outcomes. Additionally, Redis' blocking queue commands optimize resource usage by eliminating the need for constant polling, thus conserving CPU resources.
  • Graceful shutdown for idle consumers: The system includes a mechanism to handle graceful shutdowns, allowing consumers to complete processing of the current message before shutting down. This is particularly useful for handling interrupt signals (e.g., Ctrl+C) without disrupting ongoing tasks.
  • Threadless heartbeats for idle consumers: The system employs a heartbeat mechanism for consumers awaiting messages, which operates without additional threads or processes, ensuring minimal resource consumption and a simplified consumer architecture.

Please note that these features are optional and can be disabled as needed.

Preparation

pip install redis-message-queue

You will also need a running Redis server. You can run one locally with Docker:

docker run -it --rm -p 6379:6379 redis

Usage

Send messages to a queue:

import time
from random import randint as random_number

from redis import Redis

from redis_message_queue import RedisMessageQueue

client = Redis.from_url("redis://localhost:6379/0")
queue = RedisMessageQueue(
    name="my_message_queue",
    client=client,
    deduplication=True,
)

while True:
    # Sending unique messages
    queue.publish(f"Hello (id={random_number(0, 1_000_000)})")
    time.sleep(1)

Receive messages from a queue:

from redis import Redis

from redis_message_queue import RedisMessageQueue

client = Redis.from_url(
    "redis://localhost:6379/0",
    decode_responses=True,
)
queue = RedisMessageQueue("my_message_queue", client=client)

while True:
    with queue.process_message() as message:
        if message:
            print(f"Received Message: {message}")

To see how the message queue operates, you can look at the examples in the examples folder.

Run two publishers and three workers by using the commands below. Each command should be run in its own terminal window:

poetry run python -m examples.send_messages
poetry run python -m examples.send_messages
poetry run python -m examples.receive_messages
poetry run python -m examples.receive_messages
poetry run python -m examples.receive_messages

Asyncio

To use asyncio, just replace

from redis_message_queue import RedisMessageQueue

with

from redis_message_queue.asyncio import RedisMessageQueue

All examples are the same for both versions, except that you'll need to manually close the connection as described in the documentation:

import redis.asyncio as redis

client = redis.Redis()
# ...all of your other code
await client.aclose()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

redis_message_queue-0.10.1.tar.gz (8.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

redis_message_queue-0.10.1-py3-none-any.whl (12.1 kB view details)

Uploaded Python 3

File details

Details for the file redis_message_queue-0.10.1.tar.gz.

File metadata

  • Download URL: redis_message_queue-0.10.1.tar.gz
  • Upload date:
  • Size: 8.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.6.1 CPython/3.11.4 Darwin/24.3.0

File hashes

Hashes for redis_message_queue-0.10.1.tar.gz
Algorithm Hash digest
SHA256 4eba48bd4355852c60b3afff9a677dd3d86653fc36dd9b27a09be17e2b54e104
MD5 5d224223a770678d443e32bf4e2ee231
BLAKE2b-256 07d85c15919402f9107467d99e81ddf9c5e3359f83f01873dc2e5ff92edc4af2

See more details on using hashes here.

File details

Details for the file redis_message_queue-0.10.1-py3-none-any.whl.

File metadata

File hashes

Hashes for redis_message_queue-0.10.1-py3-none-any.whl
Algorithm Hash digest
SHA256 9251d44ea8bea86aee9428bb0a6430d1afa5a881aedf3fa833618a05e4219cdf
MD5 939e5cf3158c2ee2a3b63fe6776c5de6
BLAKE2b-256 641b10627c0e7e4fe27872e9c9487c4424e88ab4c56a4977f1238c8c9ebe141a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page