Skip to main content

A modern Celery-based messaging library with true broadcast support for microservices

Project description

tchu-tchu

A modern Celery-based messaging library with true broadcast support for microservices. Designed to replicate the simplicity of the original tchu library while leveraging Celery workers for execution.

License: MIT PyPI version

Features

  • โœจ True Broadcast Messaging - One event reaches multiple microservices
  • ๐Ÿš€ Uses Existing Celery Workers - No separate listener process needed
  • ๐ŸŽฏ Topic-based Routing - RabbitMQ topic exchange with wildcard patterns
  • ๐Ÿ”„ Drop-in Replacement - Compatible with original tchu API
  • ๐Ÿ“ฆ Pydantic Integration - Type-safe event serialization
  • ๐Ÿ›ก๏ธ Django Support - Built-in Django REST Framework integration
  • โšก Fast - No task discovery or inspection overhead

Architecture

How It Works:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  Publisher  โ”‚โ”€โ”€โ”
โ”‚   (Any App) โ”‚  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
                 โ–ผ
         โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
         โ”‚Topic Exchangeโ”‚ (RabbitMQ)
         โ”‚ tchu_events  โ”‚
         โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                โ”‚
        โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
        โ”‚               โ”‚
        โ–ผ               โ–ผ
   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
   โ”‚Queue 1 โ”‚      โ”‚Queue 2 โ”‚
   โ”‚(App A) โ”‚      โ”‚(App B) โ”‚
   โ””โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜      โ””โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜
       โ”‚               โ”‚
       โ–ผ               โ–ผ
   [Worker A]      [Worker B]
   Receives event  Receives event

Each microservice:

  1. Creates its own unique queue
  2. Binds the queue to the topic exchange with routing key patterns
  3. Runs Celery workers that consume from the queue
  4. All matching apps receive the same event simultaneously

Installation

pip install tchu-tchu

Quick Start

1. Configure Celery (Each Microservice)

Each microservice needs to configure its Celery app to consume from a queue bound to the tchu_events exchange:

# myapp/celery.py
from celery import Celery
from kombu import Exchange, Queue
from tchu_tchu import create_topic_dispatcher

app = Celery('myapp')

# Define the topic exchange
tchu_exchange = Exchange('tchu_events', type='topic', durable=True)

# Each app has its own unique queue bound to the exchange
app.conf.task_queues = (
    Queue(
        'myapp_queue',                  # Unique queue name for this app
        exchange=tchu_exchange,
        routing_key='user.*',           # Subscribe to user.* events
        durable=True,
        auto_delete=False,
    ),
)

# Route the dispatcher task to your app's queue
app.conf.task_routes = {
    'tchu_tchu.dispatch_event': {'queue': 'myapp_queue'},
}

# Create the dispatcher task (handles incoming events)
dispatcher = create_topic_dispatcher(app)

2. Subscribe to Events

# myapp/subscribers.py
from tchu_tchu import subscribe

@subscribe('user.created')
def handle_user_created(event_data):
    """This handler will be called when user.created events are published"""
    print(f"User created: {event_data}")
    # Process the event...

@subscribe('user.*')  # Wildcard pattern
def handle_any_user_event(event_data):
    """This handler receives all user.* events"""
    print(f"User event: {event_data}")

3. Publish Events

# In any microservice
from tchu_tchu import TchuClient

client = TchuClient()

# Publish an event (all subscribed apps receive it)
client.publish('user.created', {
    'user_id': 123,
    'email': 'user@example.com',
    'name': 'John Doe'
})

Multiple Microservices Example

App 1: User Service (Publisher + Subscriber)

# users/celery.py
from celery import Celery
from kombu import Exchange, Queue
from tchu_tchu import create_topic_dispatcher

app = Celery('users')

tchu_exchange = Exchange('tchu_events', type='topic', durable=True)

app.conf.task_queues = (
    Queue('users_queue', exchange=tchu_exchange, routing_key='user.*'),
)

app.conf.task_routes = {
    'tchu_tchu.dispatch_event': {'queue': 'users_queue'},
}

