Skip to main content

Production-ready event sourcing library for Python

Project description

eventsource-py

PyPI version Python Version CI License: MIT

Stop losing data. Start capturing history.

Traditional databases overwrite state on every update. Event sourcing captures what happened as a sequence of immutable events, giving you:

  • Complete audit trail - Know exactly what changed, when, and why
  • Time travel - Reconstruct state at any point in history
  • Multiple views - Build different read models from the same events
  • Reliable debugging - Replay events to reproduce any bug

eventsource-py makes this practical for Python applications with a clean, async-first API.

pip install eventsource-py

Quick Start

import asyncio
from uuid import UUID, uuid4
from pydantic import BaseModel
from eventsource import (
    DomainEvent, register_event, AggregateRoot, AggregateRepository,
    InMemoryEventStore, InMemoryEventBus, InMemoryCheckpointRepository,
)
from eventsource.subscriptions import SubscriptionManager, SubscriptionConfig

# 1. Define events - immutable facts that capture what happened
@register_event
class OrderPlaced(DomainEvent):
    event_type: str = "OrderPlaced"
    aggregate_type: str = "Order"
    customer_id: UUID
    total: float

@register_event
class OrderShipped(DomainEvent):
    event_type: str = "OrderShipped"
    aggregate_type: str = "Order"
    tracking_number: str

# 2. Define aggregate state and business logic
class OrderState(BaseModel):
    order_id: UUID
    customer_id: UUID | None = None
    total: float = 0.0
    status: str = "draft"

class Order(AggregateRoot[OrderState]):
    aggregate_type = "Order"

    def _get_initial_state(self) -> OrderState:
        return OrderState(order_id=self.aggregate_id)

    def _apply(self, event: DomainEvent) -> None:
        match event:
            case OrderPlaced():
                self._state = OrderState(
                    order_id=self.aggregate_id,
                    customer_id=event.customer_id,
                    total=event.total,
                    status="placed",
                )
            case OrderShipped():
                self._state = self._state.model_copy(update={"status": "shipped"})

    def place(self, customer_id: UUID, total: float) -> None:
        if self.version > 0:
            raise ValueError("Order already placed")
        self.apply_event(OrderPlaced(
            aggregate_id=self.aggregate_id,
            customer_id=customer_id,
            total=total,
            aggregate_version=self.get_next_version(),
        ))

    def ship(self, tracking_number: str) -> None:
        if self.state.status != "placed":
            raise ValueError("Order must be placed before shipping")
        self.apply_event(OrderShipped(
            aggregate_id=self.aggregate_id,
            tracking_number=tracking_number,
            aggregate_version=self.get_next_version(),
        ))

# 3. Define a projection - build read models from the event stream
class SalesReport:
    """Read model that tracks sales metrics from events."""
    def __init__(self):
        self.total_revenue = 0.0
        self.orders_placed = 0
        self.orders_shipped = 0

    def subscribed_to(self) -> list[type[DomainEvent]]:
        return [OrderPlaced, OrderShipped]

    async def handle(self, event: DomainEvent) -> None:
        match event:
            case OrderPlaced():
                self.total_revenue += event.total
                self.orders_placed += 1
            case OrderShipped():
                self.orders_shipped += 1

# 4. Wire it together
async def main():
    # Infrastructure
    store = InMemoryEventStore()
    bus = InMemoryEventBus()
    repo = AggregateRepository(
        event_store=store,
        aggregate_factory=Order,
        aggregate_type="Order",
        event_publisher=bus,  # Publishes events to the bus after saving
    )

    # Set up subscription manager with our projection
    manager = SubscriptionManager(store, bus, InMemoryCheckpointRepository())
    report = SalesReport()
    await manager.subscribe(report, SubscriptionConfig(start_from="beginning"), name="SalesReport")
    await manager.start()

    # Create some orders
    for i in range(3):
        order = repo.create_new(uuid4())
        order.place(customer_id=uuid4(), total=100.0 * (i + 1))
        await repo.save(order)

        if i == 0:  # Ship the first order
            order.ship(tracking_number="TRACK-001")
            await repo.save(order)

    await asyncio.sleep(0.1)  # Let events propagate

    # The projection built a read model from the event stream
    print(f"Revenue: ${report.total_revenue}")      # Revenue: $600.0
    print(f"Orders placed: {report.orders_placed}")  # Orders placed: 3
    print(f"Orders shipped: {report.orders_shipped}")  # Orders shipped: 1

    # Events are the source of truth - reload aggregate from its event history
    order = await repo.load(order.aggregate_id)
    print(f"Order status: {order.state.status}")  # Order status: shipped
    print(f"Order version: {order.version}")      # Order version: 2 (placed + shipped)

    await manager.stop()

asyncio.run(main())

Production Ready

Swap in production backends when you're ready to deploy:

Component Development Production
Event Store InMemoryEventStore PostgreSQLEventStore, SQLiteEventStore
Event Bus InMemoryEventBus RedisEventBus, RabbitMQEventBus, KafkaEventBus
Checkpoints InMemoryCheckpointRepository PostgreSQLCheckpointRepository
# Add PostgreSQL + Redis for production
pip install eventsource-py[postgresql,redis]
All installation options
pip install eventsource-py[postgresql]  # PostgreSQL event store
pip install eventsource-py[sqlite]      # SQLite event store
pip install eventsource-py[redis]       # Redis event bus
pip install eventsource-py[rabbitmq]    # RabbitMQ event bus
pip install eventsource-py[kafka]       # Kafka event bus
pip install eventsource-py[telemetry]   # OpenTelemetry tracing
pip install eventsource-py[all]         # Everything

Features

  • Event Stores - PostgreSQL, SQLite, In-Memory with optimistic concurrency
  • Event Bus - Redis Streams, RabbitMQ, Kafka, In-Memory with consumer groups
  • Subscriptions - Catch-up from history, live events, checkpointing, graceful shutdown
  • Projections - Declarative handlers, retry logic, dead letter queues
  • Snapshots - Optimize aggregate loading for long event streams
  • Multi-tenancy - Built-in tenant isolation
  • Observability - OpenTelemetry integration

Documentation

Full Documentation - Guides, examples, and API reference

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eventsource_py-0.4.0.tar.gz (1.6 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eventsource_py-0.4.0-py3-none-any.whl (528.6 kB view details)

Uploaded Python 3

File details

Details for the file eventsource_py-0.4.0.tar.gz.

File metadata

  • Download URL: eventsource_py-0.4.0.tar.gz
  • Upload date:
  • Size: 1.6 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventsource_py-0.4.0.tar.gz
Algorithm Hash digest
SHA256 5f7080995c9a440ee1fb495194a072ae02695c9a53e8e03a30aee95cba9c542f
MD5 f62ca37b97770d7153a646564a5159d7
BLAKE2b-256 c02416cb68b7a9555689d1fdcd764f66981fe863c1ab8514915dd85c141239ac

See more details on using hashes here.

Provenance

The following attestation bundles were made for eventsource_py-0.4.0.tar.gz:

Publisher: release.yml on tyevans/eventsource-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file eventsource_py-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: eventsource_py-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 528.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventsource_py-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ca6a076bb016ec5615eff2c5554362e56f6fccca79af60565436d56b0791357d
MD5 1c67df35ab57e9aaaf96b60bde2b8db0
BLAKE2b-256 1c3bbfca7423911fd47196a3bac1d91a866c5a51e11ec374da8fcc7a9f1b5e93

See more details on using hashes here.

Provenance

The following attestation bundles were made for eventsource_py-0.4.0-py3-none-any.whl:

Publisher: release.yml on tyevans/eventsource-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page