Skip to main content

Rillo is a lightweight, type-safe Event Sourcing framework for Python, built on top of Pydantic.

Project description

Rillo Hero

Rillo

Rillo is a lightweight, type-safe Event Sourcing framework for Python, built on top of Pydantic.

Documentation

Installation

Installing the core library using pip:

pip install rillo

Install with NATS JetStream support for repositories and snapshot stores:

pip install 'rillo[nats]'

Installing using uv:

uv add rillo
uv add rillo[nats]

Usage

Defining an Aggregate

Rillo uses Pydantic models for State, Events, and Commands. Creating an Aggregate requires three type parameters and implementing the abstract apply() and execute() methods.

from typing import Annotated, Literal
from pydantic import BaseModel, Field
from rillo import Aggregate

# 1. Define events
class UserSignedUp(BaseModel):
    type: Literal["UserSignedUpV1"] = "UserSignedUpV1"
    username: str

class AccountDeleted(BaseModel):
    type: Literal["AccountDeletedV1"] = "AccountDeletedV1"

# 2. Define commands
class SignUp(BaseModel):
    type: Literal["SignUpV1"] = "SignUpV1"
    username: str

class DeleteAccount(BaseModel):
    type: Literal["DeleteAccountV1"] = "DeleteAccountV1"

# 3. Define aggregate state
class State(BaseModel):
    type: Literal["UserStateV1"] = "UserStateV1"
    username: str
    account_deleted: bool

# Type aliases with discriminators for union types
type Event = Annotated[UserSignedUp | AccountDeleted, Field(discriminator="type")]
type Command = Annotated[SignUp | DeleteAccount, Field(discriminator="type")]

# 4. Create the Aggregate with [State, Event, Command] type parameters
class User(Aggregate[State, Event, Command]):

    # apply() maps each event to a state mutation
    def apply(self, event: Event) -> None:
        match event:
            case UserSignedUp(username=username):
                self._state = State(username=username, account_deleted=False)
            case AccountDeleted():
                if self._state is not None:
                    self._state.account_deleted = True

    # execute() contains business logic and emits events via _emit()
    def execute(self, command: Command) -> None:
        match command:
            case SignUp(username=username):
                if self._state is not None:
                    raise ValueError("User already exists.")
                self._emit(UserSignedUp(username=username))
            case DeleteAccount():
                if self._state is None:
                    raise ValueError("User does not exist.")
                if self._state.account_deleted:
                    raise ValueError("Account is already deleted.")
                self._emit(AccountDeleted())

# Using the aggregate
user = User(id="user-1")
user.execute(SignUp(username="alice"))
user.execute(DeleteAccount())

# Pending events are stored and ready to be committed
events = user.pending_events

Repositories and Snapshot Stores

Rillo provides a Repository base class to save/load events and a SnapshotStore base for capturing aggregate snapshots to optimize load times. Both have built-in support for NATS JetStream (NATSRepository & NATSSnapshotStore).

import asyncio
from nats.aio.client import Client as NATS
from rillo.nats import NATSRepository, NATSSnapshotStore

async def main():
    nc = NATS()
    await nc.connect("nats://localhost:4222")
    js = nc.jetstream()

    # Create a repository instance
    repository = NATSRepository[User](
        js=js,
        stream_name="USERS",
        subject_prefix="users.events"
    )

    user = User("user-123")
    user.execute(SignUp(username="alice"))

    # Persist pending events into NATS JetStream
    await repository.save(user)

    # Rehydrate aggregate state back from the event stream
    loaded_user = User("user-123")
    await repository.load(loaded_user)

    # Snapshot store uses a NATS KV bucket to cache aggregate state
    kv = await js.key_value("users-snapshots")
    snapshot_store = NATSSnapshotStore[User](kv=kv)

    # Save a snapshot of the current aggregate state
    await snapshot_store.save(loaded_user)

    # Load the snapshot before replaying remaining events
    restored_user = User("user-123")
    await snapshot_store.load(restored_user)
    await repository.load(restored_user)  # replays only events after the snapshot

if __name__ == "__main__":
    asyncio.run(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rillo-0.2.3.tar.gz (4.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rillo-0.2.3-py3-none-any.whl (6.5 kB view details)

Uploaded Python 3

File details

Details for the file rillo-0.2.3.tar.gz.

File metadata

  • Download URL: rillo-0.2.3.tar.gz
  • Upload date:
  • Size: 4.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for rillo-0.2.3.tar.gz
Algorithm Hash digest
SHA256 995f68760c02f13a1d810e99c3d8ceb2fea00195d784e4ef49ba0978ffab66f8
MD5 f358240cdfc998503db3b6e95ee96703
BLAKE2b-256 aa6256f5b8082c5b8c8c105703b5f0c9664a64d7256c1c5b8e347a58d7d33a2e

See more details on using hashes here.

File details

Details for the file rillo-0.2.3-py3-none-any.whl.

File metadata

  • Download URL: rillo-0.2.3-py3-none-any.whl
  • Upload date:
  • Size: 6.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.7 {"installer":{"name":"uv","version":"0.11.7","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for rillo-0.2.3-py3-none-any.whl
Algorithm Hash digest
SHA256 fa5d3934e18a0b4331bfbf5912569498341e44d99d2a7ab03040ae2898534b72
MD5 30023f0eded250b401095332c10a2122
BLAKE2b-256 7620cd796ac4eb742b41bcafc07c121e2d964e7888aaa1d8c1dd01895f0f9d42

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page