dispatcher = create_topic_dispatcher(app)

# users/subscribers.py
from tchu_tchu import subscribe

@subscribe('user.deleted')
def cleanup_user_data(event):
    print(f"Cleaning up data for user {event['user_id']}")

# users/views.py
from tchu_tchu import TchuClient

client = TchuClient()

def create_user(request):
    user = User.objects.create(...)
    
    # Publish event - all apps receive it!
    client.publish('user.created', {
        'user_id': user.id,
        'email': user.email
    })
    
    return Response(...)

App 2: Notifications Service (Subscriber Only)

# notifications/celery.py
from celery import Celery
from kombu import Exchange, Queue
from tchu_tchu import create_topic_dispatcher

app = Celery('notifications')

tchu_exchange = Exchange('tchu_events', type='topic', durable=True)

app.conf.task_queues = (
    Queue('notifications_queue', exchange=tchu_exchange, routing_key='user.*'),
)

app.conf.task_routes = {
    'tchu_tchu.dispatch_event': {'queue': 'notifications_queue'},
}

dispatcher = create_topic_dispatcher(app)

# notifications/subscribers.py
from tchu_tchu import subscribe

@subscribe('user.created')
def send_welcome_email(event):
    print(f"Sending welcome email to {event['email']}")
    # Send email...

App 3: Analytics Service (Subscriber Only)

# analytics/celery.py
from celery import Celery
from kombu import Exchange, Queue
from tchu_tchu import create_topic_dispatcher

app = Celery('analytics')

tchu_exchange = Exchange('tchu_events', type='topic', durable=True)

app.conf.task_queues = (
    Queue('analytics_queue', exchange=tchu_exchange, routing_key='#'),  # All events
)

app.conf.task_routes = {
    'tchu_tchu.dispatch_event': {'queue': 'analytics_queue'},
}

dispatcher = create_topic_dispatcher(app)

# analytics/subscribers.py
from tchu_tchu import subscribe

@subscribe('user.*')  # All user events
def track_user_event(event):
    print(f"Tracking event: {event}")
    # Store in analytics DB...

Routing Key Patterns

RabbitMQ topic exchanges support powerful routing patterns:

  • user.created - Exact match only
  • user.* - Matches user.created, user.updated, user.deleted
  • *.created - Matches user.created, order.created, etc.
  • # - Matches all events
  • order.# - Matches order.created, order.payment.completed, etc.

Django Integration

Using with TchuEvent

from tchu_tchu.events import TchuEvent
from rest_framework import serializers

class UserCreatedEvent(TchuEvent):
    topic = "user.created"
    
    class RequestSerializer(serializers.Serializer):
        user_id = serializers.IntegerField()
        email = serializers.EmailField()
        name = serializers.CharField()

# Publish
event = UserCreatedEvent(request_data={
    'user_id': 123,
    'email': 'user@example.com',
    'name': 'John Doe'
})
event.publish()

# Subscribe
from tchu_tchu import subscribe

@subscribe('user.created')
def handle_user_created(event_data):
    # event_data is validated by the RequestSerializer
    print(f"User {event_data['email']} was created")

Model Signal Integration

from django.db.models.signals import post_save
from django.dispatch import receiver
from tchu_tchu import TchuClient

client = TchuClient()

@receiver(post_save, sender=User)
def publish_user_created(sender, instance, created, **kwargs):
    if created:
        client.publish('user.created', {
            'user_id': instance.id,
            'email': instance.email
        })

Configuration

Celery Settings

# settings.py or celery.py

from kombu import Exchange, Queue

# Broker URL
broker_url = 'amqp://guest:guest@localhost:5672//'

# Define the topic exchange
tchu_exchange = Exchange('tchu_events', type='topic', durable=True)

# Configure your app's queue
task_queues = (
    Queue(
        'myapp_queue',
        exchange=tchu_exchange,
        routing_key='user.*',  # Routing key pattern(s)
        durable=True,
        auto_delete=False,
    ),
)

