Skip to main content

A lightweight, async Redis-based queue for small applications, alternative to Kafka.

Project description

redisaq

redisaq is a Python library for distributed job queuing and processing using Redis Streams. It provides a robust, scalable solution for handling distributed workloads with features like consumer groups, automatic partition rebalancing, and fault tolerance.

Installation

Install redisaq from PyPI:

pip install redisaq

Features

Producer

  • Message Handling:
    • Single message enqueuing: enqueue(payload)
    • Batch operations: batch_enqueue(payloads)
    • Custom partition key support for message routing
    • Configurable message timeouts
  • Stream Management:
    • Configurable stream length (maxlen) and trimming behavior (approximate)
    • Dynamic partition scaling with request_partition_increase()
    • Automatic partition key hashing for load distribution
    • Custom serialization support (default: orjson)

Consumer

  • Message Processing:
    • Support for both single and batch message processing
    • Configurable batch size
    • Asynchronous message handling with custom callbacks
    • Automatic message acknowledgment
  • Fault Tolerance:
    • Heartbeat mechanism (configurable interval and TTL)
    • Automatic crash detection
    • Graceful consumer registration and deregistration
    • Partition rebalancing on consumer changes
  • Group Management:
    • Consumer group support with XREADGROUP
    • Dynamic partition assignment
    • Automatic consumer group creation
    • Message tracking and acknowledgment

Advanced Features

  • Scalability:
    • Multi-partition support for horizontal scaling
    • Dynamic partition count adjustment
    • Efficient round-robin partition assignment
  • Reliability:
    • Built-in error handling and retries
    • Dead-letter queue support
    • Message persistence via Redis Streams
  • Monitoring:
    • Detailed logging with configurable levels
    • Consumer and producer status tracking
    • Partition assignment monitoring

Technical Details

  • Async Support: Built with asyncio for non-blocking operations
  • Redis Integration: Uses Redis Streams with aioredis
  • Type Safety: Full type hints support
  • Customization: Configurable serialization/deserialization
  • Namespace Management: Automatic key prefixing and organization

Warning: Unbounded streams (maxlen=None) can consume significant Redis memory. Set maxlen (e.g., 1000) to limit stream size in production.

Usage

Basic Producer-Consumer Example

from redisaq import Producer, Consumer
import asyncio

async def process_message(message):
    print(f"Processing message {message.msg_id}: {message.payload}")
    await asyncio.sleep(1)  # Simulate work

async def main():
    # Initialize producer with topic and max stream length
    producer = Producer(
        topic="notifications",
        maxlen=1000,
        redis_url="redis://localhost:6379/0"
    )
    await producer.connect()

    # Send some messages
    await producer.batch_enqueue([
        {"type": "email", "to": "user1@example.com", "subject": "Hello"},
        {"type": "sms", "to": "+1234567890", "text": "Hi there"}
    ])

    # Initialize consumer
    consumer = Consumer(
        topic="notifications",
        group_name="notification_processors",
        batch_size=10,
        heartbeat_interval=3.0,
        redis_url="redis://localhost:6379/0"
    )
    
    # Connect and start processing
    await consumer.connect()
    await consumer.consume(process_message)

    # Cleanup
    await producer.close()
    await consumer.close()

if __name__ == "__main__":
    asyncio.run(main())

Advanced Usage

Partition Key Routing

from redisaq import Producer

async def send_notifications():
    producer = Producer(topic="notifications", init_partitions=3)
    await producer.connect()

    await producer.enqueue({"user_id": "123", "content": "Hello"})

Batch Processing

import asyncio
from redisaq import Consumer, Message
from typing import List

async def process_batch(messages: List[Message]):
    print(f"Processing batch of {len(messages)} messages")
    for msg in messages:
        # Process each message in the batch
        print(f"Message {msg.msg_id}: {msg.payload}")

consumer = Consumer(
    topic="notifications",
    batch_size=10,  # Process up to 10 messages at once
    heartbeat_interval=3.0
)

asyncio.run(consumer.consume_batch(process_batch))

Custom Serialization

import msgpack
from redisaq import Producer, Consumer

# Custom serializer/deserializer
def msgpack_serializer(data):
    return msgpack.packb(data).decode('utf-8')

def msgpack_deserializer(data):
    return msgpack.unpackb(data.encode('utf-8'))

# Use custom serialization
producer = Producer(
    topic="data",
    serializer=msgpack_serializer
)

consumer = Consumer(
    topic="data",
    deserializer=msgpack_deserializer
)

FastAPI Example

See examples/fastapi for a full-featured FastAPI integration.

Examples

  • Basic Example: Demonstrates batch job production, consumption, rebalancing, and reconsumption. See examples/basic/README.md.
  • FastAPI Integration: Shows how to integrate redisaq with a FastAPI application for job submission and processing. See examples/fastapi/README.md.

Running Tests

poetry run pytest

Contributing

  • Report issues or suggest features via GitHub Issues.
  • Submit pull requests with clear descriptions.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

redisaq-0.1.3.tar.gz (13.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

redisaq-0.1.3-py3-none-any.whl (13.9 kB view details)

Uploaded Python 3

File details

Details for the file redisaq-0.1.3.tar.gz.

File metadata

  • Download URL: redisaq-0.1.3.tar.gz
  • Upload date:
  • Size: 13.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.5 CPython/3.8.18 Linux/6.11.0-1012-azure

File hashes

Hashes for redisaq-0.1.3.tar.gz
Algorithm Hash digest
SHA256 834b6c9e65b8cfe93e253ae4ca755b5b36e444db90904e8fe144a6f03dbdc256
MD5 35fe0202ccb5aa002f3406b9a5f38d6b
BLAKE2b-256 2e3fb34823f3d05d89d5a4818637e73a8a4afb626c9cab2b75364ff5034e90c4

See more details on using hashes here.

File details

Details for the file redisaq-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: redisaq-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 13.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.5 CPython/3.8.18 Linux/6.11.0-1012-azure

File hashes

Hashes for redisaq-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 2481bb4d97d9629460ef8800b988f3840ffedd15b5857c0b1cd1bb32c48cbc38
MD5 64e882a7e8f383f574a1910e86df33e1
BLAKE2b-256 05159bf89800f309300ec97b7368328a8129f3444521e5ccd9814761fa4c2f4d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page