Lightweight Redis Streams event messenger with publish/consume API
Project description
Streamix Queue
Lightweight Redis Streams event messenger for service-to-service communication.
Features
- Simple API: Just
publish()andconsume()functions - Redis Streams: Built on battle-tested Redis infrastructure
- Consumer Groups: Multiple services can consume the same events with group-based tracking
- Automatic Retries: Configurable retry limit with exponential backoff support
- Dead-Letter Queue: Failed messages sent to
<stream>:failedfor inspection - Stale Message Recovery: Automatic reclaim of messages from crashed consumers
- Structured Schema: Messages include id, event, data, retries, and timestamps
- Type Hints: Full Python type annotations for IDE support
- Minimal Dependencies: Only Redis client required
Installation
pip install streamix-queue
Quick Start
Consumer Service A
from streamix_queue import consume
def on_user_created(data):
print(f"User created: {data['user_id']}")
# If handler raises an exception, message is automatically retried
consume(
"user.created",
on_user_created,
redis_url="redis://localhost:6379/0",
stream="app.events",
group="service-a",
)
Publisher Service B
from streamix_queue import publish
publish(
"user.created",
{"user_id": "123", "email": "alice@example.com"},
redis_url="redis://localhost:6379/0",
stream="app.events",
group="service-a",
)
How It Works
- Publish: Event sent to Redis Stream with structured schema
- Consumer Group: Maintains message delivery state and ownership
- Processing: Consumer reads and processes events
- Success: Message acknowledged (removed from pending list)
- Failure: On exception, message retried; after limit exceeded, moved to DLQ
- Dead-Letter: Permanently failed messages stored in
<stream>:failedfor debugging
API Reference
publish(event, data, **kwargs)
Publish an event to the stream.
Parameters:
event(str): Event name (e.g., "user.created")data(dict): Event payloadredis_url(str, default="redis://localhost:6379/0"): Redis connection URLstream(str, default="app.events"): Stream namegroup(str, default="app.workers"): Consumer group name
Returns: StreamMessage object with id, event, data, retries, timestamps
consume(event, handler, **kwargs)
Start a consumer that listens for events.
Parameters:
event(str): Event name to listen forhandler(callable): Function called with message data (or message object if it accepts 2+ args)redis_url(str, default="redis://localhost:6379/0"): Redis connection URLstream(str, default="app.events"): Stream namegroup(str, default="app.workers"): Consumer group nameconsumer(str, optional): Consumer instance name (auto-generated if None)retry_limit(int, default=3): Max retries before sending to DLQbatch_size(int, default=10): Messages per batchblock_ms(int, default=5000): Blocking timeout for XREADGROUPclaim_idle_ms(int, default=60000): Idle time threshold for stale message reclaim
Handler signature:
# Simple - receives data only
def handler(data):
pass
# Advanced - receives data and full message
def handler(data, message):
print(message.id) # Message UUID
print(message.retries) # Retry count
print(message.event) # Original event name
Configuration Examples
Multiple consumers for same event
# Service A
consume("order.placed", on_order_placed, group="service-a", consumer="worker-1")
# Service B - same event, different group
consume("order.placed", on_order_placed_b, group="service-b", consumer="worker-1")
Different streams per environment
# Dev
publish("user.updated", {...}, stream="dev.events", group="dev-workers")
# Prod
publish("user.updated", {...}, stream="prod.events", group="prod-workers")
Adjust retry behavior
consume(
"payment.processed",
handle_payment,
retry_limit=5, # More retries
block_ms=10000, # Longer blocking timeouts
claim_idle_ms=120000, # Reclaim after 2 minutes
)
Message Schema
Every message follows this structure:
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"event": "user.created",
"data": {
"user_id": "123",
"email": "alice@example.com"
},
"retries": 0,
"timestamps": {
"created_at": "2026-04-24T17:45:00+00:00",
"updated_at": "2026-04-24T17:45:00+00:00"
}
}
Error Handling & Dead-Letter Queue
By default, messages are retried up to 3 times. After exceeding the retry limit, they're sent to the dead-letter stream:
DLQ Stream: <stream>:failed (e.g., app.events:failed)
DLQ Message Example:
{
"id": "...",
"event": "user.created",
"data": {
"original_id": "550e8400-...",
"original_event": "user.created",
"original_data": {"user_id": "123"},
"source_stream_id": "1713982500001-0",
"retries": 3,
"error": "Traceback: Connection timeout..."
},
"retries": 3,
"timestamps": {...}
}
Running in Production
Docker Example
FROM python:3.12-slim
WORKDIR /app
RUN pip install streamix-queue
COPY handlers.py .
CMD ["python", "handlers.py"]
Kubernetes Example
apiVersion: v1
kind: Pod
metadata:
name: streamix-consumer
spec:
containers:
- name: consumer
image: myapp:latest
env:
- name: REDIS_URL
value: "redis://redis:6379/0"
- name: STREAM
value: "app.events"
- name: GROUP
value: "service-a"
Performance Tips
- Batch Size: Increase
batch_sizefor high throughput (10-50) - Block Timeout: Increase
block_msto reduce CPU usage (5000-30000) - Consumer Instances: Run multiple consumers in the same group for parallel processing
- Redis Persistence: Enable AOF/RDB for durability
Troubleshooting
Messages stuck in pending
Check the consumer group pending entries:
from redis import Redis
r = Redis.from_url("redis://localhost:6379/0")
pending = r.xpending("app.events", "service-a")
print(pending)
Inspect dead-letter stream
from redis import Redis
r = Redis.from_url("redis://localhost:6379/0")
failed = r.xread({"app.events:failed": "0"}, count=10)
for stream, messages in failed:
for msg_id, data in messages:
print(msg_id, data)
License
MIT License - see LICENSE file for details
Contributing
Contributions welcome! Please feel free to submit a Pull Request.
Support
For issues, questions, or feature requests, please open an issue on GitHub.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file streamix_queue-0.1.0.tar.gz.
File metadata
- Download URL: streamix_queue-0.1.0.tar.gz
- Upload date:
- Size: 12.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5566607b141a2cf4797800271f23117149a89ecfb9f751c081be1a2b0220ba6a
|
|
| MD5 |
f9ee6586f0cc299da7b59bed6540a419
|
|
| BLAKE2b-256 |
d0f9932a38ddc16b60f1545ffc307ea33e9c70751121f2e9383df1ce962e7b9e
|
File details
Details for the file streamix_queue-0.1.0-py3-none-any.whl.
File metadata
- Download URL: streamix_queue-0.1.0-py3-none-any.whl
- Upload date:
- Size: 11.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7215e2f7269ac93e09530d486891c8b1b7cd02d723261028983fb55474b40170
|
|
| MD5 |
5fee65bd99bf534132659a1b1bfbe463
|
|
| BLAKE2b-256 |
e64eb4d2d24b4e56436d870f47fde6d8b731ab87acfe039174a8a3db6edb47ae
|