# Route the dispatcher task to your queue
task_routes = {
    'tchu_tchu.dispatch_event': {'queue': 'myapp_queue'},
}

# Optional: Recommended Celery settings
task_serializer = 'json'
accept_content = ['json']
result_serializer = 'json'
timezone = 'UTC'
enable_utc = True

Multiple Routing Keys

If you want to subscribe to multiple patterns, create multiple queue bindings:

from kombu import Queue, binding

task_queues = (
    Queue(
        'myapp_queue',
        exchange=tchu_exchange,
        bindings=[
            binding(tchu_exchange, routing_key='user.*'),
            binding(tchu_exchange, routing_key='order.*'),
            binding(tchu_exchange, routing_key='payment.*'),
        ],
        durable=True,
    ),
)

API Reference

TchuClient

from tchu_tchu import TchuClient

client = TchuClient(celery_app=None, serializer=None)

Methods:

  • publish(topic, data, **kwargs) - Publish an event (broadcast)
  • call(topic, data, timeout=30, **kwargs) - RPC call (not yet implemented)

subscribe()

from tchu_tchu import subscribe

@subscribe(routing_key, name=None, handler_id=None, metadata=None)
def my_handler(event_data):
    pass

Parameters:

  • routing_key - Topic pattern to subscribe to
  • name - Optional handler name
  • handler_id - Optional unique handler ID
  • metadata - Optional metadata dict

create_topic_dispatcher()

from tchu_tchu import create_topic_dispatcher

dispatcher = create_topic_dispatcher(
    celery_app,
    task_name='tchu_tchu.dispatch_event',
    serializer=None
)

Creates a Celery task that dispatches incoming events to local handlers.

Migration from v1.x

v2.0.0 is a breaking change with a completely different architecture:

What Changed

v1.x v2.0.0
Task-based (point-to-point) Exchange-based (broadcast)
Task discovery/inspection Static queue configuration
Automatic remote task registration Manual queue bindings
register_remote_task() required Not needed (deprecated)
Slow (task inspection overhead) Fast (direct routing)

Migration Steps

  1. Update Celery Config - Add queue bindings to tchu_events exchange
  2. Create Dispatcher - Call create_topic_dispatcher(app) in your Celery app
  3. Remove register_remote_task() calls - No longer needed
  4. Test - Ensure events are received by all subscribing apps

Example Migration

Before (v1.x):

# No special Celery config needed
# Tasks auto-discovered via inspection

from tchu_tchu import subscribe, register_remote_task

@subscribe('user.created')
def handle_user_created(event):
    pass

# In remote app
register_remote_task('user.created', 'app.tasks.handle_user_created')

After (v2.0.0):

# celery.py - NEW: Configure queue bindings
from kombu import Exchange, Queue
from tchu_tchu import create_topic_dispatcher

tchu_exchange = Exchange('tchu_events', type='topic', durable=True)

app.conf.task_queues = (
    Queue('myapp_queue', exchange=tchu_exchange, routing_key='user.*'),
)

app.conf.task_routes = {
    'tchu_tchu.dispatch_event': {'queue': 'myapp_queue'},
}

dispatcher = create_topic_dispatcher(app)  # NEW

# subscribers.py - Same as before!
from tchu_tchu import subscribe

@subscribe('user.created')
def handle_user_created(event):
    pass

# No register_remote_task() needed!

Troubleshooting

Events Not Received

  1. Check Celery is running: celery -A myapp worker -l info
  2. Check queue bindings: Use RabbitMQ management UI to verify queue is bound to exchange
  3. Check routing keys: Ensure publisher routing key matches subscriber patterns
  4. Check task routes: Verify tchu_tchu.dispatch_event routes to your queue
  5. Check logs: Look for "Registered handler" and "Published message" log entries

"Received unregistered task" Error

This means Celery received a message for a task it doesn't recognize. Check:

  • create_topic_dispatcher(app) was called
  • Task routes are configured correctly
  • The dispatcher task name matches in config and code

Multiple Apps Not Receiving Events

