Production-ready event sourcing library for Python
Project description
eventsource-py
A production-ready event sourcing library for Python 3.11+.
Features
- Event Store - PostgreSQL, SQLite, and In-Memory backends with optimistic locking
- Domain Events - Immutable event classes with Pydantic validation and versioning
- Event Registry - Thread-safe event type registration for serialization/deserialization
- Aggregate Pattern - Base classes for event-sourced aggregates with state reconstruction
- Repository Pattern - Clean abstractions for loading and saving aggregates
- Projection System - Checkpoint tracking, retry logic, and dead letter queue support
- Event Bus - In-Memory and Redis Streams backends for event distribution
- Transactional Outbox - Reliable event publishing pattern
- Multi-tenancy - Built-in tenant isolation support
- Observability - Optional OpenTelemetry integration
Installation
# Basic installation
pip install eventsource-py
# With PostgreSQL support
pip install eventsource-py[postgresql]
# With SQLite support
pip install eventsource-py[sqlite]
# With Redis support
pip install eventsource-py[redis]
# With OpenTelemetry support
pip install eventsource-py[telemetry]
# All optional dependencies
pip install eventsource-py[all]
Requirements
- Python 3.11+
- pydantic >= 2.0
- sqlalchemy >= 2.0 (for PostgreSQL backend)
Quick Start
1. Define Your Events
from uuid import UUID
from eventsource import DomainEvent, register_event
@register_event
class OrderCreated(DomainEvent):
"""Event emitted when an order is created."""
event_type: str = "OrderCreated"
aggregate_type: str = "Order"
customer_id: UUID
total_amount: float
@register_event
class OrderShipped(DomainEvent):
"""Event emitted when an order is shipped."""
event_type: str = "OrderShipped"
aggregate_type: str = "Order"
tracking_number: str
2. Define Your Aggregate State
from pydantic import BaseModel
class OrderState(BaseModel):
"""State of an Order aggregate."""
order_id: UUID
customer_id: UUID | None = None
total_amount: float = 0.0
status: str = "draft"
tracking_number: str | None = None
3. Create Your Aggregate
from eventsource import AggregateRoot
class OrderAggregate(AggregateRoot[OrderState]):
"""Event-sourced Order aggregate."""
aggregate_type = "Order"
def _get_initial_state(self) -> OrderState:
return OrderState(order_id=self.aggregate_id)
def _apply(self, event: DomainEvent) -> None:
if isinstance(event, OrderCreated):
self._state = OrderState(
order_id=self.aggregate_id,
customer_id=event.customer_id,
total_amount=event.total_amount,
status="created",
)
elif isinstance(event, OrderShipped):
if self._state:
self._state = self._state.model_copy(
update={
"status": "shipped",
"tracking_number": event.tracking_number,
}
)
def create(self, customer_id: UUID, total_amount: float) -> None:
"""Command: Create the order."""
if self.version > 0:
raise ValueError("Order already created")
event = OrderCreated(
aggregate_id=self.aggregate_id,
customer_id=customer_id,
total_amount=total_amount,
aggregate_version=self.get_next_version(),
)
self.apply_event(event)
def ship(self, tracking_number: str) -> None:
"""Command: Ship the order."""
if not self.state or self.state.status != "created":
raise ValueError("Order must be created before shipping")
event = OrderShipped(
aggregate_id=self.aggregate_id,
tracking_number=tracking_number,
aggregate_version=self.get_next_version(),
)
self.apply_event(event)
4. Use the Repository
import asyncio
from uuid import uuid4
from eventsource import InMemoryEventStore, AggregateRepository
async def main():
# Set up event store and repository
event_store = InMemoryEventStore()
repo = AggregateRepository(
event_store=event_store,
aggregate_factory=OrderAggregate,
aggregate_type="Order",
)
# Create and save a new order
order_id = uuid4()
order = repo.create_new(order_id)
order.create(customer_id=uuid4(), total_amount=99.99)
await repo.save(order)
# Load the order and ship it
loaded_order = await repo.load(order_id)
loaded_order.ship(tracking_number="TRACK-123")
await repo.save(loaded_order)
# Verify the state
final_order = await repo.load(order_id)
print(f"Order status: {final_order.state.status}")
print(f"Tracking: {final_order.state.tracking_number}")
asyncio.run(main())
Architecture
+-------------------+ +-------------------+ +-------------------+
| | | | | |
| Commands |---->| Aggregates |---->| Event Store |
| | | | | |
+-------------------+ +-------------------+ +--------+----------+
|
v
+-------------------+ +-------------------+ +-------------------+
| | | | | |
| Read Models |<----| Projections |<----| Event Bus |
| | | | | |
+-------------------+ +-------------------+ +-------------------+
Core Concepts
- Events - Immutable facts that capture state changes. Events are never deleted or modified.
- Event Store - Persists events with ordering and optimistic locking guarantees.
- Aggregates - Consistency boundaries that reconstruct state from event streams.
- Repository - Abstracts loading/saving aggregates from/to the event store.
- Projections - Build read-optimized views from event streams.
- Event Bus - Distributes events to subscribers for async processing.
Documentation
For comprehensive documentation, see the Documentation Index.
Getting Started
- Getting Started Guide - Installation and first steps
- Architecture Overview - System design and concepts
- FAQ - Frequently asked questions
Guides
- Multi-tenant Setup - Tenant isolation patterns and configuration
- Error Handling - Exception handling best practices
- Authentication - Security integration patterns
- Production Deployment - Production readiness checklist
API Reference
- Events API - DomainEvent and EventRegistry
- Event Stores - EventStore interface and implementations
- Aggregates - AggregateRoot and Repository
- Projections - Projection system
- Event Bus - Event distribution
Examples
- Basic Usage - Simple order example
- Multi-tenant Setup - Multi-tenancy configuration
- Projections - Building read models
- Testing Patterns - Unit and integration testing
Architecture Decision Records
- ADR Index - All architectural decisions
- ADR-0001: Async-First Design
- ADR-0002: Pydantic Event Models
- ADR-0003: Optimistic Locking
Developer Documentation
- Code Structure - Project organization
- Testing Guide - Testing strategies and patterns
Development
# Install in development mode
pip install -e ".[dev]"
# Run tests
pytest
# Run tests with coverage
pytest --cov=eventsource --cov-report=html
# Run type checking
mypy src/eventsource
# Run linting
ruff check src/eventsource
# Format code
ruff format src/eventsource
License
This project is licensed under the MIT License - see the LICENSE file for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file eventsource_py-0.1.0.tar.gz.
File metadata
- Download URL: eventsource_py-0.1.0.tar.gz
- Upload date:
- Size: 437.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b3b1341de89f3ff14cc62db37519c6500395daea379e6735c047c47df57494bf
|
|
| MD5 |
bf5791ae09a124e4280f79eeb3b99835
|
|
| BLAKE2b-256 |
d45cf8efa2cc783c9687a96c512f3625002d939e0b6bf92f5436ee4a39a0d3b0
|
Provenance
The following attestation bundles were made for eventsource_py-0.1.0.tar.gz:
Publisher:
release.yml on tyevans/eventsource-py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
eventsource_py-0.1.0.tar.gz -
Subject digest:
b3b1341de89f3ff14cc62db37519c6500395daea379e6735c047c47df57494bf - Sigstore transparency entry: 748149431
- Sigstore integration time:
-
Permalink:
tyevans/eventsource-py@a752c7bd2b7e01ca431dd39ae1dff38110d5e348 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/tyevans
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@a752c7bd2b7e01ca431dd39ae1dff38110d5e348 -
Trigger Event:
push
-
Statement type:
File details
Details for the file eventsource_py-0.1.0-py3-none-any.whl.
File metadata
- Download URL: eventsource_py-0.1.0-py3-none-any.whl
- Upload date:
- Size: 145.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
30b979d4cc5beafdf86151d7ef567a24b0326b97e25861fb06bcec81cb22e62e
|
|
| MD5 |
c4f2a54fc55ba05405f2f6db08636389
|
|
| BLAKE2b-256 |
801264034ec3ed998769c4cd0f579e8eaf8ec563fcb2b68a61cd620523488ca8
|
Provenance
The following attestation bundles were made for eventsource_py-0.1.0-py3-none-any.whl:
Publisher:
release.yml on tyevans/eventsource-py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
eventsource_py-0.1.0-py3-none-any.whl -
Subject digest:
30b979d4cc5beafdf86151d7ef567a24b0326b97e25861fb06bcec81cb22e62e - Sigstore transparency entry: 748149432
- Sigstore integration time:
-
Permalink:
tyevans/eventsource-py@a752c7bd2b7e01ca431dd39ae1dff38110d5e348 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/tyevans
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@a752c7bd2b7e01ca431dd39ae1dff38110d5e348 -
Trigger Event:
push
-
Statement type: