Managed Event Orchestration With Multiple eXecutors
Project description
meowmx - Managed Event Orchestration With Multiple eXecutors
This takes many of the ideas from Eugene Khyst's postgresql-event-sourcing project and implements them worse in Python.
The end result, meowmx, lets you use PostgreSQL to:
- write events containing plain JSON data, which can later be looked up by category or aggregate IDs
- subscribe to events, allowing you to create workers that iterate through them.
Also, check out this cat!
^__^
\ o . o < M E O W >
| ---
.. ..
Why would anyone want to do this?
Think of how many systems where different components communicate by passing events. Usually you have some system that gets notified of a change or otherwise spurred to action by an event payload. It then loads a bunch of persisted data, does some stuff, and moves on.
The thing is quite often the persisted data should be the event itself. For instance let's say you have multiple events concerning "orders" in a system, ie a customer initiates an order, an order is confirmed, an order is shipped, etc. The traditional way to handle this is to model the current state of the order as a row in a SQL database. Then you have all these event handlers (maybe they're Lambda functions, maybe they listen to rabbit, etc) noifying components in a modern system which all then have to load the current order row, figure out how it should be modified, consider if another component has also updated the row, etc.
To make a gross simplification event-sourcing just says hey maybe all those events are the persisted state; just load them all to figure out what the current situation is, make an update to the same set of events, and as a cherry on top use optimistic concurrency so the update fails if we find some other system updated our set of events between the time we read them just know and when we went to write them, in which case we'll cancle our write, reload all the events and try this logic again.
There's also the notion of aggregates, which are basically objects that can be constructed by reading a set of events. In my experience that kind of "helper" code is extremely easy to write but obscures the basic utility of event sourcing libraries like this one. This project offers a helper to save and load aggregates using a simple protocol to get the pending set of events from any object. For details on this see this test.
Notes on SqlAlchemy
This code assumes Postgres via SqlAlchemy.
The code also has nerfed support for other databases with SqlAlchemy, but this is just to be useful for testing. In memory databases have some errors when it comes to listening to subscribers, so only file based sqlite databases are supported for now.
Usage
Create an engine with sqlalchemy, then create a meowmx client by passing it in:
import meowmx
import sqlalchemy.orm
test_db_url = "postgresql+psycopg://user:pass@localhost:5445/myapp?sslmode=disable"
engine = sqlalchemy.create_engine(test_db_url)
# note: the session_maker arg is optional
session_maker = sqlalchemy.orm.sessionmaker(autocommit=False, autoflush=False, bind=engine)
meow = meowmx.Client(engine=engine, session_maker=session_maker)
Initial table setup
To initially create the tables used by meowmx, call meowmx.setup_tables().
meow.setup_tables()
Note: by default meowmx creates a UUID column for the aggregate IDs, but this can be changed if an argument is passed in as follows:
meow.setup_tables(aggregate_id_column_type="CHAR(20)")
This argument has no effect if other database besides postgres are used; instead the column type is CHAR(64). This argument also only works the first time setup_tables is called.
For a production ready app you probably already have a method of standing up your tables. You can see what tables meowmx builds by looking at migrations.py, which was mostly lifted from postgresql-event-sourcing.
Writing Events
Call meow.save_events to persist / publish events:
order_id = "<order-id-here>"
order_created = meowmx.NewEvent(
event_type="OrderCreated",
json={
"customer_id": customer_id,
"order_id": order_id,
"time": datetime.now().isoformat(),
},
)
meow.save_events("order", order_id, [order_created], version=0)
# save a second event
order_created = meowmx.NewEvent(
event_type="OrderShipped",
json={
"order_id": order_id,
"time": datetime.now().isoformat(),
},
)
meow.save_events("order", order_id, [order_created], version=1)
Subscribing to Events
Let's say you want to create a read model for an aggregate that is updated every time an event is written for the aggregate.
One way to do that is by subscribing to all changes in the events from another process:
from sqlalchemy import orm
class Order(Base):
__tablename__ = "orders"
id = orm.mapped_column(sqlalchemy.CHAR(20), primary_key=True)
version = orm.mapped_column(sqlalchemy.Integer, nullable=False)
customer_id = orm.mapped_column(sqlalchemy.String, nullable=False)
def build_read_model(events: list[meowmx.RecordedEvent]) -> Order:
order = Order()
for event in events:
if event.event_type == "OrderCreated":
order.id = event.json["order_id"]
order.customer_id = event.json["customer_id"]
order.shipped = False
elif event.event_type == "OrderShipped":
order.shipped = True
else:
log.warning("unknown event type")
# don't raise an exception as the events are historical
order.version = event.version
return order
# runs until the process is killed
def start_subscription(meow: meowmx.Client) -> None:
def handler(session: meowmx.Session, event: meowmx.RecordedEvent) -> None:
order_events = meow.load(event.aggregate_type, event.aggregate_id)
order = build_read_model(order_events)
session.merge(order)
session.commit()
meow.sub(
subscription_name="order-rm-builder",
aggregate_name="order",
batch_size=10,
max_sleep_time=30,
handler=handler:
)
See the files in examples.
Setup:
just start-docker-db
just usql # open repl
just test-psql
just examples read-events # view all events written by the tests
just examples # see examples
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file meowmx-0.1.3.tar.gz.
File metadata
- Download URL: meowmx-0.1.3.tar.gz
- Upload date:
- Size: 42.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c795dfecc08763fff4406488a3c7b6db39ed19ff8140d1936aa2eed53e951067
|
|
| MD5 |
1694fd4030a6e67a86fbc2493d63f6ef
|
|
| BLAKE2b-256 |
2d13cb60b9a92da190aee347399532a0e767d8d9a7f4bab3c30dddd597d6e64c
|
File details
Details for the file meowmx-0.1.3-py3-none-any.whl.
File metadata
- Download URL: meowmx-0.1.3-py3-none-any.whl
- Upload date:
- Size: 26.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
da830997da4ee4c76a64b198c4e2b554b94e8a0a7e718ff6677a29abc600ab63
|
|
| MD5 |
0b9b556807e6a5f5abe80a39df59766f
|
|
| BLAKE2b-256 |
4f4ed0a0f6bb0124405570e17d67b0c54930c392772edc07214062b0f4862df9
|