Skip to main content

Production-ready event sourcing library for Python

Project description

eventsource-py

PyPI version Python Version CI License: MIT

Stop losing data. Start capturing history.

Traditional databases overwrite state on every update. Event sourcing captures what happened as a sequence of immutable events, giving you:

  • Complete audit trail - Know exactly what changed, when, and why
  • Time travel - Reconstruct state at any point in history
  • Multiple views - Build different read models from the same events
  • Reliable debugging - Replay events to reproduce any bug

eventsource-py makes this practical for Python applications with a clean, async-first API.

pip install eventsource-py

Quick Start

import asyncio
from uuid import UUID, uuid4
from pydantic import BaseModel
from eventsource import (
    DomainEvent, register_event, AggregateRoot, AggregateRepository,
    InMemoryEventStore, InMemoryEventBus, InMemoryCheckpointRepository,
)
from eventsource.subscriptions import SubscriptionManager, SubscriptionConfig

# 1. Define events - immutable facts that capture what happened
@register_event
class OrderPlaced(DomainEvent):
    event_type: str = "OrderPlaced"
    aggregate_type: str = "Order"
    customer_id: UUID
    total: float

@register_event
class OrderShipped(DomainEvent):
    event_type: str = "OrderShipped"
    aggregate_type: str = "Order"
    tracking_number: str

# 2. Define aggregate state and business logic
class OrderState(BaseModel):
    order_id: UUID
    customer_id: UUID | None = None
    total: float = 0.0
    status: str = "draft"

class Order(AggregateRoot[OrderState]):
    aggregate_type = "Order"

    def _get_initial_state(self) -> OrderState:
        return OrderState(order_id=self.aggregate_id)

    def _apply(self, event: DomainEvent) -> None:
        match event:
            case OrderPlaced():
                self._state = OrderState(
                    order_id=self.aggregate_id,
                    customer_id=event.customer_id,
                    total=event.total,
                    status="placed",
                )
            case OrderShipped():
                self._state = self._state.model_copy(update={"status": "shipped"})

    def place(self, customer_id: UUID, total: float) -> None:
        if self.version > 0:
            raise ValueError("Order already placed")
        self.apply_event(OrderPlaced(
            aggregate_id=self.aggregate_id,
            customer_id=customer_id,
            total=total,
            aggregate_version=self.get_next_version(),
        ))

    def ship(self, tracking_number: str) -> None:
        if self.state.status != "placed":
            raise ValueError("Order must be placed before shipping")
        self.apply_event(OrderShipped(
            aggregate_id=self.aggregate_id,
            tracking_number=tracking_number,
            aggregate_version=self.get_next_version(),
        ))

# 3. Define a projection - build read models from the event stream
class SalesReport:
    """Read model that tracks sales metrics from events."""
    def __init__(self):
        self.total_revenue = 0.0
        self.orders_placed = 0
        self.orders_shipped = 0

    def subscribed_to(self) -> list[type[DomainEvent]]:
        return [OrderPlaced, OrderShipped]

    async def handle(self, event: DomainEvent) -> None:
        match event:
            case OrderPlaced():
                self.total_revenue += event.total
                self.orders_placed += 1
            case OrderShipped():
                self.orders_shipped += 1

# 4. Wire it together
async def main():
    # Infrastructure
    store = InMemoryEventStore()
    bus = InMemoryEventBus()
    repo = AggregateRepository(
        event_store=store,
        aggregate_factory=Order,
        aggregate_type="Order",
        event_publisher=bus,  # Publishes events to the bus after saving
    )

    # Set up subscription manager with our projection
    manager = SubscriptionManager(store, bus, InMemoryCheckpointRepository())
    report = SalesReport()
    await manager.subscribe(report, SubscriptionConfig(start_from="beginning"), name="SalesReport")
    await manager.start()

    # Create some orders
    for i in range(3):
        order = repo.create_new(uuid4())
        order.place(customer_id=uuid4(), total=100.0 * (i + 1))
        await repo.save(order)

        if i == 0:  # Ship the first order
            order.ship(tracking_number="TRACK-001")
            await repo.save(order)

    await asyncio.sleep(0.1)  # Let events propagate

    # The projection built a read model from the event stream
    print(f"Revenue: ${report.total_revenue}")      # Revenue: $600.0
    print(f"Orders placed: {report.orders_placed}")  # Orders placed: 3
    print(f"Orders shipped: {report.orders_shipped}")  # Orders shipped: 1

    # Events are the source of truth - reload aggregate from its event history
    order = await repo.load(order.aggregate_id)
    print(f"Order status: {order.state.status}")  # Order status: shipped
    print(f"Order version: {order.version}")      # Order version: 2 (placed + shipped)

    await manager.stop()

asyncio.run(main())

Production Ready

Swap in production backends when you're ready to deploy:

Component Development Production
Event Store InMemoryEventStore PostgreSQLEventStore, SQLiteEventStore
Event Bus InMemoryEventBus RedisEventBus, RabbitMQEventBus, KafkaEventBus
Checkpoints InMemoryCheckpointRepository PostgreSQLCheckpointRepository
# Add PostgreSQL + Redis for production
pip install eventsource-py[postgresql,redis]
All installation options
pip install eventsource-py[postgresql]  # PostgreSQL event store
pip install eventsource-py[sqlite]      # SQLite event store
pip install eventsource-py[redis]       # Redis event bus
pip install eventsource-py[rabbitmq]    # RabbitMQ event bus
pip install eventsource-py[kafka]       # Kafka event bus
pip install eventsource-py[telemetry]   # OpenTelemetry tracing
pip install eventsource-py[all]         # Everything

Features

  • Event Stores - PostgreSQL, SQLite, In-Memory with optimistic concurrency
  • Event Bus - Redis Streams, RabbitMQ, Kafka, In-Memory with consumer groups
  • Subscriptions - Catch-up from history, live events, checkpointing, graceful shutdown
  • Projections - Declarative handlers, retry logic, dead letter queues
  • Snapshots - Optimize aggregate loading for long event streams
  • Multi-tenancy - Built-in tenant isolation
  • Observability - OpenTelemetry integration

Documentation

Full Documentation - Guides, examples, and API reference

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eventsource_py-0.5.0.tar.gz (1.7 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eventsource_py-0.5.0-py3-none-any.whl (559.4 kB view details)

Uploaded Python 3

File details

Details for the file eventsource_py-0.5.0.tar.gz.

File metadata

  • Download URL: eventsource_py-0.5.0.tar.gz
  • Upload date:
  • Size: 1.7 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventsource_py-0.5.0.tar.gz
Algorithm Hash digest
SHA256 0eb5c7651216794c53a34623fd9ef9b0cb4180a27a52cd889aabe0b83673aee8
MD5 6b7fddbb81175be72dcc9239edfaa414
BLAKE2b-256 768859042cdd9ab8ac409bb41076f452a35db67e1030eee280fab2000513e7fd

See more details on using hashes here.

Provenance

The following attestation bundles were made for eventsource_py-0.5.0.tar.gz:

Publisher: release.yml on tyevans/eventsource-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file eventsource_py-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: eventsource_py-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 559.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventsource_py-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d1de63802e6f5972d55772e0eaab5bc28733187351c69e4b6941d1877e451e9a
MD5 c3ceefe805d619ca144c88003e7298cb
BLAKE2b-256 7ce1d8f20ac0e9c8db8aa32abfc053c6739ff036b85fcd25185970230eb0316e

See more details on using hashes here.

Provenance

The following attestation bundles were made for eventsource_py-0.5.0-py3-none-any.whl:

Publisher: release.yml on tyevans/eventsource-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page