Skip to main content

A simple RabbitMQ/Pika wrapper for publishing and consuming events

Project description

tchu

tchu is a lightweight Python wrapper around Pika/RabbitMQ that simplifies event publishing and consuming in distributed systems. It provides intuitive abstractions for common messaging patterns while handling the underlying RabbitMQ connection management.

License: MIT PyPI version

Features

  • Simple API for publishing events and consuming them
  • ThreadedConsumer for concurrent message processing
  • RPC-style messaging with request-response pattern support
  • Automatic retries with configurable backoff
  • Message deduplication support with optional cache integration
  • Idle handlers for periodic maintenance tasks
  • Comprehensive logging of all messaging operations

Installation

pip install tchu

Usage

Producer: Publishing Events

from tchu import Producer

# Initialize a producer
producer = Producer(
    amqp_url="amqp://guest:guest@localhost:5672/",
    exchange="my-exchange",
    exchange_type="topic"
)

# Publish a message
producer.publish(
    routing_key="user.created",
    body={"user_id": "123", "name": "John Doe", "email": "john@example.com"}
)

# Publish a message and wait for a response (RPC-style)
try:
    response = producer.call(
        routing_key="user.validate",
        body={"user_id": "123", "email": "john@example.com"},
        timeout=5  # seconds
    )
    print(f"Response received: {response}")
except TimeoutError:
    print("No response received within timeout period")

Consumer: Processing Events

Basic Consumer

from tchu import Consumer

def message_handler(ch, method, properties, body, is_rpc):
    print(f"Received message: {body}")
    if is_rpc:
        # For RPC calls, return a response
        return json.dumps({"status": "success", "message": "Validation completed"})

# Initialize a consumer
consumer = Consumer(
    amqp_url="amqp://guest:guest@localhost:5672/",
    exchange="my-exchange",
    exchange_type="topic",
    routing_keys=["user.*"],  # Listen to all user events
    callback=message_handler,
    prefetch_count=10  # Process up to 10 messages at once
)

# Start consuming messages
consumer.run()

Threaded Consumer with Django Management Command

# management/commands/listen_for_events.py
from tchu import ThreadedConsumer
from django.core.management.base import BaseCommand
from django.conf import settings
import json

def event_callback(ch, method, properties, body, is_rpc):
    try:
        print(f"Received event: {method.routing_key}")
        data = json.loads(body)
        
        # Process the event data
        # ...
        
        print("Event processed successfully")
    except Exception as e:
        print(f"Error processing event: {e}")


class Command(BaseCommand):
    help = "Starts a listener for RabbitMQ events"

    def handle(self, *args, **options):
        consumer = ThreadedConsumer(
            amqp_url=settings.RABBITMQ_BROKER_URL,
            exchange="app-events",
            exchange_type="topic",
            threads=5,  # Use 5 worker threads
            routing_keys=["user.*", "order.created"],
            callback=event_callback,
        )
        
        # Start consuming messages in a separate thread
        consumer.start()
        
        # Keep the main thread running
        self.stdout.write("Event listener started. Press Ctrl+C to stop.")
        try:
            consumer.join()
        except KeyboardInterrupt:
            self.stdout.write("Stopping event listener...")

Advanced Features

Using with Cache for Message Deduplication

from django.core.cache import cache
from tchu import ThreadedConsumer

# Cache adapter that implements the required interface
class DjangoCache:
    def add(self, key, value, timeout=300):
        return cache.add(key, value, timeout)

# Initialize consumer with cache
consumer = ThreadedConsumer(
    amqp_url="amqp://guest:guest@localhost:5672/",
    exchange="my-exchange",
    exchange_type="topic",
    routing_keys=["user.*"],
    callback=message_handler,
    cache=DjangoCache(),
    cache_key_prefix="myapp"  # Prefix for cache keys
)

Idle Handler for Periodic Tasks

def maintenance_task():
    print("Performing periodic maintenance...")
    # Clean up resources, update statistics, etc.

consumer = Consumer(
    # ... other parameters
    idle_handler=maintenance_task,
    idle_interval=3600  # Run maintenance every hour
)

API Reference

AMQPClient

Base class for both Producer and Consumer.

  • __init__(amqp_url="amqp://guest:guest@localhost:5672/")
  • setup_exchange(exchange, exchange_type)
  • close()

Producer

  • __init__(amqp_url, exchange, exchange_type)
  • publish(routing_key, body, content_type, delivery_mode)
  • call(routing_key, body, content_type, delivery_mode, timeout)

Consumer

  • __init__(amqp_url, exchange, exchange_type, threads, routing_keys, callback, idle_handler, idle_interval, prefetch_count, cache, cache_key_prefix)
  • run()

ThreadedConsumer

Extends Consumer to run in a separate thread.

Development

  1. Clone the repository
  2. Install dependencies: poetry install
  3. Run tests: poetry run pytest

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tchu-0.1.1.tar.gz (8.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tchu-0.1.1-py3-none-any.whl (10.5 kB view details)

Uploaded Python 3

File details

Details for the file tchu-0.1.1.tar.gz.

File metadata

  • Download URL: tchu-0.1.1.tar.gz
  • Upload date:
  • Size: 8.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.2 CPython/3.11.3 Darwin/24.3.0

File hashes

Hashes for tchu-0.1.1.tar.gz
Algorithm Hash digest
SHA256 ad5db2c73d8554927c5f6aa64fb77fa7447c0d0a95420489957a4885487a2cbb
MD5 252ec143ac64a164a7d14989dc31fc06
BLAKE2b-256 942718ac70fb643d68ef4b2bd6cdffb45de3463a4078d5c9e07b26106d4210d9

See more details on using hashes here.

File details

Details for the file tchu-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: tchu-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 10.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.2 CPython/3.11.3 Darwin/24.3.0

File hashes

Hashes for tchu-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 913e6b0edcfd9c2eb8540802e8811db549580509694cbc21ca41e31aa35d19f5
MD5 e47dd9cedbaf4301e57d6a778f8bfe80
BLAKE2b-256 19d6621c4596ea967a11776b82327edcf1808205f0f2638bca7b5eb78fe0bd2b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page