Skip to main content

A lightweight library for creating event driven systems using domain driven design.

Project description

py-bondi

A library for creating event driven systems using domain driven design.

Installation

pip install pybondi

Introduction

This library provides a framework for modeling complex domains using an event driven architecture and the pub/sub pattern. It provides:

  • An in memory message bus for handling events and commands.
  • A simple in memory publisher for publishing messages to external systems.
  • A base aggregate root that can collect domain events and a base aggregate class.
  • A base repository class for storing and retrieving aggregates.
  • A session class for managing transactions and unit of work.
  • Default events for handling aggregate's state when it is added to a session, saved, or rolled back.

Write domain logic using commands and events

You can define commands and events as classes that inherit from the Command and Event classes. A command is a request to do something, and an event is a notification that something has happened.

You can handle commands and events by defining "handlers" that are functions that take a command or event as an argument. You can pass dependencies to handlers using dependency injection just like in FastAPI.

from pybondi.messagebus import Event, Command
from pybondi.messagebus import Messagebus, Depends

class MakeSomethingHappen(Command):
    def __init__(self, message: str):
        self.message = message

class SomethingHappened(Event):
    def __init__(self, message: str):
        self.message = message

class AnotherThingHappened(Event):
    def __init__(self, message: str):
        self.message = message

class ABCRepositoryDependency: ### This simulates to be a repository dependency that we want to inject into our handlers.
    ### This can be a real database client, file system client, or any other service that we want to inject.
    data: list[str]

messagebus = Messagebus()

def get_repository_deps() -> ABCRepositoryDependency:
    raise NotImplementedError("Subclasses must implement the execute method.")

@messagebus.on(SomethingHappened, AnotherThingHappened)
def handle_something(event: SomethingHappened | AnotherThingHappened, repository: ABCRepositoryDependency = Depends(get_repository_deps)):
    repository.data.append(event.message)

@messagebus.on(AnotherThingHappened)
def handle_more(event: AnotherThingHappened, repository: ABCRepositoryDependency = Depends(get_repository_deps)):
    repository.data.append(event.message)

@messagebus.register(MakeSomethingHappen)
def handle_make_something(command: MakeSomethingHappen, repository: ABCRepositoryDependency = Depends(get_repository_deps)):
    repository.data.append(command.message)

class RepositoryDependency(ABCRepositoryDependency):
    def __init__(self):
        self.data = list()

repository = RepositoryDependency()

def actual_repository(): 
    return repository    
                         
messagebus.dependency_overrides[get_repository_deps] = actual_repository    ### We override the dependency with our mock API.
                                                                            ### Just like in FastAPI. This is not necessary but
                                                                            ### super useful.
messagebus.handle(MakeSomethingHappen("Hello"))
messagebus.handle(SomethingHappened("World"))
messagebus.handle(AnotherThingHappened("!"))
print(repository.data) # ["Hello", "World", "!", "!"]

Publish messages to external systems (Pubsub)

You can publish messages to external systems adding handlers to a publisher class. This is an in memory implementation of the pub/sub pattern, but you can easily replace it with a real pub/sub system like Redis or RabbitMQ.

from pybondi.publisher import Message, Publisher, Depends

class ExternalAPI: ### This is a mock class for an external API that we want to publish messages to.
    ### This can be a real API client, or any other external system that we want to inject.
    def __init__(self):
        self.data = []

def abc_dependency() -> ExternalAPI: ...

publisher = Publisher()

@publisher.subscribe("topic-1")
def subscriber1(message: Message, api: ExternalAPI = Depends(abc_dependency)):
    api.data.append(message.payload)

@publisher.subscribe("topic-1", "topic-2")
def subscriber2(message: Message, api: ExternalAPI = Depends(abc_dependency)):
    api.data.append(message.payload)

api = ExternalAPI()
publisher.dependency_overrides[abc_dependency] = lambda: api ### We override the dependency with our mock API.
                                                             ### Just like in FastAPI. This is not necessary but
                                                             ### super useful.
                                                            

