Skip to main content

Domain Event Message Infrastructure

Project description

eventable

Domain event infrastructure for Python microservices.

Provides the building blocks to implement domain events following Domain-Driven Design: aggregate roots that collect events, an in-process dispatcher, and a RabbitMQ transport — with an optional plug-and-play FastAPI integration.


Features

  • Frozen, serializable DomainEvent dataclasses
  • AggregateRoot (Pydantic) that accumulates and releases domain events
  • EventDispatcher — runs in-process handlers first, then publishes to the broker
  • RabbitMQ publisher & subscriber backed by a durable topic exchange
  • FastAPI plugin: one-line lifespan setup + pure-ASGI middleware that dispatches events after every successful request without swallowing exceptions

Installation

pip install eventable
# or with uv
uv add eventable

Requirements: Python 3.12+, a running RabbitMQ broker (for the infrastructure layer).


Core concepts

Request
  └─ AggregateRoot.add_domain_event(event)   # collect during business logic
       └─ EventCollector.collect_events()     # pull after request completes
            └─ EventDispatcher.dispatch(event)
                 ├─ in-process handlers       # same bounded context
                 └─ RabbitMQPublisher         # cross-context via broker

Quick start

1. Define a domain event

from dataclasses import dataclass
from uuid import UUID
from eventable.domain_event import DomainEvent

@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
    order_id: UUID
    total: str

2. Define an aggregate root

from uuid import uuid4
from datetime import datetime
from eventable.aggregate_root import AggregateRoot
from eventable.domain_event import generate_event_id

class Order(AggregateRoot):
    order_id: UUID
    total: str

    def place(self) -> None:
        self.add_domain_event(
            OrderPlaced(
                event_id=generate_event_id(),
                occurred_at=datetime.now(),
                order_id=self.order_id,
                total=self.total,
            )
        )

3. Register in-process handlers

Edit event_handlers.py (generated alongside your app):

from eventable.event_managment.event_dispatcher import EventDispatcher
from myapp.events import OrderPlaced

def handle_order_placed(event: OrderPlaced) -> None:
    print(f"Order {event.order_id} placed — sending confirmation email")

def register_handlers(dispatcher: EventDispatcher) -> None:
    dispatcher.register(OrderPlaced, handle_order_placed)

FastAPI integration

Lifespan setup

Eventable connects to RabbitMQ on startup, registers your in-process handlers, and disconnects cleanly on shutdown. Wire it into FastAPI's async lifespan:

from contextlib import asynccontextmanager
from fastapi import FastAPI
from eventable.infrastructure.fastapi.eventable import Eventable, EventableSettings
from eventable.infrastructure.fastapi.middleware import DomainEventMiddleware

settings = EventableSettings(
    rabbit_url="amqp://guest:guest@localhost/",
    exchange_name="domain_events",   # default
)
eventable = Eventable(settings)

@asynccontextmanager
async def lifespan(app: FastAPI):
    await eventable.startup(app)   # connects publisher, wires dispatcher
    yield
    await eventable.shutdown()     # closes RabbitMQ connection

app = FastAPI(lifespan=lifespan)
app.add_middleware(DomainEventMiddleware)

After startup, app.state.publisher and app.state.dispatcher are available throughout the application.

Middleware

DomainEventMiddleware is a pure ASGI middleware (not BaseHTTPMiddleware) so exceptions from your route handlers propagate cleanly to FastAPI's exception handlers — nothing is swallowed.

Per-request lifecycle:

  1. Attaches a fresh EventCollector to request.state.collector.
  2. Runs the route handler normally.
  3. After the response is sent, if the status code is < 400, pulls all collected events and dispatches them.
  4. On 4xx/5xx the collector is discarded — no events are dispatched for failed requests.

Collecting events in a route

from fastapi import Request
from eventable.event_managment.event_collector import EventCollector

@app.post("/orders")
def place_order(request: Request):
    collector: EventCollector = request.state.collector

    order = Order(order_id=uuid4(), total="99.99")
    collector.track(order)   # middleware will pull events from this aggregate
    order.place()

    return {"order_id": str(order.order_id)}

RabbitMQ subscriber (cross-context consumer)

Run a long-lived consumer process in a separate service to handle events published by another bounded context:

from eventable import rabbit_subscription
from myapp.events import OrderPlaced

def handle_order_placed(event: OrderPlaced) -> None:
    # react to the event in this bounded context
    ...

rabbit_subscription(
    consumer_name="inventory-service",
    rabbit_url="amqp://guest:guest@localhost/",
    queue_name="inventory.order_placed",
    handlers_map={
        OrderPlaced: (handle_order_placed, {}),
    },
)

rabbit_subscription blocks, handles SIGTERM/SIGINT for graceful shutdown, and nacks messages that fail processing (sending them to the dead-letter queue if configured).


Event serialization

Events are serialized to JSON automatically. The following field types are supported out of the box:

Python type JSON representation
UUID string
datetime ISO 8601 string
Decimal string
ValueObject get_value() result
dict, list passed through recursively

ValueObject

Implement the ValueObject protocol to have your value objects serialize transparently:

from eventable.value_object import ValueObject

class Money:
    def __init__(self, amount: str):
        self.value = amount

    def get_value(self) -> str:
        return self.value

Architecture overview

eventable/
├── domain_event.py          # DomainEvent base dataclass + serialize/deserialize
├── aggregate_root.py        # AggregateRoot (Pydantic BaseModel)
├── value_object.py          # ValueObject protocol
├── event_handlers.py        # register_handlers() — your in-process handler hook
│
├── event_managment/
│   ├── event_collector.py   # Tracks aggregates, pulls events after request
│   ├── event_dispatcher.py  # Runs handlers then publishes to broker
│   ├── event_publisher.py   # Abstract EventPublisher
│   └── event_subscriber.py  # Abstract EventSubscriber
│
└── infrastructure/
    ├── rabbitmq/
    │   ├── event_publisher.py   # RabbitMQPublisher (thread-local channels)
    │   └── event_subscriber.py  # RabbitMQSubscriber (blocking consumer)
    └── fastapi/
        ├── eventable.py         # Eventable + EventableSettings (lifespan plugin)
        └── middleware.py        # DomainEventMiddleware (pure ASGI)

RabbitMQ topology: a single durable topic exchange (domain_events by default). Each event type is routed by its class name as the routing key. Consumers bind a durable queue to the exchange for the routing keys they care about.


Configuration reference

EventableSettings

Field Type Default Description
rabbit_url str AMQP connection URL
exchange_name str "domain_events" Topic exchange name

Running the tests

uv run pytest tests/unit/          # pure unit tests, no broker needed
uv run pytest tests/infrastructure/ -k "not rabbitmq"  # FastAPI plugin tests only

# Full suite including RabbitMQ integration tests:
RABBITMQ_URL=amqp://admin:admin@localhost:5672/ uv run pytest

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eventable-0.1.0.tar.gz (8.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eventable-0.1.0-py3-none-any.whl (13.4 kB view details)

Uploaded Python 3

File details

Details for the file eventable-0.1.0.tar.gz.

File metadata

  • Download URL: eventable-0.1.0.tar.gz
  • Upload date:
  • Size: 8.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.5 {"installer":{"name":"uv","version":"0.11.5","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for eventable-0.1.0.tar.gz
Algorithm Hash digest
SHA256 1fb19df02bc573e7ee059026b36e23ea358957b18ea61e9bf9dea718984de816
MD5 a354b1d02725fe0695f581029fbb8e3c
BLAKE2b-256 a01c31e24f5433f5bfbae38c35b105b80fa6aa41397e423377b1a9fa8e406448

See more details on using hashes here.

File details

Details for the file eventable-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: eventable-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 13.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.5 {"installer":{"name":"uv","version":"0.11.5","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for eventable-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2d88f54085992911e85af11771f35bff4c7d14469f9e5e0d14c48c4262b73ff1
MD5 4e514a5c8251a3234ba9134a609caba1
BLAKE2b-256 df2c91a780f63e6617733197b1cbba3bb12cde12a6ecfc57a3ae1511d94cea12

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page