Skip to main content

Event-driven architecture library for Python that extends Celery with event publishing/subscribing patterns

Project description

CelerySalt

A modern, event-driven architecture library for Python that extends Celery with schema-validated event publishing/subscribing patterns. Built with Pydantic for type safety and import-time schema registration.

License: MIT Python 3.10+

🚀 v1.0.0 - First release! Pydantic-based event schemas, import-time registration, RPC support, and protocol compatibility with tchu-tchu.

Features

  • Pydantic-Based Schemas - Type-safe event definitions with automatic validation
  • 🚀 Import-Time Registration - Schemas registered at import time for early error detection
  • 📡 Broadcast Events - Fire-and-forget pub/sub messaging (one event → many subscribers)
  • RPC Support - Synchronous request/response with response/error schema validation
  • 🔄 Protocol Compatible - Works with existing tchu-tchu applications
  • 🎯 Framework Agnostic - Core library works with any Python app (Django optional)
  • 🛡️ Schema Registry - Centralized schema management (in-memory, PostgreSQL, Cloud)
  • 🔁 Celery Integration - Full Celery features: retries, time limits, rate limiting, monitoring
  • 📦 Simple API - Decorator-based: @event and @subscribe

Quick Start

Installation

pip install celery-salt

Broadcast Example

from celerysalt import event, subscribe

# Define event schema
@event("user.signup.completed")
class UserSignupCompleted:
    user_id: int
    email: str
    company_id: int
    signup_source: str = "web"

# Publish event
UserSignupCompleted.publish(
    user_id=123,
    email="alice@example.com",
    company_id=1,
    signup_source="web"
)

# Subscribe to event
@subscribe("user.signup.completed")
def send_welcome_email(data: UserSignupCompleted):
    print(f"Sending welcome email to {data.email}")

RPC Example

from celerysalt import event, subscribe, RPCError

# Define RPC request/response schemas
@event("rpc.calculator.add", mode="rpc")
class CalculatorAddRequest:
    a: float
    b: float

@event.response("rpc.calculator.add")
class CalculatorAddResponse:
    result: float
    operation: str = "add"

@event.error("rpc.calculator.add")
class CalculatorAddError:
    error_code: str
    error_message: str

# Handler
@subscribe("rpc.calculator.add")
def handle_add(data: CalculatorAddRequest) -> CalculatorAddResponse:
    return CalculatorAddResponse(result=data.a + data.b, operation="add")

# Client call
response = CalculatorAddRequest.call(a=10, b=5, timeout=10)
print(f"Result: {response.result}")  # 15.0

Architecture

Publisher → RabbitMQ Exchange (tchu_events) → Subscribers
  • Exchange: tchu_events (topic exchange, protocol compatible)
  • Routing: Topic-based with wildcard support (user.*, #)
  • Serialization: JSON with Pydantic validation
  • Result Backend: Redis (required for RPC)

Documentation

Requirements

  • Python 3.10+
  • Celery 5.3+
  • RabbitMQ (message broker)
  • Redis (optional, required for RPC)

Installation

# Basic installation
pip install celery-salt

# With Django support
pip install celery-salt[django]

# With all extras
pip install celery-salt[all]

Examples

See the examples directory for complete working examples:

Run examples with Docker Compose:

cd examples
docker-compose up -d  # Starts RabbitMQ and Redis

Key Concepts

Event Schemas

Schemas are defined using Pydantic models and registered at import time:

@event("user.created")
class UserCreated:
    user_id: int
    email: str
    created_at: datetime

Publishing Events

# Broadcast (fire-and-forget)
UserCreated.publish(user_id=123, email="user@example.com", created_at=datetime.now())

# RPC (synchronous)
response = CalculatorAddRequest.call(a=10, b=5, timeout=10)

Subscribing to Events

@subscribe("user.created")
def handle_user_created(data: UserCreated):
    # Process event
    pass

RPC Response Validation

@event.response("rpc.calculator.add")
class CalculatorAddResponse:
    result: float

@event.error("rpc.calculator.add")
class CalculatorAddError:
    error_code: str
    error_message: str

Protocol Compatibility

CelerySalt maintains protocol compatibility with tchu-tchu:

  • Same exchange name: tchu_events
  • Same message format: _tchu_meta field
  • Same routing key conventions

This allows gradual migration: apps using celery-salt can communicate with apps still using tchu-tchu.

Development

# Clone repository
git clone https://github.com/sigularusrex/celery-salt.git
cd celery-salt

# Install development dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Run examples
cd examples
docker-compose up -d
celery -A basic_broadcast.subscriber worker --loglevel=info

License

MIT License - see LICENSE file for details.

Contributing

Contributions welcome! Please open an issue or pull request on GitHub.

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celery_salt-1.1.0.tar.gz (37.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

celery_salt-1.1.0-py3-none-any.whl (49.2 kB view details)

Uploaded Python 3

File details

Details for the file celery_salt-1.1.0.tar.gz.

File metadata

  • Download URL: celery_salt-1.1.0.tar.gz
  • Upload date:
  • Size: 37.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.1 CPython/3.11.3 Darwin/24.5.0

File hashes

Hashes for celery_salt-1.1.0.tar.gz
Algorithm Hash digest
SHA256 b8aaeb00038e5b8843593a818bcd24642fe3cde04d9b7634cf858f5abb48f25d
MD5 901a77249c84138f04a0d54079dda762
BLAKE2b-256 d78aa36c7a2eaadd01e2863820cde68241afc0463be4eaa626ca0a8b401cda37

See more details on using hashes here.

File details

Details for the file celery_salt-1.1.0-py3-none-any.whl.

File metadata

  • Download URL: celery_salt-1.1.0-py3-none-any.whl
  • Upload date:
  • Size: 49.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.1 CPython/3.11.3 Darwin/24.5.0

File hashes

Hashes for celery_salt-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 bc1ab04dfbc6557e2f8e68dee4c5ed74d5085a4911679b638ef0c1a4e6abaae7
MD5 7181ab72d62f7e3705969a34482bb2ce
BLAKE2b-256 55cc2a97dfa82a99be52fa76711f700d9b7a8d8f98bc48d0c2417729b2c2854b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page