Skip to main content

A modern Celery-based messaging library with true broadcast support for microservices

Project description

tchu-tchu

A modern Celery-based messaging library with true broadcast support for microservices. Designed to replicate the simplicity of the original tchu library while leveraging Celery workers for execution.

License: MIT PyPI version

๐Ÿš€ v2.4.0 Released - Native Celery retry support via celery_options! Pass autoretry_for, retry_backoff, max_retries through TchuEvent or @subscribe - no Celery imports needed in your app! See Celery Retry Options โ†’

๐Ÿšจ v2.2.30 Released - CRITICAL FIX for ServerlessProducer bytes encoding! If using v2.2.29, upgrade immediately. See Serverless Guide โ†’

๐Ÿš€ v2.2.26 Released - Extended Celery class with cleaner API! Import Celery from tchu-tchu for seamless integration.
See Quick Start โ†’

โŒ Getting "No handlers found" errors? See Troubleshooting Guide โ†’

Features

  • โœจ True Broadcast Messaging - One event reaches multiple microservices
  • ๐Ÿš€ Uses Existing Celery Workers - No separate listener process needed
  • ๐ŸŽฏ Topic-based Routing - RabbitMQ topic exchange with wildcard patterns
  • ๐Ÿ”„ Drop-in Replacement - Compatible with original tchu API
  • ๐Ÿ“ฆ Pydantic Integration - Type-safe event serialization
  • ๐Ÿ›ก๏ธ Django Support - Built-in Django REST Framework integration with one-line setup
  • โšก Fast - No task discovery or inspection overhead
  • ๐ŸŽจ Simple - One function call replaces 60+ lines of boilerplate configuration
  • ๐Ÿ” Native Celery Retry - Pass celery_options for automatic retries with exponential backoff

Architecture

How It Works:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  Publisher  โ”‚โ”€โ”€โ”
โ”‚   (Any App) โ”‚  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
                 โ–ผ
         โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
         โ”‚Topic Exchangeโ”‚ (RabbitMQ)
         โ”‚ tchu_events  โ”‚
         โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                โ”‚
        โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
        โ”‚               โ”‚
        โ–ผ               โ–ผ
   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
   โ”‚Queue 1 โ”‚      โ”‚Queue 2 โ”‚
   โ”‚(App A) โ”‚      โ”‚(App B) โ”‚
   โ””โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜      โ””โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜
       โ”‚               โ”‚
       โ–ผ               โ–ผ
   [Worker A]      [Worker B]
   Receives event  Receives event

Each microservice:

  1. Creates its own unique queue
  2. Binds the queue to the topic exchange with routing key patterns
  3. Runs Celery workers that consume from the queue
  4. All matching apps receive the same event simultaneously

RPC vs Broadcast Events

tchu-tchu supports two messaging patterns:

๐Ÿ”„ Broadcast Events (Pub/Sub)

One publisher โ†’ Multiple subscribers receive the same event

# Publisher
client.publish('user.created', {'user_id': 123})

# Subscriber 1 (notifications service)
@subscribe('user.created')
def send_welcome_email(event):
    send_email(event['user_id'])

# Subscriber 2 (analytics service)
@subscribe('user.created')
def track_signup(event):
    track_event(event['user_id'])

Use for: Event notifications, logging, analytics, cache invalidation

โšก RPC (Request-Response)

One caller โ†’ One responder โ†’ Returns result

# Caller
result = client.call('rpc.app_name.documents.list', {'company_id': 67})
print(result)  # Returns document list

# Responder (only data-room service handles this)
@subscribe('rpc.app_name.documents.list')
def list_documents(data):
    docs = Document.objects.filter(company_id=data['company_id'])
    return [doc.to_dict() for doc in docs]  # Returned to caller

Use for: Querying data, validation, calculations, inter-service API calls

Key Difference: Broadcast events go to ALL subscribers; RPC calls go to ONE handler and return a response.

Native Celery Retry Support

๐Ÿ†• v2.4.0 - Pass Celery retry options through tchu-tchu without importing Celery in your consuming app!

Via TchuEvent

from tchu_tchu.events import TchuEvent

class DataExchangeRunInitiatedEvent(TchuEvent):
    class Meta:
        topic = "data_exchange.run.initiated"
        request_serializer_class = DataExchangeRunSerializer

def execute_data_exchange_run_task(event):
    # Your handler - just a regular function
    run_id = event.get('run_id')
    # ... do work that might fail ...

# Pass celery_options - tchu-tchu handles everything internally!
DataExchangeRunInitiatedEvent(
    handler=execute_data_exchange_run_task,
    celery_options={
        "autoretry_for": (ConnectionError, TimeoutError, IOError),
        "retry_backoff": True,           # Exponential backoff
        "retry_backoff_max": 600,        # Max 10 minutes between retries
        "retry_jitter": True,            # Add randomness to prevent thundering herd
        "max_retries": 5,                # Maximum retry attempts
    }
).subscribe()

Via @subscribe Decorator

from tchu_tchu.subscriber import subscribe

@subscribe(
    'data_exchange.run.initiated',
    celery_options={
        "autoretry_for": (ConnectionError, TimeoutError),
        "retry_backoff": True,
        "retry_backoff_max": 600,
        "max_retries": 5,
    }
)
def my_handler(event):
    # Your handler logic
    ...

Supported celery_options

Option Type Description
autoretry_for tuple Exception classes that trigger automatic retry
retry_backoff bool/int Enable exponential backoff (True or base multiplier)
retry_backoff_max int Maximum backoff time in seconds (default: 600)
retry_jitter bool Add randomness to backoff to prevent thundering herd
max_retries int Maximum number of retry attempts
default_retry_delay int Default delay between retries in seconds
acks_late bool Acknowledge message after task completes
reject_on_worker_lost bool Reject task if worker dies

