Skip to main content

Domain Event Message Infrastructure

Project description

eventable

Domain event infrastructure for Python microservices.

Provides the building blocks to implement domain events following Domain-Driven Design: aggregate roots that collect events, an in-process dispatcher, and a RabbitMQ transport — with an optional plug-and-play FastAPI integration.


Features

  • Frozen, serializable DomainEvent dataclasses
  • AggregateRoot (Pydantic) that accumulates and releases domain events
  • EventDispatcher — runs in-process handlers first, then publishes to the broker
  • RabbitMQ publisher & subscriber backed by a durable topic exchange
  • FastAPI plugin: one-line lifespan setup + pure-ASGI middleware that dispatches events after every successful request without swallowing exceptions

Installation

pip install eventable
# or with uv
uv add eventable

Requirements: Python 3.12+, a running RabbitMQ broker (for the infrastructure layer).


Core concepts

Request
  └─ AggregateRoot.add_domain_event(event)   # automatically pushed to context collector
       └─ EventCollector (ContextVar)         # active for the lifetime of the request
            └─ dispatch loop (middleware)     # runs until no new events remain
                 └─ EventDispatcher.dispatch(event)
                      ├─ in-process handlers  # same bounded context (may fire more events)
                      └─ RabbitMQPublisher    # cross-context via broker

The collector is a context variable — it is implicitly active for any code that runs inside the request, including in-process handlers. When a handler modifies an aggregate that fires its own events, those events are picked up in the next pass of the dispatch loop automatically.


Quick start

1. Define a domain event

from dataclasses import dataclass
from uuid import UUID
from eventable.domain_event import DomainEvent

@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
    order_id: UUID
    total: str

2. Define an aggregate root

from uuid import uuid4
from datetime import datetime
from eventable.aggregate_root import AggregateRoot
from eventable.domain_event import generate_event_id

class Order(AggregateRoot):
    order_id: UUID
    total: str

    def place(self) -> None:
        self.add_domain_event(
            OrderPlaced(
                event_id=generate_event_id(),
                occurred_at=datetime.now(),
                order_id=self.order_id,
                total=self.total,
            )
        )

3. Register in-process handlers

Edit event_handlers.py (generated alongside your app):

from eventable.event_managment.event_dispatcher import EventDispatcher
from myapp.events import OrderPlaced

def handle_order_placed(event: OrderPlaced) -> None:
    print(f"Order {event.order_id} placed — sending confirmation email")

def register_handlers(dispatcher: EventDispatcher) -> None:
    dispatcher.register(OrderPlaced, handle_order_placed)

FastAPI integration

Lifespan setup

Eventable connects to RabbitMQ on startup, registers your in-process handlers, and disconnects cleanly on shutdown. Wire it into FastAPI's async lifespan:

from contextlib import asynccontextmanager
from fastapi import FastAPI
from eventable.infrastructure.fastapi.eventable import Eventable, EventableSettings
from eventable.infrastructure.fastapi.middleware import DomainEventMiddleware

settings = EventableSettings(
    rabbit_url="amqp://guest:guest@localhost/",
    exchange_name="domain_events",   # default
)
eventable = Eventable(settings)

@asynccontextmanager
async def lifespan(app: FastAPI):
    await eventable.startup(app)   # connects publisher, wires dispatcher
    yield
    await eventable.shutdown()     # closes RabbitMQ connection

app = FastAPI(lifespan=lifespan)
app.add_middleware(DomainEventMiddleware)

After startup, app.state.publisher and app.state.dispatcher are available throughout the application.

Middleware

DomainEventMiddleware is a pure ASGI middleware (not BaseHTTPMiddleware) so exceptions from your route handlers propagate cleanly to FastAPI's exception handlers — nothing is swallowed.

Per-request lifecycle:

  1. Creates a fresh EventCollector and sets it as the active context collector.
  2. Runs the route handler normally — any add_domain_event call anywhere in the call stack automatically registers the event.
  3. After the response is sent, if the status code is < 400, dispatches all collected events. If a handler fires new events (by modifying aggregates), those are dispatched in the next pass of the loop.
  4. On 4xx/5xx the collector is discarded — no events are dispatched for failed requests.

Collecting events in a route

No collector wiring needed in your routes. Just call your domain methods — events are captured implicitly:

from fastapi import Request
from uuid import uuid4

@app.post("/orders")
def place_order(request: Request):
    order = Order(order_id=uuid4(), total="99.99")
    order.place()   # add_domain_event() inside here is captured automatically

    return {"order_id": str(order.order_id)}

Handlers registered via register_handlers can also fire new events the same way — just call add_domain_event on any aggregate and the middleware loop will pick them up:

def handle_order_placed(event: OrderPlaced) -> None:
    invoice = Invoice.create(order_id=event.order_id)
    invoice.generate()   # fires InvoiceGenerated — dispatched in the next loop pass

RabbitMQ subscriber (cross-context consumer)

Run a long-lived consumer process in a separate service to handle events published by another bounded context:

from eventable import rabbit_subscription
from myapp.events import OrderPlaced

def handle_order_placed(event: OrderPlaced) -> None:
    # react to the event in this bounded context
    ...

rabbit_subscription(
    consumer_name="inventory-service",
    rabbit_url="amqp://guest:guest@localhost/",
    queue_name="inventory.order_placed",
    handlers_map={
        OrderPlaced: (handle_order_placed, {}),
    },
)

rabbit_subscription blocks, handles SIGTERM/SIGINT for graceful shutdown, and nacks messages that fail processing (sending them to the dead-letter queue if configured).


Event serialization

Events are serialized to JSON automatically. The following field types are supported out of the box:

Python type JSON representation
UUID string
datetime ISO 8601 string
Decimal string
ValueObject get_value() result
dict, list passed through recursively

ValueObject

Implement the ValueObject protocol to have your value objects serialize transparently:

from eventable.value_object import ValueObject

class Money:
    def __init__(self, amount: str):
        self.value = amount

    def get_value(self) -> str:
        return self.value

Architecture overview

eventable/
├── domain_event.py          # DomainEvent base dataclass + serialize/deserialize
├── aggregate_root.py        # AggregateRoot (Pydantic BaseModel)
├── value_object.py          # ValueObject protocol
├── event_handlers.py        # register_handlers() — your in-process handler hook
│
├── event_managment/
│   ├── event_collector.py   # ContextVar-based collector, auto-captures events from aggregates
│   ├── event_dispatcher.py  # Runs handlers then publishes to broker
│   ├── event_publisher.py   # Abstract EventPublisher
│   └── event_subscriber.py  # Abstract EventSubscriber
│
└── infrastructure/
    ├── rabbitmq/
    │   ├── event_publisher.py   # RabbitMQPublisher (thread-local channels)
    │   └── event_subscriber.py  # RabbitMQSubscriber (blocking consumer)
    └── fastapi/
        ├── eventable.py         # Eventable + EventableSettings (lifespan plugin)
        └── middleware.py        # DomainEventMiddleware (pure ASGI)

RabbitMQ topology: a single durable topic exchange (domain_events by default). Each event type is routed by its class name as the routing key. Consumers bind a durable queue to the exchange for the routing keys they care about.


Configuration reference

EventableSettings

Field Type Default Description
rabbit_url str AMQP connection URL
exchange_name str "domain_events" Topic exchange name

Running the tests

uv run pytest tests/unit/          # pure unit tests, no broker needed
uv run pytest tests/infrastructure/ -k "not rabbitmq"  # FastAPI plugin tests only

# Full suite including RabbitMQ integration tests:
RABBITMQ_URL=amqp://admin:admin@localhost:5672/ uv run pytest

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eventable-0.2.0.tar.gz (8.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eventable-0.2.0-py3-none-any.whl (14.0 kB view details)

Uploaded Python 3

File details

Details for the file eventable-0.2.0.tar.gz.

File metadata

  • Download URL: eventable-0.2.0.tar.gz
  • Upload date:
  • Size: 8.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for eventable-0.2.0.tar.gz
Algorithm Hash digest
SHA256 896d0346b60af2506e463209007ec474b6469bf79a3c3c631e69860b24e60288
MD5 d5bb540c73ab8a7387e6b5e08c990716
BLAKE2b-256 f91fce3d898fa6d56ab0675ec79c389d6c242da6ea433e509d41ceb16a407aaf

See more details on using hashes here.

File details

Details for the file eventable-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: eventable-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 14.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"13","id":"trixie","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for eventable-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b6bd5560f6d989f4cb61b5c1ad7f3155a1c4cf474cfdd9d3d295700b089108f7
MD5 f4d4c7a2d44c3dbaee4f0bcb3f456e60
BLAKE2b-256 24c4b656cf5cebfe97b45323fda00eee74d87c34f080ab4a1019a9e0d9580b3b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page