Domain Event Message Infrastructure
Project description
eventable
Domain event infrastructure for Python microservices.
Provides the building blocks to implement domain events following Domain-Driven Design: aggregate roots that collect events, an in-process dispatcher, and a RabbitMQ transport — with an optional plug-and-play FastAPI integration.
Features
- Frozen, serializable
DomainEventdataclasses AggregateRoot(Pydantic) that accumulates and releases domain eventsEventDispatcher— runs in-process handlers first, then publishes to the broker- RabbitMQ publisher & subscriber backed by a durable topic exchange
- FastAPI plugin: one-line lifespan setup + pure-ASGI middleware that dispatches events after every successful request without swallowing exceptions
Installation
pip install eventable
# or with uv
uv add eventable
Requirements: Python 3.12+, a running RabbitMQ broker (for the infrastructure layer).
Core concepts
Request
└─ AggregateRoot.add_domain_event(event) # collect during business logic
└─ EventCollector.collect_events() # pull after request completes
└─ EventDispatcher.dispatch(event)
├─ in-process handlers # same bounded context
└─ RabbitMQPublisher # cross-context via broker
Quick start
1. Define a domain event
from dataclasses import dataclass
from uuid import UUID
from eventable.domain_event import DomainEvent
@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
order_id: UUID
total: str
2. Define an aggregate root
from uuid import uuid4
from datetime import datetime
from eventable.aggregate_root import AggregateRoot
from eventable.domain_event import generate_event_id
class Order(AggregateRoot):
order_id: UUID
total: str
def place(self) -> None:
self.add_domain_event(
OrderPlaced(
event_id=generate_event_id(),
occurred_at=datetime.now(),
order_id=self.order_id,
total=self.total,
)
)
3. Register in-process handlers
Edit event_handlers.py (generated alongside your app):
from eventable.event_managment.event_dispatcher import EventDispatcher
from myapp.events import OrderPlaced
def handle_order_placed(event: OrderPlaced) -> None:
print(f"Order {event.order_id} placed — sending confirmation email")
def register_handlers(dispatcher: EventDispatcher) -> None:
dispatcher.register(OrderPlaced, handle_order_placed)
FastAPI integration
Lifespan setup
Eventable connects to RabbitMQ on startup, registers your in-process handlers, and disconnects cleanly on shutdown. Wire it into FastAPI's async lifespan:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from eventable.infrastructure.fastapi.eventable import Eventable, EventableSettings
from eventable.infrastructure.fastapi.middleware import DomainEventMiddleware
settings = EventableSettings(
rabbit_url="amqp://guest:guest@localhost/",
exchange_name="domain_events", # default
)
eventable = Eventable(settings)
@asynccontextmanager
async def lifespan(app: FastAPI):
await eventable.startup(app) # connects publisher, wires dispatcher
yield
await eventable.shutdown() # closes RabbitMQ connection
app = FastAPI(lifespan=lifespan)
app.add_middleware(DomainEventMiddleware)
After startup, app.state.publisher and app.state.dispatcher are available throughout the application.
Middleware
DomainEventMiddleware is a pure ASGI middleware (not BaseHTTPMiddleware) so exceptions from your route handlers propagate cleanly to FastAPI's exception handlers — nothing is swallowed.
Per-request lifecycle:
- Attaches a fresh
EventCollectortorequest.state.collector. - Runs the route handler normally.
- After the response is sent, if the status code is < 400, pulls all collected events and dispatches them.
- On 4xx/5xx the collector is discarded — no events are dispatched for failed requests.
Collecting events in a route
from fastapi import Request
from eventable.event_managment.event_collector import EventCollector
@app.post("/orders")
def place_order(request: Request):
collector: EventCollector = request.state.collector
order = Order(order_id=uuid4(), total="99.99")
collector.track(order) # middleware will pull events from this aggregate
order.place()
return {"order_id": str(order.order_id)}
RabbitMQ subscriber (cross-context consumer)
Run a long-lived consumer process in a separate service to handle events published by another bounded context:
from eventable import rabbit_subscription
from myapp.events import OrderPlaced
def handle_order_placed(event: OrderPlaced) -> None:
# react to the event in this bounded context
...
rabbit_subscription(
consumer_name="inventory-service",
rabbit_url="amqp://guest:guest@localhost/",
queue_name="inventory.order_placed",
handlers_map={
OrderPlaced: (handle_order_placed, {}),
},
)
rabbit_subscription blocks, handles SIGTERM/SIGINT for graceful shutdown, and nacks messages that fail processing (sending them to the dead-letter queue if configured).
Event serialization
Events are serialized to JSON automatically. The following field types are supported out of the box:
| Python type | JSON representation |
|---|---|
UUID |
string |
datetime |
ISO 8601 string |
Decimal |
string |
ValueObject |
get_value() result |
dict, list |
passed through recursively |
ValueObject
Implement the ValueObject protocol to have your value objects serialize transparently:
from eventable.value_object import ValueObject
class Money:
def __init__(self, amount: str):
self.value = amount
def get_value(self) -> str:
return self.value
Architecture overview
eventable/
├── domain_event.py # DomainEvent base dataclass + serialize/deserialize
├── aggregate_root.py # AggregateRoot (Pydantic BaseModel)
├── value_object.py # ValueObject protocol
├── event_handlers.py # register_handlers() — your in-process handler hook
│
├── event_managment/
│ ├── event_collector.py # Tracks aggregates, pulls events after request
│ ├── event_dispatcher.py # Runs handlers then publishes to broker
│ ├── event_publisher.py # Abstract EventPublisher
│ └── event_subscriber.py # Abstract EventSubscriber
│
└── infrastructure/
├── rabbitmq/
│ ├── event_publisher.py # RabbitMQPublisher (thread-local channels)
│ └── event_subscriber.py # RabbitMQSubscriber (blocking consumer)
└── fastapi/
├── eventable.py # Eventable + EventableSettings (lifespan plugin)
└── middleware.py # DomainEventMiddleware (pure ASGI)
RabbitMQ topology: a single durable topic exchange (domain_events by default). Each event type is routed by its class name as the routing key. Consumers bind a durable queue to the exchange for the routing keys they care about.
Configuration reference
EventableSettings
| Field | Type | Default | Description |
|---|---|---|---|
rabbit_url |
str |
— | AMQP connection URL |
exchange_name |
str |
"domain_events" |
Topic exchange name |
Running the tests
uv run pytest tests/unit/ # pure unit tests, no broker needed
uv run pytest tests/infrastructure/ -k "not rabbitmq" # FastAPI plugin tests only
# Full suite including RabbitMQ integration tests:
RABBITMQ_URL=amqp://admin:admin@localhost:5672/ uv run pytest
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file eventable-0.1.0.tar.gz.
File metadata
- Download URL: eventable-0.1.0.tar.gz
- Upload date:
- Size: 8.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.5 {"installer":{"name":"uv","version":"0.11.5","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1fb19df02bc573e7ee059026b36e23ea358957b18ea61e9bf9dea718984de816
|
|
| MD5 |
a354b1d02725fe0695f581029fbb8e3c
|
|
| BLAKE2b-256 |
a01c31e24f5433f5bfbae38c35b105b80fa6aa41397e423377b1a9fa8e406448
|
File details
Details for the file eventable-0.1.0-py3-none-any.whl.
File metadata
- Download URL: eventable-0.1.0-py3-none-any.whl
- Upload date:
- Size: 13.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.5 {"installer":{"name":"uv","version":"0.11.5","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2d88f54085992911e85af11771f35bff4c7d14469f9e5e0d14c48c4262b73ff1
|
|
| MD5 |
4e514a5c8251a3234ba9134a609caba1
|
|
| BLAKE2b-256 |
df2c91a780f63e6617733197b1cbba3bb12cde12a6ecfc57a3ae1511d94cea12
|