A lightweight, async Redis-based queue for small applications, alternative to Kafka.
Project description
redisaq
redisaq is a Python library for distributed job queuing and processing using Redis Streams. It provides a robust, scalable solution for handling distributed workloads with features like consumer groups, automatic partition rebalancing, and fault tolerance.
Installation
Install redisaq from PyPI:
pip install redisaq
Features
Producer
- Message Handling:
- Single message enqueuing:
enqueue(payload) - Batch operations:
batch_enqueue(payloads) - Custom partition key support for message routing
- Configurable message timeouts
- Single message enqueuing:
- Stream Management:
- Configurable stream length (
maxlen) and trimming behavior (approximate) - Dynamic partition scaling with
request_partition_increase() - Automatic partition key hashing for load distribution
- Custom serialization support (default:
orjson)
- Configurable stream length (
Consumer
- Message Processing:
- Support for both single and batch message processing
- Configurable batch size
- Asynchronous message handling with custom callbacks
- Automatic message acknowledgment
- Fault Tolerance:
- Heartbeat mechanism (configurable interval and TTL)
- Automatic crash detection
- Graceful consumer registration and deregistration
- Partition rebalancing on consumer changes
- Group Management:
- Consumer group support with
XREADGROUP - Dynamic partition assignment
- Automatic consumer group creation
- Message tracking and acknowledgment
- Consumer group support with
Advanced Features
- Scalability:
- Multi-partition support for horizontal scaling
- Dynamic partition count adjustment
- Efficient round-robin partition assignment
- Reliability:
- Built-in error handling and retries
- Dead-letter queue support
- Message persistence via Redis Streams
- Monitoring:
- Detailed logging with configurable levels
- Consumer and producer status tracking
- Partition assignment monitoring
Technical Details
- Async Support: Built with
asynciofor non-blocking operations - Redis Integration: Uses Redis Streams with
aioredis - Type Safety: Full type hints support
- Customization: Configurable serialization/deserialization
- Namespace Management: Automatic key prefixing and organization
Warning: Unbounded streams (maxlen=None) can consume significant Redis memory. Set maxlen (e.g., 1000) to limit stream size in production.
Usage
Basic Producer-Consumer Example
from redisaq import Producer, Consumer
import asyncio
async def process_message(message):
print(f"Processing message {message.msg_id}: {message.payload}")
await asyncio.sleep(1) # Simulate work
async def main():
# Initialize producer with topic and max stream length
producer = Producer(
topic="notifications",
maxlen=1000,
redis_url="redis://localhost:6379/0"
)
await producer.connect()
# Send some messages
await producer.batch_enqueue([
{"type": "email", "to": "user1@example.com", "subject": "Hello"},
{"type": "sms", "to": "+1234567890", "text": "Hi there"}
])
# Initialize consumer
consumer = Consumer(
topic="notifications",
group_name="notification_processors",
batch_size=10,
heartbeat_interval=3.0,
redis_url="redis://localhost:6379/0"
)
# Connect and start processing
await consumer.connect()
await consumer.consume(process_message)
# Cleanup
await producer.close()
await consumer.close()
if __name__ == "__main__":
asyncio.run(main())
Advanced Usage
Partition Key Routing
from redisaq import Producer
async def send_notifications():
producer = Producer(topic="notifications", init_partitions=3)
await producer.connect()
await producer.enqueue({"user_id": "123", "content": "Hello"})
Batch Processing
import asyncio
from redisaq import Consumer, Message
from typing import List
async def process_batch(messages: List[Message]):
print(f"Processing batch of {len(messages)} messages")
for msg in messages:
# Process each message in the batch
print(f"Message {msg.msg_id}: {msg.payload}")
consumer = Consumer(
topic="notifications",
batch_size=10, # Process up to 10 messages at once
heartbeat_interval=3.0
)
asyncio.run(consumer.consume_batch(process_batch))
Custom Serialization
import msgpack
from redisaq import Producer, Consumer
# Custom serializer/deserializer
def msgpack_serializer(data):
return msgpack.packb(data).decode('utf-8')
def msgpack_deserializer(data):
return msgpack.unpackb(data.encode('utf-8'))
# Use custom serialization
producer = Producer(
topic="data",
serializer=msgpack_serializer
)
consumer = Consumer(
topic="data",
deserializer=msgpack_deserializer
)
FastAPI Example
See examples/fastapi for a full-featured FastAPI integration.
Examples
- Basic Example: Demonstrates batch job production, consumption, rebalancing, and reconsumption. See examples/basic/README.md.
- FastAPI Integration: Shows how to integrate
redisaqwith a FastAPI application for job submission and processing. See examples/fastapi/README.md.
Running Tests
poetry run pytest
Contributing
- Report issues or suggest features via GitHub Issues.
- Submit pull requests with clear descriptions.
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file redisaq-0.1.3.tar.gz.
File metadata
- Download URL: redisaq-0.1.3.tar.gz
- Upload date:
- Size: 13.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.5 CPython/3.8.18 Linux/6.11.0-1012-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
834b6c9e65b8cfe93e253ae4ca755b5b36e444db90904e8fe144a6f03dbdc256
|
|
| MD5 |
35fe0202ccb5aa002f3406b9a5f38d6b
|
|
| BLAKE2b-256 |
2e3fb34823f3d05d89d5a4818637e73a8a4afb626c9cab2b75364ff5034e90c4
|
File details
Details for the file redisaq-0.1.3-py3-none-any.whl.
File metadata
- Download URL: redisaq-0.1.3-py3-none-any.whl
- Upload date:
- Size: 13.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.5 CPython/3.8.18 Linux/6.11.0-1012-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2481bb4d97d9629460ef8800b988f3840ffedd15b5857c0b1cd1bb32c48cbc38
|
|
| MD5 |
64e882a7e8f383f574a1910e86df33e1
|
|
| BLAKE2b-256 |
05159bf89800f309300ec97b7368328a8129f3444521e5ccd9814761fa4c2f4d
|