Skip to main content

Production-ready event sourcing library for Python

Project description

eventsource-py

PyPI version Python Version CI License: MIT

Stop losing data. Start capturing history.

Traditional databases overwrite state on every update. Event sourcing captures what happened as a sequence of immutable events, giving you:

  • Complete audit trail - Know exactly what changed, when, and why
  • Time travel - Reconstruct state at any point in history
  • Multiple views - Build different read models from the same events
  • Reliable debugging - Replay events to reproduce any bug

eventsource-py makes this practical for Python applications with a clean, async-first API.

pip install eventsource-py

Quick Start

import asyncio
from uuid import UUID, uuid4
from pydantic import BaseModel
from eventsource import (
    DomainEvent, register_event, AggregateRoot, AggregateRepository,
    InMemoryEventStore, InMemoryEventBus, InMemoryCheckpointRepository,
)
from eventsource.subscriptions import SubscriptionManager, SubscriptionConfig

# 1. Define events - immutable facts that capture what happened
@register_event
class OrderPlaced(DomainEvent):
    event_type: str = "OrderPlaced"
    aggregate_type: str = "Order"
    customer_id: UUID
    total: float

@register_event
class OrderShipped(DomainEvent):
    event_type: str = "OrderShipped"
    aggregate_type: str = "Order"
    tracking_number: str

# 2. Define aggregate state and business logic
class OrderState(BaseModel):
    order_id: UUID
    customer_id: UUID | None = None
    total: float = 0.0
    status: str = "draft"

class Order(AggregateRoot[OrderState]):
    aggregate_type = "Order"

    def _get_initial_state(self) -> OrderState:
        return OrderState(order_id=self.aggregate_id)

    def _apply(self, event: DomainEvent) -> None:
        match event:
            case OrderPlaced():
                self._state = OrderState(
                    order_id=self.aggregate_id,
                    customer_id=event.customer_id,
                    total=event.total,
                    status="placed",
                )
            case OrderShipped():
                self._state = self._state.model_copy(update={"status": "shipped"})

    def place(self, customer_id: UUID, total: float) -> None:
        if self.version > 0:
            raise ValueError("Order already placed")
        self.apply_event(OrderPlaced(
            aggregate_id=self.aggregate_id,
            customer_id=customer_id,
            total=total,
            aggregate_version=self.get_next_version(),
        ))

    def ship(self, tracking_number: str) -> None:
        if self.state.status != "placed":
            raise ValueError("Order must be placed before shipping")
        self.apply_event(OrderShipped(
            aggregate_id=self.aggregate_id,
            tracking_number=tracking_number,
            aggregate_version=self.get_next_version(),
        ))

# 3. Define a projection - build read models from the event stream
class SalesReport:
    """Read model that tracks sales metrics from events."""
    def __init__(self):
        self.total_revenue = 0.0
        self.orders_placed = 0
        self.orders_shipped = 0

    def subscribed_to(self) -> list[type[DomainEvent]]:
        return [OrderPlaced, OrderShipped]

    async def handle(self, event: DomainEvent) -> None:
        match event:
            case OrderPlaced():
                self.total_revenue += event.total
                self.orders_placed += 1
            case OrderShipped():
                self.orders_shipped += 1

# 4. Wire it together
async def main():
    # Infrastructure
    store = InMemoryEventStore()
    bus = InMemoryEventBus()
    repo = AggregateRepository(
        event_store=store,
        aggregate_factory=Order,
        aggregate_type="Order",
        event_publisher=bus,  # Publishes events to the bus after saving
    )

    # Set up subscription manager with our projection
    manager = SubscriptionManager(store, bus, InMemoryCheckpointRepository())
    report = SalesReport()
    await manager.subscribe(report, SubscriptionConfig(start_from="beginning"), name="SalesReport")
    await manager.start()

    # Create some orders
    for i in range(3):
        order = repo.create_new(uuid4())
        order.place(customer_id=uuid4(), total=100.0 * (i + 1))
        await repo.save(order)

        if i == 0:  # Ship the first order
            order.ship(tracking_number="TRACK-001")
            await repo.save(order)

    await asyncio.sleep(0.1)  # Let events propagate

    # The projection built a read model from the event stream
    print(f"Revenue: ${report.total_revenue}")      # Revenue: $600.0
    print(f"Orders placed: {report.orders_placed}")  # Orders placed: 3
    print(f"Orders shipped: {report.orders_shipped}")  # Orders shipped: 1

    # Events are the source of truth - reload aggregate from its event history
    order = await repo.load(order.aggregate_id)
    print(f"Order status: {order.state.status}")  # Order status: shipped
    print(f"Order version: {order.version}")      # Order version: 2 (placed + shipped)

    await manager.stop()

asyncio.run(main())

Production Ready

Swap in production backends when you're ready to deploy:

Component Development Production
Event Store InMemoryEventStore PostgreSQLEventStore, SQLiteEventStore
Event Bus InMemoryEventBus RedisEventBus, RabbitMQEventBus, KafkaEventBus
Checkpoints InMemoryCheckpointRepository PostgreSQLCheckpointRepository
# Add PostgreSQL + Redis for production
pip install eventsource-py[postgresql,redis]
All installation options
pip install eventsource-py[postgresql]  # PostgreSQL event store
pip install eventsource-py[sqlite]      # SQLite event store
pip install eventsource-py[redis]       # Redis event bus
pip install eventsource-py[rabbitmq]    # RabbitMQ event bus
pip install eventsource-py[kafka]       # Kafka event bus
pip install eventsource-py[telemetry]   # OpenTelemetry tracing
pip install eventsource-py[all]         # Everything

Features

  • Event Stores - PostgreSQL, SQLite, In-Memory with optimistic concurrency
  • Event Bus - Redis Streams, RabbitMQ, Kafka, In-Memory with consumer groups
  • Subscriptions - Catch-up from history, live events, checkpointing, graceful shutdown
  • Projections - Declarative handlers, retry logic, dead letter queues
  • Snapshots - Optimize aggregate loading for long event streams
  • Multi-tenancy - Built-in tenant isolation
  • Observability - OpenTelemetry integration

Documentation

Full Documentation - Guides, examples, and API reference

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eventsource_py-0.3.0.tar.gz (1.5 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eventsource_py-0.3.0-py3-none-any.whl (518.3 kB view details)

Uploaded Python 3

File details

Details for the file eventsource_py-0.3.0.tar.gz.

File metadata

  • Download URL: eventsource_py-0.3.0.tar.gz
  • Upload date:
  • Size: 1.5 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventsource_py-0.3.0.tar.gz
Algorithm Hash digest
SHA256 4aca17c995070bf64549694fcce57f61561f10abbda3d1637bebc65740e47f0d
MD5 3aba036e263205e7296a42b2d24fedc5
BLAKE2b-256 8ad10dbb33c86136eb8331fada3b375473c312a83919b30cb8d8b66d242253b5

See more details on using hashes here.

Provenance

The following attestation bundles were made for eventsource_py-0.3.0.tar.gz:

Publisher: release.yml on tyevans/eventsource-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file eventsource_py-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: eventsource_py-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 518.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventsource_py-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1fd07f18eecd536bd44029475ead011537c1ae940d1be89ef1855dd7e05dc620
MD5 386f3903250698fe809f859b15f802af
BLAKE2b-256 87ab7eeaa69f56d00e5a6bd34deb81c35d84e10d0d4d374604e9bbb91847bbff

See more details on using hashes here.

Provenance

The following attestation bundles were made for eventsource_py-0.3.0-py3-none-any.whl:

Publisher: release.yml on tyevans/eventsource-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page