How It Works

When you pass celery_options, tchu-tchu dynamically creates a Celery task with those native options and dispatches your handler via .delay(). Your consuming app never needs to import or configure Celery directly - everything goes through tchu-tchu!

Benefits:

  • โœ… No Celery imports in your consuming app
  • โœ… Full native retry support (exponential backoff, jitter, etc.)
  • โœ… Per-handler configuration - different retry strategies for different events
  • โœ… Clean separation - tchu-tchu handles the Celery complexity

Installation

pip install tchu-tchu

Quick Start

Django + Celery (Simplified Setup)

๐Ÿš€ tchu-tchu v2.2.26+ provides an extended Celery class for the cleanest API:

# myapp/celery.py
import os
import django

from tchu_tchu.django import Celery  # Import extended Celery from tchu-tchu
from tchu_tchu.events import TchuEvent

# 1. Initialize Django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "settings.production")
django.setup()

# 2. Create Celery app (with tchu-tchu extensions)
app = Celery("my_app")
app.config_from_object("django.conf:settings", namespace="CELERY")

# 3. Optional: Set context helper for Django request reconstruction
def create_django_context(event_data):
    from types import SimpleNamespace
    user_data = event_data.get("user")
    if not user_data:
        return {}
    mock_request = SimpleNamespace()
    mock_request.user = SimpleNamespace(**user_data)
    return {"request": mock_request}

TchuEvent.set_context_helper(create_django_context)

# 4. Configure message broker in one method call! ๐ŸŽ‰
app.message_broker(
    queue_name="my_queue",
    include=[
        "app1.subscribers",
        "app2.subscribers",
        "app3.subscribers",
    ],
)

That's it! This automatically:

  • โœ… Imports all subscriber modules after Django is ready
  • โœ… Collects all routing keys from @subscribe decorators
  • โœ… Creates queue bindings to the tchu_events exchange
  • โœ… Configures Celery queues and task routes
  • โœ… Sets up cross-service RPC messaging
  • โœ… Creates the dispatcher task

Auto-Discovery: You can also omit include to use Celery's include parameter:

from tchu_tchu.django import Celery

# Specify full module paths in Celery's 'include'
app = Celery("my_app", include=["app1.subscribers", "app2.subscribers", "app3.subscribers"])
app.config_from_object("django.conf:settings", namespace="CELERY")

# Uses modules from Celery's include parameter
app.message_broker(queue_name="my_queue")

Alternative: You can still use the standalone setup_celery_queue() function:

from celery import Celery  # Standard Celery
from tchu_tchu.django import setup_celery_queue

app = Celery("my_app")
app.config_from_object("django.conf:settings", namespace="CELERY")

setup_celery_queue(
    app,
    queue_name="my_queue",
    subscriber_modules=["app1.subscribers", "app2.subscribers"],
)

Settings needed in Django:

# settings/base.py
INSTALLED_APPS = [
    # ... other apps ...
    "myapp.apps.MyAppConfig",  # Use explicit AppConfig path
]

# Make sure you have explicit AppConfig paths for your main apps
# This avoids "Apps aren't loaded yet" errors

๐Ÿ“– For complete documentation on the extended Celery class, see EXTENDED_CELERY_USAGE.md

Generic Celery Setup (Non-Django)

For non-Django apps, or if you prefer manual configuration:

# myapp/celery.py
from celery import Celery
from kombu import Exchange, Queue, binding
from tchu_tchu import create_topic_dispatcher, get_subscribed_routing_keys
from tchu_tchu.events import TchuEvent

app = Celery('myapp')

# Configure context helper (optional)
def my_context_helper(event_data):
    """Reconstruct request context from event data."""
    from types import SimpleNamespace
    user = event_data.get('user')
    if user:
        mock_request = SimpleNamespace()
        mock_request.user = SimpleNamespace(**user)
        return {'request': mock_request}
    return {}

TchuEvent.set_context_helper(my_context_helper)

# Import subscribers FIRST so @subscribe decorators run
app.autodiscover_tasks(['myapp.subscribers'])

# Auto-configure queue bindings from subscribed routing keys
tchu_exchange = Exchange('tchu_events', type='topic', durable=True)

# โœ… IMPORTANT: Pass celery_app to force immediate handler registration
all_routing_keys = get_subscribed_routing_keys(celery_app=app)

# Build bindings automatically
all_bindings = [
    binding(tchu_exchange, routing_key=key) 
    for key in all_routing_keys
]

# Configure queues
app.conf.task_queues = (
    Queue(
        'myapp_queue',
        exchange=tchu_exchange,
        bindings=all_bindings,
        durable=True,
        auto_delete=False,
    ),
)

# Route dispatcher to the queue
app.conf.task_routes = {
    'tchu_tchu.dispatch_event': {'queue': 'myapp_queue'},
}

# Create the dispatcher task
dispatcher = create_topic_dispatcher(app)

Key Points:

  • โœ… Always pass celery_app=app to get_subscribed_routing_keys() - this ensures handlers are registered before queue configuration
  • โœ… Queue bindings are automatically generated from your @subscribe decorators
  • โœ… Both broadcast events and RPC calls can use the same queue

2. Subscribe to Events

# myapp/subscribers.py
from tchu_tchu import subscribe

@subscribe('user.created')
def handle_user_created(event_data):
    """This handler will be called when user.created events are published"""
    print(f"User created: {event_data}")
    # Process the event...

@subscribe('user.*')  # Wildcard pattern
def handle_any_user_event(event_data):
    """This handler receives all user.* events"""
    print(f"User event: {event_data}")

3. Publish Events

# In any microservice
from tchu_tchu import TchuClient

client = TchuClient()

# Publish an event (all subscribed apps receive it)
client.publish('user.created', {
    'user_id': 123,
    'email': 'user@example.com',
    'name': 'John Doe'
})

# RPC call (request-response pattern)
try:
    result = client.call('user.validate', {
        'email': 'user@example.com'
    }, timeout=5)
    print(f"Validation result: {result}")
except TimeoutError:
    print("No response within timeout")

Multiple Microservices Example

App 1: User Service (Publisher + Subscriber)

# users/celery.py
from celery import Celery
from kombu import Exchange, Queue, binding
from tchu_tchu import create_topic_dispatcher, get_subscribed_routing_keys

app = Celery('users')
app.autodiscover_tasks(['users.subscribers'])

tchu_exchange = Exchange('tchu_events', type='topic', durable=True)

# Auto-collect routing keys from handlers
all_routing_keys = get_subscribed_routing_keys(celery_app=app)
all_bindings = [binding(tchu_exchange, routing_key=key) for key in all_routing_keys]

app.conf.task_queues = (
    Queue(
        'users_queue',
        exchange=tchu_exchange,
        bindings=all_bindings,
        durable=True,
    ),
)

app.conf.task_routes = {
    'tchu_tchu.dispatch_event': {'queue': 'users_queue'},
}

dispatcher = create_topic_dispatcher(app)

# users/subscribers.py
from tchu_tchu import subscribe

@subscribe('user.deleted')
def cleanup_user_data(event):
    print(f"Cleaning up data for user {event['user_id']}")

# users/views.py
from tchu_tchu import TchuClient

client = TchuClient()

def create_user(request):
    user = User.objects.create(...)
    
    # Publish event - all apps receive it!
    client.publish('user.created', {
        'user_id': user.id,
        'email': user.email
    })
    
    return Response(...)

App 2: Notifications Service (Subscriber Only)

# notifications/celery.py
from celery import Celery
from kombu import Exchange, Queue, binding
from tchu_tchu import create_topic_dispatcher, get_subscribed_routing_keys

app = Celery('notifications')
app.autodiscover_tasks(['notifications.subscribers'])

tchu_exchange = Exchange('tchu_events', type='topic', durable=True)

# Auto-collect routing keys from handlers
all_routing_keys = get_subscribed_routing_keys(celery_app=app)
all_bindings = [binding(tchu_exchange, routing_key=key) for key in all_routing_keys]

app.conf.task_queues = (
    Queue(
        'notifications_queue',
        exchange=tchu_exchange,
        bindings=all_bindings,
        durable=True,
    ),
)

app.conf.task_routes = {
    'tchu_tchu.dispatch_event': {'queue': 'notifications_queue'},
}

dispatcher = create_topic_dispatcher(app)

# notifications/subscribers.py
from tchu_tchu import subscribe

@subscribe('user.created')
def send_welcome_email(event):
    print(f"Sending welcome email to {event['email']}")
    # Send email...

App 3: Analytics Service (Subscriber Only)

# analytics/celery.py
from celery import Celery
from kombu import Exchange, Queue, binding
from tchu_tchu import create_topic_dispatcher, get_subscribed_routing_keys

app = Celery('analytics')
app.autodiscover_tasks(['analytics.subscribers'])

tchu_exchange = Exchange('tchu_events', type='topic', durable=True)

# Auto-collect routing keys from handlers
all_routing_keys = get_subscribed_routing_keys(celery_app=app)
all_bindings = [binding(tchu_exchange, routing_key=key) for key in all_routing_keys]

app.conf.task_queues = (
    Queue(
        'analytics_queue',
        exchange=tchu_exchange,
        bindings=all_bindings,
        durable=True,
    ),
)

app.conf.task_routes = {
    'tchu_tchu.dispatch_event': {'queue': 'analytics_queue'},
}

dispatcher = create_topic_dispatcher(app)

# analytics/subscribers.py
from tchu_tchu import subscribe

@subscribe('user.*')  # All user events
def track_user_event(event):
    print(f"Tracking event: {event}")
    # Store in analytics DB...

Routing Key Patterns

RabbitMQ topic exchanges support powerful routing patterns using wildcards:

Wildcard Types

  • * (star) - Matches exactly one word

    • user.* matches user.created, user.updated, user.deleted
    • *.created matches user.created, order.created, etc.
    • Does NOT match user.profile.updated (two words after user)
  • # (hash) - Matches zero or more words

    • user.# matches user.created, user.profile.updated, user.anything.else.here
    • # matches ALL events
    • order.# matches order.created, order.payment.completed, order.shipping.tracking.updated

Examples

Pattern Matches Doesn't Match
user.created user.created user.updated, order.created
user.* user.created, user.deleted user.profile.updated
user.# user.created, user.profile.updated order.created
*.created user.created, order.created user.updated
# Everything Nothing
rpc.app_name.* rpc.app_name.documents, rpc.app_name.exports rpc.app_name.documents

RPC Naming Convention

By convention, RPC routing keys start with rpc.:

  • rpc.app_name.documents.list - RPC call to data-room service
  • rpc.app_name.products.validate - RPC call to app_name service
  • app_name.sub_app_or_model.order.enriched - Broadcast event (not RPC)

This helps distinguish between point-to-point RPC calls and broadcast events.

Serverless Environments (Cloud Functions, Lambda)

v2.2.30+ includes a serverless producer that works in serverless environments where Celery's connection pooling doesn't work well.

Why a Serverless Producer?

  • Cloud Functions are short-lived and don't maintain persistent connections
  • Celery uses connection pooling, which expects long-running processes
  • The serverless producer uses pika directly (like the original tchu library)
  • Creates short-lived connections that work perfectly in serverless environments

Usage in Cloud Functions

# cloud_function/main.py
import os
from tchu_tchu.serverless_producer import ServerlessClient

# Get broker URL from environment
BROKER_URL = os.environ.get('RABBITMQ_URL', 'amqp://guest:guest@localhost:5672//')

