Skip to main content

A simple and efficient message queue based on Redis Streams.

Project description

LeanMQ

LeanMQ is a lightweight, Redis-based message queue for microservice communication. It provides a simple but powerful implementation using Redis Streams with support for dead-letter queues, message TTL, atomic transactions, consumer groups, and message retry tracking.

Features

  • Dead Letter Queues: Automatic handling of failed messages
  • Message TTL: Set expiration times for messages
  • Atomic Transactions: Send multiple messages in a single transaction
  • Consumer Groups: Support for multiple consumers
  • Message Tracking: Track delivery attempts and failures
  • Lightweight: Simple API with minimal dependencies

Installation

pip install leanmq

Quick Start

from leanmq import LeanMQ

# Initialize message queue
mq = LeanMQ(redis_host="localhost", redis_port=6379)

# Create a queue pair (main queue and dead letter queue)
main_queue, dlq = mq.create_queue_pair("notifications")

# Send a message
message_id = main_queue.send_message(
    {"type": "email", "recipient": "user@example.com"}
)

# Send messages in a transaction
with mq.transaction() as tx:
    tx.send_message(main_queue, {"type": "email", "recipient": "user1@example.com"})
    tx.send_message(main_queue, {"type": "sms", "recipient": "+1234567890"})

# Receive messages
messages = main_queue.get_messages(count=5, block_for_seconds=1)

# Process messages
for msg in messages:
    try:
        print(f"Processing message: {msg.id} - {msg.data}")
        # Your processing logic here...

        # Acknowledge successful processing (keeps message in stream for history)
        main_queue.acknowledge_messages([msg.id])
    except Exception as e:
        # Move to DLQ if processing fails
        main_queue.move_to_dlq([msg.id], f"Processing error: {e}", dlq)

# Clean up
mq.close()

Usage Guide

Initializing LeanMQ

from leanmq import LeanMQ

# Basic initialization
mq = LeanMQ()

# With custom Redis connection
mq = LeanMQ(
    redis_host="redis.example.com",
    redis_port=6379,
    redis_db=0,
    redis_password="password",
    prefix="myapp:",
    max_retries=3
)

# Using with statement for automatic cleanup
with LeanMQ() as mq:
    # your code here
    pass  # Connection will be closed automatically

Working with Queues

# Create a new queue with its dead letter queue
main_queue, dlq = mq.create_queue_pair("orders")

# Get an existing queue
queue = mq.get_queue("orders")

# Get the associated dead letter queue
dlq = mq.get_dead_letter_queue("orders")

# List all queues
queues = mq.list_queues()
for q in queues:
    print(f"Queue: {q.name}, Messages: {q.message_count}, DLQ: {q.is_dlq}")

# Delete a queue
mq.delete_queue("orders", delete_dlq=True)

Sending Messages

# Basic message sending
message_id = queue.send_message({"order_id": "12345", "status": "new"})

# With custom message ID
message_id = queue.send_message(
    {"order_id": "12345", "status": "new"},
    message_id="custom-id-123"
)

# With time-to-live (TTL) in seconds
message_id = queue.send_message(
    {"order_id": "12345", "status": "new"},
    ttl=3600  # Message will expire after 1 hour
)

Receiving and Processing Messages

# Get up to 10 messages
messages = queue.get_messages(count=10)

# Block for messages if none are immediately available
messages = queue.get_messages(count=5, block_for_seconds=5)

# Specify consumer ID (useful for load balancing)
messages = queue.get_messages(count=10, consumer_id="worker-1")

# Process and acknowledge messages
for msg in messages:
    try:
        # Process the message
        process_order(msg.data)
        
        # Acknowledge successful processing (keeps message in stream for history/auditing)
        queue.acknowledge_messages([msg.id])
        
        # Or completely remove the message from the stream
        # queue.delete_messages([msg.id])
    except Exception as e:
        # If processing fails, move to dead letter queue
        queue.move_to_dlq([msg.id], f"Error: {str(e)}", dlq)

Managing Dead Letter Queue (DLQ) Messages

# Get messages from DLQ
dlq_messages = dlq.get_messages(count=10)

# Permanently delete a message from DLQ
dlq.delete_messages([dlq_messages[0].id])

# Requeue a message back to the main queue for retry
dlq.requeue_messages([dlq_messages[1].id], main_queue)

Using Transactions

# Start a transaction to send multiple messages atomically
with mq.transaction() as tx:
    # Add messages to the transaction
    tx.send_message(queue1, {"key": "value1"})
    tx.send_message(queue2, {"key": "value2"})
    # Transaction will be committed at the end of the block

Queue Management

# Get information about a queue
info = queue.get_info()
print(f"Queue name: {info.name}")
print(f"Message count: {info.message_count}")
print(f"Pending messages: {info.pending_messages}")

# Purge all messages from a queue
purged_count = queue.purge()
print(f"Purged {purged_count} messages")

# Process expired messages across all queues
expired_count = mq.process_expired_messages()
print(f"Processed {expired_count} expired messages")

License

Apache 2.0 - see LICENSE.md for details.

Copyright

Copyright (c) 2025 Augustus D'Souza

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

leanmq-0.1.0.tar.gz (16.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

leanmq-0.1.0-py3-none-any.whl (17.4 kB view details)

Uploaded Python 3

File details

Details for the file leanmq-0.1.0.tar.gz.

File metadata

  • Download URL: leanmq-0.1.0.tar.gz
  • Upload date:
  • Size: 16.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for leanmq-0.1.0.tar.gz
Algorithm Hash digest
SHA256 6fc5a187c4e6ee53611465f1f35942e8629961dfaf6ad9489253b5a205e48648
MD5 044f57bfdd7eb1c517d8d6e4684f4b0a
BLAKE2b-256 f4097fe29e12b5818df945468ee6de95a59e3163311e04d2ba4b8d0dee7826fe

See more details on using hashes here.

Provenance

The following attestation bundles were made for leanmq-0.1.0.tar.gz:

Publisher: publish.yml on augiwan/LeanMQ

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file leanmq-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: leanmq-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 17.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for leanmq-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0e4349f2cdb2d1875517c8f81b7e97625f74cb681af5e4d103b7caa0806b7c11
MD5 eeca97102a8fc176f5ca0bbe7cb1a0b1
BLAKE2b-256 54f35a4a10d13eb20b4fd8bd1045be265defab8e8f55f81a9ac44e4099b1a2a4

See more details on using hashes here.

Provenance

The following attestation bundles were made for leanmq-0.1.0-py3-none-any.whl:

Publisher: publish.yml on augiwan/LeanMQ

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page