Skip to main content

Rillo is a lightweight, type-safe Event Sourcing framework for Python, built on top of Pydantic.

Project description

Rillo Hero

Rillo

Rillo is a lightweight, type-safe Event Sourcing framework for Python, built on top of Pydantic.

Documentation

Installation

Installing the core library using pip:

pip install rillo

Install with NATS JetStream support for repositories and snapshot stores:

pip install 'rillo[nats]'

Installing using uv:

uv add rillo
uv add rillo[nats]

Usage

Defining an Aggregate

Rillo uses Pydantic models for State, Events, and Commands. Creating an Aggregate requires three type parameters and implementing the abstract apply() and execute() methods.

from typing import Annotated, Literal
from pydantic import BaseModel, Field
from rillo import Aggregate

# 1. Define events
class UserSignedUp(BaseModel):
    type: Literal["UserSignedUpV1"] = "UserSignedUpV1"
    username: str

class AccountDeleted(BaseModel):
    type: Literal["AccountDeletedV1"] = "AccountDeletedV1"

# 2. Define commands
class SignUp(BaseModel):
    type: Literal["SignUpV1"] = "SignUpV1"
    username: str

class DeleteAccount(BaseModel):
    type: Literal["DeleteAccountV1"] = "DeleteAccountV1"

# 3. Define aggregate state
class State(BaseModel):
    type: Literal["UserStateV1"] = "UserStateV1"
    username: str
    account_deleted: bool

# Type aliases with discriminators for union types
type Event = Annotated[UserSignedUp | AccountDeleted, Field(discriminator="type")]
type Command = Annotated[SignUp | DeleteAccount, Field(discriminator="type")]

# 4. Create the Aggregate with [State, Event, Command] type parameters
class User(Aggregate[State, Event, Command]):

    # apply() maps each event to a state mutation
    def apply(self, event: Event) -> None:
        match event:
            case UserSignedUp(username=username):
                self._state = State(username=username, account_deleted=False)
            case AccountDeleted():
                if self._state is not None:
                    self._state.account_deleted = True

    # execute() contains business logic and emits events via _emit()
    def execute(self, command: Command) -> None:
        match command:
            case SignUp(username=username):
                if self._state is not None:
                    raise ValueError("User already exists.")
                self._emit(UserSignedUp(username=username))
            case DeleteAccount():
                if self._state is None:
                    raise ValueError("User does not exist.")
                if self._state.account_deleted:
                    raise ValueError("Account is already deleted.")
                self._emit(AccountDeleted())

# Using the aggregate
user = User(id="user-1")
user.execute(SignUp(username="alice"))
user.execute(DeleteAccount())

# Pending events are stored and ready to be committed
events = user.pending_events

Repositories and Snapshot Stores

Rillo provides a Repository base class to save/load events and a SnapshotStore base for capturing aggregate snapshots to optimize load times. Both have built-in support for NATS JetStream (NATSRepository & NATSSnapshotStore).

import asyncio
from nats.aio.client import Client as NATS
from rillo.nats import NATSRepository, NATSSnapshotStore

async def main():
    nc = NATS()
    await nc.connect("nats://localhost:4222")
    js = nc.jetstream()

    # Create a repository instance
    repository = NATSRepository[User](
        js=js,
        stream_name="USERS",
        subject_prefix="users.events"
    )

    user = User("user-123")
    user.execute(SignUp(username="alice"))

    # Persist pending events into NATS JetStream
    await repository.save(user)

    # Rehydrate aggregate state back from the event stream
    loaded_user = User("user-123")
    await repository.load(loaded_user)

    # Snapshot store uses a NATS KV bucket to cache aggregate state
    kv = await js.key_value("users-snapshots")
    snapshot_store = NATSSnapshotStore[User](kv=kv)

    # Save a snapshot of the current aggregate state
    await snapshot_store.save(loaded_user)

    # Load the snapshot before replaying remaining events
    restored_user = User("user-123")
    await snapshot_store.load(restored_user)
    await repository.load(restored_user)  # replays only events after the snapshot

if __name__ == "__main__":
    asyncio.run(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rillo-0.2.4.tar.gz (4.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rillo-0.2.4-py3-none-any.whl (6.5 kB view details)

Uploaded Python 3

File details

Details for the file rillo-0.2.4.tar.gz.

File metadata

  • Download URL: rillo-0.2.4.tar.gz
  • Upload date:
  • Size: 4.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for rillo-0.2.4.tar.gz
Algorithm Hash digest
SHA256 c0dc76ea510a84b221a5855e82d758c40c901a75f6e7895945e9dd1dc9658776
MD5 e7ca557718e488da252c5109824ca8d0
BLAKE2b-256 8ac65ba3a923ad65f2b0a4c593b67a82c38b48268cc0fc6922aeebda434c63ee

See more details on using hashes here.

File details

Details for the file rillo-0.2.4-py3-none-any.whl.

File metadata

  • Download URL: rillo-0.2.4-py3-none-any.whl
  • Upload date:
  • Size: 6.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for rillo-0.2.4-py3-none-any.whl
Algorithm Hash digest
SHA256 8343021c91e511ea22503ee74a39d7c58a65337c264bcfb921bbcac347f9c503
MD5 d1efeb9b4b34c7708145a50eb470d62b
BLAKE2b-256 d21f09c90365683eeedbd60201c164df3924a6095d3a9dde7efa2d7213168c8d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page