Skip to main content

Managed Event Orchestration With Multiple eXecutors

Project description

meowmx - Managed Event Orchestration With Multiple eXecutors

This takes many of the ideas from Eugene Khyst's postgresql-event-sourcing project and implements them worse in Python.

The end result, meowmx, lets you use PostgreSQL to:

  • write events containing plain JSON data, which can later be looked up by category or aggregate IDs
  • subscribe to events, allowing you to create workers that iterate through them.

Also, check out this cat!

        ^__^         
    \   o . o    <  M E O W >
     |    ---
     ..    ..

Why would anyone want to do this?

Think of how many systems where different components communicate by passing events. Usually you have some system that gets notified of a change or otherwise spurred to action by an event payload. It then loads a bunch of persisted data, does some stuff, and moves on.

The thing is quite often the persisted data should be the event itself. For instance let's say you have multiple events concerning "orders" in a system, ie a customer initiates an order, an order is confirmed, an order is shipped, etc. The traditional way to handle this is to model the current state of the order as a row in a SQL database. Then you have all these event handlers (maybe they're Lambda functions, maybe they listen to rabbit, etc) noifying components in a modern system which all then have to load the current order row, figure out how it should be modified, consider if another component has also updated the row, etc.

To make a gross simplification event-sourcing just says hey maybe all those events are the persisted state; just load them all to figure out what the current situation is, make an update to the same set of events, and as a cherry on top use optimistic concurrency so the update fails if we find some other system updated our set of events between the time we read them just know and when we went to write them, in which case we'll cancle our write, reload all the events and try this logic again.

There's also the notion of aggregates, which are basically objects that can be constructed by reading a set of events. In my experience that kind of "helper" code is extremely easy to write but obscures the basic utility of event sourcing libraries like this one. This project offers a helper to save and load aggregates using a simple protocol to get the pending set of events from any object. For details on this see this test.

Notes on SqlAlchemy

This code assumes Postgres via SqlAlchemy.

The code also has nerfed support for other databases with SqlAlchemy, but this is just to be useful for testing. In memory databases have some errors when it comes to listening to subscribers, so only file based sqlite databases are supported for now.

Usage

Create an engine with sqlalchemy, then create a meowmx client by passing it in:

import meowmx
import sqlalchemy.orm

test_db_url = "postgresql+psycopg://user:pass@localhost:5445/myapp?sslmode=disable"
engine = sqlalchemy.create_engine(test_db_url)
# note: the session_maker arg is optional
session_maker = sqlalchemy.orm.sessionmaker(autocommit=False, autoflush=False, bind=engine)
meow = meowmx.Client(engine=engine, session_maker=session_maker)

Initial table setup

To initially create the tables used by meowmx, call meowmx.setup_tables().

meow.setup_tables()

Note: by default meowmx creates a UUID column for the aggregate IDs, but this can be changed if an argument is passed in as follows:

meow.setup_tables(aggregate_id_column_type="CHAR(20)")

This argument has no effect if other database besides postgres are used; instead the column type is CHAR(64). This argument also only works the first time setup_tables is called.

For a production ready app you probably already have a method of standing up your tables. You can see what tables meowmx builds by looking at migrations.py, which was mostly lifted from postgresql-event-sourcing.

Writing Events

Call meow.save_events to persist / publish events:

order_id = "<order-id-here>"
order_created = meowmx.NewEvent(
    event_type="OrderCreated",
    json={
        "customer_id": customer_id,
        "order_id": order_id,
        "time": datetime.now().isoformat(),
    },
)
meow.save_events("order", order_id, [order_created], version=0)

# save a second event

order_created = meowmx.NewEvent(
    event_type="OrderShipped",
    json={
        "order_id": order_id,
        "time": datetime.now().isoformat(),
    },
)
meow.save_events("order", order_id, [order_created], version=1)

Subscribing to Events

Let's say you want to create a read model for an aggregate that is updated every time an event is written for the aggregate.

One way to do that is by subscribing to all changes in the events from another process:

from sqlalchemy import orm

class Order(Base):
    __tablename__ = "orders"

    id = orm.mapped_column(sqlalchemy.CHAR(20), primary_key=True)
    version = orm.mapped_column(sqlalchemy.Integer, nullable=False)
    customer_id = orm.mapped_column(sqlalchemy.String, nullable=False)

def build_read_model(events: list[meowmx.RecordedEvent]) -> Order:
    order = Order()
    for event in events:
        if event.event_type == "OrderCreated":
            order.id = event.json["order_id"]
            order.customer_id = event.json["customer_id"]
            order.shipped = False
        elif event.event_type == "OrderShipped":
            order.shipped = True
        else:
            log.warning("unknown event type")
            # don't raise an exception as the events are historical
        order.version = event.version
    
    return order


# runs until the process is killed
def start_subscription(meow: meowmx.Client) -> None:
    def handler(session: meowmx.Session, event: meowmx.RecordedEvent) -> None:
        order_events = meow.load(event.aggregate_type, event.aggregate_id)
        order = build_read_model(order_events)
        session.merge(order)
        session.commit()

    meow.sub(
        subscription_name="order-rm-builder", 
        aggregate_name="order", 
        batch_size=10,
        max_sleep_time=30,
        handler=handler: 
    )

See the files in examples.

Setup:

just start-docker-db
just usql  # open repl
just test-psql
just examples read-events # view all events written by the tests
just examples # see examples

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

meowmx-0.2.1.tar.gz (42.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

meowmx-0.2.1-py3-none-any.whl (26.2 kB view details)

Uploaded Python 3

File details

Details for the file meowmx-0.2.1.tar.gz.

File metadata

  • Download URL: meowmx-0.2.1.tar.gz
  • Upload date:
  • Size: 42.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.0

File hashes

Hashes for meowmx-0.2.1.tar.gz
Algorithm Hash digest
SHA256 b1a1f5dc960d85e4e48d2d5d7ef134722a92dd97354cb4acd19bb7de5a467d53
MD5 411375226f18c0d027148db8652f1844
BLAKE2b-256 4fa052525fd16b55aef5661aded6a47b7daa869175a2734c3ccb27bdba19b841

See more details on using hashes here.

File details

Details for the file meowmx-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: meowmx-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 26.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.0

File hashes

Hashes for meowmx-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 786c4dd03e55fd0bc5705c3526f96b04cd96158dfc21940f42169a0d761847f2
MD5 be0af3820047f1b0174abda11bd75d26
BLAKE2b-256 bd382617d1babcd940ffdb76230ad5cf0ddd07740e65af57dad24fd28265e165

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page