Skip to main content

Production-ready event sourcing library for Python

Project description

eventsource-py

PyPI version Python Version CI License: MIT

Stop losing data. Start capturing history.

Traditional databases overwrite state on every update. Event sourcing captures what happened as a sequence of immutable events, giving you:

  • Complete audit trail - Know exactly what changed, when, and why
  • Time travel - Reconstruct state at any point in history
  • Multiple views - Build different read models from the same events
  • Reliable debugging - Replay events to reproduce any bug

eventsource-py makes this practical for Python applications with a clean, async-first API.

pip install eventsource-py

Quick Start

import asyncio
from uuid import UUID, uuid4
from pydantic import BaseModel
from eventsource import (
    DomainEvent, register_event, AggregateRoot, AggregateRepository,
    InMemoryEventStore, InMemoryEventBus, InMemoryCheckpointRepository,
)
from eventsource.subscriptions import SubscriptionManager, SubscriptionConfig

# 1. Define events - immutable facts that capture what happened
@register_event
class OrderPlaced(DomainEvent):
    event_type: str = "OrderPlaced"
    aggregate_type: str = "Order"
    customer_id: UUID
    total: float

@register_event
class OrderShipped(DomainEvent):
    event_type: str = "OrderShipped"
    aggregate_type: str = "Order"
    tracking_number: str

# 2. Define aggregate state and business logic
class OrderState(BaseModel):
    order_id: UUID
    customer_id: UUID | None = None
    total: float = 0.0
    status: str = "draft"

class Order(AggregateRoot[OrderState]):
    aggregate_type = "Order"

    def _get_initial_state(self) -> OrderState:
        return OrderState(order_id=self.aggregate_id)

    def _apply(self, event: DomainEvent) -> None:
        match event:
            case OrderPlaced():
                self._state = OrderState(
                    order_id=self.aggregate_id,
                    customer_id=event.customer_id,
                    total=event.total,
                    status="placed",
                )
            case OrderShipped():
                self._state = self._state.model_copy(update={"status": "shipped"})

    def place(self, customer_id: UUID, total: float) -> None:
        if self.version > 0:
            raise ValueError("Order already placed")
        self.apply_event(OrderPlaced(
            aggregate_id=self.aggregate_id,
            customer_id=customer_id,
            total=total,
            aggregate_version=self.get_next_version(),
        ))

    def ship(self, tracking_number: str) -> None:
        if self.state.status != "placed":
            raise ValueError("Order must be placed before shipping")
        self.apply_event(OrderShipped(
            aggregate_id=self.aggregate_id,
            tracking_number=tracking_number,
            aggregate_version=self.get_next_version(),
        ))

# 3. Define a projection - build read models from the event stream
class SalesReport:
    """Read model that tracks sales metrics from events."""
    def __init__(self):
        self.total_revenue = 0.0
        self.orders_placed = 0
        self.orders_shipped = 0

    def subscribed_to(self) -> list[type[DomainEvent]]:
        return [OrderPlaced, OrderShipped]

    async def handle(self, event: DomainEvent) -> None:
        match event:
            case OrderPlaced():
                self.total_revenue += event.total
                self.orders_placed += 1
            case OrderShipped():
                self.orders_shipped += 1

# 4. Wire it together
async def main():
    # Infrastructure
    store = InMemoryEventStore()
    bus = InMemoryEventBus()
    repo = AggregateRepository(
        event_store=store,
        aggregate_factory=Order,
        aggregate_type="Order",
        event_publisher=bus,  # Publishes events to the bus after saving
    )

    # Set up subscription manager with our projection
    manager = SubscriptionManager(store, bus, InMemoryCheckpointRepository())
    report = SalesReport()
    await manager.subscribe(report, SubscriptionConfig(start_from="beginning"), name="SalesReport")
    await manager.start()

    # Create some orders
    for i in range(3):
        order = repo.create_new(uuid4())
        order.place(customer_id=uuid4(), total=100.0 * (i + 1))
        await repo.save(order)

        if i == 0:  # Ship the first order
            order.ship(tracking_number="TRACK-001")
            await repo.save(order)

    await asyncio.sleep(0.1)  # Let events propagate

    # The projection built a read model from the event stream
    print(f"Revenue: ${report.total_revenue}")      # Revenue: $600.0
    print(f"Orders placed: {report.orders_placed}")  # Orders placed: 3
    print(f"Orders shipped: {report.orders_shipped}")  # Orders shipped: 1

    # Events are the source of truth - reload aggregate from its event history
    order = await repo.load(order.aggregate_id)
    print(f"Order status: {order.state.status}")  # Order status: shipped
    print(f"Order version: {order.version}")      # Order version: 2 (placed + shipped)

    await manager.stop()

asyncio.run(main())

Production Ready

Swap in production backends when you're ready to deploy:

Component Development Production
Event Store InMemoryEventStore PostgreSQLEventStore, SQLiteEventStore
Event Bus InMemoryEventBus RedisEventBus, RabbitMQEventBus, KafkaEventBus
Checkpoints InMemoryCheckpointRepository PostgreSQLCheckpointRepository
# Add PostgreSQL + Redis for production
pip install eventsource-py[postgresql,redis]
All installation options
pip install eventsource-py[postgresql]  # PostgreSQL event store
pip install eventsource-py[sqlite]      # SQLite event store
pip install eventsource-py[redis]       # Redis event bus
pip install eventsource-py[rabbitmq]    # RabbitMQ event bus
pip install eventsource-py[kafka]       # Kafka event bus
pip install eventsource-py[telemetry]   # OpenTelemetry tracing
pip install eventsource-py[all]         # Everything

Features

  • Event Stores - PostgreSQL, SQLite, In-Memory with optimistic concurrency
  • Event Bus - Redis Streams, RabbitMQ, Kafka, In-Memory with consumer groups
  • Subscriptions - Catch-up from history, live events, checkpointing, graceful shutdown
  • Projections - Declarative handlers, retry logic, dead letter queues
  • Snapshots - Optimize aggregate loading for long event streams
  • Multi-tenancy - Built-in tenant isolation
  • Observability - OpenTelemetry integration

Documentation

Full Documentation - Guides, examples, and API reference

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eventsource_py-0.3.1.tar.gz (1.5 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eventsource_py-0.3.1-py3-none-any.whl (524.0 kB view details)

Uploaded Python 3

File details

Details for the file eventsource_py-0.3.1.tar.gz.

File metadata

  • Download URL: eventsource_py-0.3.1.tar.gz
  • Upload date:
  • Size: 1.5 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventsource_py-0.3.1.tar.gz
Algorithm Hash digest
SHA256 cca7e83293ac7d87fa1688e6c08fbc4aa4b285d3a97a1a35bbab9cdfdee5a977
MD5 99dcac72506b501bab77cd1822cee389
BLAKE2b-256 83a4c251b2a7019247858eaf0b094c5f955d922cc663d244dd7d301e099ec096

See more details on using hashes here.

Provenance

The following attestation bundles were made for eventsource_py-0.3.1.tar.gz:

Publisher: release.yml on tyevans/eventsource-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file eventsource_py-0.3.1-py3-none-any.whl.

File metadata

  • Download URL: eventsource_py-0.3.1-py3-none-any.whl
  • Upload date:
  • Size: 524.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventsource_py-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 d41815a19062f28196ada8d09a8522b271d3be649da5697a1104f96a218f85df
MD5 cd27873e74ce6adf986ceea19392450f
BLAKE2b-256 cc9b295354cf5b2e9f69fd1004fbc90b72e154cd030625bbc081530d43b1a12e

See more details on using hashes here.

Provenance

The following attestation bundles were made for eventsource_py-0.3.1-py3-none-any.whl:

Publisher: release.yml on tyevans/eventsource-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page