Skip to main content

Production-ready event sourcing library for Python

Project description

eventsource-py

Python Version License: MIT PyPI version CI Documentation Code style: ruff

A production-ready event sourcing library for Python 3.11+.

Features

  • Event Store - PostgreSQL, SQLite, and In-Memory backends with optimistic locking
  • Domain Events - Immutable event classes with Pydantic validation and versioning
  • Event Registry - Thread-safe event type registration for serialization/deserialization
  • Aggregate Pattern - Base classes for event-sourced aggregates with state reconstruction
  • Repository Pattern - Clean abstractions for loading and saving aggregates
  • Aggregate Snapshotting - Optimize load performance for long-lived aggregates
  • Projection System - Checkpoint tracking, retry logic, and dead letter queue support
  • Event Bus - In-Memory, Redis Streams, RabbitMQ, and Kafka backends for event distribution
  • Transactional Outbox - Reliable event publishing pattern
  • Multi-tenancy - Built-in tenant isolation support
  • Observability - Optional OpenTelemetry integration

Installation

Basic Installation

pip install eventsource-py

With Optional Dependencies

eventsource uses optional dependencies to keep the core package lightweight. Install only what you need:

# PostgreSQL support - production event store with asyncpg
pip install eventsource-py[postgresql]

# SQLite support - lightweight deployments
pip install eventsource-py[sqlite]

# Redis support - distributed event bus with Redis Streams
pip install eventsource-py[redis]

# RabbitMQ support - distributed event bus with RabbitMQ
pip install eventsource-py[rabbitmq]

# Kafka support - distributed event bus with Apache Kafka
pip install eventsource-py[kafka]

# Telemetry support - OpenTelemetry tracing integration
pip install eventsource-py[telemetry]

# All optional dependencies
pip install eventsource-py[all]

# Multiple extras
pip install eventsource-py[postgresql,redis,telemetry]
Extra Enables Dependencies
postgresql PostgreSQLEventStore, checkpoint/outbox/DLQ repositories asyncpg
sqlite SQLiteEventStore for lightweight deployments aiosqlite
redis RedisEventBus with consumer groups and DLQ redis
rabbitmq RabbitMQEventBus with exchange routing and DLQ aio-pika
kafka KafkaEventBus with consumer groups, DLQ, and tracing aiokafka
telemetry Distributed tracing for event operations opentelemetry-api, opentelemetry-sdk
all All of the above All of the above
dev Development tools pytest, mypy, ruff, pre-commit

For detailed installation instructions, troubleshooting, and version compatibility, see the Installation Guide.

Requirements

  • Python 3.11+
  • pydantic >= 2.0
  • sqlalchemy >= 2.0

Quick Start

1. Define Your Events

from uuid import UUID
from eventsource import DomainEvent, register_event

@register_event
class OrderCreated(DomainEvent):
    """Event emitted when an order is created."""
    event_type: str = "OrderCreated"
    aggregate_type: str = "Order"
    customer_id: UUID
    total_amount: float

@register_event
class OrderShipped(DomainEvent):
    """Event emitted when an order is shipped."""
    event_type: str = "OrderShipped"
    aggregate_type: str = "Order"
    tracking_number: str

2. Define Your Aggregate State

from pydantic import BaseModel

class OrderState(BaseModel):
    """State of an Order aggregate."""
    order_id: UUID
    customer_id: UUID | None = None
    total_amount: float = 0.0
    status: str = "draft"
    tracking_number: str | None = None

3. Create Your Aggregate

from eventsource import AggregateRoot

class OrderAggregate(AggregateRoot[OrderState]):
    """Event-sourced Order aggregate."""
    aggregate_type = "Order"

    def _get_initial_state(self) -> OrderState:
        return OrderState(order_id=self.aggregate_id)

    def _apply(self, event: DomainEvent) -> None:
        if isinstance(event, OrderCreated):
            self._state = OrderState(
                order_id=self.aggregate_id,
                customer_id=event.customer_id,
                total_amount=event.total_amount,
                status="created",
            )
        elif isinstance(event, OrderShipped):
            if self._state:
                self._state = self._state.model_copy(
                    update={
                        "status": "shipped",
                        "tracking_number": event.tracking_number,
                    }
                )

    def create(self, customer_id: UUID, total_amount: float) -> None:
        """Command: Create the order."""
        if self.version > 0:
            raise ValueError("Order already created")

        event = OrderCreated(
            aggregate_id=self.aggregate_id,
            customer_id=customer_id,
            total_amount=total_amount,
            aggregate_version=self.get_next_version(),
        )
        self.apply_event(event)

    def ship(self, tracking_number: str) -> None:
        """Command: Ship the order."""
        if not self.state or self.state.status != "created":
            raise ValueError("Order must be created before shipping")

        event = OrderShipped(
            aggregate_id=self.aggregate_id,
            tracking_number=tracking_number,
            aggregate_version=self.get_next_version(),
        )
        self.apply_event(event)

