Skip to main content

Lightweight Redis Streams event messenger with publish/consume API

Project description

Streamix Queue

Python 3.10+ License: MIT

Lightweight Redis Streams event messenger for service-to-service communication.

Features

  • Simple API: Just publish() and consume() functions
  • Redis Streams: Built on battle-tested Redis infrastructure
  • Consumer Groups: Multiple services can consume the same events with group-based tracking
  • Automatic Retries: Configurable retry limit with exponential backoff support
  • Dead-Letter Queue: Failed messages sent to <stream>:failed for inspection
  • Stale Message Recovery: Automatic reclaim of messages from crashed consumers
  • Structured Schema: Messages include id, event, data, retries, and timestamps
  • Type Hints: Full Python type annotations for IDE support
  • Minimal Dependencies: Only Redis client required

Installation

pip install streamix-queue

Quick Start

Consumer Service A

from streamix_queue import consume

def on_user_created(data):
    print(f"User created: {data['user_id']}")
    # If handler raises an exception, message is automatically retried

consume(
    "user.created",
    on_user_created,
    redis_url="redis://localhost:6379/0",
    stream="app.events",
    group="service-a",
)

Publisher Service B

from streamix_queue import publish

publish(
    "user.created",
    {"user_id": "123", "email": "alice@example.com"},
    redis_url="redis://localhost:6379/0",
    stream="app.events",
    group="service-a",
)

How It Works

  1. Publish: Event sent to Redis Stream with structured schema
  2. Consumer Group: Maintains message delivery state and ownership
  3. Processing: Consumer reads and processes events
  4. Success: Message acknowledged (removed from pending list)
  5. Failure: On exception, message retried; after limit exceeded, moved to DLQ
  6. Dead-Letter: Permanently failed messages stored in <stream>:failed for debugging

API Reference

publish(event, data, **kwargs)

Publish an event to the stream.

Parameters:

  • event (str): Event name (e.g., "user.created")
  • data (dict): Event payload
  • redis_url (str, default="redis://localhost:6379/0"): Redis connection URL
  • stream (str, default="app.events"): Stream name
  • group (str, default="app.workers"): Consumer group name

Returns: StreamMessage object with id, event, data, retries, timestamps

consume(event, handler, **kwargs)

Start a consumer that listens for events.

Parameters:

  • event (str): Event name to listen for
  • handler (callable): Function called with message data (or message object if it accepts 2+ args)
  • redis_url (str, default="redis://localhost:6379/0"): Redis connection URL
  • stream (str, default="app.events"): Stream name
  • group (str, default="app.workers"): Consumer group name
  • consumer (str, optional): Consumer instance name (auto-generated if None)
  • retry_limit (int, default=3): Max retries before sending to DLQ
  • batch_size (int, default=10): Messages per batch
  • block_ms (int, default=5000): Blocking timeout for XREADGROUP
  • claim_idle_ms (int, default=60000): Idle time threshold for stale message reclaim

Handler signature:

# Simple - receives data only
def handler(data):
    pass

# Advanced - receives data and full message
def handler(data, message):
    print(message.id)       # Message UUID
    print(message.retries)  # Retry count
    print(message.event)    # Original event name

Configuration Examples

Multiple consumers for same event

# Service A
consume("order.placed", on_order_placed, group="service-a", consumer="worker-1")

# Service B - same event, different group
consume("order.placed", on_order_placed_b, group="service-b", consumer="worker-1")

Different streams per environment

# Dev
publish("user.updated", {...}, stream="dev.events", group="dev-workers")

# Prod
publish("user.updated", {...}, stream="prod.events", group="prod-workers")

Adjust retry behavior

consume(
    "payment.processed",
    handle_payment,
    retry_limit=5,  # More retries
    block_ms=10000,  # Longer blocking timeouts
    claim_idle_ms=120000,  # Reclaim after 2 minutes
)

Message Schema

Every message follows this structure:

{
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "event": "user.created",
  "data": {
    "user_id": "123",
    "email": "alice@example.com"
  },
  "retries": 0,
  "timestamps": {
    "created_at": "2026-04-24T17:45:00+00:00",
    "updated_at": "2026-04-24T17:45:00+00:00"
  }
}

Error Handling & Dead-Letter Queue

By default, messages are retried up to 3 times. After exceeding the retry limit, they're sent to the dead-letter stream:

DLQ Stream: <stream>:failed (e.g., app.events:failed)

DLQ Message Example:

{
  "id": "...",
  "event": "user.created",
  "data": {
    "original_id": "550e8400-...",
    "original_event": "user.created",
    "original_data": {"user_id": "123"},
    "source_stream_id": "1713982500001-0",
    "retries": 3,
    "error": "Traceback: Connection timeout..."
  },
  "retries": 3,
  "timestamps": {...}
}

Running in Production

Docker Example

FROM python:3.12-slim

WORKDIR /app
RUN pip install streamix-queue

COPY handlers.py .

CMD ["python", "handlers.py"]

Kubernetes Example

apiVersion: v1
kind: Pod
metadata:
  name: streamix-consumer
spec:
  containers:
  - name: consumer
    image: myapp:latest
    env:
    - name: REDIS_URL
      value: "redis://redis:6379/0"
    - name: STREAM
      value: "app.events"
    - name: GROUP
      value: "service-a"

Performance Tips

  1. Batch Size: Increase batch_size for high throughput (10-50)
  2. Block Timeout: Increase block_ms to reduce CPU usage (5000-30000)
  3. Consumer Instances: Run multiple consumers in the same group for parallel processing
  4. Redis Persistence: Enable AOF/RDB for durability

Troubleshooting

Messages stuck in pending

Check the consumer group pending entries:

from redis import Redis

r = Redis.from_url("redis://localhost:6379/0")
pending = r.xpending("app.events", "service-a")
print(pending)

Inspect dead-letter stream

from redis import Redis

r = Redis.from_url("redis://localhost:6379/0")
failed = r.xread({"app.events:failed": "0"}, count=10)
for stream, messages in failed:
    for msg_id, data in messages:
        print(msg_id, data)

License

MIT License - see LICENSE file for details

Contributing

Contributions welcome! Please feel free to submit a Pull Request.

Support

For issues, questions, or feature requests, please open an issue on GitHub.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamix_queue-0.1.0.tar.gz (12.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamix_queue-0.1.0-py3-none-any.whl (11.0 kB view details)

Uploaded Python 3

File details

Details for the file streamix_queue-0.1.0.tar.gz.

File metadata

  • Download URL: streamix_queue-0.1.0.tar.gz
  • Upload date:
  • Size: 12.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for streamix_queue-0.1.0.tar.gz
Algorithm Hash digest
SHA256 5566607b141a2cf4797800271f23117149a89ecfb9f751c081be1a2b0220ba6a
MD5 f9ee6586f0cc299da7b59bed6540a419
BLAKE2b-256 d0f9932a38ddc16b60f1545ffc307ea33e9c70751121f2e9383df1ce962e7b9e

See more details on using hashes here.

File details

Details for the file streamix_queue-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: streamix_queue-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 11.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for streamix_queue-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7215e2f7269ac93e09530d486891c8b1b7cd02d723261028983fb55474b40170
MD5 5fee65bd99bf534132659a1b1bfbe463
BLAKE2b-256 e64eb4d2d24b4e56436d870f47fde6d8b731ab87acfe039174a8a3db6edb47ae

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page