Skip to main content

Domain Event Message Infrastructure

Project description

eventable

Domain event infrastructure for Python microservices.

Provides the building blocks to implement domain events following Domain-Driven Design: aggregate roots that collect events, an in-process dispatcher, and a RabbitMQ transport — with an optional plug-and-play FastAPI integration.


Features

  • Frozen, serializable DomainEvent dataclasses
  • AggregateRoot (Pydantic) that accumulates and releases domain events
  • EventDispatcher — runs in-process handlers first, then publishes to the broker
  • RabbitMQ publisher & subscriber backed by a durable topic exchange
  • FastAPI plugin: one-line lifespan setup + pure-ASGI middleware that dispatches events after every successful request without swallowing exceptions

Installation

pip install eventable
# or with uv
uv add eventable

Requirements: Python 3.12+, a running RabbitMQ broker (for the infrastructure layer).


Core concepts

Request
  └─ AggregateRoot.add_domain_event(event)   # automatically pushed to context collector
       └─ EventCollector (ContextVar)         # active for the lifetime of the request
            └─ dispatch loop (middleware)     # runs until no new events remain
                 └─ EventDispatcher.dispatch(event)
                      ├─ in-process handlers  # same bounded context (may fire more events)
                      └─ RabbitMQPublisher    # cross-context via broker

The collector is a context variable — it is implicitly active for any code that runs inside the request, including in-process handlers. When a handler modifies an aggregate that fires its own events, those events are picked up in the next pass of the dispatch loop automatically.


Quick start

1. Define a domain event

from dataclasses import dataclass
from uuid import UUID
from eventable.domain_event import DomainEvent

@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
    order_id: UUID
    total: str

2. Define an aggregate root

from uuid import uuid4
from datetime import datetime
from eventable.aggregate_root import AggregateRoot
from eventable.domain_event import generate_event_id

class Order(AggregateRoot):
    order_id: UUID
    total: str

    def place(self) -> None:
        self.add_domain_event(
            OrderPlaced(
                event_id=generate_event_id(),
                occurred_at=datetime.now(),
                order_id=self.order_id,
                total=self.total,
            )
        )

3. Register in-process handlers

Edit event_handlers.py (generated alongside your app):

from eventable.event_managment.event_dispatcher import EventDispatcher
from myapp.events import OrderPlaced

def handle_order_placed(event: OrderPlaced) -> None:
    print(f"Order {event.order_id} placed — sending confirmation email")

def register_handlers(dispatcher: EventDispatcher) -> None:
    dispatcher.register(OrderPlaced, handle_order_placed)

FastAPI integration

Lifespan setup

Eventable connects to RabbitMQ on startup, registers your in-process handlers, and disconnects cleanly on shutdown. Wire it into FastAPI's async lifespan:

from contextlib import asynccontextmanager
from fastapi import FastAPI
from eventable.infrastructure.fastapi.eventable import Eventable, EventableSettings
from eventable.infrastructure.fastapi.middleware import DomainEventMiddleware

settings = EventableSettings(
    rabbit_url="amqp://guest:guest@localhost/",
    exchange_name="domain_events",   # default
)
eventable = Eventable(settings)

@asynccontextmanager
async def lifespan(app: FastAPI):
    await eventable.startup(app)   # connects publisher, wires dispatcher
    yield
    await eventable.shutdown()     # closes RabbitMQ connection

app = FastAPI(lifespan=lifespan)
app.add_middleware(DomainEventMiddleware)

After startup, app.state.publisher and app.state.dispatcher are available throughout the application.

Middleware

DomainEventMiddleware is a pure ASGI middleware (not BaseHTTPMiddleware) so exceptions from your route handlers propagate cleanly to FastAPI's exception handlers — nothing is swallowed.

Per-request lifecycle:

  1. Creates a fresh EventCollector and sets it as the active context collector.
  2. Runs the route handler normally — any add_domain_event call anywhere in the call stack automatically registers the event.
  3. After the response is sent, if the status code is < 400, dispatches all collected events. If a handler fires new events (by modifying aggregates), those are dispatched in the next pass of the loop.
  4. On 4xx/5xx the collector is discarded — no events are dispatched for failed requests.

Collecting events in a route

No collector wiring needed in your routes. Just call your domain methods — events are captured implicitly:

from fastapi import Request
from uuid import uuid4

