Domain Event Message Infrastructure
Project description
eventable
Domain event infrastructure for Python microservices.
Provides the building blocks to implement domain events following Domain-Driven Design: aggregate roots that collect events, an in-process dispatcher, and a RabbitMQ transport — with an optional plug-and-play FastAPI integration.
Features
- Frozen, serializable
DomainEventdataclasses AggregateRoot(Pydantic) that accumulates and releases domain eventsEventDispatcher— runs in-process handlers first, then publishes to the broker- RabbitMQ publisher & subscriber backed by a durable topic exchange
- FastAPI plugin: one-line lifespan setup + pure-ASGI middleware that dispatches events after every successful request without swallowing exceptions
Installation
pip install eventable
# or with uv
uv add eventable
Requirements: Python 3.12+, a running RabbitMQ broker (for the infrastructure layer).
Core concepts
Request
└─ AggregateRoot.add_domain_event(event) # automatically pushed to context collector
└─ EventCollector (ContextVar) # active for the lifetime of the request
└─ dispatch loop (middleware) # runs until no new events remain
└─ EventDispatcher.dispatch(event)
├─ in-process handlers # same bounded context (may fire more events)
└─ RabbitMQPublisher # cross-context via broker
The collector is a context variable — it is implicitly active for any code that runs inside the request, including in-process handlers. When a handler modifies an aggregate that fires its own events, those events are picked up in the next pass of the dispatch loop automatically.
Quick start
1. Define a domain event
from dataclasses import dataclass
from uuid import UUID
from eventable.domain_event import DomainEvent
@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
order_id: UUID
total: str
2. Define an aggregate root
from uuid import uuid4
from datetime import datetime
from eventable.aggregate_root import AggregateRoot
from eventable.domain_event import generate_event_id
class Order(AggregateRoot):
order_id: UUID
total: str
def place(self) -> None:
self.add_domain_event(
OrderPlaced(
event_id=generate_event_id(),
occurred_at=datetime.now(),
order_id=self.order_id,
total=self.total,
)
)
3. Register in-process handlers
Edit event_handlers.py (generated alongside your app):
from eventable.event_managment.event_dispatcher import EventDispatcher
from myapp.events import OrderPlaced
def handle_order_placed(event: OrderPlaced) -> None:
print(f"Order {event.order_id} placed — sending confirmation email")
def register_handlers(dispatcher: EventDispatcher) -> None:
dispatcher.register(OrderPlaced, handle_order_placed)
FastAPI integration
Lifespan setup
Eventable connects to RabbitMQ on startup, registers your in-process handlers, and disconnects cleanly on shutdown. Wire it into FastAPI's async lifespan:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from eventable.infrastructure.fastapi.eventable import Eventable, EventableSettings
from eventable.infrastructure.fastapi.middleware import DomainEventMiddleware
settings = EventableSettings(
rabbit_url="amqp://guest:guest@localhost/",
exchange_name="domain_events", # default
)
eventable = Eventable(settings)
@asynccontextmanager
async def lifespan(app: FastAPI):
await eventable.startup(app) # connects publisher, wires dispatcher
yield
await eventable.shutdown() # closes RabbitMQ connection
app = FastAPI(lifespan=lifespan)
app.add_middleware(DomainEventMiddleware)
After startup, app.state.publisher and app.state.dispatcher are available throughout the application.
Middleware
DomainEventMiddleware is a pure ASGI middleware (not BaseHTTPMiddleware) so exceptions from your route handlers propagate cleanly to FastAPI's exception handlers — nothing is swallowed.
Per-request lifecycle:
- Creates a fresh
EventCollectorand sets it as the active context collector. - Runs the route handler normally — any
add_domain_eventcall anywhere in the call stack automatically registers the event. - After the response is sent, if the status code is < 400, dispatches all collected events. If a handler fires new events (by modifying aggregates), those are dispatched in the next pass of the loop.
- On 4xx/5xx the collector is discarded — no events are dispatched for failed requests.
Collecting events in a route
No collector wiring needed in your routes. Just call your domain methods — events are captured implicitly:
from fastapi import Request
from uuid import uuid4
@app.post("/orders")
def place_order(request: Request):
order = Order(order_id=uuid4(), total="99.99")
order.place() # add_domain_event() inside here is captured automatically
return {"order_id": str(order.order_id)}
Handlers registered via register_handlers can also fire new events the same way — just call add_domain_event on any aggregate and the middleware loop will pick them up:
def handle_order_placed(event: OrderPlaced) -> None:
invoice = Invoice.create(order_id=event.order_id)
invoice.generate() # fires InvoiceGenerated — dispatched in the next loop pass
RabbitMQ subscriber (cross-context consumer)
Run a long-lived consumer process in a separate service to handle events published by another bounded context:
from eventable import rabbit_subscription
from myapp.events import OrderPlaced
def handle_order_placed(event: OrderPlaced) -> None:
# react to the event in this bounded context
...
rabbit_subscription(
consumer_name="inventory-service",
rabbit_url="amqp://guest:guest@localhost/",
queue_name="inventory.order_placed",
handlers_map={
OrderPlaced: (handle_order_placed, {}),
},
)
rabbit_subscription blocks, handles SIGTERM/SIGINT for graceful shutdown, and nacks messages that fail processing (sending them to the dead-letter queue if configured).
Event serialization
Events are serialized to JSON automatically. The following field types are supported out of the box:
| Python type | JSON representation |
|---|---|
UUID |
string |
datetime |
ISO 8601 string |
Decimal |
string |
ValueObject |
get_value() result |
dict, list |
passed through recursively |
ValueObject
Implement the ValueObject protocol to have your value objects serialize transparently:
from eventable.value_object import ValueObject
class Money:
def __init__(self, amount: str):
self.value = amount
def get_value(self) -> str:
return self.value
Architecture overview
eventable/
├── domain_event.py # DomainEvent base dataclass + serialize/deserialize
├── aggregate_root.py # AggregateRoot (Pydantic BaseModel)
├── value_object.py # ValueObject protocol
├── event_handlers.py # register_handlers() — your in-process handler hook
│
├── event_managment/
│ ├── event_collector.py # ContextVar-based collector, auto-captures events from aggregates
│ ├── event_dispatcher.py # Runs handlers then publishes to broker
│ ├── event_publisher.py # Abstract EventPublisher
│ └── event_subscriber.py # Abstract EventSubscriber
│
└── infrastructure/
├── rabbitmq/
│ ├── event_publisher.py # RabbitMQPublisher (thread-local channels)
│ └── event_subscriber.py # RabbitMQSubscriber (blocking consumer)
└── fastapi/
├── eventable.py # Eventable + EventableSettings (lifespan plugin)
└── middleware.py # DomainEventMiddleware (pure ASGI)
RabbitMQ topology: a single durable topic exchange (domain_events by default). Each event type is routed by its class name as the routing key. Consumers bind a durable queue to the exchange for the routing keys they care about.
Configuration reference
EventableSettings
| Field | Type | Default | Description |
|---|---|---|---|
rabbit_url |
str |
— | AMQP connection URL |
exchange_name |
str |
"domain_events" |
Topic exchange name |
Running the tests
uv run pytest tests/unit/ # pure unit tests, no broker needed
uv run pytest tests/infrastructure/ -k "not rabbitmq" # FastAPI plugin tests only
# Full suite including RabbitMQ integration tests:
RABBITMQ_URL=amqp://admin:admin@localhost:5672/ uv run pytest
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file eventable-0.3.0.tar.gz.
File metadata
- Download URL: eventable-0.3.0.tar.gz
- Upload date:
- Size: 8.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2c93a080d0da13cfe1997e16e1778128111ee0ac3ae593c55ebb84ecc06742d1
|
|
| MD5 |
c166ac73e2096d99addc1bb5cb67a042
|
|
| BLAKE2b-256 |
c7ded527227e0a2c2865812efeed1ba05e2304760b1bc3648842ddb37b876682
|
File details
Details for the file eventable-0.3.0-py3-none-any.whl.
File metadata
- Download URL: eventable-0.3.0-py3-none-any.whl
- Upload date:
- Size: 14.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5ee873e1231489e98ad2442e749c4bc918ac633aa274609646bf2595f6af42aa
|
|
| MD5 |
e990227e743090afc94369a7cdf8b853
|
|
| BLAKE2b-256 |
a285a9e15aab94c0d021c176e4cab8904a1b0966e7ab89b5d062fd719ca6ee45
|