4. Use the Repository

import asyncio
from uuid import uuid4
from eventsource import InMemoryEventStore, AggregateRepository

async def main():
    # Set up event store and repository
    event_store = InMemoryEventStore()
    repo = AggregateRepository(
        event_store=event_store,
        aggregate_factory=OrderAggregate,
        aggregate_type="Order",
    )

    # Create and save a new order
    order_id = uuid4()
    order = repo.create_new(order_id)
    order.create(customer_id=uuid4(), total_amount=99.99)
    await repo.save(order)

    # Load the order and ship it
    loaded_order = await repo.load(order_id)
    loaded_order.ship(tracking_number="TRACK-123")
    await repo.save(loaded_order)

    # Verify the state
    final_order = await repo.load(order_id)
    print(f"Order status: {final_order.state.status}")
    print(f"Tracking: {final_order.state.tracking_number}")

asyncio.run(main())

Architecture

+-------------------+     +-------------------+     +-------------------+
|                   |     |                   |     |                   |
|    Commands       |---->|    Aggregates     |---->|   Event Store     |
|                   |     |                   |     |                   |
+-------------------+     +-------------------+     +--------+----------+
                                                             |
                                                             v
+-------------------+     +-------------------+     +-------------------+
|                   |     |                   |     |                   |
|   Read Models     |<----|   Projections     |<----|   Event Bus       |
|                   |     |                   |     |                   |
+-------------------+     +-------------------+     +-------------------+

Core Concepts

  • Events - Immutable facts that capture state changes. Events are never deleted or modified.
  • Event Store - Persists events with ordering and optimistic locking guarantees.
  • Aggregates - Consistency boundaries that reconstruct state from event streams.
  • Repository - Abstracts loading/saving aggregates from/to the event store.
  • Projections - Build read-optimized views from event streams.
  • Event Bus - Distributes events to subscribers for async processing.

Documentation

📚 Full Documentation

Development

# Install in development mode
pip install -e ".[dev]"

# Run tests
pytest

# Run tests with coverage
pytest --cov=eventsource --cov-report=html

# Run type checking
mypy src/eventsource

# Run linting
ruff check src/eventsource

# Format code
ruff format src/eventsource

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eventsource_py-0.2.0.tar.gz (766.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eventsource_py-0.2.0-py3-none-any.whl (242.1 kB view details)

Uploaded Python 3

File details

Details for the file eventsource_py-0.2.0.tar.gz.

File metadata

  • Download URL: eventsource_py-0.2.0.tar.gz
  • Upload date:
  • Size: 766.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventsource_py-0.2.0.tar.gz
Algorithm Hash digest
SHA256 a8f9d67a8f55b5ea17a03064e8572fb20f56335263979d1d6303df008519a6d2
MD5 480b1b14e5bff3426f1394ecafbced9a
BLAKE2b-256 b45b419daf0f07a0bc2d30816d3201cccaf2c6dad11875da41ed55100ff7643b

See more details on using hashes here.

Provenance

The following attestation bundles were made for eventsource_py-0.2.0.tar.gz:

Publisher: release.yml on tyevans/eventsource-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file eventsource_py-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: eventsource_py-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 242.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventsource_py-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 257e21282f9b11e86b36c6b98c47013387c59979b799619cd15eec5fed0750c7
MD5 a0ff7f73e1800080c650c6ecd2c19d4b
BLAKE2b-256 eb891bb460603c4b2b4051c5292909d0dde51e9f7c940eb764a07b609a1422fc

See more details on using hashes here.

Provenance

The following attestation bundles were made for eventsource_py-0.2.0-py3-none-any.whl:

Publisher: release.yml on tyevans/eventsource-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page