A lightweight, async Redis-based queue for small applications, alternative to Kafka.
Project description
redisaq
redisaq is a Python library for distributed job queuing and processing using Redis Streams. It supports consumer groups, partition rebalancing, reconsumption, heartbeats, crash detection, and efficient batch job production.
Installation
Install redisaq from PyPI:
pip install redisaq
Features
- Producer:
- Enqueue individual jobs with
enqueue(payload)or batches withbatch_enqueue(payloads)to a specified topic. - Configurable stream length (
maxlen) and trimming behavior (approximate) viaXADD. - Dynamic partition scaling for load distribution.
- Uses
redisaqprefix by default for streams and keys (e.g.,redisaq:send_email:0).
- Enqueue individual jobs with
- Consumer:
- Process jobs in consumer groups using
XREADGROUPandXACK. - Self-assign partitions with round-robin strategy.
- Pause/rebalance/resume on consumer join or crash.
- Heartbeats (TTL 10s, interval 5s) for crash detection.
- Dead-letter queue for failed jobs (
redisaq:dead_letter). - Asynchronous
process_jobfunction for non-blocking job handling.
- Process jobs in consumer groups using
- Reconsumption: Create a new consumer group to reprocess all jobs in streams.
- Async: Built with
asynciofor non-blocking I/O. - Redis-Compatible: Uses Redis Streams for persistence and coordination.
Warning: Unbounded streams (maxlen=None) can consume significant Redis memory. Set maxlen (e.g., 1000) to limit stream size in production.
Usage
from redisaq import Producer, Consumer
import asyncio
async def main():
# Producer
producer = Producer(topic="my_topic", maxlen=1000)
await producer.batch_enqueue([
{"data": "job1"},
{"data": "job2"}
])
# Consumer
async def process_job(job):
print(f"Processing message {job.id}: {job.payload}")
await asyncio.sleep(1)
consumer = Consumer(
topic="my_topic",
group="my_group",
consumer_id="consumer_1",
process_job=process_job
)
await consumer.consume()
asyncio.run(main())
Examples
- Basic Example: Demonstrates batch job production, consumption, rebalancing, and reconsumption. See examples/basic/README.md.
- FastAPI Integration: Shows how to integrate
redisaqwith a FastAPI application for job submission and processing. See examples/fastapi/README.md.
Running Tests
poetry run pytest
Contributing
- Report issues or suggest features via GitHub Issues.
- Submit pull requests with clear descriptions.
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file redisaq-0.1.0.tar.gz.
File metadata
- Download URL: redisaq-0.1.0.tar.gz
- Upload date:
- Size: 10.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.5 CPython/3.8.18 Linux/6.8.0-1021-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e5a91f29dd9d9d582471f8ad239d8026efd589ab2dbefc4c774d099886571d48
|
|
| MD5 |
aa3a9931a1882a9fd094f860ea8e7a31
|
|
| BLAKE2b-256 |
9c3fb74f88cb3ac0637b2b38950af806cc21bcbf9ef275f3bf1905ff3f526ee0
|
File details
Details for the file redisaq-0.1.0-py3-none-any.whl.
File metadata
- Download URL: redisaq-0.1.0-py3-none-any.whl
- Upload date:
- Size: 12.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.5 CPython/3.8.18 Linux/6.8.0-1021-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5b539174337544ec0aa5cb2f49aa90a9d21f28fa27ab4a5002f7005cef16f51f
|
|
| MD5 |
05f6dacd1ca4448585255f0a52d00743
|
|
| BLAKE2b-256 |
f138ef70081cbec7f8dadddaf47d56fd947ffe5ba13efa0af6d5592746406b28
|