Skip to main content

Event-driven architecture library for Python that extends Celery with event publishing/subscribing patterns

Project description

CelerySalt

A schema-driven event API on top of Celery: publish/subscribe (broadcast) and RPC with Pydantic validation and a schema registry.

License: MIT Python 3.10+

Features

  • Pydantic schemas: type-checked event payloads with validation.
  • Broadcast: fire-and-forget pub/sub (one event to many subscribers).
  • RPC: request/response with response and error schema validation.
  • Schema registry: schema registration and lookup by topic/version.
  • Versioning: topic stays stable; versions are metadata.
  • Django integration: optional helpers for wiring queues and module discovery.
  • Protocol compatibility: interoperates with tchu-tchu (tchu_events exchange and _tchu_meta).

Quick Start

Installation

pip install celery-salt

Broadcast Example

from celery_salt import event, subscribe

# Define event schema
@event("user.signup.completed")
class UserSignupCompleted:
    user_id: int
    email: str
    company_id: int
    signup_source: str = "web"

# Publish event
UserSignupCompleted.publish(
    user_id=123,
    email="alice@example.com",
    company_id=1,
    signup_source="web"
)

# Subscribe to event
@subscribe("user.signup.completed")
def send_welcome_email(data: UserSignupCompleted):
    print(f"Sending welcome email to {data.email}")

RPC Example

from celery_salt import event, subscribe, RPCError

# Define RPC request/response schemas
@event("rpc.calculator.add", mode="rpc")
class CalculatorAddRequest:
    a: float
    b: float

@event.response("rpc.calculator.add")
class CalculatorAddResponse:
    result: float
    operation: str = "add"

@event.error("rpc.calculator.add")
class CalculatorAddError:
    error_code: str
    error_message: str

# Handler
@subscribe("rpc.calculator.add")
def handle_add(data: CalculatorAddRequest) -> CalculatorAddResponse:
    return CalculatorAddResponse(result=data.a + data.b, operation="add")

# Client call (returns SaltResponse: .event, .data, .payload, attribute access)
response = CalculatorAddRequest.call(a=10, b=5, timeout=10)
print(f"Result: {response.result}")  # 15.0
# For DRF/JsonResponse: use response.payload (JSON-serializable dict/list)

Architecture

Publisher → RabbitMQ Exchange (tchu_events) → Subscribers
  • Exchange: tchu_events (topic exchange, protocol compatible)
  • Routing: Topic-based with wildcard support (user.*, #)
  • Serialization: JSON with Pydantic validation; datetime, UUID, Decimal, etc. are normalized to JSON-safe types automatically
  • Result Backend: Redis (required for RPC)

Documentation

Requirements

  • Python 3.10+
  • Celery 5.3+
  • RabbitMQ (message broker)
  • Redis (optional, required for RPC)

Django is optional. The core library is Celery + Pydantic; events and validation do not require Django. Install with pip install celery-salt[django] only if you use the optional Django helpers below.

Django (optional)

If you use Django:

  1. Wire the Celery app
    Add 'celery_salt.django' to INSTALLED_APPS and set CELERY_APP = "myproject.celery:app" in settings. Then .publish() and .call() work from views with no extra code.

  2. Configure the worker
    In your celery.py, call setup_salt_queue(app, queue_name="my_queue"). If your app uses Celery(..., include=[...]), that same list is used for subscriber modules; otherwise pass subscriber_modules=[...].

  3. Optional: auto-publish on model save/delete
    Use the @auto_publish decorator on a Django model to publish events on create/update/delete.

Examples

See the examples directory for complete working examples:

Run examples with Docker Compose:

cd examples
docker-compose up -d  # Starts RabbitMQ and Redis

Key Concepts

Event Schemas

Schemas are defined using Pydantic models and registered at import time:

@event("user.created")
class UserCreated:
    user_id: int
    email: str
    created_at: datetime

Publishing Events

# Broadcast (fire-and-forget)
UserCreated.publish(user_id=123, email="user@example.com", created_at=datetime.now())

# RPC (synchronous)
response = CalculatorAddRequest.call(a=10, b=5, timeout=10)

Subscribing to Events

Handlers can use Celery task options (priority, retries, time limits, etc.) via **celery_options:

@subscribe("user.created", priority=5, autoretry_for=(Exception,), max_retries=3)
def handle_user_created(data: UserCreated):
    # Process event
    pass

For SaltEvent class-based handlers, subscribe with the event class to receive a full event instance and use event.respond() for RPC:

@subscribe(MyRpcEvent)
def handle_my_rpc(evt: MyRpcEvent):
    # evt is MyRpcEvent; return validated response via event.respond()
    return evt.respond(serializer.data)  # or evt.respond(key=value, ...)

RPC Response Validation

@event.response("rpc.calculator.add")
class CalculatorAddResponse:
    result: float

@event.error("rpc.calculator.add")
class CalculatorAddError:
    error_code: str
    error_message: str

Protocol Compatibility

CelerySalt maintains protocol compatibility with tchu-tchu:

  • Same exchange name: tchu_events
  • Same message format: _tchu_meta field
  • Same routing key conventions

This allows gradual migration: apps using celery-salt can communicate with apps still using tchu-tchu.

Development

git clone https://github.com/sigularusrex/celery-salt.git
cd celery-salt

# tests
python -m pytest

License

MIT License - see LICENSE file for details.

Contributing

Please open an issue or pull request on GitHub.

Links

  • GitHub: https://github.com/sigularusrex/celery-salt
  • Issues: https://github.com/sigularusrex/celery-salt/issues

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celery_salt-1.5.14.tar.gz (42.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

celery_salt-1.5.14-py3-none-any.whl (53.6 kB view details)

Uploaded Python 3

File details

Details for the file celery_salt-1.5.14.tar.gz.

File metadata

  • Download URL: celery_salt-1.5.14.tar.gz
  • Upload date:
  • Size: 42.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.2 CPython/3.11.3 Darwin/25.3.0

File hashes

Hashes for celery_salt-1.5.14.tar.gz
Algorithm Hash digest
SHA256 6789fbeefc6623a2e30c6250c8b5f7178df3aeea897674744c7125afe5153a1c
MD5 99b4d25dcb00341338ca449e26e49919
BLAKE2b-256 6f66efb7c43991bff29183fdcfbb778fa03a4bc930ea15bbc205c7625ec7ac41

See more details on using hashes here.

File details

Details for the file celery_salt-1.5.14-py3-none-any.whl.

File metadata

  • Download URL: celery_salt-1.5.14-py3-none-any.whl
  • Upload date:
  • Size: 53.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.2 CPython/3.11.3 Darwin/25.3.0

File hashes

Hashes for celery_salt-1.5.14-py3-none-any.whl
Algorithm Hash digest
SHA256 b2a6bfb63796380ed7e54cc184fb79245e58fcc63a0cf373e9a7335d11204463
MD5 8a6c25cfd6f3e3a7867236bd3f28bea9
BLAKE2b-256 a7d799512ff6fe4bf580cbc044eb82cbe0165eb0db812aa39a51a8f3f5cee947

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page