Skip to main content

Production-ready event-driven architecture toolkit with Transactional Inbox/Outbox patterns

Project description

EventFlow

Production-ready event-driven infrastructure for Python microservices.
Reliable consumption with the Transactional Inbox pattern on top of Redis Streams + SQLAlchemy.


PyPI version Python 3.10+ License: MIT

eventflow is a small, battle-tested toolkit for building reliable event-driven services. It focuses on the consumer side: it ingests events from Redis Streams, stores them in a durable inbox table, and processes them with retries and dead-lettering.

Note: the producer-side Transactional Outbox is intentionally not implemented yet (OutboxPublisher raises NotImplementedError).

Quick Start

Install:

pip install eventflow asyncpg

asyncpg is the PostgreSQL async driver used in the examples; you can use a different SQLAlchemy async driver if needed.

Create the inbox table (standalone usage):

from sqlalchemy.ext.asyncio import create_async_engine
from eventflow.patterns.inbox.models import Base

engine = create_async_engine("postgresql+asyncpg://postgres:1234@localhost:5432/mydb")

async with engine.begin() as conn:
    await conn.run_sync(Base.metadata.create_all)

Run a consumer:

from eventflow import InboxConsumer, RedisStreamsTransport
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

class Handlers:
    async def handle_event(self, session, inbox):
        print(inbox.event_type, inbox.payload)

engine = create_async_engine("postgresql+asyncpg://postgres:1234@localhost:5432/mydb")
Session = async_sessionmaker(engine, expire_on_commit=False)

redis = RedisStreamsTransport(host="localhost", port=6379).build_client()

consumer = InboxConsumer(
    redis_client=redis,
    session_factory=Session,
    stream_name="my-events",
    consumer_group="my-service",
    consumer_name_prefix="worker",
    event_handlers=Handlers(),
)

await consumer.start()

Features

  • Transactional Inbox (Exactly-once processing): idempotent persistence keyed by event_id.
  • Redis Streams transport: consumer groups + acknowledgement handling.
  • Safe concurrency: workers cooperate via SELECT ... FOR UPDATE SKIP LOCKED.
  • Retries + dead-lettering: exponential backoff capped at 15 minutes.
  • Type-safe event model: BaseEvent + EventMetadata, full type hints and mypy support.
  • SQLite-friendly tests: JSON payloads fall back cleanly for unit tests (JSONBCompat).

Architecture Flow

flowchart LR
    Producer[Producer] --> Stream[(Redis Stream)]

    subgraph Consumers["Consumers (same consumer group)"]
        C1[InboxConsumer]
        C2[InboxConsumer]
    end

    Stream -->|XREADGROUP| C1
    Stream -->|XREADGROUP| C2

    C1 -->|insert_pending<br/>(idempotent)| Inbox[(event_inbox)]
    C2 -->|insert_pending<br/>(idempotent)| Inbox

    Inbox -->|acquire_due_events<br/>(SKIP LOCKED)| C1
    Inbox -->|acquire_due_events<br/>(SKIP LOCKED)| C2

    C1 --> Handler[Your handler<br/>handle_event(session, inbox)]
    C2 --> Handler

    Handler -->|success| Inbox
    Handler -->|failure<br/>schedule retry / dead-letter| Inbox

Event Format

The consumer supports two common Redis Stream payload styles:

  1. A single data field containing JSON (recommended):
redis-cli XADD my-events '*' data '{"event_id":"e-1","event_type":"OrderCreated","aggregate_id":"7c8f0a6a-7b7c-4c74-9cfb-2e2e2b9b1d33","occurred_on":"2025-01-01T00:00:00Z","payload":{"order_id":"o-123"}}'
  1. A “flattened” entry with top-level fields (event_id, event_type, ...).

Schema Options

If you already have your own SQLAlchemy Base, use the mixin:

from eventflow.patterns.inbox.models import EventInboxMixin

class EventInbox(EventInboxMixin, YourBase):
    __tablename__ = "event_inbox"
PostgreSQL SQL schema (reference)
-- For gen_random_uuid()
CREATE EXTENSION IF NOT EXISTS pgcrypto;

CREATE TABLE event_inbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    event_id VARCHAR(255) NOT NULL UNIQUE,
    stream_id VARCHAR(255) NOT NULL,
    event_type VARCHAR(128) NOT NULL,
    aggregate_id UUID NOT NULL,
    correlation_id VARCHAR(255),
    occurred_on TIMESTAMP WITH TIME ZONE NOT NULL,
    payload JSONB NOT NULL,
    status VARCHAR(50) NOT NULL DEFAULT 'pending',
    retry_count INTEGER NOT NULL DEFAULT 0,
    max_retries INTEGER NOT NULL DEFAULT 3,
    received_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
    processed_at TIMESTAMP WITH TIME ZONE,
    next_retry_at TIMESTAMP WITH TIME ZONE,
    error_message TEXT,
    last_error_at TIMESTAMP WITH TIME ZONE,
    CONSTRAINT chk_event_inbox_status CHECK (
        status IN ('pending', 'processing', 'processed', 'failed', 'dead_letter')
    )
);

CREATE UNIQUE INDEX uq_event_inbox_event_id ON event_inbox(event_id);
CREATE INDEX ix_event_inbox_status_next_retry ON event_inbox(status, next_retry_at);
CREATE INDEX ix_event_inbox_aggregate_received ON event_inbox(aggregate_id, received_at);

Configuration & Tuning

  • Batch size: InboxConsumer.BATCH_SIZE (default: 10)
  • Read block time: InboxConsumer.BLOCK_MS (default: 1000)
  • Retry policy: stored per row (max_retries, retry_count, next_retry_at); backoff is exponential and capped at 15 minutes.

Development

poetry install
poetry run pytest
poetry run mypy eventflow
poetry run black --check eventflow tests

License

MIT License. See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

python_eventflow-1.0.0.tar.gz (14.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

python_eventflow-1.0.0-py3-none-any.whl (17.8 kB view details)

Uploaded Python 3

File details

Details for the file python_eventflow-1.0.0.tar.gz.

File metadata

  • Download URL: python_eventflow-1.0.0.tar.gz
  • Upload date:
  • Size: 14.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.11.14 Linux/6.11.0-1018-azure

File hashes

Hashes for python_eventflow-1.0.0.tar.gz
Algorithm Hash digest
SHA256 779a75f727bde284ee33034b9c479d9d7d3ec0c1cd863bc5575958ea01000d45
MD5 dff2cd07aaf6024915721fd72069c29f
BLAKE2b-256 8399d4220457dc3d6724ed709069d11b42e20d5512e5f4697689674a385d5026

See more details on using hashes here.

File details

Details for the file python_eventflow-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: python_eventflow-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 17.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.11.14 Linux/6.11.0-1018-azure

File hashes

Hashes for python_eventflow-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 66047de342605ec76991c5a6ca0f4fa41098476e2bfda09d5683f018c00ede5b
MD5 884556208b58cf14f13d2d27a7358461
BLAKE2b-256 a0a7fabbfdf7a50a9959739db44a9a274079c2bed76bbee713f2242dd037060a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page