Skip to main content

Managed Event Orchestration With Multiple eXecutors

Project description

meowmx - Managed Event Orchestration With Multiple eXecutors

This takes many of the ideas from Eugene Khyst's postgresql-event-sourcing project and implements them worse in Python.

The end result, meowmx, lets you use PostgreSQL to:

  • write events containing plain JSON data, which can later be looked up by category or aggregate IDs
  • subscribe to events, allowing you to create workers that iterate through them.

Also, check out this cat!

        ^__^         
    \   o . o    <  M E O W >
     |    ---
     ..    ..

Why would anyone want to do this?

Think of how many systems where different components communicate by passing events. Usually you have some system that gets notified of a change or otherwise spurred to action by an event payload. It then loads a bunch of persisted data, does some stuff, and moves on.

The thing is quite often the persisted data should be the event itself. For instance let's say you have multiple events concerning "orders" in a system, ie a customer initiates an order, an order is confirmed, an order is shipped, etc. The traditional way to handle this is to model the current state of the order as a row in a SQL database. Then you have all these event handlers (maybe they're Lambda functions, maybe they listen to rabbit, etc) noifying components in a modern system which all then have to load the current order row, figure out how it should be modified, consider if another component has also updated the row, etc.

To make a gross simplification event-sourcing just says hey maybe all those events are the persisted state; just load them all to figure out what the current situation is, make an update to the same set of events, and as a cherry on top use optimistic concurrency so the update fails if we find some other system updated our set of events between the time we read them just know and when we went to write them, in which case we'll cancle our write, reload all the events and try this logic again.

There's also the notion of aggregates, which are basically objects that can be constructed by reading a set of events. In my experience that kind of "helper" code is extremely easy to write but obscures the basic utility of event sourcing libraries like this one. This project offers a helper to save and load aggregates using a simple protocol to get the pending set of events from any object. For details on this see this test.

Notes on SqlAlchemy

This code assumes Postgres via SqlAlchemy.

The code also has nerfed support for other databases with SqlAlchemy, but this is just to be useful for testing. In memory databases have some errors when it comes to listening to subscribers, so only file based sqlite databases are supported for now.

Usage

Create an engine with sqlalchemy, then create a meowmx client by passing it in:

import meowmx
import sqlalchemy.orm

test_db_url = "postgresql+psycopg://user:pass@localhost:5445/myapp?sslmode=disable"
engine = sqlalchemy.create_engine(test_db_url)
# note: the session_maker arg is optional
session_maker = sqlalchemy.orm.sessionmaker(autocommit=False, autoflush=False, bind=engine)
meow = meowmx.Client(engine=engine, session_maker=session_maker)

Initial table setup

To initially create the tables used by meowmx, call meowmx.setup_tables().

meow.setup_tables()

Note: by default meowmx creates a UUID column for the aggregate IDs, but this can be changed if an argument is passed in as follows:

meow.setup_tables(aggregate_id_column_type="CHAR(20)")

This argument has no effect if other database besides postgres are used; instead the column type is CHAR(64). This argument also only works the first time setup_tables is called.

For a production ready app you probably already have a method of standing up your tables. You can see what tables meowmx builds by looking at migrations.py, which was mostly lifted from postgresql-event-sourcing.

Writing Events

Call meow.save_events to persist / publish events:

order_id = "<order-id-here>"
order_created = meowmx.NewEvent(
    event_type="OrderCreated",
    json={
        "customer_id": customer_id,
        "order_id": order_id,
        "time": datetime.now().isoformat(),
    },
)
meow.save_events("order", order_id, [order_created], version=0)

# save a second event

order_created = meowmx.NewEvent(
    event_type="OrderShipped",
    json={
        "order_id": order_id,
        "time": datetime.now().isoformat(),
    },
)
meow.save_events("order", order_id, [order_created], version=1)

Subscribing to Events

Let's say you want to create a read model for an aggregate that is updated every time an event is written for the aggregate.

One way to do that is by subscribing to all changes in the events from another process:

from sqlalchemy import orm

class Order(Base):
    __tablename__ = "orders"

    id = orm.mapped_column(sqlalchemy.CHAR(20), primary_key=True)
    version = orm.mapped_column(sqlalchemy.Integer, nullable=False)
    customer_id = orm.mapped_column(sqlalchemy.String, nullable=False)

def build_read_model(events: list[meowmx.RecordedEvent]) -> Order:
    order = Order()
    for event in events:
        if event.event_type == "OrderCreated":
            order.id = event.json["order_id"]
            order.customer_id = event.json["customer_id"]
            order.shipped = False
        elif event.event_type == "OrderShipped":
            order.shipped = True
        else:
            log.warning("unknown event type")
            # don't raise an exception as the events are historical
        order.version = event.version
    
    return order


# runs until the process is killed
def start_subscription(meow: meowmx.Client) -> None:
    def handler(session: meowmx.Session, event: meowmx.RecordedEvent) -> None:
        order_events = meow.load(event.aggregate_type, event.aggregate_id)
        order = build_read_model(order_events)
        session.merge(order)
        session.commit()

    meow.sub(
        subscription_name="order-rm-builder", 
        aggregate_name="order", 
        batch_size=10,
        max_sleep_time=30,
        handler=handler: 
    )

See the files in examples.

Setup:

just start-docker-db
just usql  # open repl
just test-psql
just examples read-events # view all events written by the tests
just examples # see examples

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

meowmx-0.1.3.tar.gz (42.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

meowmx-0.1.3-py3-none-any.whl (26.2 kB view details)

Uploaded Python 3

File details

Details for the file meowmx-0.1.3.tar.gz.

File metadata

  • Download URL: meowmx-0.1.3.tar.gz
  • Upload date:
  • Size: 42.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.0

File hashes

Hashes for meowmx-0.1.3.tar.gz
Algorithm Hash digest
SHA256 c795dfecc08763fff4406488a3c7b6db39ed19ff8140d1936aa2eed53e951067
MD5 1694fd4030a6e67a86fbc2493d63f6ef
BLAKE2b-256 2d13cb60b9a92da190aee347399532a0e767d8d9a7f4bab3c30dddd597d6e64c

See more details on using hashes here.

File details

Details for the file meowmx-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: meowmx-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 26.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.0

File hashes

Hashes for meowmx-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 da830997da4ee4c76a64b198c4e2b554b94e8a0a7e718ff6677a29abc600ab63
MD5 0b9b556807e6a5f5abe80a39df59766f
BLAKE2b-256 4f4ed0a0f6bb0124405570e17d67b0c54930c392772edc07214062b0f4862df9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page