A simple and efficient message queue based on Redis Streams.
Project description
LeanMQ
LeanMQ is a lightweight, Redis-based message queue for microservice communication. It provides a simple but powerful implementation using Redis Streams with support for dead-letter queues, message TTL, atomic transactions, consumer groups, and message retry tracking.
Features
- Dead Letter Queues: Automatic handling of failed messages
- Message TTL: Set expiration times for messages
- Atomic Transactions: Send multiple messages in a single transaction
- Consumer Groups: Support for multiple consumers
- Message Tracking: Track delivery attempts and failures
- Lightweight: Simple API with minimal dependencies
Installation
pip install leanmq
Quick Start
from leanmq import LeanMQ
# Initialize message queue
mq = LeanMQ(redis_host="localhost", redis_port=6379)
# Create a queue pair (main queue and dead letter queue)
main_queue, dlq = mq.create_queue_pair("notifications")
# Send a message
message_id = main_queue.send_message(
{"type": "email", "recipient": "user@example.com"}
)
# Send messages in a transaction
with mq.transaction() as tx:
tx.send_message(main_queue, {"type": "email", "recipient": "user1@example.com"})
tx.send_message(main_queue, {"type": "sms", "recipient": "+1234567890"})
# Receive messages
messages = main_queue.get_messages(count=5, block_for_seconds=1)
# Process messages
for msg in messages:
try:
print(f"Processing message: {msg.id} - {msg.data}")
# Your processing logic here...
# Acknowledge successful processing (keeps message in stream for history)
main_queue.acknowledge_messages([msg.id])
except Exception as e:
# Move to DLQ if processing fails
main_queue.move_to_dlq([msg.id], f"Processing error: {e}", dlq)
# Clean up
mq.close()
Usage Guide
Initializing LeanMQ
from leanmq import LeanMQ
# Basic initialization
mq = LeanMQ()
# With custom Redis connection
mq = LeanMQ(
redis_host="redis.example.com",
redis_port=6379,
redis_db=0,
redis_password="password",
prefix="myapp:",
max_retries=3
)
# Using with statement for automatic cleanup
with LeanMQ() as mq:
# your code here
pass # Connection will be closed automatically
Working with Queues
# Create a new queue with its dead letter queue
main_queue, dlq = mq.create_queue_pair("orders")
# Get an existing queue
queue = mq.get_queue("orders")
# Get the associated dead letter queue
dlq = mq.get_dead_letter_queue("orders")
# List all queues
queues = mq.list_queues()
for q in queues:
print(f"Queue: {q.name}, Messages: {q.message_count}, DLQ: {q.is_dlq}")
# Delete a queue
mq.delete_queue("orders", delete_dlq=True)
Sending Messages
# Basic message sending
message_id = queue.send_message({"order_id": "12345", "status": "new"})
# With custom message ID
message_id = queue.send_message(
{"order_id": "12345", "status": "new"},
message_id="custom-id-123"
)
# With time-to-live (TTL) in seconds
message_id = queue.send_message(
{"order_id": "12345", "status": "new"},
ttl=3600 # Message will expire after 1 hour
)
Receiving and Processing Messages
# Get up to 10 messages
messages = queue.get_messages(count=10)
# Block for messages if none are immediately available
messages = queue.get_messages(count=5, block_for_seconds=5)
# Specify consumer ID (useful for load balancing)
messages = queue.get_messages(count=10, consumer_id="worker-1")
# Process and acknowledge messages
for msg in messages:
try:
# Process the message
process_order(msg.data)
# Acknowledge successful processing (keeps message in stream for history/auditing)
queue.acknowledge_messages([msg.id])
# Or completely remove the message from the stream
# queue.delete_messages([msg.id])
except Exception as e:
# If processing fails, move to dead letter queue
queue.move_to_dlq([msg.id], f"Error: {str(e)}", dlq)
Managing Dead Letter Queue (DLQ) Messages
# Get messages from DLQ
dlq_messages = dlq.get_messages(count=10)
# Permanently delete a message from DLQ
dlq.delete_messages([dlq_messages[0].id])
# Requeue a message back to the main queue for retry
dlq.requeue_messages([dlq_messages[1].id], main_queue)
Using Transactions
# Start a transaction to send multiple messages atomically
with mq.transaction() as tx:
# Add messages to the transaction
tx.send_message(queue1, {"key": "value1"})
tx.send_message(queue2, {"key": "value2"})
# Transaction will be committed at the end of the block
Queue Management
# Get information about a queue
info = queue.get_info()
print(f"Queue name: {info.name}")
print(f"Message count: {info.message_count}")
print(f"Pending messages: {info.pending_messages}")
# Purge all messages from a queue
purged_count = queue.purge()
print(f"Purged {purged_count} messages")
# Process expired messages across all queues
expired_count = mq.process_expired_messages()
print(f"Processed {expired_count} expired messages")
License
Apache 2.0 - see LICENSE.md for details.
Copyright
Copyright (c) 2025 Augustus D'Souza
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file leanmq-0.1.0.tar.gz.
File metadata
- Download URL: leanmq-0.1.0.tar.gz
- Upload date:
- Size: 16.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6fc5a187c4e6ee53611465f1f35942e8629961dfaf6ad9489253b5a205e48648
|
|
| MD5 |
044f57bfdd7eb1c517d8d6e4684f4b0a
|
|
| BLAKE2b-256 |
f4097fe29e12b5818df945468ee6de95a59e3163311e04d2ba4b8d0dee7826fe
|
Provenance
The following attestation bundles were made for leanmq-0.1.0.tar.gz:
Publisher:
publish.yml on augiwan/LeanMQ
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
leanmq-0.1.0.tar.gz -
Subject digest:
6fc5a187c4e6ee53611465f1f35942e8629961dfaf6ad9489253b5a205e48648 - Sigstore transparency entry: 182584697
- Sigstore integration time:
-
Permalink:
augiwan/LeanMQ@d1668e070f3c3d6b54a850f9d81bed4b67a96020 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/augiwan
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@d1668e070f3c3d6b54a850f9d81bed4b67a96020 -
Trigger Event:
release
-
Statement type:
File details
Details for the file leanmq-0.1.0-py3-none-any.whl.
File metadata
- Download URL: leanmq-0.1.0-py3-none-any.whl
- Upload date:
- Size: 17.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0e4349f2cdb2d1875517c8f81b7e97625f74cb681af5e4d103b7caa0806b7c11
|
|
| MD5 |
eeca97102a8fc176f5ca0bbe7cb1a0b1
|
|
| BLAKE2b-256 |
54f35a4a10d13eb20b4fd8bd1045be265defab8e8f55f81a9ac44e4099b1a2a4
|
Provenance
The following attestation bundles were made for leanmq-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on augiwan/LeanMQ
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
leanmq-0.1.0-py3-none-any.whl -
Subject digest:
0e4349f2cdb2d1875517c8f81b7e97625f74cb681af5e4d103b7caa0806b7c11 - Sigstore transparency entry: 182584698
- Sigstore integration time:
-
Permalink:
augiwan/LeanMQ@d1668e070f3c3d6b54a850f9d81bed4b67a96020 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/augiwan
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@d1668e070f3c3d6b54a850f9d81bed4b67a96020 -
Trigger Event:
release
-
Statement type: