A simple RabbitMQ/Pika wrapper for publishing and consuming events
Project description
tchu
tchu is a lightweight Python wrapper around Pika/RabbitMQ that simplifies event publishing and consuming in distributed systems. It provides intuitive abstractions for common messaging patterns while handling the underlying RabbitMQ connection management.
Features
- Simple API for publishing events and consuming them
- ThreadedConsumer for concurrent message processing
- RPC-style messaging with request-response pattern support
- Automatic retries with configurable backoff
- Message deduplication support with optional cache integration
- Idle handlers for periodic maintenance tasks
- Comprehensive logging of all messaging operations
Installation
pip install tchu
Usage
Producer: Publishing Events
from tchu import Producer
# Initialize a producer
producer = Producer(
amqp_url="amqp://guest:guest@localhost:5672/",
exchange="my-exchange",
exchange_type="topic"
)
# Publish a message
producer.publish(
routing_key="user.created",
body={"user_id": "123", "name": "John Doe", "email": "john@example.com"}
)
# Publish a message and wait for a response (RPC-style)
try:
response = producer.call(
routing_key="user.validate",
body={"user_id": "123", "email": "john@example.com"},
timeout=5 # seconds
)
print(f"Response received: {response}")
except TimeoutError:
print("No response received within timeout period")
Consumer: Processing Events
Basic Consumer
from tchu import Consumer
def message_handler(ch, method, properties, body, is_rpc):
print(f"Received message: {body}")
if is_rpc:
# For RPC calls, return a response
return json.dumps({"status": "success", "message": "Validation completed"})
# Initialize a consumer
consumer = Consumer(
amqp_url="amqp://guest:guest@localhost:5672/",
exchange="my-exchange",
exchange_type="topic",
routing_keys=["user.*"], # Listen to all user events
callback=message_handler,
prefetch_count=10 # Process up to 10 messages at once
)
# Start consuming messages
consumer.run()
Threaded Consumer with Django Management Command
# management/commands/listen_for_events.py
from tchu import ThreadedConsumer
from django.core.management.base import BaseCommand
from django.conf import settings
import json
def event_callback(ch, method, properties, body, is_rpc):
try:
print(f"Received event: {method.routing_key}")
data = json.loads(body)
# Process the event data
# ...
print("Event processed successfully")
except Exception as e:
print(f"Error processing event: {e}")
class Command(BaseCommand):
help = "Starts a listener for RabbitMQ events"
def handle(self, *args, **options):
consumer = ThreadedConsumer(
amqp_url=settings.RABBITMQ_BROKER_URL,
exchange="app-events",
exchange_type="topic",
threads=5, # Use 5 worker threads
routing_keys=["user.*", "order.created"],
callback=event_callback,
)
# Start consuming messages in a separate thread
consumer.start()
# Keep the main thread running
self.stdout.write("Event listener started. Press Ctrl+C to stop.")
try:
consumer.join()
except KeyboardInterrupt:
self.stdout.write("Stopping event listener...")
Advanced Features
Using with Cache for Message Deduplication
from django.core.cache import cache
from tchu import ThreadedConsumer
# Cache adapter that implements the required interface
class DjangoCache:
def add(self, key, value, timeout=300):
return cache.add(key, value, timeout)
# Initialize consumer with cache
consumer = ThreadedConsumer(
amqp_url="amqp://guest:guest@localhost:5672/",
exchange="my-exchange",
exchange_type="topic",
routing_keys=["user.*"],
callback=message_handler,
cache=DjangoCache(),
cache_key_prefix="myapp" # Prefix for cache keys
)
Idle Handler for Periodic Tasks
def maintenance_task():
print("Performing periodic maintenance...")
# Clean up resources, update statistics, etc.
consumer = Consumer(
# ... other parameters
idle_handler=maintenance_task,
idle_interval=3600 # Run maintenance every hour
)
API Reference
AMQPClient
Base class for both Producer and Consumer.
__init__(amqp_url="amqp://guest:guest@localhost:5672/")setup_exchange(exchange, exchange_type)close()
Producer
__init__(amqp_url, exchange, exchange_type)publish(routing_key, body, content_type, delivery_mode)call(routing_key, body, content_type, delivery_mode, timeout)
Consumer
__init__(amqp_url, exchange, exchange_type, threads, routing_keys, callback, idle_handler, idle_interval, prefetch_count, cache, cache_key_prefix)run()
ThreadedConsumer
Extends Consumer to run in a separate thread.
Development
- Clone the repository
- Install dependencies:
poetry install - Run tests:
poetry run pytest
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file tchu-0.1.2.tar.gz.
File metadata
- Download URL: tchu-0.1.2.tar.gz
- Upload date:
- Size: 10.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.4 CPython/3.11.3 Darwin/24.5.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
afbd60b1e3de9445ad2da7f3381a2fc29d4a73585296c0cffd48c9b3e06c56fc
|
|
| MD5 |
e83ba50a93c6c1a219061f6d27e57fdb
|
|
| BLAKE2b-256 |
f4f8f2975508bf75b83496454dac2a9751be3b3e97f521d51fa759e64a76a543
|
File details
Details for the file tchu-0.1.2-py3-none-any.whl.
File metadata
- Download URL: tchu-0.1.2-py3-none-any.whl
- Upload date:
- Size: 12.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.4 CPython/3.11.3 Darwin/24.5.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5c4ef01228e0b12eb38e26a1295d58f65633b14412c1fa97d000bdf017733308
|
|
| MD5 |
438e068e16f0d555c7ee40bcca36264d
|
|
| BLAKE2b-256 |
e2a156508dab4d5fdc19e0d9d29e8935339b84eb8cde0cbab6ce2cbb7aed20e0
|