A modern Celery-based messaging library with Pydantic serialization and Django integration
Project description
tchu-tchu
tchu-tchu is a modern, Celery-based messaging library that provides high-performance event publishing and consumption with Pydantic serialization and Django integration. It serves as a drop-in replacement for the original tchu package while leveraging Celery's robust task management system.
Features
- 🚀 High Performance: Pydantic serialization for fast data validation and serialization
- 🔄 Drop-in Replacement: Compatible API with existing tchu-based systems
- 📊 Built-in Metrics: Comprehensive metrics collection with Prometheus support
- 🏗️ Django Integration: Automatic model event publishing with decorators
- 🔍 Structured Logging: JSON-formatted logs with correlation tracking
- 🎯 Topic Patterns: Support for wildcard topic subscriptions (e.g., "user.*")
- ⚡ Parallel Processing: Multiple handlers per topic with parallel execution
- 🔒 Type Safety: Full typing support with Pydantic models
- 📈 Scalable: Built on Celery's proven distributed task system
Installation
pip install tchu-tchu
Optional Dependencies
# For Django integration
pip install tchu-tchu[django]
# For Protobuf support (future feature)
pip install tchu-tchu[protobuf]
# Install all optional dependencies
pip install tchu-tchu[all]
Quick Start
1. Basic Publishing and Subscription
from tchu_tchu import TchuClient, subscribe
# Subscribe to events
def handle_user_created(data):
print(f"User created: {data['user_id']}")
subscribe("user.created", handle_user_created)
# Publish events
client = TchuClient()
client.publish("user.created", {
"user_id": "123",
"name": "John Doe",
"email": "john@example.com"
})
2. Using with Your Existing TchuEvent Classes
from tchu_tchu import TchuEvent
from rest_framework import serializers
class UserCreatedEventRequest(serializers.Serializer):
user_id = serializers.CharField()
name = serializers.CharField()
email = serializers.EmailField()
class UserCreatedEvent(TchuEvent):
class Meta:
topic = "user.created"
request_serializer_class = UserCreatedEventRequest
# Publishing events (same API as before)
event = UserCreatedEvent()
event.serialize_request({
"user_id": "123",
"name": "John Doe",
"email": "john@example.com"
})
event.publish()
# Subscribing to events (same API as before)
def handle_user_created(event_instance):
user_id = event_instance.get("user_id")
print(f"Handling user creation: {user_id}")
UserCreatedEvent(handler=handle_user_created).subscribe()
3. Django Model Auto-Publishing
from django.db import models
from tchu_tchu.django import auto_publish
@auto_publish(
topic_prefix="myapp.users",
include_fields=["id", "username", "email", "is_active"],
publish_on=["created", "updated"]
)
class User(models.Model):
username = models.CharField(max_length=150)
email = models.EmailField()
is_active = models.BooleanField(default=True)
# Events are automatically published:
# - myapp.users.user.created (when user is created)
# - myapp.users.user.updated (when user is updated)
Advanced Usage
Multiple Handlers per Topic
from tchu_tchu import subscribe
def send_welcome_email(data):
print(f"Sending welcome email to {data['email']}")
def update_analytics(data):
print(f"Updating analytics for user {data['user_id']}")
def sync_to_crm(data):
print(f"Syncing user {data['user_id']} to CRM")
# All handlers will run in parallel when a message is published
subscribe("user.created", send_welcome_email)
subscribe("user.created", update_analytics)
subscribe("user.created", sync_to_crm)
Wildcard Topic Subscriptions
from tchu_tchu import subscribe
def handle_all_user_events(data):
print(f"User event received: {data}")
def handle_all_order_events(data):
print(f"Order event received: {data}")
# Subscribe to all user-related events
subscribe("user.*", handle_all_user_events)
# Subscribe to all order-related events
subscribe("order.*", handle_all_order_events)
RPC-Style Messaging
from tchu_tchu import TchuClient, subscribe
# Set up RPC handler
def validate_user(data):
user_id = data.get("user_id")
# Perform validation logic
return {
"valid": True,
"user_id": user_id,
"status": "active"
}
subscribe("user.validate", validate_user)
# Make RPC call
client = TchuClient()
try:
response = client.call("user.validate", {"user_id": "123"}, timeout=5)
print(f"Validation result: {response}")
except TimeoutError:
print("Validation timed out")
Django Model Mixin
from django.db import models
from tchu_tchu.django.mixins import EventPublishingMixin
class Product(EventPublishingMixin, models.Model):
name = models.CharField(max_length=200)
price = models.DecimalField(max_digits=10, decimal_places=2)
class Meta:
tchu_topic_prefix = "ecommerce.products"
tchu_publish_on = ["created", "updated", "deleted"]
tchu_include_fields = ["id", "name", "price"]
# Manual event publishing
product = Product.objects.get(id=1)
product.publish_event("price_changed", {
"old_price": "10.00",
"new_price": "12.00"
})
Metrics and Monitoring
from tchu_tchu.metrics import get_metrics_collector, MetricsReporter
from tchu_tchu.metrics.exporters import PrometheusExporter, JSONExporter
from datetime import timedelta
# Get metrics summary
collector = get_metrics_collector()
summary = collector.get_summary(time_window=timedelta(hours=1))
print(f"Messages in last hour: {summary['total_messages']}")
# Export metrics
reporter = MetricsReporter(exporters=[
PrometheusExporter("/tmp/metrics.prom"),
JSONExporter("/tmp/metrics.json")
])
reporter.export_report(time_window=timedelta(hours=24))
# Topic-specific metrics
topic_stats = collector.get_topic_stats("user.created")
print(f"User creation events: {topic_stats}")
Custom Serialization
from tchu_tchu.serializers import PydanticSerializer
from pydantic import BaseModel
from typing import Optional
class UserModel(BaseModel):
user_id: str
name: str
email: str
age: Optional[int] = None
# Use custom serializer
serializer = PydanticSerializer(UserModel)
client = TchuClient(serializer=serializer)
client.publish("user.created", {
"user_id": "123",
"name": "John Doe",
"email": "john@example.com",
"age": 30
})
Cross-App Communication
tchu-tchu uses Celery's routing system for cross-app messaging. Publisher apps need to register remote tasks as proxies.
Requirements
- Shared Celery Broker: All apps must connect to the same Redis/RabbitMQ broker
- Task Name: Know the full task name registered by the consumer app
Setup Example
Step 1: Consumer App (Scranton Service) - Register Handler:
# scranton/subscribers/information_request_subscriber.py
from tchu_tchu.events import TchuEvent
import celery
class InformationRequestPreparedEvent(TchuEvent):
class Meta:
topic = "coolset.scranton.information_request.prepared"
request_serializer_class = InformationRequestSerializer
@celery.shared_task
def execute_information_request_task(event, **kwargs):
# Handle the event
information_request_data = event.get("information_request")
serializer = InformationRequestSerializer(
data=information_request_data,
context=event.request_context
)
if serializer.is_valid():
return serializer.save()
# Subscribe - creates task: tchu_tchu.topics.coolset_scranton_information_request_prepared.InformationRequestPreparedEvent_execute_information_request_task
InformationRequestPreparedEvent(handler=execute_information_request_task).subscribe()
Step 2: Publisher App (API/Pulse Service) - Register Remote Task:
# api/apps.py or pulse/__init__.py
from tchu_tchu import register_remote_task
def ready():
# Register the remote task from Scranton service
register_remote_task(
topic="coolset.scranton.information_request.prepared",
task_name="tchu_tchu.topics.coolset_scranton_information_request_prepared.InformationRequestPreparedEvent_execute_information_request_task"
)
Step 3: Publish from Any App:
# api/views.py or pulse/views.py
from scranton.events import InformationRequestPreparedEvent
def create_information_request(request):
event = InformationRequestPreparedEvent()
event.serialize_request(
{"information_request": {"order_id": 123}},
context={"request": request}
)
event.publish() # Routes to Scranton worker via Celery!
How It Works (The Proper Celery Way)
- Consumer registers task:
subscribe()creates a Celery@shared_taskwith a predictable name - Publisher registers proxy:
register_remote_task()tells the publisher about the remote task name - Publisher uses
send_task(): Sends task by name to Celery broker - Celery routes it: Broker routes to any worker with that task registered (the consumer)
Multiple Consumers for One Event
Register handlers with different names in each app:
# Scranton app
InformationRequestPreparedEvent(handler=execute_in_scranton).subscribe()
# Creates: tchu_tchu.topics....execute_in_scranton
# Pulse app
InformationRequestPreparedEvent(handler=execute_in_pulse).subscribe()
# Creates: tchu_tchu.topics....execute_in_pulse
# Publisher app - register both
register_remote_task(topic, "tchu_tchu.topics....execute_in_scranton")
register_remote_task(topic, "tchu_tchu.topics....execute_in_pulse")
Now when you publish, BOTH apps will process the event!
Finding Task Names
Check your consumer app's Celery worker logs when it starts:
[tasks]
. tchu_tchu.topics.coolset_scranton_information_request_prepared.InformationRequestPreparedEvent_execute_information_request_task
Or use Celery inspect:
celery -A your_app inspect registered
Troubleshooting
"No handlers found for topic":
- Publisher app needs to call
register_remote_task()for each remote handler - Make sure task name matches exactly (copy from consumer logs)
Task not executing:
- Verify consumer app's Celery worker is running
- Check both apps use the same broker URL
- Confirm task name is correct
Configuration
Celery Configuration
# celery_config.py
from celery import Celery
app = Celery('myapp')
app.config_from_object({
'broker_url': 'redis://localhost:6379/0',
'result_backend': 'redis://localhost:6379/0',
'task_serializer': 'json',
'accept_content': ['json'],
'result_serializer': 'json',
'timezone': 'UTC',
'enable_utc': True,
})
# Use with tchu-tchu
from tchu_tchu import TchuClient
client = TchuClient(celery_app=app)
Django Settings
# settings.py
INSTALLED_APPS = [
# ... other apps
'tchu_tchu.django',
]
# Celery configuration
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
# tchu-tchu specific settings
TCHU_METRICS_ENABLED = True
TCHU_LOG_LEVEL = 'INFO'
Migration from Original tchu
1. Update Imports
# Before (original tchu)
from tchu import Producer, Consumer
from cs_common.events.clients.tchu_client import TchuClient
# After (tchu-tchu)
from tchu_tchu import CeleryProducer, TchuClient, subscribe
2. Replace Consumers with Subscriptions
# Before (original tchu)
def message_handler(ch, method, properties, body, is_rpc):
# Handle message
pass
consumer = Consumer(
amqp_url="amqp://localhost:5672/",
exchange="my-exchange",
routing_keys=["user.*"],
callback=message_handler
)
consumer.run()
# After (tchu-tchu)
def message_handler(data):
# Handle message (simplified signature)
pass
subscribe("user.*", message_handler)
# No need to run consumer - Celery handles it
3. Update TchuClient Usage
# Your existing TchuEvent classes work unchanged!
class MyEvent(TchuEvent):
class Meta:
topic = "my.topic"
request_serializer_class = MyRequestSerializer
# Same API
event = MyEvent()
event.serialize_request(data)
event.publish() # Now uses Celery instead of RabbitMQ directly
Performance Benefits
- Faster Serialization: Pydantic is significantly faster than DRF serializers
- Better Concurrency: Celery's worker pools handle concurrent processing
- Reduced Memory Usage: No persistent RabbitMQ connections per consumer
- Horizontal Scaling: Easy to scale by adding more Celery workers
- Built-in Retries: Celery's robust retry mechanisms
Development
Running Tests
# Install development dependencies
pip install -e .[dev]
# Run tests
pytest
# Run with coverage
pytest --cov=tchu_tchu --cov-report=html
Code Quality
# Format code
black tchu_tchu/
isort tchu_tchu/
# Lint code
flake8 tchu_tchu/
mypy tchu_tchu/
Troubleshooting
Common Issues
- No handlers registered for topic: Make sure you call
subscribe()before publishing messages - Celery workers not processing tasks: Ensure Celery workers are running with
celery -A myapp worker - Import errors: Check that optional dependencies are installed if using Django features
Debugging
# Enable debug logging
import logging
logging.basicConfig(level=logging.DEBUG)
# Check registered handlers
from tchu_tchu.subscriber import list_subscriptions
print(list_subscriptions())
# View metrics
from tchu_tchu.metrics import get_metrics_collector
collector = get_metrics_collector()
print(collector.get_summary())
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
License
This project is licensed under the MIT License - see the LICENSE file for details.
Changelog
v1.2.1
- IMPROVED:
publish()now logs warning instead of raising exception when no handlers found - Better for model signal-triggered events that may not always have handlers
- RPC
call()still raises exception (as you expect a response) - More helpful warning message explaining when this is normal behavior
v1.2.0
- PROPER CELERY IMPLEMENTATION: Cross-app messaging using
send_task() - New
register_remote_task()function for registering remote handlers - Producer now uses
send_task()instead ofapply_async()for proper cross-worker routing - Simplified architecture - no complex task discovery needed
- Publisher apps explicitly register remote tasks as proxies
- Follows Celery best practices for distributed task execution
- Comprehensive cross-app communication documentation
v1.1.0
- Initial attempt at cross-app event handling (improved in v1.2.0)
- Task discovery across apps
- Better logging for missing handlers
v1.0.3
- CRITICAL FIX: Properly handle DRF serializers with
EventAuthorizationSerializerand HiddenFields - DRF serializers now use the actual request context during publishing to populate auth fields
- Event handlers receive reconstructed context via
event.request_contextwith user/company data - Fixes
TypeError: 'NoneType' object is not subscriptablein handlers usingInformationRequestSerializer - Support for
skip_authorizationparameter passed through to DRF serializers - Hidden fields (company, user_company, user) are now properly serialized and transmitted
v1.0.2
- Context (authentication) data transmission improvements
- Authentication data automatically extracted and included in messages
- Mock request objects for handler context reconstruction
v1.0.1
- Fixed
UnboundLocalErrorin DRF to Pydantic conversion when usingAnytype - Fixed context handling for DRF serializers with callable defaults
- Improved error handling for fields that depend on request context
v1.0.0
- Initial release
- Celery-based message processing
- Pydantic serialization
- Django integration
- Metrics collection
- Drop-in replacement for original tchu
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file tchu_tchu-1.2.1.tar.gz.
File metadata
- Download URL: tchu_tchu-1.2.1.tar.gz
- Upload date:
- Size: 33.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.1 CPython/3.11.3 Darwin/24.5.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
83e362e7f0f8051ff72c92a5e4d1636ff439428335567b9000c2ce071892e591
|
|
| MD5 |
5eebac203fe7a0f9f1519ec688f47bc0
|
|
| BLAKE2b-256 |
3d6dcd491ea7a72b1c989907dd0bd2b3d394a93349bb9099f6cb0a3ccac80274
|
File details
Details for the file tchu_tchu-1.2.1-py3-none-any.whl.
File metadata
- Download URL: tchu_tchu-1.2.1-py3-none-any.whl
- Upload date:
- Size: 38.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.1 CPython/3.11.3 Darwin/24.5.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7d9d834e6d99268a76b9139d31fbcd8e34ef01968c32387e73ea043e80ccd218
|
|
| MD5 |
0c533fa3974175ee4baf89b43a6e13dd
|
|
| BLAKE2b-256 |
4ebc92b19e2ec65199c0d45fb23e4ca70158faf21172879282dde09be7049dad
|