def publish_event(request):
    """Cloud Function that publishes events to tchu-tchu."""
    # Create client with broker URL
    client = ServerlessClient(broker_url=BROKER_URL)
    
    # Publish event
    client.publish('coolset.data_room_consumer.data_point.submitted', {
        'data_point_id': 123,
        'value': 456.78,
        'timestamp': '2024-11-06T10:30:00Z'
    })
    
    # Close connection (optional, but recommended)
    client.close()
    
    return {'status': 'published'}

def publish_with_context_manager(request):
    """Using context manager (automatically closes connection)."""
    with ServerlessClient(broker_url=BROKER_URL) as client:
        client.publish('user.created', {'user_id': 123})
    
    return {'status': 'published'}

Using ServerlessProducer Directly

from tchu_tchu.serverless_producer import ServerlessProducer

# Create producer
producer = ServerlessProducer(
    broker_url='amqp://user:pass@rabbitmq-host:5672//',
    exchange_name='tchu_events',  # Optional, default: 'tchu_events'
    connection_timeout=10,  # Optional, default: 10 seconds
)

# Publish messages
message_id = producer.publish(
    routing_key='user.created',
    body={'user_id': 123},
    delivery_mode=2,  # 1=non-persistent, 2=persistent
)

# Close when done
producer.close()

Configuration

Environment Variables:

# Set RabbitMQ connection URL
export RABBITMQ_URL="amqp://username:password@rabbitmq-host:5672//"

Requirements:

tchu-tchu>=2.2.30

Network Access Requirements

Your cloud function must have network access to RabbitMQ:

Google Cloud Functions:

# Create VPC connector
gcloud compute networks vpc-access connectors create rabbitmq-connector \
    --region=us-central1 \
    --network=your-vpc \
    --range=10.8.0.0/28

# Deploy function with VPC access
gcloud functions deploy my-function \
    --vpc-connector=rabbitmq-connector \
    --set-env-vars RABBITMQ_URL="amqp://user:pass@internal-ip:5672//"

AWS Lambda:

  • Place Lambda in same VPC as RabbitMQ (or use Amazon MQ)
  • Configure security groups to allow access to RabbitMQ ports

Azure Functions:

  • Use VNet Integration to connect to RabbitMQ
  • Or use Azure Service Bus as an alternative

Comparison: Serverless vs Celery-based

Feature ServerlessClient TchuClient (Celery)
Use Case Cloud Functions, Lambda Long-running services
Connection Short-lived Persistent pool
Dependencies pika only Celery + kombu
Overhead Minimal Higher (connection pooling)
Publish Support โœ… Yes โœ… Yes
RPC Support โŒ No โœ… Yes
Subscribe Support โŒ No (publish-only) โœ… Yes

Key Difference: ServerlessClient is publish-only and perfect for serverless environments where you only need to send events, not consume them.

RPC (Request-Response) Pattern

tchu-tchu supports RPC-style calls where you send a message and wait for a response:

from tchu_tchu import TchuClient, subscribe

# Publisher (any app)
client = TchuClient()

try:
    result = client.call('order.calculate_total', {
        'items': [{'id': 1, 'quantity': 2}],
        'discount_code': 'SAVE10'
    }, timeout=10)
    
    print(f"Order total: ${result['total']}")
except TimeoutError:
    print("No response within 10 seconds")
except Exception as e:
    print(f"RPC call failed: {e}")

# Subscriber (handler app)
@subscribe('order.calculate_total')
def calculate_order_total(data):
    items = data['items']
    total = sum(item['quantity'] * get_price(item['id']) for item in items)
    
    # Apply discount if provided
    if data.get('discount_code'):
        total = apply_discount(total, data['discount_code'])
    
    # Return value will be sent back to caller
    return {'total': total, 'currency': 'USD'}

Important Notes:

  • RPC calls are point-to-point (only one worker processes the request)
  • The first handler to respond wins (if multiple handlers exist)
  • Requires a result backend (Redis, database, etc.) configured in Celery
  • Use appropriate timeouts to avoid hanging requests

RPC Error Handling with TchuRPCException

Use TchuRPCException to return business logic errors as responses (not exceptions). Define separate serializers for success and error cases:

from tchu_tchu import TchuEvent, TchuRPCException
from rest_framework import serializers

# Define your serializers
class RequestSerializer(serializers.Serializer):
    user_id = serializers.IntegerField()
    amount = serializers.DecimalField(max_digits=10, decimal_places=2)

class SuccessResponseSerializer(serializers.Serializer):
    transaction_id = serializers.CharField()
    amount = serializers.DecimalField(max_digits=10, decimal_places=2)
    status = serializers.CharField()

class ErrorResponseSerializer(serializers.Serializer):
    success = serializers.BooleanField()
    error_message = serializers.CharField()
    error_code = serializers.CharField()
    error_details = serializers.JSONField(required=False, default=dict)

# Define your event
class ProcessPaymentEvent(TchuEvent):
    class Meta:
        topic = "rpc.payments.process"
        request_serializer_class = RequestSerializer
        response_serializer_class = SuccessResponseSerializer
        error_serializer_class = ErrorResponseSerializer

# Handler - raise TchuRPCException for business logic errors
def process_payment_handler(event):
    user_id = event["user_id"]
    amount = event["amount"]
    
    # Business validation - raise TchuRPCException
    if amount <= 0:
        raise TchuRPCException(
            message="Amount must be positive",
            code="INVALID_AMOUNT",
            details={"amount": str(amount)}
        )
    
    if not user_exists(user_id):
        raise TchuRPCException(
            message="User not found",
            code="USER_NOT_FOUND",
            details={"user_id": user_id}
        )
    
    # Process and return success response
    transaction = charge_user(user_id, amount)
    return {
        "transaction_id": transaction.id,
        "amount": transaction.amount,
        "status": "completed"
    }

