Skip to main content

Production-ready event sourcing library for Python

Project description

eventsource-py

Python Version License: MIT PyPI version CI Documentation Code style: ruff

A production-ready event sourcing library for Python 3.11+.

Features

  • Event Store - PostgreSQL, SQLite, and In-Memory backends with optimistic locking
  • Domain Events - Immutable event classes with Pydantic validation and versioning
  • Event Registry - Thread-safe event type registration for serialization/deserialization
  • Aggregate Pattern - Base classes for event-sourced aggregates with state reconstruction
  • Repository Pattern - Clean abstractions for loading and saving aggregates
  • Projection System - Checkpoint tracking, retry logic, and dead letter queue support
  • Event Bus - In-Memory and Redis Streams backends for event distribution
  • Transactional Outbox - Reliable event publishing pattern
  • Multi-tenancy - Built-in tenant isolation support
  • Observability - Optional OpenTelemetry integration

Installation

# Basic installation
pip install eventsource-py

# With PostgreSQL support
pip install eventsource-py[postgresql]

# With SQLite support
pip install eventsource-py[sqlite]

# With Redis support
pip install eventsource-py[redis]

# With OpenTelemetry support
pip install eventsource-py[telemetry]

# All optional dependencies
pip install eventsource-py[all]

Requirements

  • Python 3.11+
  • pydantic >= 2.0
  • sqlalchemy >= 2.0 (for PostgreSQL backend)

Quick Start

1. Define Your Events

from uuid import UUID
from eventsource import DomainEvent, register_event

@register_event
class OrderCreated(DomainEvent):
    """Event emitted when an order is created."""
    event_type: str = "OrderCreated"
    aggregate_type: str = "Order"
    customer_id: UUID
    total_amount: float

@register_event
class OrderShipped(DomainEvent):
    """Event emitted when an order is shipped."""
    event_type: str = "OrderShipped"
    aggregate_type: str = "Order"
    tracking_number: str

2. Define Your Aggregate State

from pydantic import BaseModel

class OrderState(BaseModel):
    """State of an Order aggregate."""
    order_id: UUID
    customer_id: UUID | None = None
    total_amount: float = 0.0
    status: str = "draft"
    tracking_number: str | None = None

3. Create Your Aggregate

from eventsource import AggregateRoot

class OrderAggregate(AggregateRoot[OrderState]):
    """Event-sourced Order aggregate."""
    aggregate_type = "Order"

    def _get_initial_state(self) -> OrderState:
        return OrderState(order_id=self.aggregate_id)

    def _apply(self, event: DomainEvent) -> None:
        if isinstance(event, OrderCreated):
            self._state = OrderState(
                order_id=self.aggregate_id,
                customer_id=event.customer_id,
                total_amount=event.total_amount,
                status="created",
            )
        elif isinstance(event, OrderShipped):
            if self._state:
                self._state = self._state.model_copy(
                    update={
                        "status": "shipped",
                        "tracking_number": event.tracking_number,
                    }
                )

    def create(self, customer_id: UUID, total_amount: float) -> None:
        """Command: Create the order."""
        if self.version > 0:
            raise ValueError("Order already created")

        event = OrderCreated(
            aggregate_id=self.aggregate_id,
            customer_id=customer_id,
            total_amount=total_amount,
            aggregate_version=self.get_next_version(),
        )
        self.apply_event(event)

    def ship(self, tracking_number: str) -> None:
        """Command: Ship the order."""
        if not self.state or self.state.status != "created":
            raise ValueError("Order must be created before shipping")

        event = OrderShipped(
            aggregate_id=self.aggregate_id,
            tracking_number=tracking_number,
            aggregate_version=self.get_next_version(),
        )
        self.apply_event(event)

4. Use the Repository

import asyncio
from uuid import uuid4
from eventsource import InMemoryEventStore, AggregateRepository

async def main():
    # Set up event store and repository
    event_store = InMemoryEventStore()
    repo = AggregateRepository(
        event_store=event_store,
        aggregate_factory=OrderAggregate,
        aggregate_type="Order",
    )

    # Create and save a new order
    order_id = uuid4()
    order = repo.create_new(order_id)
    order.create(customer_id=uuid4(), total_amount=99.99)
    await repo.save(order)

    # Load the order and ship it
    loaded_order = await repo.load(order_id)
    loaded_order.ship(tracking_number="TRACK-123")
    await repo.save(loaded_order)

    # Verify the state
    final_order = await repo.load(order_id)
    print(f"Order status: {final_order.state.status}")
    print(f"Tracking: {final_order.state.tracking_number}")

asyncio.run(main())

Architecture

+-------------------+     +-------------------+     +-------------------+
|                   |     |                   |     |                   |
|    Commands       |---->|    Aggregates     |---->|   Event Store     |
|                   |     |                   |     |                   |
+-------------------+     +-------------------+     +--------+----------+
                                                             |
                                                             v
+-------------------+     +-------------------+     +-------------------+
|                   |     |                   |     |                   |
|   Read Models     |<----|   Projections     |<----|   Event Bus       |
|                   |     |                   |     |                   |
+-------------------+     +-------------------+     +-------------------+

Core Concepts

  • Events - Immutable facts that capture state changes. Events are never deleted or modified.
  • Event Store - Persists events with ordering and optimistic locking guarantees.
  • Aggregates - Consistency boundaries that reconstruct state from event streams.
  • Repository - Abstracts loading/saving aggregates from/to the event store.
  • Projections - Build read-optimized views from event streams.
  • Event Bus - Distributes events to subscribers for async processing.

Documentation

📚 Full Documentation

Development

# Install in development mode
pip install -e ".[dev]"

# Run tests
pytest

# Run tests with coverage
pytest --cov=eventsource --cov-report=html

# Run type checking
mypy src/eventsource

# Run linting
ruff check src/eventsource

# Format code
ruff format src/eventsource

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eventsource_py-0.1.3.tar.gz (437.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eventsource_py-0.1.3-py3-none-any.whl (144.6 kB view details)

Uploaded Python 3

File details

Details for the file eventsource_py-0.1.3.tar.gz.

File metadata

  • Download URL: eventsource_py-0.1.3.tar.gz
  • Upload date:
  • Size: 437.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventsource_py-0.1.3.tar.gz
Algorithm Hash digest
SHA256 e5b9048269f8ebb5351af9c4dca53e1b4977addc031f2c610d927c4725c12c7f
MD5 52fdd7cc391787715647237b678bf385
BLAKE2b-256 a0f8e94477a45e08585c8b0488ccbd51c617db469df4d74d0b0273c6da17d9da

See more details on using hashes here.

Provenance

The following attestation bundles were made for eventsource_py-0.1.3.tar.gz:

Publisher: release.yml on tyevans/eventsource-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file eventsource_py-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: eventsource_py-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 144.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventsource_py-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 0c471d19f33928b329752c89c74ade3af5240a7f131dd2a8bdce726e896e9c1f
MD5 3ea37829dc5ffa59c5a2f2a028601113
BLAKE2b-256 2ed33e3a064882279089fd4bd824fbb6a6fce7623f577f93d651494093926cc9

See more details on using hashes here.

Provenance

The following attestation bundles were made for eventsource_py-0.1.3-py3-none-any.whl:

Publisher: release.yml on tyevans/eventsource-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page