Each app MUST have:

  1. Its own unique queue name
  2. Queue bound to the tchu_events exchange
  3. The create_topic_dispatcher() call in its Celery config
  4. Celery worker running

Performance

v2.0.0 is significantly faster than v1.x:

  • No task discovery - Static configuration means no inspection overhead
  • No queue inspection - Direct routing via AMQP
  • Parallel delivery - RabbitMQ broadcasts to all queues simultaneously

Expected latency: < 10ms (vs 500-1500ms in v1.x due to inspection)

Changelog

v2.0.0 (2025-10-26) - BREAKING CHANGE

Complete redesign using RabbitMQ topic exchanges for true broadcast messaging.

Added:

  • Topic exchange-based architecture
  • create_topic_dispatcher() for creating event dispatcher tasks
  • True broadcast support (multiple apps receive same event)
  • Kombu integration for direct exchange publishing
  • Comprehensive documentation and examples

Changed:

  • Producer now uses topic exchange instead of task discovery
  • Subscriber uses dispatcher pattern instead of individual tasks
  • Configuration now requires queue bindings in Celery config
  • Terminology: "topic" โ†’ "routing_key" throughout

Removed:

  • Task discovery/inspection logic (no longer needed)
  • register_remote_task() (deprecated, kept for compatibility)
  • RPC support (temporarily removed, will be re-added)

Performance:

  • 100x faster than v1.x (no inspection overhead)
  • Reduced latency from ~1000ms to ~10ms

v1.4.0 (2025-10-26)

  • Added generic queue routing via Celery inspect (replaced hardcoded app names)
  • Fixed cross-app communication with dynamic queue detection

v1.3.2 (2025-10-26)

  • Added queue routing based on task names (bad design, later removed)

v1.3.1 (2025-10-26)

  • Updated documentation for "Received unregistered task" errors

v1.3.0 (2025-10-26)

  • Added automatic task discovery via inspect.registered()
  • Enabled cross-app event publishing and consumption

v1.2.0 (2025-10-25)

  • Fixed context serialization with DRF serializers
  • Fixed authentication details preservation in event context

v1.1.0 (2025-10-25)

  • Added cross-app task scanning
  • Fixed task registry lookup

v1.0.3 (2025-10-25)

  • Fixed UnboundLocalError in DRF to Pydantic conversion
  • Fixed callable defaults handling

v1.0.2 (2025-10-25)

  • Fixed context handling in DRF serializers
  • Added mock request objects for event context

v1.0.1 (2025-10-25)

  • Initial release with Celery-based messaging

License

MIT License - see LICENSE file for details.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tchu_tchu-2.0.0.tar.gz (31.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tchu_tchu-2.0.0-py3-none-any.whl (36.9 kB view details)

Uploaded Python 3

File details

Details for the file tchu_tchu-2.0.0.tar.gz.

File metadata

  • Download URL: tchu_tchu-2.0.0.tar.gz
  • Upload date:
  • Size: 31.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.11.3 Darwin/24.5.0

File hashes

Hashes for tchu_tchu-2.0.0.tar.gz
Algorithm Hash digest
SHA256 f2ba587838ba92285923fd33fbbe49df96e94ba79986197f27996df0b670dbb5
MD5 1b36697b451b6ef2bbaa156b09e859b4
BLAKE2b-256 75758d3a01a672b2129f4760609757a69596f97afc7c9aa018afe9c6a4495232

See more details on using hashes here.

File details

Details for the file tchu_tchu-2.0.0-py3-none-any.whl.

File metadata

  • Download URL: tchu_tchu-2.0.0-py3-none-any.whl
  • Upload date:
  • Size: 36.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.11.3 Darwin/24.5.0

File hashes

Hashes for tchu_tchu-2.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 021a55d39f0797a452e3c1ca8c2721de68ba344a8f73ce90d5acf8e7560f38c3
MD5 d1055896d2a0103a0cfa578b6477bfc8
BLAKE2b-256 edf584070c0de06fc51efebee1bf37640499f4badadddb46af747df84c38eb97

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page