A lightweight library for creating event driven systems using domain driven design.
Project description
py-bondi
A library for creating event driven systems using domain driven design.
Installation
pip install pybondi
Introduction
This library provides a framework for modeling complex domains using an event driven architecture and the pub/sub pattern. It provides:
- An in memory message bus for handling events and commands.
- A simple in memory publisher for publishing messages to external systems.
- A base aggregate root that can collect domain events and a base aggregate class.
- A base repository class for storing and retrieving aggregates.
- A session class for managing transactions and unit of work.
- Default events for handling aggregate's state when it is added to a session, saved, or rolled back.
Write domain logic using commands and events
You can define commands and events as classes that inherit from the Command and Event classes. A command is a request to do something, and an event is a notification that something has happened.
You can handle commands and events by defining "handlers" that are functions that take a command or event as an argument. You can pass dependencies to handlers using dependency injection just like in FastAPI.
from pybondi.messagebus import Event, Command
from pybondi.messagebus import Messagebus, Depends
class MakeSomethingHappen(Command):
def __init__(self, message: str):
self.message = message
class SomethingHappened(Event):
def __init__(self, message: str):
self.message = message
class AnotherThingHappened(Event):
def __init__(self, message: str):
self.message = message
class ABCRepositoryDependency: ### This simulates to be a repository dependency that we want to inject into our handlers.
### This can be a real database client, file system client, or any other service that we want to inject.
data: list[str]
messagebus = Messagebus()
def get_repository_deps() -> ABCRepositoryDependency:
raise NotImplementedError("Subclasses must implement the execute method.")
@messagebus.on(SomethingHappened, AnotherThingHappened)
def handle_something(event: SomethingHappened | AnotherThingHappened, repository: ABCRepositoryDependency = Depends(get_repository_deps)):
repository.data.append(event.message)
@messagebus.on(AnotherThingHappened)
def handle_more(event: AnotherThingHappened, repository: ABCRepositoryDependency = Depends(get_repository_deps)):
repository.data.append(event.message)
@messagebus.register(MakeSomethingHappen)
def handle_make_something(command: MakeSomethingHappen, repository: ABCRepositoryDependency = Depends(get_repository_deps)):
repository.data.append(command.message)
class RepositoryDependency(ABCRepositoryDependency):
def __init__(self):
self.data = list()
repository = RepositoryDependency()
def actual_repository():
return repository
messagebus.dependency_overrides[get_repository_deps] = actual_repository ### We override the dependency with our mock API.
### Just like in FastAPI. This is not necessary but
### super useful.
messagebus.handle(MakeSomethingHappen("Hello"))
messagebus.handle(SomethingHappened("World"))
messagebus.handle(AnotherThingHappened("!"))
print(repository.data) # ["Hello", "World", "!", "!"]
Publish messages to external systems (Pubsub)
You can publish messages to external systems adding handlers to a publisher class. This is an in memory implementation of the pub/sub pattern, but you can easily replace it with a real pub/sub system like Redis or RabbitMQ.
from pybondi.publisher import Message, Publisher, Depends
class ExternalAPI: ### This is a mock class for an external API that we want to publish messages to.
### This can be a real API client, or any other external system that we want to inject.
def __init__(self):
self.data = []
def abc_dependency() -> ExternalAPI: ...
publisher = Publisher()
@publisher.subscribe("topic-1")
def subscriber1(message: Message, api: ExternalAPI = Depends(abc_dependency)):
api.data.append(message.payload)
@publisher.subscribe("topic-1", "topic-2")
def subscriber2(message: Message, api: ExternalAPI = Depends(abc_dependency)):
api.data.append(message.payload)
api = ExternalAPI()
publisher.dependency_overrides[abc_dependency] = lambda: api ### We override the dependency with our mock API.
### Just like in FastAPI. This is not necessary but
### super useful.
def test_publisher():
publisher.publish("topic-1", Message("Hi"))
publisher.rollback()
publisher.publish("topic-2", Message("Hello"))
publisher.publish("topic-2", Message("World"))
publisher.publish("topic-1", Message("!"))
publisher.commit()
print(api.data) ### ["Hello", "World", "!", "!"]
Define aggregates and repositories
You can define aggregates as classes that inherit from the Aggregate class. An aggregate is a collection of domain objects that are treated as a single unit. Each aggregate should have a root entity that is responsible for maintaining the consistency of the aggregate using events.
You can also define repositories as classes that inherit from the Repository class. A repository is an object that mediates between the domain objects and the database or other data store.
from pybondi.aggregate import Aggregate, Root
from pybondi.repository import Repository
class User(Aggregate):
def __init__(self, id: int, name: str):
self.root = Root(id)
self.name = name
def change_name(self, name: str):
self.root.publish(UsernameChanged(name))
class Users(Repository):
def __init__(self):
super().__init__()
def store(self, user: User):
database.save(user) ### Persists the user to the database.
def restore(self, user: User):
return database.load(user) ### Loads the user from the database to it's original state.
Manage transactions and unit of work
Finally, you can wrap your domain logic in a session object that manages transactions and unit of work. A session is an object that represents a single transaction and will coordinate the messagebus, publisher, and repositories so if something goes wrong in some event, all changes can be rolled back.
from pybondi import Command
from pybondi.session import Session
@messagebus.on(Added, RolledBack)
def bring_user_up_to_date(event: Added[User] | RolledBack[User]):
event.user.age = fetch_user_age(event.user) ### Fetch the user's age from somewhere
event.user.root.publish(AgeChanged(event.user.age)) ### You can publish events from events handlers
### This way you can create a chain of events that
### Will be executed in order.
@messagebus.on(Commited)
def save_user(event: Commited[User]):
save_user_age(event.user) ### Send the user's age to somewhere this is just an example.
@dataclass
class BumpAge(Command):
user: User
def execute(self):
self.user.age += 1
with Session(messagebus, repository, publisher) as session:
session.add(user)
session.execute(BumpAge(user))
### user is now 1 year older
with Session(messagebus) as session:
session.add(user)
session.execute(BumpAge(user))
raise Exception("Something went wrong")
### user don't get older because the session was rolled back
License
This project is licensed under the terms of the MIT license.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pybondi-1.2.1.tar.gz.
File metadata
- Download URL: pybondi-1.2.1.tar.gz
- Upload date:
- Size: 12.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.5 CPython/3.12.1 Linux/6.5.0-1025-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
51b1726cd5cce19958f705afafb07324bb28f3ad7aeb077b1064c2f523fefa8e
|
|
| MD5 |
8672e57c4cc56a4c9c4b7d975f38512b
|
|
| BLAKE2b-256 |
a7d6ce609a518ee46f4ec282e8fd721a52ccf57b624bd2acb4164eac47cd3303
|
File details
Details for the file pybondi-1.2.1-py3-none-any.whl.
File metadata
- Download URL: pybondi-1.2.1-py3-none-any.whl
- Upload date:
- Size: 13.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.5 CPython/3.12.1 Linux/6.5.0-1025-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1a1cdcf5c1f476bf15e45f823e674a54d942aa4a3546a3a48816140661abe11a
|
|
| MD5 |
04f834385743adf83a22c094082ac94b
|
|
| BLAKE2b-256 |
9ece27f0aa04ded3bc5c0822d845971573cb16926059fa4d4fc4bc9698760f6c
|