Event-driven architecture library for Python that extends Celery with event publishing/subscribing patterns
Project description
CelerySalt
A modern, event-driven architecture library for Python that extends Celery with schema-validated event publishing/subscribing patterns. Built with Pydantic for type safety and import-time schema registration.
🚀 v1.0.0 - First release! Pydantic-based event schemas, import-time registration, RPC support, and protocol compatibility with tchu-tchu.
Features
- ✨ Pydantic-Based Schemas - Type-safe event definitions with automatic validation
- 🚀 Import-Time Registration - Schemas registered at import time for early error detection
- 📡 Broadcast Events - Fire-and-forget pub/sub messaging (one event → many subscribers)
- ⚡ RPC Support - Synchronous request/response with response/error schema validation
- 🔄 Protocol Compatible - Works with existing tchu-tchu applications
- 🎯 Framework Agnostic - Core library works with any Python app (Django optional)
- 🛡️ Schema Registry - Centralized schema management (in-memory, PostgreSQL, Cloud)
- 🔁 Celery Integration - Full Celery features: retries, time limits, rate limiting, monitoring
- 📦 Simple API - Decorator-based:
@eventand@subscribe
Quick Start
Installation
pip install celery-salt
Broadcast Example
from celerysalt import event, subscribe
# Define event schema
@event("user.signup.completed")
class UserSignupCompleted:
user_id: int
email: str
company_id: int
signup_source: str = "web"
# Publish event
UserSignupCompleted.publish(
user_id=123,
email="alice@example.com",
company_id=1,
signup_source="web"
)
# Subscribe to event
@subscribe("user.signup.completed")
def send_welcome_email(data: UserSignupCompleted):
print(f"Sending welcome email to {data.email}")
RPC Example
from celerysalt import event, subscribe, RPCError
# Define RPC request/response schemas
@event("rpc.calculator.add", mode="rpc")
class CalculatorAddRequest:
a: float
b: float
@event.response("rpc.calculator.add")
class CalculatorAddResponse:
result: float
operation: str = "add"
@event.error("rpc.calculator.add")
class CalculatorAddError:
error_code: str
error_message: str
# Handler
@subscribe("rpc.calculator.add")
def handle_add(data: CalculatorAddRequest) -> CalculatorAddResponse:
return CalculatorAddResponse(result=data.a + data.b, operation="add")
# Client call
response = CalculatorAddRequest.call(a=10, b=5, timeout=10)
print(f"Result: {response.result}") # 15.0
Architecture
Publisher → RabbitMQ Exchange (tchu_events) → Subscribers
- Exchange:
tchu_events(topic exchange, protocol compatible) - Routing: Topic-based with wildcard support (
user.*,#) - Serialization: JSON with Pydantic validation
- Result Backend: Redis (required for RPC)
Documentation
- Examples - Working examples for broadcast and RPC
- Design Document - Full architecture and design
- Implementation Context - Development context
Requirements
- Python 3.10+
- Celery 5.3+
- RabbitMQ (message broker)
- Redis (optional, required for RPC)
Installation
# Basic installation
pip install celery-salt
# With Django support
pip install celery-salt[django]
# With all extras
pip install celery-salt[all]
Examples
See the examples directory for complete working examples:
- Basic Broadcast - Pub/sub messaging
- Basic RPC - Request/response pattern
Run examples with Docker Compose:
cd examples
docker-compose up -d # Starts RabbitMQ and Redis
Key Concepts
Event Schemas
Schemas are defined using Pydantic models and registered at import time:
@event("user.created")
class UserCreated:
user_id: int
email: str
created_at: datetime
Publishing Events
# Broadcast (fire-and-forget)
UserCreated.publish(user_id=123, email="user@example.com", created_at=datetime.now())
# RPC (synchronous)
response = CalculatorAddRequest.call(a=10, b=5, timeout=10)
Subscribing to Events
@subscribe("user.created")
def handle_user_created(data: UserCreated):
# Process event
pass
RPC Response Validation
@event.response("rpc.calculator.add")
class CalculatorAddResponse:
result: float
@event.error("rpc.calculator.add")
class CalculatorAddError:
error_code: str
error_message: str
Protocol Compatibility
CelerySalt maintains protocol compatibility with tchu-tchu:
- Same exchange name:
tchu_events - Same message format:
_tchu_metafield - Same routing key conventions
This allows gradual migration: apps using celery-salt can communicate with apps still using tchu-tchu.
Development
# Clone repository
git clone https://github.com/sigularusrex/celery-salt.git
cd celery-salt
# Install development dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Run examples
cd examples
docker-compose up -d
celery -A basic_broadcast.subscriber worker --loglevel=info
License
MIT License - see LICENSE file for details.
Contributing
Contributions welcome! Please open an issue or pull request on GitHub.
Links
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file celery_salt-1.2.0.tar.gz.
File metadata
- Download URL: celery_salt-1.2.0.tar.gz
- Upload date:
- Size: 37.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.1 CPython/3.11.3 Darwin/24.5.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1876e8ad23e61d011edf1692fc55c53116480f151b8e7b783f77f27845cf9c0e
|
|
| MD5 |
eb2990ed9d4e81facfa3ba2bc963d4ec
|
|
| BLAKE2b-256 |
270eb783df192ca8638b2a4bab53d5ee54492c7198526c02cbc1bc9deb4ca47f
|
File details
Details for the file celery_salt-1.2.0-py3-none-any.whl.
File metadata
- Download URL: celery_salt-1.2.0-py3-none-any.whl
- Upload date:
- Size: 49.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.1 CPython/3.11.3 Darwin/24.5.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9b07f75ec3d27db55906b991a34208b6bdff342d5914ae03298140fc0b9f2b1f
|
|
| MD5 |
5378687fec29286d093db3ec92b9a241
|
|
| BLAKE2b-256 |
32bc34ad992fd4dbbf569027b7fbd7c14f61f95aecd50b5396e0ee8fc31fbf36
|