Skip to main content

A modern Celery-based messaging library with Pydantic serialization and Django integration

Project description

tchu-tchu

tchu-tchu is a modern, Celery-based messaging library that provides high-performance event publishing and consumption with Pydantic serialization and Django integration. It serves as a drop-in replacement for the original tchu package while leveraging Celery's robust task management system.

License: MIT Python 3.8+

Features

  • 🚀 High Performance: Pydantic serialization for fast data validation and serialization
  • 🔄 Drop-in Replacement: Compatible API with existing tchu-based systems
  • 📊 Built-in Metrics: Comprehensive metrics collection with Prometheus support
  • 🏗️ Django Integration: Automatic model event publishing with decorators
  • 🔍 Structured Logging: JSON-formatted logs with correlation tracking
  • 🎯 Topic Patterns: Support for wildcard topic subscriptions (e.g., "user.*")
  • ⚡ Parallel Processing: Multiple handlers per topic with parallel execution
  • 🔒 Type Safety: Full typing support with Pydantic models
  • 📈 Scalable: Built on Celery's proven distributed task system

Installation

pip install tchu-tchu

Optional Dependencies

# For Django integration
pip install tchu-tchu[django]

# For Protobuf support (future feature)
pip install tchu-tchu[protobuf]

# Install all optional dependencies
pip install tchu-tchu[all]

Quick Start

1. Basic Publishing and Subscription

from tchu_tchu import TchuClient, subscribe

# Subscribe to events
def handle_user_created(data):
    print(f"User created: {data['user_id']}")

subscribe("user.created", handle_user_created)

# Publish events
client = TchuClient()
client.publish("user.created", {
    "user_id": "123",
    "name": "John Doe",
    "email": "john@example.com"
})

2. Using with Your Existing TchuEvent Classes

from tchu_tchu import TchuEvent
from rest_framework import serializers

class UserCreatedEventRequest(serializers.Serializer):
    user_id = serializers.CharField()
    name = serializers.CharField()
    email = serializers.EmailField()

class UserCreatedEvent(TchuEvent):
    class Meta:
        topic = "user.created"
        request_serializer_class = UserCreatedEventRequest

# Publishing events (same API as before)
event = UserCreatedEvent()
event.serialize_request({
    "user_id": "123",
    "name": "John Doe",
    "email": "john@example.com"
})
event.publish()

# Subscribing to events (same API as before)
def handle_user_created(event_instance):
    user_id = event_instance.get("user_id")
    print(f"Handling user creation: {user_id}")

UserCreatedEvent(handler=handle_user_created).subscribe()

3. Django Model Auto-Publishing

from django.db import models
from tchu_tchu.django import auto_publish

@auto_publish(
    topic_prefix="myapp.users",
    include_fields=["id", "username", "email", "is_active"],
    publish_on=["created", "updated"]
)
class User(models.Model):
    username = models.CharField(max_length=150)
    email = models.EmailField()
    is_active = models.BooleanField(default=True)

# Events are automatically published:
# - myapp.users.user.created (when user is created)
# - myapp.users.user.updated (when user is updated)

Advanced Usage

Multiple Handlers per Topic

from tchu_tchu import subscribe

def send_welcome_email(data):
    print(f"Sending welcome email to {data['email']}")

def update_analytics(data):
    print(f"Updating analytics for user {data['user_id']}")

def sync_to_crm(data):
    print(f"Syncing user {data['user_id']} to CRM")

# All handlers will run in parallel when a message is published
subscribe("user.created", send_welcome_email)
subscribe("user.created", update_analytics)
subscribe("user.created", sync_to_crm)

Wildcard Topic Subscriptions

from tchu_tchu import subscribe

def handle_all_user_events(data):
    print(f"User event received: {data}")

def handle_all_order_events(data):
    print(f"Order event received: {data}")

# Subscribe to all user-related events
subscribe("user.*", handle_all_user_events)

# Subscribe to all order-related events  
subscribe("order.*", handle_all_order_events)

RPC-Style Messaging

from tchu_tchu import TchuClient, subscribe

# Set up RPC handler
def validate_user(data):
    user_id = data.get("user_id")
    # Perform validation logic
    return {
        "valid": True,
        "user_id": user_id,
        "status": "active"
    }

subscribe("user.validate", validate_user)

# Make RPC call
client = TchuClient()
try:
    response = client.call("user.validate", {"user_id": "123"}, timeout=5)
    print(f"Validation result: {response}")
except TimeoutError:
    print("Validation timed out")

Django Model Mixin

from django.db import models
from tchu_tchu.django.mixins import EventPublishingMixin

class Product(EventPublishingMixin, models.Model):
    name = models.CharField(max_length=200)
    price = models.DecimalField(max_digits=10, decimal_places=2)
    
    class Meta:
        tchu_topic_prefix = "ecommerce.products"
        tchu_publish_on = ["created", "updated", "deleted"]
        tchu_include_fields = ["id", "name", "price"]

