Event-driven architecture library for Python that extends Celery with event publishing/subscribing patterns
Project description
CelerySalt
A schema-driven event API on top of Celery: publish/subscribe (broadcast) and RPC with Pydantic validation and a schema registry.
Features
- Pydantic schemas: type-checked event payloads with validation.
- Broadcast: fire-and-forget pub/sub (one event to many subscribers).
- RPC: request/response with response and error schema validation.
- Schema registry: schema registration and lookup by topic/version.
- Versioning: topic stays stable; versions are metadata.
- Django integration: optional helpers for wiring queues and module discovery.
- Protocol compatibility: interoperates with
tchu-tchu(tchu_eventsexchange and_tchu_meta).
Quick Start
Installation
pip install celery-salt
Broadcast Example
from celery_salt import event, subscribe
# Define event schema
@event("user.signup.completed")
class UserSignupCompleted:
user_id: int
email: str
company_id: int
signup_source: str = "web"
# Publish event
UserSignupCompleted.publish(
user_id=123,
email="alice@example.com",
company_id=1,
signup_source="web"
)
# Subscribe to event
@subscribe("user.signup.completed")
def send_welcome_email(data: UserSignupCompleted):
print(f"Sending welcome email to {data.email}")
RPC Example
from celery_salt import event, subscribe, RPCError
# Define RPC request/response schemas
@event("rpc.calculator.add", mode="rpc")
class CalculatorAddRequest:
a: float
b: float
@event.response("rpc.calculator.add")
class CalculatorAddResponse:
result: float
operation: str = "add"
@event.error("rpc.calculator.add")
class CalculatorAddError:
error_code: str
error_message: str
# Handler
@subscribe("rpc.calculator.add")
def handle_add(data: CalculatorAddRequest) -> CalculatorAddResponse:
return CalculatorAddResponse(result=data.a + data.b, operation="add")
# Client call (returns SaltResponse: .event, .data, .payload, attribute access)
response = CalculatorAddRequest.call(a=10, b=5, timeout=10)
print(f"Result: {response.result}") # 15.0
# For DRF/JsonResponse: use response.payload (JSON-serializable dict/list)
Architecture
Publisher → RabbitMQ Exchange (tchu_events) → Subscribers
- Exchange:
tchu_events(topic exchange, protocol compatible) - Routing: Topic-based with wildcard support (
user.*,#) - Serialization: JSON with Pydantic validation;
datetime,UUID,Decimal, etc. are normalized to JSON-safe types automatically - Result Backend: Redis (required for RPC)
Documentation
- Examples: ./examples/
- Docs: ./docs/
- Unified API (SaltEvent / SaltResponse): ./docs/EVENT_CLASS_UNIFIED_API.md
- Typing subscriber payloads: ./docs/TYPING_SUBSCRIBER_EVENTS.md
- Celery config (Django): ./docs/CELERY_CONFIG_TEMPLATE.md
Requirements
- Python 3.10+
- Celery 5.3+
- RabbitMQ (message broker)
- Redis (optional, required for RPC)
Django is optional. The core library is Celery + Pydantic; events and validation do not require Django. Install with pip install celery-salt[django] only if you use the optional Django helpers below.
Django (optional)
If you use Django:
-
Wire the Celery app
Add'celery_salt.django'toINSTALLED_APPSand setCELERY_APP = "myproject.celery:app"in settings. Then.publish()and.call()work from views with no extra code. -
Configure the worker
In yourcelery.py, callsetup_salt_queue(app, queue_name="my_queue"). If your app usesCelery(..., include=[...]), that same list is used for subscriber modules; otherwise passsubscriber_modules=[...]. -
Optional: auto-publish on model save/delete
Use the@auto_publishdecorator on a Django model to publish events on create/update/delete.
Examples
See the examples directory for complete working examples:
- Basic Broadcast - Pub/sub messaging
- Basic RPC - Request/response pattern
Run examples with Docker Compose:
cd examples
docker-compose up -d # Starts RabbitMQ and Redis
Key Concepts
Event Schemas
Schemas are defined using Pydantic models and registered at import time:
@event("user.created")
class UserCreated:
user_id: int
email: str
created_at: datetime
Publishing Events
# Broadcast (fire-and-forget)
UserCreated.publish(user_id=123, email="user@example.com", created_at=datetime.now())
# RPC (synchronous)
response = CalculatorAddRequest.call(a=10, b=5, timeout=10)
Subscribing to Events
Handlers can use Celery task options (priority, retries, time limits, etc.) via **celery_options:
@subscribe("user.created", priority=5, autoretry_for=(Exception,), max_retries=3)
def handle_user_created(data: UserCreated):
# Process event
pass
For SaltEvent class-based handlers, subscribe with the event class to receive a full event instance and use event.respond() for RPC:
@subscribe(MyRpcEvent)
def handle_my_rpc(evt: MyRpcEvent):
# evt is MyRpcEvent; return validated response via event.respond()
return evt.respond(serializer.data) # or evt.respond(key=value, ...)
RPC Response Validation
@event.response("rpc.calculator.add")
class CalculatorAddResponse:
result: float
@event.error("rpc.calculator.add")
class CalculatorAddError:
error_code: str
error_message: str
Protocol Compatibility
CelerySalt maintains protocol compatibility with tchu-tchu:
- Same exchange name:
tchu_events - Same message format:
_tchu_metafield - Same routing key conventions
This allows gradual migration: apps using celery-salt can communicate with apps still using tchu-tchu.
Development
git clone https://github.com/sigularusrex/celery-salt.git
cd celery-salt
# tests
python -m pytest
License
MIT License - see LICENSE file for details.
Contributing
Please open an issue or pull request on GitHub.
Links
- GitHub:
https://github.com/sigularusrex/celery-salt - Issues:
https://github.com/sigularusrex/celery-salt/issues
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file celery_salt-1.5.10.tar.gz.
File metadata
- Download URL: celery_salt-1.5.10.tar.gz
- Upload date:
- Size: 41.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.2 CPython/3.11.3 Darwin/24.5.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0fbf1430dde09a83fbdb453c08dd181e598668b22f6b5613204745ececc2136d
|
|
| MD5 |
968253e56d2ba1d8f3eb631d081cc6b1
|
|
| BLAKE2b-256 |
18a3ed8c239db95e17ab6bcbe2cac201cf3fa174d20f764234ff26dd4d6fad29
|
File details
Details for the file celery_salt-1.5.10-py3-none-any.whl.
File metadata
- Download URL: celery_salt-1.5.10-py3-none-any.whl
- Upload date:
- Size: 52.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.2 CPython/3.11.3 Darwin/24.5.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
707b31c8728577ab3fc804ab94f954f1b79a646245b00ddcdafc7a34a7578279
|
|
| MD5 |
1a3018a2f5db0ade7c987ef290d00d84
|
|
| BLAKE2b-256 |
161763adf9a236f35c9a7ba41d81bdb31634deb81218bb4a718fa9b8c99662e9
|