def test_publisher():
    publisher.publish("topic-1", Message("Hi"))
    publisher.rollback()
    publisher.publish("topic-2", Message("Hello"))
    publisher.publish("topic-2", Message("World"))
    publisher.publish("topic-1", Message("!"))
    publisher.commit()

    print(api.data) ### ["Hello", "World", "!", "!"]

Define aggregates and repositories

You can define aggregates as classes that inherit from the Aggregate class. An aggregate is a collection of domain objects that are treated as a single unit. Each aggregate should have a root entity that is responsible for maintaining the consistency of the aggregate using events.

You can also define repositories as classes that inherit from the Repository class. A repository is an object that mediates between the domain objects and the database or other data store.

from pybondi.aggregate import Aggregate, Root
from pybondi.repository import Repository

class User(Aggregate):
    def __init__(self, id: int, name: str):
        self.root = Root(id)
        self.name = name

    def change_name(self, name: str):
        self.root.publish(UsernameChanged(name))


class Users(Repository):
    def __init__(self):
        super().__init__()

    def store(self, user: User):
        database.save(user) ### Persists the user to the database.

    def restore(self, user: User):
        return database.load(user) ### Loads the user from the database to it's original state.

Manage transactions and unit of work

Finally, you can wrap your domain logic in a session object that manages transactions and unit of work. A session is an object that represents a single transaction and will coordinate the messagebus, publisher, and repositories so if something goes wrong in some event, all changes can be rolled back.

from pybondi import Command
from pybondi.session import Session


@messagebus.on(Added, RolledBack)
def bring_user_up_to_date(event: Added[User] | RolledBack[User]):
    event.user.age = fetch_user_age(event.user) ### Fetch the user's age from somewhere
    event.user.root.publish(AgeChanged(event.user.age)) ### You can publish events from events handlers
                                                        ### This way you can create a chain of events that
                                                        ### Will be executed in order.

@messagebus.on(Commited)
def save_user(event: Commited[User]):
    save_user_age(event.user) ### Send the user's age to somewhere this is just an example.
    
    
@dataclass
class BumpAge(Command):
    user: User

    def execute(self):
        self.user.age += 1 

with Session(messagebus, repository, publisher) as session:
    session.add(user)
    session.execute(BumpAge(user))

### user is now 1 year older

with Session(messagebus) as session:
    session.add(user)
    session.execute(BumpAge(user))
    raise Exception("Something went wrong")

### user don't get older because the session was rolled back

License

This project is licensed under the terms of the MIT license.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pybondi-1.2.1.tar.gz (12.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pybondi-1.2.1-py3-none-any.whl (13.7 kB view details)

Uploaded Python 3

File details

Details for the file pybondi-1.2.1.tar.gz.

File metadata

  • Download URL: pybondi-1.2.1.tar.gz
  • Upload date:
  • Size: 12.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.5 CPython/3.12.1 Linux/6.5.0-1025-azure

File hashes

Hashes for pybondi-1.2.1.tar.gz
Algorithm Hash digest
SHA256 51b1726cd5cce19958f705afafb07324bb28f3ad7aeb077b1064c2f523fefa8e
MD5 8672e57c4cc56a4c9c4b7d975f38512b
BLAKE2b-256 a7d6ce609a518ee46f4ec282e8fd721a52ccf57b624bd2acb4164eac47cd3303

See more details on using hashes here.

File details

Details for the file pybondi-1.2.1-py3-none-any.whl.

File metadata

  • Download URL: pybondi-1.2.1-py3-none-any.whl
  • Upload date:
  • Size: 13.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.5 CPython/3.12.1 Linux/6.5.0-1025-azure

File hashes

Hashes for pybondi-1.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 1a1cdcf5c1f476bf15e45f823e674a54d942aa4a3546a3a48816140661abe11a
MD5 04f834385743adf83a22c094082ac94b
BLAKE2b-256 9ece27f0aa04ded3bc5c0822d845971573cb16926059fa4d4fc4bc9698760f6c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page