# Manual event publishing
product = Product.objects.get(id=1)
product.publish_event("price_changed", {
    "old_price": "10.00",
    "new_price": "12.00"
})

Metrics and Monitoring

from tchu_tchu.metrics import get_metrics_collector, MetricsReporter
from tchu_tchu.metrics.exporters import PrometheusExporter, JSONExporter
from datetime import timedelta

# Get metrics summary
collector = get_metrics_collector()
summary = collector.get_summary(time_window=timedelta(hours=1))
print(f"Messages in last hour: {summary['total_messages']}")

# Export metrics
reporter = MetricsReporter(exporters=[
    PrometheusExporter("/tmp/metrics.prom"),
    JSONExporter("/tmp/metrics.json")
])
reporter.export_report(time_window=timedelta(hours=24))

# Topic-specific metrics
topic_stats = collector.get_topic_stats("user.created")
print(f"User creation events: {topic_stats}")

Custom Serialization

from tchu_tchu.serializers import PydanticSerializer
from pydantic import BaseModel
from typing import Optional

class UserModel(BaseModel):
    user_id: str
    name: str
    email: str
    age: Optional[int] = None

# Use custom serializer
serializer = PydanticSerializer(UserModel)
client = TchuClient(serializer=serializer)

client.publish("user.created", {
    "user_id": "123",
    "name": "John Doe", 
    "email": "john@example.com",
    "age": 30
})

Cross-App Communication

tchu-tchu supports publishing events from one app and consuming them in another. Here's what you need to know:

Requirements

  1. Shared Celery Broker: All apps must connect to the same Redis/RabbitMQ broker
  2. Same Celery App Name: Not required, but recommended for consistency
  3. Task Discovery: Consumer apps must be running before publishing (so tasks are registered)

Setup Example

App A (Consumer - Scranton Service):

# scranton/celery.py
from celery import Celery

app = Celery('greencast')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

# scranton/subscribers/information_request_subscriber.py
from tchu_tchu.events import TchuEvent

class InformationRequestPreparedEvent(TchuEvent):
    class Meta:
        topic = "coolset.scranton.information_request.prepared"
        request_serializer_class = InformationRequestSerializer

@celery.shared_task
def execute_information_request_task(event, **kwargs):
    # Handle the event
    pass

# Register the handler
InformationRequestPreparedEvent(handler=execute_information_request_task).subscribe()

App B (Publisher - API Service):

# api/views.py
from scranton.events import InformationRequestPreparedEvent

def create_information_request(request):
    # Publish event - will be handled by Scranton service
    event = InformationRequestPreparedEvent()
    event.serialize_request(
        {"information_request": {"order_id": 123}},
        context={"request": request}
    )
    event.publish()  # Scranton service will receive this!

How It Works

  1. Consumer App Starts: Registers Celery tasks with predictable names like:

    tchu_tchu.topics.coolset_scranton_information_request_prepared.execute_information_request_task
    
  2. Publisher Discovers Tasks: When publishing, checks Celery's task registry for matching tasks across all connected apps

  3. Message Routing: Celery routes the task to the appropriate worker (could be same or different app)

Troubleshooting

"No handlers found for topic" warning:

  • Make sure the consumer app is running and has registered its handlers
  • Check that both apps connect to the same Celery broker
  • Verify the topic names match exactly between publisher and subscriber

Tasks not being discovered:

  • Ensure consumer app's Celery workers are running: celery -A myapp worker
  • Check that subscribe() was called during app startup (not just defined)
  • Verify Celery broker URL is the same across apps

Configuration

Celery Configuration

# celery_config.py
from celery import Celery

app = Celery('myapp')
app.config_from_object({
    'broker_url': 'redis://localhost:6379/0',
    'result_backend': 'redis://localhost:6379/0',
    'task_serializer': 'json',
    'accept_content': ['json'],
    'result_serializer': 'json',
    'timezone': 'UTC',
    'enable_utc': True,
})

# Use with tchu-tchu
from tchu_tchu import TchuClient
client = TchuClient(celery_app=app)

Django Settings

# settings.py
INSTALLED_APPS = [
    # ... other apps
    'tchu_tchu.django',
]

# Celery configuration
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

# tchu-tchu specific settings
TCHU_METRICS_ENABLED = True
TCHU_LOG_LEVEL = 'INFO'

Migration from Original tchu

1. Update Imports

# Before (original tchu)
from tchu import Producer, Consumer
from cs_common.events.clients.tchu_client import TchuClient

# After (tchu-tchu)
from tchu_tchu import CeleryProducer, TchuClient, subscribe

2. Replace Consumers with Subscriptions

# Before (original tchu)
def message_handler(ch, method, properties, body, is_rpc):
    # Handle message
    pass

