Skip to main content

A simple RabbitMQ/Pika wrapper for publishing and consuming events

Project description

tchu

tchu is a lightweight Python wrapper around Pika/RabbitMQ that simplifies event publishing and consuming in distributed systems. It provides intuitive abstractions for common messaging patterns while handling the underlying RabbitMQ connection management.

License: MIT PyPI version

Features

  • Simple API for publishing events and consuming them
  • ThreadedConsumer for concurrent message processing
  • RPC-style messaging with request-response pattern support
  • Automatic retries with configurable backoff
  • Message deduplication support with optional cache integration
  • Idle handlers for periodic maintenance tasks
  • Comprehensive logging of all messaging operations

Installation

pip install tchu

Usage

Producer: Publishing Events

from tchu import Producer

# Initialize a producer
producer = Producer(
    amqp_url="amqp://guest:guest@localhost:5672/",
    exchange="my-exchange",
    exchange_type="topic"
)

# Publish a message
producer.publish(
    routing_key="user.created",
    body={"user_id": "123", "name": "John Doe", "email": "john@example.com"}
)

# Publish a message and wait for a response (RPC-style)
try:
    response = producer.call(
        routing_key="user.validate",
        body={"user_id": "123", "email": "john@example.com"},
        timeout=5  # seconds
    )
    print(f"Response received: {response}")
except TimeoutError:
    print("No response received within timeout period")

Consumer: Processing Events

Basic Consumer

from tchu import Consumer

def message_handler(ch, method, properties, body, is_rpc):
    print(f"Received message: {body}")
    if is_rpc:
        # For RPC calls, return a response
        return json.dumps({"status": "success", "message": "Validation completed"})

# Initialize a consumer
consumer = Consumer(
    amqp_url="amqp://guest:guest@localhost:5672/",
    exchange="my-exchange",
    exchange_type="topic",
    routing_keys=["user.*"],  # Listen to all user events
    callback=message_handler,
    prefetch_count=10  # Process up to 10 messages at once
)

# Start consuming messages
consumer.run()

Threaded Consumer with Django Management Command

# management/commands/listen_for_events.py
from tchu import ThreadedConsumer
from django.core.management.base import BaseCommand
from django.conf import settings
import json

def event_callback(ch, method, properties, body, is_rpc):
    try:
        print(f"Received event: {method.routing_key}")
        data = json.loads(body)
        
        # Process the event data
        # ...
        
        print("Event processed successfully")
    except Exception as e:
        print(f"Error processing event: {e}")


class Command(BaseCommand):
    help = "Starts a listener for RabbitMQ events"

    def handle(self, *args, **options):
        consumer = ThreadedConsumer(
            amqp_url=settings.RABBITMQ_BROKER_URL,
            exchange="app-events",
            exchange_type="topic",
            threads=5,  # Use 5 worker threads
            routing_keys=["user.*", "order.created"],
            callback=event_callback,
        )
        
        # Start consuming messages in a separate thread
        consumer.start()
        
        # Keep the main thread running
        self.stdout.write("Event listener started. Press Ctrl+C to stop.")
        try:
            consumer.join()
        except KeyboardInterrupt:
            self.stdout.write("Stopping event listener...")

Advanced Features

Using with Cache for Message Deduplication

from django.core.cache import cache
from tchu import ThreadedConsumer

# Cache adapter that implements the required interface
class DjangoCache:
    def add(self, key, value, timeout=300):
        return cache.add(key, value, timeout)

# Initialize consumer with cache
consumer = ThreadedConsumer(
    amqp_url="amqp://guest:guest@localhost:5672/",
    exchange="my-exchange",
    exchange_type="topic",
    routing_keys=["user.*"],
    callback=message_handler,
    cache=DjangoCache(),
    cache_key_prefix="myapp"  # Prefix for cache keys
)

Idle Handler for Periodic Tasks

def maintenance_task():
    print("Performing periodic maintenance...")
    # Clean up resources, update statistics, etc.

consumer = Consumer(
    # ... other parameters
    idle_handler=maintenance_task,
    idle_interval=3600  # Run maintenance every hour
)

API Reference

AMQPClient

Base class for both Producer and Consumer.

  • __init__(amqp_url="amqp://guest:guest@localhost:5672/")
  • setup_exchange(exchange, exchange_type)
  • close()

Producer

  • __init__(amqp_url, exchange, exchange_type)
  • publish(routing_key, body, content_type, delivery_mode)
  • call(routing_key, body, content_type, delivery_mode, timeout)

Consumer

  • __init__(amqp_url, exchange, exchange_type, threads, routing_keys, callback, idle_handler, idle_interval, prefetch_count, cache, cache_key_prefix)
  • run()

ThreadedConsumer

Extends Consumer to run in a separate thread.

Development

  1. Clone the repository
  2. Install dependencies: poetry install
  3. Run tests: poetry run pytest

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tchu-0.1.2.tar.gz (10.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tchu-0.1.2-py3-none-any.whl (12.0 kB view details)

Uploaded Python 3

File details

Details for the file tchu-0.1.2.tar.gz.

File metadata

  • Download URL: tchu-0.1.2.tar.gz
  • Upload date:
  • Size: 10.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.4 CPython/3.11.3 Darwin/24.5.0

File hashes

Hashes for tchu-0.1.2.tar.gz
Algorithm Hash digest
SHA256 afbd60b1e3de9445ad2da7f3381a2fc29d4a73585296c0cffd48c9b3e06c56fc
MD5 e83ba50a93c6c1a219061f6d27e57fdb
BLAKE2b-256 f4f8f2975508bf75b83496454dac2a9751be3b3e97f521d51fa759e64a76a543

See more details on using hashes here.

File details

Details for the file tchu-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: tchu-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 12.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.4 CPython/3.11.3 Darwin/24.5.0

File hashes

Hashes for tchu-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 5c4ef01228e0b12eb38e26a1295d58f65633b14412c1fa97d000bdf017733308
MD5 438e068e16f0d555c7ee40bcca36264d
BLAKE2b-256 e2a156508dab4d5fdc19e0d9d29e8935339b84eb8cde0cbab6ce2cbb7aed20e0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page