@app.post("/orders")
def place_order(request: Request):
    order = Order(order_id=uuid4(), total="99.99")
    order.place()   # add_domain_event() inside here is captured automatically

    return {"order_id": str(order.order_id)}

Handlers registered via register_handlers can also fire new events the same way — just call add_domain_event on any aggregate and the middleware loop will pick them up:

def handle_order_placed(event: OrderPlaced) -> None:
    invoice = Invoice.create(order_id=event.order_id)
    invoice.generate()   # fires InvoiceGenerated — dispatched in the next loop pass

RabbitMQ subscriber (cross-context consumer)

Run a long-lived consumer process in a separate service to handle events published by another bounded context:

from eventable import rabbit_subscription
from myapp.events import OrderPlaced

def handle_order_placed(event: OrderPlaced) -> None:
    # react to the event in this bounded context
    ...

rabbit_subscription(
    consumer_name="inventory-service",
    rabbit_url="amqp://guest:guest@localhost/",
    queue_name="inventory.order_placed",
    handlers_map={
        OrderPlaced: (handle_order_placed, {}),
    },
)

rabbit_subscription blocks, handles SIGTERM/SIGINT for graceful shutdown, and nacks messages that fail processing (sending them to the dead-letter queue if configured).


Event serialization

Events are serialized to JSON automatically. The following field types are supported out of the box:

Python type JSON representation
UUID string
datetime ISO 8601 string
Decimal string
ValueObject get_value() result
dict, list passed through recursively

ValueObject

Implement the ValueObject protocol to have your value objects serialize transparently:

from eventable.value_object import ValueObject

class Money:
    def __init__(self, amount: str):
        self.value = amount

    def get_value(self) -> str:
        return self.value

Architecture overview

eventable/
├── domain_event.py          # DomainEvent base dataclass + serialize/deserialize
├── aggregate_root.py        # AggregateRoot (Pydantic BaseModel)
├── value_object.py          # ValueObject protocol
├── event_handlers.py        # register_handlers() — your in-process handler hook
│
├── event_managment/
│   ├── event_collector.py   # ContextVar-based collector, auto-captures events from aggregates
│   ├── event_dispatcher.py  # Runs handlers then publishes to broker
│   ├── event_publisher.py   # Abstract EventPublisher
│   └── event_subscriber.py  # Abstract EventSubscriber
│
└── infrastructure/
    ├── rabbitmq/
    │   ├── event_publisher.py   # RabbitMQPublisher (thread-local channels)
    │   └── event_subscriber.py  # RabbitMQSubscriber (blocking consumer)
    └── fastapi/
        ├── eventable.py         # Eventable + EventableSettings (lifespan plugin)
        └── middleware.py        # DomainEventMiddleware (pure ASGI)

RabbitMQ topology: a single durable topic exchange (domain_events by default). Each event type is routed by its class name as the routing key. Consumers bind a durable queue to the exchange for the routing keys they care about.


Configuration reference

EventableSettings

Field Type Default Description
rabbit_url str AMQP connection URL
exchange_name str "domain_events" Topic exchange name

Running the tests

uv run pytest tests/unit/          # pure unit tests, no broker needed
uv run pytest tests/infrastructure/ -k "not rabbitmq"  # FastAPI plugin tests only

# Full suite including RabbitMQ integration tests:
RABBITMQ_URL=amqp://admin:admin@localhost:5672/ uv run pytest

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eventable-0.3.0.tar.gz (8.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eventable-0.3.0-py3-none-any.whl (14.0 kB view details)

Uploaded Python 3

File details

Details for the file eventable-0.3.0.tar.gz.

File metadata

  • Download URL: eventable-0.3.0.tar.gz
  • Upload date:
  • Size: 8.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for eventable-0.3.0.tar.gz
Algorithm Hash digest
SHA256 2c93a080d0da13cfe1997e16e1778128111ee0ac3ae593c55ebb84ecc06742d1
MD5 c166ac73e2096d99addc1bb5cb67a042
BLAKE2b-256 c7ded527227e0a2c2865812efeed1ba05e2304760b1bc3648842ddb37b876682

See more details on using hashes here.

File details

Details for the file eventable-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: eventable-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 14.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for eventable-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5ee873e1231489e98ad2442e749c4bc918ac633aa274609646bf2595f6af42aa
MD5 e990227e743090afc94369a7cdf8b853
BLAKE2b-256 a285a9e15aab94c0d021c176e4cab8904a1b0966e7ab89b5d062fd719ca6ee45

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page