Skip to main content

Rillo is a lightweight, type-safe Event Sourcing framework for Python, built on top of Pydantic.

Project description

Rillo

Rillo is a lightweight, type-safe Event Sourcing framework for Python, built on top of Pydantic.

Documentation

Installation

Installing the core library using pip:

pip install rillo

Install with NATS JetStream support for repositories and snapshot stores:

pip install 'rillo[nats]'

Installing using uv:

uv add rillo
uv add rillo[nats]

Usage

Defining an Aggregate

Rillo uses Pydantic models for State, Events, and Commands. Creating an Aggregate requires three type parameters and implementing the abstract apply() and execute() methods.

from typing import Annotated, Literal
from pydantic import BaseModel, Field
from rillo import Aggregate

# 1. Define events
class UserSignedUp(BaseModel):
    type: Literal["UserSignedUpV1"] = "UserSignedUpV1"
    username: str

class AccountDeleted(BaseModel):
    type: Literal["AccountDeletedV1"] = "AccountDeletedV1"

# 2. Define commands
class SignUp(BaseModel):
    type: Literal["SignUpV1"] = "SignUpV1"
    username: str

class DeleteAccount(BaseModel):
    type: Literal["DeleteAccountV1"] = "DeleteAccountV1"

# 3. Define aggregate state
class State(BaseModel):
    type: Literal["UserStateV1"] = "UserStateV1"
    username: str
    account_deleted: bool

# Type aliases with discriminators for union types
type Event = Annotated[UserSignedUp | AccountDeleted, Field(discriminator="type")]
type Command = Annotated[SignUp | DeleteAccount, Field(discriminator="type")]

# 4. Create the Aggregate with [State, Event, Command] type parameters
class User(Aggregate[State, Event, Command]):

    # apply() maps each event to a state mutation
    def apply(self, event: Event) -> None:
        match event:
            case UserSignedUp(username=username):
                self._state = State(username=username, account_deleted=False)
            case AccountDeleted():
                if self._state is not None:
                    self._state.account_deleted = True

    # execute() contains business logic and emits events via _emit()
    def execute(self, command: Command) -> None:
        match command:
            case SignUp(username=username):
                if self._state is not None:
                    raise ValueError("User already exists.")
                self._emit(UserSignedUp(username=username))
            case DeleteAccount():
                if self._state is None:
                    raise ValueError("User does not exist.")
                if self._state.account_deleted:
                    raise ValueError("Account is already deleted.")
                self._emit(AccountDeleted())

# Using the aggregate
user = User(id="user-1")
user.execute(SignUp(username="alice"))
user.execute(DeleteAccount())

# Pending events are stored and ready to be committed
events = user.pending_events

Repositories and Snapshot Stores

Rillo provides a Repository base class to save/load events and a SnapshotStore base for capturing aggregate snapshots to optimize load times. Both have built-in support for NATS JetStream (NATSRepository & NATSSnapshotStore).

import asyncio
from nats.aio.client import Client as NATS
from rillo.nats import NATSRepository, NATSSnapshotStore

async def main():
    nc = NATS()
    await nc.connect("nats://localhost:4222")
    js = nc.jetstream()

    # Create a repository instance
    repository = NATSRepository[User](
        js=js,
        stream_name="USERS",
        subject_prefix="users.events"
    )

    user = User("user-123")
    user.execute(SignUp(username="alice"))

    # Persist pending events into NATS JetStream
    await repository.save(user)

    # Rehydrate aggregate state back from the event stream
    loaded_user = User("user-123")
    await repository.load(loaded_user)

    # Snapshot store uses a NATS KV bucket to cache aggregate state
    kv = await js.key_value("users-snapshots")
    snapshot_store = NATSSnapshotStore[User](kv=kv)

    # Save a snapshot of the current aggregate state
    await snapshot_store.save(loaded_user)

    # Load the snapshot before replaying remaining events
    restored_user = User("user-123")
    await snapshot_store.load(restored_user)
    await repository.load(restored_user)  # replays only events after the snapshot

if __name__ == "__main__":
    asyncio.run(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rillo-0.2.0.tar.gz (4.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rillo-0.2.0-py3-none-any.whl (6.5 kB view details)

Uploaded Python 3

File details

Details for the file rillo-0.2.0.tar.gz.

File metadata

  • Download URL: rillo-0.2.0.tar.gz
  • Upload date:
  • Size: 4.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for rillo-0.2.0.tar.gz
Algorithm Hash digest
SHA256 42ffacdbda30a36118dd209b0a63f649eff6e351ed99b3ba869e66ea69a70746
MD5 1506a5d31f786ad49829d400ce7940e0
BLAKE2b-256 a1e58fc2af6514fe489d742baf159ce8f957b45056a5d6488e17acf0c92957ad

See more details on using hashes here.

File details

Details for the file rillo-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: rillo-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 6.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for rillo-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 86f745ead63e6263d7c2a95cbbbec1e214071e8ed617a4a3dd3e1618ca108281
MD5 f24863b2f26aa36401603f71a7719767
BLAKE2b-256 d9dc4471f11eb58eb4322ed8634c08e673e24f0002e4afca5afb518be7422370

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page