ProcessPaymentEvent(handler=process_payment_handler).subscribe()

# Caller - check for error_code to detect errors
event = ProcessPaymentEvent()
event.serialize_request({"user_id": 123, "amount": 50.00})
response = event.call(timeout=30)

if "error_code" in response:
    print(f"Failed: {response['error_message']} ({response['error_code']})")
else:
    print(f"Success: {response['transaction_id']}")

Benefits:

  • Clean separation: Success and error responses have separate schemas
  • No mixed serializers: No need to make all fields optional
  • Errors as responses: Business logic errors returned as proper responses (not exceptions)
  • Better validation: Each response type is validated against its own serializer
  • No timeouts: Errors don't cause "No handlers found" or timeout errors
  • Clear error handling: Caller can distinguish between business logic errors and system failures

Error Response Structure:

{
    "success": False,
    "error_message": "User not found",
    "error_code": "USER_NOT_FOUND",
    "error_details": {"user_id": 123}
}

Success Response Structure:

{
    "transaction_id": "txn_123",
    "amount": 50.00,
    "status": "completed"
}

Note: If you don't define error_serializer_class, error responses will still work but won't be validated.

Custom RPC Exceptions

Create domain-specific exceptions by subclassing TchuRPCException and overriding to_response_dict():

from tchu_tchu import TchuRPCException

class PaymentException(TchuRPCException):
    """Custom payment error with specific fields."""
    
    def __init__(self, payment_id: str, error_type: str, message: str, retry_after: int = None):
        self.payment_id = payment_id
        self.error_type = error_type
        self.retry_after = retry_after
        super().__init__(message=message, code=error_type)
    
    def to_response_dict(self) -> dict:
        """Return custom response structure."""
        response = {
            "payment_id": self.payment_id,
            "error_type": self.error_type,
            "message": self.message,
        }
        if self.retry_after:
            response["retry_after"] = self.retry_after
        return response

# Handler
def payment_handler(event):
    if insufficient_funds():
        raise PaymentException(
            payment_id="pay_123",
            error_type="INSUFFICIENT_FUNDS",
            message="Your account balance is too low",
            retry_after=3600  # Retry after 1 hour
        )
    # ...

# Define matching error serializer
class PaymentErrorSerializer(serializers.Serializer):
    payment_id = serializers.CharField()
    error_type = serializers.CharField()
    message = serializers.CharField()
    retry_after = serializers.IntegerField(required=False)

Exception Guide

Use TchuRPCException for:

  • โœ… Business logic errors in RPC handlers
  • โœ… Validation failures (user input, business rules)
  • โœ… Expected error conditions (user not found, insufficient funds)
  • ๐ŸŽฏ Effect: Returns error response to caller (not an exception)
if not user_exists(user_id):
    raise TchuRPCException("User not found", code="USER_NOT_FOUND")

Use other exceptions for:

  • โœ… System/infrastructure errors (database down, broker unavailable)
  • โœ… Unexpected errors (bugs, null pointers)
  • โœ… Broadcast event handler failures (no caller to return to)
  • ๐ŸŽฏ Effect: Propagates as exception, RPC call times out
from tchu_tchu.utils.error_handling import SerializationError, PublishError

# Serialization errors
if not serializer.is_valid():
    raise SerializationError(f"Invalid data: {serializer.errors}")

# Broker errors
if not broker.is_connected():
    raise PublishError("Failed to connect to RabbitMQ")

Framework Integration

Custom Context Reconstruction

tchu-tchu is framework-agnostic. To integrate with your framework's auth/context system, provide a context helper:

from tchu_tchu.events import TchuEvent

# Define your context reconstruction logic
def my_context_helper(event_data):
    """
    Reconstruct request context from event data.
    
    Args:
        event_data: Dict containing fields from your event
        
    Returns:
        Context dict for use with serializers
    """
    user = event_data.get('user')
    if not user:
        return {}
    
    # Reconstruct your framework's request/context object
    # Example for Django:
    from types import SimpleNamespace
    mock_request = SimpleNamespace()
    mock_request.user = SimpleNamespace(**user)
    return {'request': mock_request}

# Set globally (affects all events)
TchuEvent.set_context_helper(my_context_helper)

# Or per-instance
event = MyEvent(context_helper=my_context_helper)

โš ๏ธ Important: Context helpers must be regular functions, not bound methods. If you encounter TypeError: context_helper() takes 1 positional argument but 2 were given, ensure you're passing a function, not a method:

# โŒ WRONG - Don't pass bound methods
class MyClass:
    def my_helper(self, event_data):
        return {}

obj = MyClass()
TchuEvent.set_context_helper(obj.my_helper)  # Will pass (self, event_data)

# โœ… CORRECT - Use standalone functions
def my_helper(event_data):
    return {}

TchuEvent.set_context_helper(my_helper)  # Will pass (event_data)

# โœ… WORKAROUND - Use *args if you need to support both
def safe_context_helper(*args, **kwargs):
    """Handles both (event_data) and (self, event_data) signatures."""
    event_data = args[1] if len(args) >= 2 else args[0]
    # Your logic here
    return create_context(event_data)

Django Integration

Recommended: Put context helper in your celery.py file

# myapp/celery.py
from celery import Celery
from kombu import Exchange, Queue
from tchu_tchu import create_topic_dispatcher
from tchu_tchu.events import TchuEvent

# 1. Define your Django context helper
def create_django_request_context(event_data):
    """Reconstruct Django request context from event data."""
    from types import SimpleNamespace
    
    user_data = event_data.get("user")
    company_data = event_data.get("company")
    user_company_data = event_data.get("user_company")
    
    mock_request = SimpleNamespace()
    
    # If no auth data, return empty context
    if not all([user_data, company_data, user_company_data]):
        return {"request": mock_request}
    
    # Build mock user with company and user_company
    mock_user = SimpleNamespace()
    mock_user.id = user_data.get("id")
    mock_user.email = user_data.get("email")
    mock_user.first_name = user_data.get("first_name")
    mock_user.last_name = user_data.get("last_name")
    
    mock_user.company = SimpleNamespace()
    mock_user.company.id = company_data.get("id")
    mock_user.company.name = company_data.get("name")
    
    mock_user.user_company = SimpleNamespace()
    mock_user.user_company.id = user_company_data.get("id")
    
    mock_request.user = mock_user
    return {"request": mock_request}

# Set it globally (runs when celery.py is imported)
TchuEvent.set_context_helper(create_django_request_context)

# ... rest of your Celery configuration ...
app = Celery('myapp')
# etc.

Or: Create a separate helper module and import it

# myapp/events/django_context_helper.py
from types import SimpleNamespace

def create_django_request_context(event_data):
    # ... same as above ...
    pass

# myapp/celery.py
from tchu_tchu.events import TchuEvent
from myapp.events.django_context_helper import create_django_request_context

# Set it globally
TchuEvent.set_context_helper(create_django_request_context)

Then define your events:

# myapp/events.py
from tchu_tchu.events import TchuEvent
from rest_framework import serializers

class UserCreatedEvent(TchuEvent):
    class Meta:
        topic = "user.created"
        request_serializer_class = RequestSerializer
    
class RequestSerializer(serializers.Serializer):
    user_id = serializers.IntegerField()
    email = serializers.EmailField()
    name = serializers.CharField()

Publish events:

# myapp/views.py
event = UserCreatedEvent()
event.serialize_request({
    'user_id': 123,
    'email': 'user@example.com',
    'name': 'John Doe'
})
event.publish()

Subscribe to events:

# myapp/subscribers.py
from tchu_tchu import subscribe

@subscribe('user.created')
def handle_user_created(event_data):
    print(f"User {event_data['email']} was created")
    
    # Or use TchuEvent for context:
    event = UserCreatedEvent()
    event.serialize_request(event_data)
    user_id = event.request_context['request'].user.id  # โœ… Works!

Model Signal Integration

from django.db.models.signals import post_save
from django.dispatch import receiver
from tchu_tchu import TchuClient

client = TchuClient()

@receiver(post_save, sender=User)
def publish_user_created(sender, instance, created, **kwargs):
    if created:
        client.publish('user.created', {
            'user_id': instance.id,
            'email': instance.email
})

Configuration

Celery Settings

# settings.py or celery.py

from kombu import Exchange, Queue

# Broker URL
broker_url = 'amqp://guest:guest@localhost:5672//'

# Define the topic exchange
tchu_exchange = Exchange('tchu_events', type='topic', durable=True)

# Configure your app's queue
task_queues = (
    Queue(
        'myapp_queue',
        exchange=tchu_exchange,
        routing_key='user.*',  # Routing key pattern(s)
        durable=True,
        auto_delete=False,
    ),
)

# Route the dispatcher task to your queue
task_routes = {
    'tchu_tchu.dispatch_event': {'queue': 'myapp_queue'},
}

# Optional: Recommended Celery settings
task_serializer = 'json'
accept_content = ['json']
result_serializer = 'json'
timezone = 'UTC'
enable_utc = True

Multiple Routing Keys

If you want to subscribe to multiple patterns, create multiple queue bindings:

from kombu import Queue, binding

task_queues = (
    Queue(
        'myapp_queue',
        exchange=tchu_exchange,
        bindings=[
            binding(tchu_exchange, routing_key='user.*'),
            binding(tchu_exchange, routing_key='order.*'),
            binding(tchu_exchange, routing_key='payment.*'),
        ],
        durable=True,
    ),
)

API Reference

Extended Celery Class

๐Ÿ†• v2.2.26+ - Import from tchu-tchu for extended functionality:

from tchu_tchu.django import Celery

app = Celery("my_app")
app.config_from_object("django.conf:settings", namespace="CELERY")

# Extended method for message broker configuration
app.message_broker(
    queue_name="my_queue",
    include=["app1.subscribers", "app2.subscribers"],
    exchange_name="tchu_events",  # Optional
    exchange_type="topic",  # Optional
    durable=True,  # Optional
    auto_delete=False,  # Optional
)

Methods:

  • All standard celery.Celery methods (fully compatible)
  • message_broker() - Configure tchu-tchu message broker with subscriber modules

See EXTENDED_CELERY_USAGE.md for complete documentation.

TchuClient

from tchu_tchu import TchuClient

client = TchuClient(celery_app=None, serializer=None)

Methods:

  • publish(topic, data, **kwargs) - Publish an event (broadcast to all subscribers)
  • call(topic, data, timeout=30, **kwargs) - RPC call (request-response, returns result)

subscribe()

from tchu_tchu import subscribe

@subscribe(routing_key, name=None, handler_id=None, metadata=None, celery_options=None)
def my_handler(event_data):
    pass

