Skip to main content

Production-ready event sourcing library for Python

Project description

eventsource-py

Python Version License: MIT PyPI version CI Code style: ruff

A production-ready event sourcing library for Python 3.11+.

Features

  • Event Store - PostgreSQL, SQLite, and In-Memory backends with optimistic locking
  • Domain Events - Immutable event classes with Pydantic validation and versioning
  • Event Registry - Thread-safe event type registration for serialization/deserialization
  • Aggregate Pattern - Base classes for event-sourced aggregates with state reconstruction
  • Repository Pattern - Clean abstractions for loading and saving aggregates
  • Projection System - Checkpoint tracking, retry logic, and dead letter queue support
  • Event Bus - In-Memory and Redis Streams backends for event distribution
  • Transactional Outbox - Reliable event publishing pattern
  • Multi-tenancy - Built-in tenant isolation support
  • Observability - Optional OpenTelemetry integration

Installation

# Basic installation
pip install eventsource-py

# With PostgreSQL support
pip install eventsource-py[postgresql]

# With SQLite support
pip install eventsource-py[sqlite]

# With Redis support
pip install eventsource-py[redis]

# With OpenTelemetry support
pip install eventsource-py[telemetry]

# All optional dependencies
pip install eventsource-py[all]

Requirements

  • Python 3.11+
  • pydantic >= 2.0
  • sqlalchemy >= 2.0 (for PostgreSQL backend)

Quick Start

1. Define Your Events

from uuid import UUID
from eventsource import DomainEvent, register_event

@register_event
class OrderCreated(DomainEvent):
    """Event emitted when an order is created."""
    event_type: str = "OrderCreated"
    aggregate_type: str = "Order"
    customer_id: UUID
    total_amount: float

@register_event
class OrderShipped(DomainEvent):
    """Event emitted when an order is shipped."""
    event_type: str = "OrderShipped"
    aggregate_type: str = "Order"
    tracking_number: str

2. Define Your Aggregate State

from pydantic import BaseModel

class OrderState(BaseModel):
    """State of an Order aggregate."""
    order_id: UUID
    customer_id: UUID | None = None
    total_amount: float = 0.0
    status: str = "draft"
    tracking_number: str | None = None

3. Create Your Aggregate

from eventsource import AggregateRoot

class OrderAggregate(AggregateRoot[OrderState]):
    """Event-sourced Order aggregate."""
    aggregate_type = "Order"

    def _get_initial_state(self) -> OrderState:
        return OrderState(order_id=self.aggregate_id)

    def _apply(self, event: DomainEvent) -> None:
        if isinstance(event, OrderCreated):
            self._state = OrderState(
                order_id=self.aggregate_id,
                customer_id=event.customer_id,
                total_amount=event.total_amount,
                status="created",
            )
        elif isinstance(event, OrderShipped):
            if self._state:
                self._state = self._state.model_copy(
                    update={
                        "status": "shipped",
                        "tracking_number": event.tracking_number,
                    }
                )

    def create(self, customer_id: UUID, total_amount: float) -> None:
        """Command: Create the order."""
        if self.version > 0:
            raise ValueError("Order already created")

        event = OrderCreated(
            aggregate_id=self.aggregate_id,
            customer_id=customer_id,
            total_amount=total_amount,
            aggregate_version=self.get_next_version(),
        )
        self.apply_event(event)

    def ship(self, tracking_number: str) -> None:
        """Command: Ship the order."""
        if not self.state or self.state.status != "created":
            raise ValueError("Order must be created before shipping")

        event = OrderShipped(
            aggregate_id=self.aggregate_id,
            tracking_number=tracking_number,
            aggregate_version=self.get_next_version(),
        )
        self.apply_event(event)

4. Use the Repository

import asyncio
from uuid import uuid4
from eventsource import InMemoryEventStore, AggregateRepository

async def main():
    # Set up event store and repository
    event_store = InMemoryEventStore()
    repo = AggregateRepository(
        event_store=event_store,
        aggregate_factory=OrderAggregate,
        aggregate_type="Order",
    )

    # Create and save a new order
    order_id = uuid4()
    order = repo.create_new(order_id)
    order.create(customer_id=uuid4(), total_amount=99.99)
    await repo.save(order)

    # Load the order and ship it
    loaded_order = await repo.load(order_id)
    loaded_order.ship(tracking_number="TRACK-123")
    await repo.save(loaded_order)

    # Verify the state
    final_order = await repo.load(order_id)
    print(f"Order status: {final_order.state.status}")
    print(f"Tracking: {final_order.state.tracking_number}")

asyncio.run(main())

Architecture

+-------------------+     +-------------------+     +-------------------+
|                   |     |                   |     |                   |
|    Commands       |---->|    Aggregates     |---->|   Event Store     |
|                   |     |                   |     |                   |
+-------------------+     +-------------------+     +--------+----------+
                                                             |
                                                             v
+-------------------+     +-------------------+     +-------------------+
|                   |     |                   |     |                   |
|   Read Models     |<----|   Projections     |<----|   Event Bus       |
|                   |     |                   |     |                   |
+-------------------+     +-------------------+     +-------------------+

Core Concepts

  • Events - Immutable facts that capture state changes. Events are never deleted or modified.
  • Event Store - Persists events with ordering and optimistic locking guarantees.
  • Aggregates - Consistency boundaries that reconstruct state from event streams.
  • Repository - Abstracts loading/saving aggregates from/to the event store.
  • Projections - Build read-optimized views from event streams.
  • Event Bus - Distributes events to subscribers for async processing.

Documentation

For comprehensive documentation, see the Documentation Index.

Getting Started

Guides

API Reference

Examples

Architecture Decision Records

Developer Documentation

Development

# Install in development mode
pip install -e ".[dev]"

# Run tests
pytest

# Run tests with coverage
pytest --cov=eventsource --cov-report=html

# Run type checking
mypy src/eventsource

# Run linting
ruff check src/eventsource

# Format code
ruff format src/eventsource

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eventsource_py-0.1.0.tar.gz (437.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eventsource_py-0.1.0-py3-none-any.whl (145.0 kB view details)

Uploaded Python 3

File details

Details for the file eventsource_py-0.1.0.tar.gz.

File metadata

  • Download URL: eventsource_py-0.1.0.tar.gz
  • Upload date:
  • Size: 437.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventsource_py-0.1.0.tar.gz
Algorithm Hash digest
SHA256 b3b1341de89f3ff14cc62db37519c6500395daea379e6735c047c47df57494bf
MD5 bf5791ae09a124e4280f79eeb3b99835
BLAKE2b-256 d45cf8efa2cc783c9687a96c512f3625002d939e0b6bf92f5436ee4a39a0d3b0

See more details on using hashes here.

Provenance

The following attestation bundles were made for eventsource_py-0.1.0.tar.gz:

Publisher: release.yml on tyevans/eventsource-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file eventsource_py-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: eventsource_py-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 145.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventsource_py-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 30b979d4cc5beafdf86151d7ef567a24b0326b97e25861fb06bcec81cb22e62e
MD5 c4f2a54fc55ba05405f2f6db08636389
BLAKE2b-256 801264034ec3ed998769c4cd0f579e8eaf8ec563fcb2b68a61cd620523488ca8

See more details on using hashes here.

Provenance

The following attestation bundles were made for eventsource_py-0.1.0-py3-none-any.whl:

Publisher: release.yml on tyevans/eventsource-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page