Skip to main content

A lightweight, async Redis-based queue for small applications, alternative to Kafka.

Project description

redisaq

redisaq is a Python library for distributed job queuing and processing using Redis Streams. It provides a robust, scalable solution for handling distributed workloads with features like consumer groups, automatic partition rebalancing, and fault tolerance.

Installation

Install redisaq from PyPI:

pip install redisaq

Features

Producer

  • Message Handling:
    • Single message enqueuing: enqueue(payload)
    • Batch operations: batch_enqueue(payloads)
    • Custom partition key support for message routing
    • Configurable message timeouts
  • Stream Management:
    • Configurable stream length (maxlen) and trimming behavior (approximate)
    • Dynamic partition scaling with request_partition_increase()
    • Automatic partition key hashing for load distribution
    • Custom serialization support (default: orjson)

Consumer

  • Message Processing:
    • Support for both single and batch message processing
    • Configurable batch size
    • Asynchronous message handling with custom callbacks
    • Automatic message acknowledgment
  • Fault Tolerance:
    • Heartbeat mechanism (configurable interval and TTL)
    • Automatic crash detection
    • Graceful consumer registration and deregistration
    • Partition rebalancing on consumer changes
  • Group Management:
    • Consumer group support with XREADGROUP
    • Dynamic partition assignment
    • Automatic consumer group creation
    • Message tracking and acknowledgment

Advanced Features

  • Scalability:
    • Multi-partition support for horizontal scaling
    • Dynamic partition count adjustment
    • Efficient round-robin partition assignment
  • Reliability:
    • Built-in error handling and retries
    • Dead-letter queue support
    • Message persistence via Redis Streams
  • Monitoring:
    • Detailed logging with configurable levels
    • Consumer and producer status tracking
    • Partition assignment monitoring

Technical Details

  • Async Support: Built with asyncio for non-blocking operations
  • Redis Integration: Uses Redis Streams with aioredis
  • Type Safety: Full type hints support
  • Customization: Configurable serialization/deserialization
  • Namespace Management: Automatic key prefixing and organization

Warning: Unbounded streams (maxlen=None) can consume significant Redis memory. Set maxlen (e.g., 1000) to limit stream size in production.

Usage

Basic Producer-Consumer Example

from redisaq import Producer, Consumer
import asyncio

async def process_message(message):
    print(f"Processing message {message.msg_id}: {message.payload}")
    await asyncio.sleep(1)  # Simulate work

async def main():
    # Initialize producer with topic and max stream length
    producer = Producer(
        topic="notifications",
        maxlen=1000,
        redis_url="redis://localhost:6379/0"
    )
    await producer.connect()

    # Send some messages
    await producer.batch_enqueue([
        {"type": "email", "to": "user1@example.com", "subject": "Hello"},
        {"type": "sms", "to": "+1234567890", "text": "Hi there"}
    ])

    # Initialize consumer
    consumer = Consumer(
        topic="notifications",
        group_name="notification_processors",
        batch_size=10,
        heartbeat_interval=3.0,
        redis_url="redis://localhost:6379/0"
    )
    
    # Connect and start processing
    await consumer.connect()
    await consumer.consume(process_message)

    # Cleanup
    await producer.close()
    await consumer.close()

if __name__ == "__main__":
    asyncio.run(main())

Advanced Usage

Partition Key Routing

from redisaq import Producer

async def send_notifications():
    producer = Producer(topic="notifications", init_partitions=3)
    await producer.connect()

    await producer.enqueue({"user_id": "123", "content": "Hello"})

Batch Processing

import asyncio
from redisaq import Consumer, Message
from typing import List

async def process_batch(messages: List[Message]):
    print(f"Processing batch of {len(messages)} messages")
    for msg in messages:
        # Process each message in the batch
        print(f"Message {msg.msg_id}: {msg.payload}")

consumer = Consumer(
    topic="notifications",
    batch_size=10,  # Process up to 10 messages at once
    heartbeat_interval=3.0
)

asyncio.run(consumer.consume_batch(process_batch))

Custom Serialization

import msgpack
from redisaq import Producer, Consumer

# Custom serializer/deserializer
def msgpack_serializer(data):
    return msgpack.packb(data).decode('utf-8')

def msgpack_deserializer(data):
    return msgpack.unpackb(data.encode('utf-8'))

# Use custom serialization
producer = Producer(
    topic="data",
    serializer=msgpack_serializer
)

consumer = Consumer(
    topic="data",
    deserializer=msgpack_deserializer
)

FastAPI Example

See examples/fastapi for a full-featured FastAPI integration.

Examples

  • Basic Example: Demonstrates batch job production, consumption, rebalancing, and reconsumption. See examples/basic/README.md.
  • FastAPI Integration: Shows how to integrate redisaq with a FastAPI application for job submission and processing. See examples/fastapi/README.md.

Running Tests

poetry run pytest

Contributing

  • Report issues or suggest features via GitHub Issues.
  • Submit pull requests with clear descriptions.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

redisaq-0.1.4.tar.gz (13.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

redisaq-0.1.4-py3-none-any.whl (13.9 kB view details)

Uploaded Python 3

File details

Details for the file redisaq-0.1.4.tar.gz.

File metadata

  • Download URL: redisaq-0.1.4.tar.gz
  • Upload date:
  • Size: 13.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.5 CPython/3.8.18 Linux/6.11.0-1012-azure

File hashes

Hashes for redisaq-0.1.4.tar.gz
Algorithm Hash digest
SHA256 f6fecca4b1d390f08565638ae352494c5b205efd196058f8fe5bd9c46929a54e
MD5 6fbe83441fc1f806a55969616d2098ce
BLAKE2b-256 820f5c3dc390edac38f2130e98fe10e8b3852f6b45d3f082d115740b7d1eac4b

See more details on using hashes here.

File details

Details for the file redisaq-0.1.4-py3-none-any.whl.

File metadata

  • Download URL: redisaq-0.1.4-py3-none-any.whl
  • Upload date:
  • Size: 13.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.5 CPython/3.8.18 Linux/6.11.0-1012-azure

File hashes

Hashes for redisaq-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 728b8278be0efd6f29acb5f68c90270f8ad48059c37fc3630e2fc2249b64704c
MD5 671840414e9741d8f60f3ab21bd049ed
BLAKE2b-256 43ed28ab5b71acd0da5a129e49a747727a9011479eca7aec62284c28abb0e46b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page