Parameters:

  • routing_key - Topic pattern to subscribe to (e.g., user.created, user.*, #)
  • name - Optional handler name
  • handler_id - Optional unique handler ID
  • metadata - Optional metadata dict
  • celery_options - Optional dict of Celery task options for native retry support:
    • autoretry_for - Tuple of exception classes to auto-retry on
    • retry_backoff - Enable exponential backoff (bool or int)
    • retry_backoff_max - Maximum backoff time in seconds
    • retry_jitter - Add randomness to backoff
    • max_retries - Maximum retry attempts
    • default_retry_delay - Default delay between retries

get_subscribed_routing_keys()

from tchu_tchu import get_subscribed_routing_keys

# โœ… IMPORTANT: Always pass celery_app
all_routing_keys = get_subscribed_routing_keys(
    celery_app=app,  # Required to force handler registration
    exclude_patterns=None,  # Optional: list of patterns to exclude (e.g., ['rpc.*'])
    force_import=True,  # Default: True (forces immediate task import)
)

Returns: List of all routing keys that have handlers registered via @subscribe or Event().subscribe().

Important: Without celery_app, handlers may not be registered yet (due to lazy autodiscover_tasks()), resulting in an empty list and incorrect queue bindings.

setup_celery_queue()

๐Ÿ†• v2.2.12+ - Django helper that simplifies Celery configuration:

from tchu_tchu import setup_celery_queue

setup_celery_queue(
    celery_app,
    queue_name,
    subscriber_modules,
    exchange_name='tchu_events',
    exchange_type='topic',
    durable=True,
    auto_delete=False
)

Parameters:

  • celery_app - Celery app instance
  • queue_name - Name of the queue (e.g., "my_app_queue")
  • subscriber_modules - List of module paths containing @subscribe decorators (e.g., ["app1.subscribers", "app2.subscribers"])
  • exchange_name - RabbitMQ exchange name (default: "tchu_events")
  • exchange_type - Exchange type (default: "topic")
  • durable - Whether queue is durable (default: True)
  • auto_delete - Whether queue auto-deletes (default: False)

What it does:

  1. Imports all subscriber modules after Django is ready (avoids AppRegistryNotReady errors)
  2. Collects all routing keys from registered handlers
  3. Creates queue bindings to the exchange
  4. Configures Celery queues and task routes
  5. Sets up cross-service RPC messaging with proper exchange config
  6. Creates the dispatcher task

Example:

# myapp/celery.py
import django
django.setup()

app = Celery("my_app")
app.config_from_object("django.conf:settings", namespace="CELERY")

setup_celery_queue(
    app,
    queue_name="my_queue",
    subscriber_modules=["app1.subscribers", "app2.subscribers"],
)

create_topic_dispatcher()

from tchu_tchu import create_topic_dispatcher

dispatcher = create_topic_dispatcher(
    celery_app,
    task_name='tchu_tchu.dispatch_event',
    serializer=None
)

Creates a Celery task that dispatches incoming events to local handlers. Call this in your celery.py after configuring queues.

Note: If using setup_celery_queue(), you don't need to call this manually - it's handled automatically.

Migration from v1.x

v2.0.0 is a breaking change with a completely different architecture:

What Changed

v1.x v2.0.0
Task-based (point-to-point) Exchange-based (broadcast)
Task discovery/inspection Static queue configuration
Automatic remote task registration Manual queue bindings
register_remote_task() required Not needed (deprecated)
Slow (task inspection overhead) Fast (direct routing)

Migration Steps

  1. Update Celery Config - Add queue bindings to tchu_events exchange
  2. Create Dispatcher - Call create_topic_dispatcher(app) in your Celery app
  3. Remove register_remote_task() calls - No longer needed
  4. Test - Ensure events are received by all subscribing apps

Example Migration

Before (v1.x):

# No special Celery config needed
# Tasks auto-discovered via inspection

from tchu_tchu import subscribe, register_remote_task

@subscribe('user.created')
def handle_user_created(event):
    pass

After (v2.0.0):

# celery.py - NEW: Configure queue bindings
from kombu import Exchange, Queue
from tchu_tchu import create_topic_dispatcher

tchu_exchange = Exchange('tchu_events', type='topic', durable=True)

app.conf.task_queues = (
    Queue('myapp_queue', exchange=tchu_exchange, routing_key='user.*'),
)

app.conf.task_routes = {
    'tchu_tchu.dispatch_event': {'queue': 'myapp_queue'},
}

dispatcher = create_topic_dispatcher(app)  # NEW

# subscribers.py - Same as before!
from tchu_tchu import subscribe

@subscribe('user.created')
def handle_user_created(event):
    pass

# No register_remote_task() needed!

Troubleshooting

Broadcast Events Not Received (v2.2.11 Fix)

Symptom: RPC works but broadcast events are silently failing.

Cause: You're not passing celery_app to get_subscribed_routing_keys(), resulting in empty routing keys [] and incorrect queue bindings.

Solution:

# โŒ WRONG (returns empty list)
all_routing_keys = get_subscribed_routing_keys()

# โœ… CORRECT (forces handler registration)
all_routing_keys = get_subscribed_routing_keys(celery_app=app)

If you upgraded from v2.2.9, you MUST:

  1. Update code to pass celery_app=app
  2. Delete old queues: rabbitmqctl delete_queue your_queue_name
  3. Restart services

See MIGRATION_2.2.11.md for details.

Verify Queue Bindings

Check that your queue has wildcard bindings:

rabbitmqctl list_bindings | grep "your_queue_name"

Should see:

tchu_events  exchange  your_queue  queue  app_name.sub_app_or_model.#  []  โœ…
tchu_events  exchange  your_queue  queue  app_name.#  []  โœ…

NOT:

tchu_events  exchange  your_queue  queue  your_queue  []  โŒ Wrong!

Events Still Not Received

  1. Check Celery is running: celery -A myapp worker -l info
  2. Check handler registration: Look for Registered handler 'X' for routing key 'Y' in logs
  3. Check routing keys match: Publisher routing key must match subscriber pattern
  4. Check task routes: Verify tchu_tchu.dispatch_event routes to your queue
  5. Check queue bindings: Verify in RabbitMQ that patterns match your handlers

"Received unregistered task" Error

This means Celery received a message for a task it doesn't recognize. Check:

  • create_topic_dispatcher(app) was called
  • Task routes are configured correctly
  • The dispatcher task name matches in config and code

Context Helper TypeError

Error: TypeError: context_helper() takes 1 positional argument but 2 were given

Cause: You passed a bound method (instance method) instead of a standalone function.

Solution:

# โŒ WRONG - Bound method
obj = MyClass()
TchuEvent.set_context_helper(obj.my_helper)

# โœ… CORRECT - Standalone function
def my_context_helper(event_data):
    return create_context(event_data)

TchuEvent.set_context_helper(my_context_helper)

# โœ… WORKAROUND - Support both signatures
def safe_helper(*args, **kwargs):
    event_data = args[1] if len(args) >= 2 else args[0]
    return create_context(event_data)

See Framework Integration for more details.

Multiple Apps Not Receiving Events

Each app MUST have:

  1. Its own unique queue name
  2. Queue bound to the tchu_events exchange with correct routing patterns
  3. The create_topic_dispatcher() call in its Celery config
  4. Celery worker running
  5. Handlers registered (visible in logs at startup)

Performance

v2.0.0 is significantly faster than v1.x:

  • No task discovery - Static configuration means no inspection overhead
  • No queue inspection - Direct routing via AMQP
  • Parallel delivery - RabbitMQ broadcasts to all queues simultaneously

Expected latency: < 10ms (vs 500-1500ms in v1.x due to inspection)

Changelog

For detailed version history, see CHANGELOG.md.

v2.4.0 (2024-12-29) - Current

๐Ÿš€ New: Native Celery Retry Support via celery_options

Pass Celery retry options through TchuEvent or @subscribe - your consuming app never needs to import Celery directly!

Added:

  • NEW: celery_options parameter for TchuEvent and @subscribe decorator
  • Native Celery retry support: autoretry_for, retry_backoff, retry_backoff_max, retry_jitter, max_retries
  • Dynamic Celery task creation with full native retry capabilities
  • Per-handler retry configuration without touching Celery directly

Example:

from tchu_tchu.events import TchuEvent

# Pass celery_options - tchu-tchu handles everything!
DataExchangeRunInitiatedEvent(
    handler=execute_task,
    celery_options={
        "autoretry_for": (ConnectionError, TimeoutError),
        "retry_backoff": True,
        "retry_backoff_max": 600,
        "retry_jitter": True,
        "max_retries": 5,
    }
).subscribe()

v2.3.1 (2024-12-15)

Minor bugfixes and documentation updates.


v2.2.26 (2024-11-05)

๐Ÿš€ New: Extended Celery Class

Added:

  • NEW: Extended Celery class that wraps standard Celery with tchu-tchu integration
  • Import with from tchu_tchu.django import Celery for seamless integration
  • New app.message_broker() method for cleaner configuration API
  • Fully compatible with standard Celery - all original methods preserved
  • More Pythonic and intuitive than standalone function approach

Example:

from tchu_tchu.django import Celery  # Extended Celery class

app = Celery("my_app")
app.config_from_object("django.conf:settings", namespace="CELERY")

# Clean method-based API
app.message_broker(
    queue_name="my_queue",
    subscriber_modules=["app1.subscribers", "app2.subscribers"],
)

Migration: Optional - setup_celery_queue() function still available and fully supported.


v2.2.11 (2025-10-28)

๐Ÿšจ Critical Fix: Broadcast Events Now Work

Fixed:

  • CRITICAL: Fixed broadcast events not being received due to incorrect RabbitMQ queue bindings
  • get_subscribed_routing_keys() was being called before handlers were registered
  • RPC calls worked, but broadcast events failed silently

Added:

  • New celery_app parameter to get_subscribed_routing_keys() to force immediate handler registration
  • New force_import parameter (default: True) to control import behavior
  • Improved documentation and migration guide

Migration Required:

# โŒ OLD (v2.2.9)
all_routing_keys = get_subscribed_routing_keys()

# โœ… NEW (v2.2.11)
all_routing_keys = get_subscribed_routing_keys(celery_app=app)

IMPORTANT: If upgrading from v2.2.9:

  1. Update code to pass celery_app=app
  2. Delete old queues from RabbitMQ
  3. Restart services

See MIGRATION_2.2.11.md for complete upgrade instructions.


Recent Versions

v2.2.3 - Auto-configuration with get_subscribed_routing_keys()
v2.2.2 - Fixed JSONField serialization errors
v2.2.0 - Framework-agnostic with injectable context helpers
v2.1.0 - RPC (request-response) support
v2.0.0 - Complete redesign with topic exchanges (100x faster)

License

MIT License - see LICENSE file for details.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tchu_tchu-3.0.1.tar.gz (57.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tchu_tchu-3.0.1-py3-none-any.whl (56.2 kB view details)

Uploaded Python 3

File details

Details for the file tchu_tchu-3.0.1.tar.gz.

File metadata

  • Download URL: tchu_tchu-3.0.1.tar.gz
  • Upload date:
  • Size: 57.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.11.3 Darwin/24.5.0

File hashes

Hashes for tchu_tchu-3.0.1.tar.gz
Algorithm Hash digest
SHA256 8633c6cf70e74f9b43b78a113bf5079f7006e5e34c63b112663a437cb80a2682
MD5 4313d6f5b2c709a798aa36657bea63ed
BLAKE2b-256 9934e986fadd1bbb56474c75759071d570e38c35d81569bb222964051ca318e6

See more details on using hashes here.

File details

Details for the file tchu_tchu-3.0.1-py3-none-any.whl.

File metadata

  • Download URL: tchu_tchu-3.0.1-py3-none-any.whl
  • Upload date:
  • Size: 56.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.11.3 Darwin/24.5.0

File hashes

Hashes for tchu_tchu-3.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 87b934c6136ab94c10bbddc4d057d3c6ed10a9c70f749e3ba617589862c7ecf6
MD5 3acc4eb5ce7319a6e06814aef6afb1e7
BLAKE2b-256 74815dfd2488f30312a1cd21b323b54dd6abf1353ba4a5be39459b8c71bed944

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page