Rillo is a lightweight, type-safe Event Sourcing framework for Python, built on top of Pydantic.
Project description
Rillo
Rillo
Rillo is a lightweight, type-safe Event Sourcing framework for Python, built on top of Pydantic.
Installation
Installing the core library using pip:
pip install rillo
Install with NATS JetStream support for repositories and snapshot stores:
pip install 'rillo[nats]'
Installing using uv:
uv add rillo
uv add rillo[nats]
Usage
Defining an Aggregate
Rillo uses Pydantic models for State, Events, and Commands. Creating an Aggregate requires three type parameters and implementing the abstract apply() and execute() methods.
from typing import Annotated, Literal
from pydantic import BaseModel, Field
from rillo import Aggregate
# 1. Define events
class UserSignedUp(BaseModel):
type: Literal["UserSignedUpV1"] = "UserSignedUpV1"
username: str
class AccountDeleted(BaseModel):
type: Literal["AccountDeletedV1"] = "AccountDeletedV1"
# 2. Define commands
class SignUp(BaseModel):
type: Literal["SignUpV1"] = "SignUpV1"
username: str
class DeleteAccount(BaseModel):
type: Literal["DeleteAccountV1"] = "DeleteAccountV1"
# 3. Define aggregate state
class State(BaseModel):
type: Literal["UserStateV1"] = "UserStateV1"
username: str
account_deleted: bool
# Type aliases with discriminators for union types
type Event = Annotated[UserSignedUp | AccountDeleted, Field(discriminator="type")]
type Command = Annotated[SignUp | DeleteAccount, Field(discriminator="type")]
# 4. Create the Aggregate with [State, Event, Command] type parameters
class User(Aggregate[State, Event, Command]):
# apply() maps each event to a state mutation
def apply(self, event: Event) -> None:
match event:
case UserSignedUp(username=username):
self._state = State(username=username, account_deleted=False)
case AccountDeleted():
if self._state is not None:
self._state.account_deleted = True
# execute() contains business logic and emits events via _emit()
def execute(self, command: Command) -> None:
match command:
case SignUp(username=username):
if self._state is not None:
raise ValueError("User already exists.")
self._emit(UserSignedUp(username=username))
case DeleteAccount():
if self._state is None:
raise ValueError("User does not exist.")
if self._state.account_deleted:
raise ValueError("Account is already deleted.")
self._emit(AccountDeleted())
# Using the aggregate
user = User(id="user-1")
user.execute(SignUp(username="alice"))
user.execute(DeleteAccount())
# Pending events are stored and ready to be committed
events = user.pending_events
Repositories and Snapshot Stores
Rillo provides a Repository base class to save/load events and a SnapshotStore base for capturing aggregate snapshots to optimize load times. Both have built-in support for NATS JetStream (NATSRepository & NATSSnapshotStore).
import asyncio
from nats.aio.client import Client as NATS
from rillo.nats import NATSRepository, NATSSnapshotStore
async def main():
nc = NATS()
await nc.connect("nats://localhost:4222")
js = nc.jetstream()
# Create a repository instance
repository = NATSRepository[User](
js=js,
stream_name="USERS",
subject_prefix="users.events"
)
user = User("user-123")
user.execute(SignUp(username="alice"))
# Persist pending events into NATS JetStream
await repository.save(user)
# Rehydrate aggregate state back from the event stream
loaded_user = User("user-123")
await repository.load(loaded_user)
# Snapshot store uses a NATS KV bucket to cache aggregate state
kv = await js.key_value("users-snapshots")
snapshot_store = NATSSnapshotStore[User](kv=kv)
# Save a snapshot of the current aggregate state
await snapshot_store.save(loaded_user)
# Load the snapshot before replaying remaining events
restored_user = User("user-123")
await snapshot_store.load(restored_user)
await repository.load(restored_user) # replays only events after the snapshot
if __name__ == "__main__":
asyncio.run(main())
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rillo-0.2.2.tar.gz.
File metadata
- Download URL: rillo-0.2.2.tar.gz
- Upload date:
- Size: 4.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7fc96adcad1435e77b9b3f9ec577c8f1d9f2a9034fc0f86e21e7d8cae8bb6b0c
|
|
| MD5 |
159556f6b61d4e8461885031b100ed4c
|
|
| BLAKE2b-256 |
e2fcd977cce967b6a70e8d988c6818bc70c1d5b535f70354922fdecdf1ca0f68
|
File details
Details for the file rillo-0.2.2-py3-none-any.whl.
File metadata
- Download URL: rillo-0.2.2-py3-none-any.whl
- Upload date:
- Size: 6.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d5c644a9103ed1f9798c3a0c90ab30ccf9cbd8d045b602b888980190cc1d072c
|
|
| MD5 |
b3f9ca019323fe20376bb9b7a6f87702
|
|
| BLAKE2b-256 |
4d2f19bb496852fee4a13ddf1d1e9219c262210d4172de89b0dc059196ed6259
|