Skip to main content

Production-ready event sourcing library for Python

Project description

eventsource-py

Python Version License: MIT PyPI version CI Code style: ruff

A production-ready event sourcing library for Python 3.11+.

Features

  • Event Store - PostgreSQL, SQLite, and In-Memory backends with optimistic locking
  • Domain Events - Immutable event classes with Pydantic validation and versioning
  • Event Registry - Thread-safe event type registration for serialization/deserialization
  • Aggregate Pattern - Base classes for event-sourced aggregates with state reconstruction
  • Repository Pattern - Clean abstractions for loading and saving aggregates
  • Projection System - Checkpoint tracking, retry logic, and dead letter queue support
  • Event Bus - In-Memory and Redis Streams backends for event distribution
  • Transactional Outbox - Reliable event publishing pattern
  • Multi-tenancy - Built-in tenant isolation support
  • Observability - Optional OpenTelemetry integration

Installation

# Basic installation
pip install eventsource-py

# With PostgreSQL support
pip install eventsource-py[postgresql]

# With SQLite support
pip install eventsource-py[sqlite]

# With Redis support
pip install eventsource-py[redis]

# With OpenTelemetry support
pip install eventsource-py[telemetry]

# All optional dependencies
pip install eventsource-py[all]

Requirements

  • Python 3.11+
  • pydantic >= 2.0
  • sqlalchemy >= 2.0 (for PostgreSQL backend)

Quick Start

1. Define Your Events

from uuid import UUID
from eventsource import DomainEvent, register_event

@register_event
class OrderCreated(DomainEvent):
    """Event emitted when an order is created."""
    event_type: str = "OrderCreated"
    aggregate_type: str = "Order"
    customer_id: UUID
    total_amount: float

@register_event
class OrderShipped(DomainEvent):
    """Event emitted when an order is shipped."""
    event_type: str = "OrderShipped"
    aggregate_type: str = "Order"
    tracking_number: str

2. Define Your Aggregate State

from pydantic import BaseModel

class OrderState(BaseModel):
    """State of an Order aggregate."""
    order_id: UUID
    customer_id: UUID | None = None
    total_amount: float = 0.0
    status: str = "draft"
    tracking_number: str | None = None

3. Create Your Aggregate

from eventsource import AggregateRoot

class OrderAggregate(AggregateRoot[OrderState]):
    """Event-sourced Order aggregate."""
    aggregate_type = "Order"

    def _get_initial_state(self) -> OrderState:
        return OrderState(order_id=self.aggregate_id)

    def _apply(self, event: DomainEvent) -> None:
        if isinstance(event, OrderCreated):
            self._state = OrderState(
                order_id=self.aggregate_id,
                customer_id=event.customer_id,
                total_amount=event.total_amount,
                status="created",
            )
        elif isinstance(event, OrderShipped):
            if self._state:
                self._state = self._state.model_copy(
                    update={
                        "status": "shipped",
                        "tracking_number": event.tracking_number,
                    }
                )

    def create(self, customer_id: UUID, total_amount: float) -> None:
        """Command: Create the order."""
        if self.version > 0:
            raise ValueError("Order already created")

        event = OrderCreated(
            aggregate_id=self.aggregate_id,
            customer_id=customer_id,
            total_amount=total_amount,
            aggregate_version=self.get_next_version(),
        )
        self.apply_event(event)

    def ship(self, tracking_number: str) -> None:
        """Command: Ship the order."""
        if not self.state or self.state.status != "created":
            raise ValueError("Order must be created before shipping")

        event = OrderShipped(
            aggregate_id=self.aggregate_id,
            tracking_number=tracking_number,
            aggregate_version=self.get_next_version(),
        )
        self.apply_event(event)

4. Use the Repository

import asyncio
from uuid import uuid4
from eventsource import InMemoryEventStore, AggregateRepository

async def main():
    # Set up event store and repository
    event_store = InMemoryEventStore()
    repo = AggregateRepository(
        event_store=event_store,
        aggregate_factory=OrderAggregate,
        aggregate_type="Order",
    )

    # Create and save a new order
    order_id = uuid4()
    order = repo.create_new(order_id)
    order.create(customer_id=uuid4(), total_amount=99.99)
    await repo.save(order)

    # Load the order and ship it
    loaded_order = await repo.load(order_id)
    loaded_order.ship(tracking_number="TRACK-123")
    await repo.save(loaded_order)

    # Verify the state
    final_order = await repo.load(order_id)
    print(f"Order status: {final_order.state.status}")
    print(f"Tracking: {final_order.state.tracking_number}")

asyncio.run(main())

Architecture

+-------------------+     +-------------------+     +-------------------+
|                   |     |                   |     |                   |
|    Commands       |---->|    Aggregates     |---->|   Event Store     |
|                   |     |                   |     |                   |
+-------------------+     +-------------------+     +--------+----------+
                                                             |
                                                             v
+-------------------+     +-------------------+     +-------------------+
|                   |     |                   |     |                   |
|   Read Models     |<----|   Projections     |<----|   Event Bus       |
|                   |     |                   |     |                   |
+-------------------+     +-------------------+     +-------------------+

Core Concepts

  • Events - Immutable facts that capture state changes. Events are never deleted or modified.
  • Event Store - Persists events with ordering and optimistic locking guarantees.
  • Aggregates - Consistency boundaries that reconstruct state from event streams.
  • Repository - Abstracts loading/saving aggregates from/to the event store.
  • Projections - Build read-optimized views from event streams.
  • Event Bus - Distributes events to subscribers for async processing.

Documentation

For comprehensive documentation, see the Documentation Index.

Getting Started

Guides

API Reference

Examples

Architecture Decision Records

Developer Documentation

Development

# Install in development mode
pip install -e ".[dev]"

# Run tests
pytest

# Run tests with coverage
pytest --cov=eventsource --cov-report=html

# Run type checking
mypy src/eventsource

# Run linting
ruff check src/eventsource

# Format code
ruff format src/eventsource

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eventsource_py-0.1.2.tar.gz (437.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eventsource_py-0.1.2-py3-none-any.whl (145.0 kB view details)

Uploaded Python 3

File details

Details for the file eventsource_py-0.1.2.tar.gz.

File metadata

  • Download URL: eventsource_py-0.1.2.tar.gz
  • Upload date:
  • Size: 437.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventsource_py-0.1.2.tar.gz
Algorithm Hash digest
SHA256 c0b465c15ac35144f73189b2dffdb4838a568929e2de474f12ce8f8f66559231
MD5 1b681807ce8ab50100e46d4d8348e1ae
BLAKE2b-256 da8557e9b988945ba93fb5d3ab7ddc793a7b1d46226630ef6a87776e7ce55153

See more details on using hashes here.

Provenance

The following attestation bundles were made for eventsource_py-0.1.2.tar.gz:

Publisher: release.yml on tyevans/eventsource-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file eventsource_py-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: eventsource_py-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 145.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventsource_py-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 6e599aa0bec9368c3e536ef66f46ddb778dfe67cf3659f81baad879e1d31d4c5
MD5 66eff0c1e83b35f2b875a4a26cea985b
BLAKE2b-256 4f18a54311ed402deb728926e623fa84794f8fdd199018dbbc3bb44b235e1cab

See more details on using hashes here.

Provenance

The following attestation bundles were made for eventsource_py-0.1.2-py3-none-any.whl:

Publisher: release.yml on tyevans/eventsource-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page