consumer = Consumer(
    amqp_url="amqp://localhost:5672/",
    exchange="my-exchange",
    routing_keys=["user.*"],
    callback=message_handler
)
consumer.run()

# After (tchu-tchu)
def message_handler(data):
    # Handle message (simplified signature)
    pass

subscribe("user.*", message_handler)
# No need to run consumer - Celery handles it

3. Update TchuClient Usage

# Your existing TchuEvent classes work unchanged!
class MyEvent(TchuEvent):
    class Meta:
        topic = "my.topic"
        request_serializer_class = MyRequestSerializer

# Same API
event = MyEvent()
event.serialize_request(data)
event.publish()  # Now uses Celery instead of RabbitMQ directly

Performance Benefits

  • Faster Serialization: Pydantic is significantly faster than DRF serializers
  • Better Concurrency: Celery's worker pools handle concurrent processing
  • Reduced Memory Usage: No persistent RabbitMQ connections per consumer
  • Horizontal Scaling: Easy to scale by adding more Celery workers
  • Built-in Retries: Celery's robust retry mechanisms

Development

Running Tests

# Install development dependencies
pip install -e .[dev]

# Run tests
pytest

# Run with coverage
pytest --cov=tchu_tchu --cov-report=html

Code Quality

# Format code
black tchu_tchu/
isort tchu_tchu/

# Lint code
flake8 tchu_tchu/
mypy tchu_tchu/

Troubleshooting

Common Issues

  1. No handlers registered for topic: Make sure you call subscribe() before publishing messages
  2. Celery workers not processing tasks: Ensure Celery workers are running with celery -A myapp worker
  3. Import errors: Check that optional dependencies are installed if using Django features

Debugging

# Enable debug logging
import logging
logging.basicConfig(level=logging.DEBUG)

# Check registered handlers
from tchu_tchu.subscriber import list_subscriptions
print(list_subscriptions())

# View metrics
from tchu_tchu.metrics import get_metrics_collector
collector = get_metrics_collector()
print(collector.get_summary())

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Changelog

v1.1.0

  • MAJOR FIX: Cross-app event handling now works correctly
  • Producer now discovers Celery tasks across all apps in the cluster, not just local registry
  • Events published from App A can now trigger handlers registered in App B
  • Uses Celery's task registry for task discovery instead of in-memory registry only
  • Better logging when no handlers are found for a topic
  • Compatible with distributed microservices architecture

v1.0.3

  • CRITICAL FIX: Properly handle DRF serializers with EventAuthorizationSerializer and HiddenFields
  • DRF serializers now use the actual request context during publishing to populate auth fields
  • Event handlers receive reconstructed context via event.request_context with user/company data
  • Fixes TypeError: 'NoneType' object is not subscriptable in handlers using InformationRequestSerializer
  • Support for skip_authorization parameter passed through to DRF serializers
  • Hidden fields (company, user_company, user) are now properly serialized and transmitted

v1.0.2

  • Context (authentication) data transmission improvements
  • Authentication data automatically extracted and included in messages
  • Mock request objects for handler context reconstruction

v1.0.1

  • Fixed UnboundLocalError in DRF to Pydantic conversion when using Any type
  • Fixed context handling for DRF serializers with callable defaults
  • Improved error handling for fields that depend on request context

v1.0.0

  • Initial release
  • Celery-based message processing
  • Pydantic serialization
  • Django integration
  • Metrics collection
  • Drop-in replacement for original tchu

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tchu_tchu-1.1.0.tar.gz (32.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tchu_tchu-1.1.0-py3-none-any.whl (37.6 kB view details)

Uploaded Python 3

File details

Details for the file tchu_tchu-1.1.0.tar.gz.

File metadata

  • Download URL: tchu_tchu-1.1.0.tar.gz
  • Upload date:
  • Size: 32.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.11.3 Darwin/24.5.0

File hashes

Hashes for tchu_tchu-1.1.0.tar.gz
Algorithm Hash digest
SHA256 cdd35b2832e57bc3462bfd9268e2720b82a395c26aca43dd89454bc90ce5224f
MD5 79a0dc74e43a4f4a8608d3f377695fd5
BLAKE2b-256 627bb1340cc323248219ce394195a4dc728830fb32afd6c80522749bc7119f9c

See more details on using hashes here.

File details

Details for the file tchu_tchu-1.1.0-py3-none-any.whl.

File metadata

  • Download URL: tchu_tchu-1.1.0-py3-none-any.whl
  • Upload date:
  • Size: 37.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.11.3 Darwin/24.5.0

File hashes

Hashes for tchu_tchu-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 349e18cf14260e5d85bd41178ed0204f567f08a212a75b5d7ba6d89e56842061
MD5 4861699d0a07e913879e9b75bc273d06
BLAKE2b-256 e7c349e1f6ca3c79c5688bda485164e48c4d57cab33d6f2b4eb46c3e72997ea4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page