Skip to main content

A lightweight, async Redis-based queue for small applications, alternative to Kafka.

Project description

redisaq

redisaq is a Python library for distributed job queuing and processing using Redis Streams. It provides a robust, scalable solution for handling distributed workloads with features like consumer groups, automatic partition rebalancing, and fault tolerance.

Installation

Install redisaq from PyPI:

pip install redisaq

Features

Producer

  • Message Handling:
    • Single message enqueuing: enqueue(payload)
    • Batch operations: batch_enqueue(payloads)
    • Custom partition key support for message routing
    • Configurable message timeouts
  • Stream Management:
    • Configurable stream length (maxlen) and trimming behavior (approximate)
    • Dynamic partition scaling with request_partition_increase()
    • Automatic partition key hashing for load distribution
    • Custom serialization support (default: orjson)

Consumer

  • Message Processing:
    • Support for both single and batch message processing
    • Configurable batch size
    • Asynchronous message handling with custom callbacks
    • Automatic message acknowledgment
  • Fault Tolerance:
    • Heartbeat mechanism (configurable interval and TTL)
    • Automatic crash detection
    • Graceful consumer registration and deregistration
    • Partition rebalancing on consumer changes
  • Group Management:
    • Consumer group support with XREADGROUP
    • Dynamic partition assignment
    • Automatic consumer group creation
    • Message tracking and acknowledgment

Advanced Features

  • Scalability:
    • Multi-partition support for horizontal scaling
    • Dynamic partition count adjustment
    • Efficient round-robin partition assignment
  • Reliability:
    • Built-in error handling and retries
    • Dead-letter queue support
    • Message persistence via Redis Streams
  • Monitoring:
    • Detailed logging with configurable levels
    • Consumer and producer status tracking
    • Partition assignment monitoring

Technical Details

  • Async Support: Built with asyncio for non-blocking operations
  • Redis Integration: Uses Redis Streams with aioredis
  • Type Safety: Full type hints support
  • Customization: Configurable serialization/deserialization
  • Namespace Management: Automatic key prefixing and organization

Warning: Unbounded streams (maxlen=None) can consume significant Redis memory. Set maxlen (e.g., 1000) to limit stream size in production.

Usage

Basic Producer-Consumer Example

from redisaq import Producer, Consumer
import asyncio

async def process_message(message):
    print(f"Processing message {message.msg_id}: {message.payload}")
    await asyncio.sleep(1)  # Simulate work

async def main():
    # Initialize producer with topic and max stream length
    producer = Producer(
        topic="notifications",
        maxlen=1000,
        redis_url="redis://localhost:6379/0"
    )
    await producer.connect()

    # Send some messages
    await producer.batch_enqueue([
        {"type": "email", "to": "user1@example.com", "subject": "Hello"},
        {"type": "sms", "to": "+1234567890", "text": "Hi there"}
    ])

    # Initialize consumer
    consumer = Consumer(
        topic="notifications",
        group_name="notification_processors",
        batch_size=10,
        heartbeat_interval=3.0,
        redis_url="redis://localhost:6379/0"
    )
    
    # Connect and start processing
    await consumer.connect()
    await consumer.process(process_message)

    # Cleanup
    await producer.close()
    await consumer.close()

if __name__ == "__main__":
    asyncio.run(main())

Advanced Usage

Partition Key Routing

from redisaq import Producer, Message

async def send_notifications():
    producer = Producer(topic="notifications", init_partitions=3)
    await producer.connect()

    # Messages with same user_id will go to same partition
    message = Message(
        topic="notifications",
        payload={"user_id": "123", "content": "Hello"},
        partition_key="user_id"
    )
    await producer.enqueue(message)

Batch Processing

from redisaq import Consumer, BatchCallback
from typing import List

async def process_batch(messages: List[Message]):
    print(f"Processing batch of {len(messages)} messages")
    for msg in messages:
        # Process each message in the batch
        print(f"Message {msg.msg_id}: {msg.payload}")

consumer = Consumer(
    topic="notifications",
    batch_size=10,  # Process up to 10 messages at once
    heartbeat_interval=3.0
)

await consumer.process_batch(process_batch)

Custom Serialization

import msgpack
from redisaq import Producer, Consumer

# Custom serializer/deserializer
def msgpack_serializer(data):
    return msgpack.packb(data).decode('utf-8')

def msgpack_deserializer(data):
    return msgpack.unpackb(data.encode('utf-8'))

# Use custom serialization
producer = Producer(
    topic="data",
    serializer=msgpack_serializer
)

consumer = Consumer(
    topic="data",
    deserializer=msgpack_deserializer
)

FastAPI Example

See examples/fastapi for a full-featured FastAPI integration.

Examples

  • Basic Example: Demonstrates batch job production, consumption, rebalancing, and reconsumption. See examples/basic/README.md.
  • FastAPI Integration: Shows how to integrate redisaq with a FastAPI application for job submission and processing. See examples/fastapi/README.md.

Running Tests

poetry run pytest

Contributing

  • Report issues or suggest features via GitHub Issues.
  • Submit pull requests with clear descriptions.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

redisaq-0.1.2.tar.gz (13.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

redisaq-0.1.2-py3-none-any.whl (13.9 kB view details)

Uploaded Python 3

File details

Details for the file redisaq-0.1.2.tar.gz.

File metadata

  • Download URL: redisaq-0.1.2.tar.gz
  • Upload date:
  • Size: 13.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.5 CPython/3.8.18 Linux/6.11.0-1012-azure

File hashes

Hashes for redisaq-0.1.2.tar.gz
Algorithm Hash digest
SHA256 67cfe3773f6ca3130260543bb033580b76852bbf62b112dbfa3d655020ac25d7
MD5 73df5d01b7dfdac0ace8d91c97b76ca3
BLAKE2b-256 85073d36bc74ab162ec77ae6bb1e7f6e1d07c802916fdae32b07941e7c2784e6

See more details on using hashes here.

File details

Details for the file redisaq-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: redisaq-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 13.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.5 CPython/3.8.18 Linux/6.11.0-1012-azure

File hashes

Hashes for redisaq-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 a856f223751fa9fb74bd8a0ec313722e3b8e65bbc41c58a9c418cb4626915e29
MD5 e8c664acca68de7e9f3b6f9aac4fca0f
BLAKE2b-256 d84725f12130aabaf3caeb0f06899cc3b8986eef8a19dde9a758dbe400cd0b18

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page