Skip to main content

Universal event infrastructure with transactional outbox and Kafka integration

Project description

Eventing

Documentation Status Tests Python License Code Style Validate Dependencies

Table of contents

Package-first universal event infrastructure for microservices.

๐Ÿ“š Full Documentation - Comprehensive guides and API reference

Installation

pip install messagekit
# or
poetry add messagekit

Import package name: messagekit (distribution is messagekit)

from messagekit.core import BaseEvent
from messagekit.infrastructure import SqlAlchemyOutboxRepository

Requirements:

  • Python 3.12+
  • PostgreSQL (outbox persistence)
  • Kafka Connect with Debezium CDC (publishing infrastructure)

Support scale: โŒ none, โœ… basic, โœ…โœ… strong, โœ…โœ…โœ… first-class

Comparison with alternatives

messagekit prioritizes durable messaging (transactional outbox + CDC) and Kafka/RabbitMQ integration over in-process event simplicity:

Capability messagekit pyventus fastapi-events Notes
Transactional outbox โœ…โœ…โœ… โŒ โŒ Durable local DB plus outbox boundary is a core feature here
Kafka data plane โœ…โœ…โœ… โŒ โŒ This package is built for Kafka-backed microservice messaging
DLQ handling โœ…โœ…โœ… โŒ โŒ Leverages native RabbitMQ DLX and Kafka Connect DLQ SMT with database bookkeeping
Health checks for eventing runtime โœ…โœ…โœ… โŒ โŒ Outbox health checks plus FastStream ASGI broker health endpoint
Typed cross-service event contracts โœ…โœ… โœ… โœ…โœ… messagekit and fastapi-events are stronger on explicit payload modeling
Decorator subscriber registration โœ…โœ… โœ…โœ…โœ… โœ…โœ… EventBus.subscriber(...) exists now; pyventus is still the most polished here
In-process dispatch backend abstraction โœ…โœ… โœ…โœ…โœ… โœ… DispatchBackend exists here; pyventus offers a broader processor model
Lifecycle hooks / callbacks โœ…โœ… โœ…โœ…โœ… โœ… DispatchHooks covers dispatch, success, failure, disabled, and debug
Debug / disable controls โœ…โœ… โœ…โœ… โœ…โœ…โœ… DispatchSettings(enabled, debug) is implemented; fastapi-events is strongest for app-level toggling
Observability / telemetry polish โœ…โœ… โœ… โœ…โœ…โœ… FastStream native middlewares (KafkaTelemetryMiddleware, KafkaPrometheusMiddleware) integrated
Resilience middleware โœ…โœ… โŒ โŒ CircuitBreakerMiddleware (prevents cascading failures) and RateLimiterMiddleware (throttles consumption rate)
CDC-based outbox publishing โœ…โœ…โœ… โŒ โŒ Kafka Connect with Debezium CDC handles outbox-to-Kafka publishing
Consumer dedup helper โœ…โœ…โœ… โŒ โŒ IdempotentConsumerBase now uses a durable processed-message store instead of process memory
Durable cross-service idempotency โœ…โœ…โœ… โŒ โŒ IProcessedMessageStore plus SqlAlchemyProcessedMessageStore provide transactional duplicate protection
Consumer batch handling โŒ โŒ โœ…โœ…โœ… fastapi-events supports handle_many(...); this package stays one-message-per-consume today
FastAPI-local event flow โŒ โœ… โœ…โœ…โœ… This package intentionally avoids request-lifecycle middleware eventing

When to use this package

Use messagekit if you need:

  • Guaranteed event delivery via transactional outbox pattern
  • Kafka-based microservice messaging with CDC publishing
  • Dead letter queue handling with database bookkeeping
  • Idempotent consumer patterns with durable deduplication
  • Native broker integration (FastStream, Debezium CDC, RabbitMQ DLX)

Consider alternatives if:

  • Simple in-process events only โ†’ pyventus
  • FastAPI request-scoped events โ†’ fastapi-events
  • Non-Kafka message brokers without CDC support
  • No need for durable outbox persistence

Scope

Included:

  • Transactional outbox primitives (write-side only; CDC handles publishing)
  • Event contracts and registry
  • Kafka/RabbitMQ consumer base classes with idempotency
  • Native broker integration (Kafka Connect CDC, RabbitMQ DLX, FastStream middlewares)
  • In-process emitter/subscriber facade and hooks
  • DLQ bookkeeping consumer for database flag synchronization

NOT included (delegated to external systems):

  • Event publishing (handled by Kafka Connect with Debezium CDC)
  • Message broker infrastructure setup (use official Kafka/RabbitMQ documentation)
  • Schema registry management (use Confluent Schema Registry or alternatives)
  • Request-scoped FastAPI event middleware (intentionally avoided)
  • Consumer batch handling (use fastapi-events if needed)

Documentation

๐Ÿ“– Integration Guide - Step-by-step integration instructions

๐Ÿ” API Reference - Complete API documentation

๐Ÿ“‹ Event Catalog - Available event types and contracts

Key topics

Architecture note: This package handles the write side of the outbox pattern (persisting events transactionally with business data). Publishing is delegated to Kafka Connect with Debezium CDC, which captures outbox table changes and publishes to Kafka. The bridge component (part of standard architecture) forwards events from Kafka to RabbitMQ for services preferring AMQP. Dead letter handling leverages native broker mechanisms (RabbitMQ DLX, Kafka Connect DLQ SMT) with a minimal bookkeeping consumer to maintain database failed-event flags.

Architecture

Quick Overview

Cross-service communication pattern: Each microservice has its own PostgreSQL database (database-per-service). Services communicate via Kafka (shared event backbone) and RabbitMQ (dual-broker pattern via bridge).

flowchart LR
    subgraph Service_A["Service A (Producer)"]
        AppA[FastAPI Route]
        DB1[(PostgreSQL A<br/>outbox_events)]
    end

    subgraph Infrastructure["Shared Infrastructure"]
        CDC[Kafka Connect CDC]
        Kafka([Apache Kafka<br/>Topic: events])
        Bridge[Eventing Bridge<br/>Kafka โ†’ RabbitMQ]
        RMQ([RabbitMQ<br/>Exchange: events])
    end

    subgraph Service_B["Service B (Kafka Consumer)"]
        Handler1[Event Handler]
        DB2[(PostgreSQL B<br/>processed_messages)]
    end

    subgraph Service_C["Service C (RabbitMQ Consumer)"]
        Handler2[Event Handler]
        DB3[(PostgreSQL C<br/>processed_messages)]
    end

    DLQ([DLQ: Native Broker])

    %% Primary Path (Kafka)
    AppA -->|1. Atomic Write| DB1
    DB1 -.->|2. CDC Monitor| CDC
    CDC -->|3. Publish| Kafka
    Kafka -->|4. Subscribe| Handler1
    Handler1 -->|5. Dedup Check| DB2

    %% Bridge Path (RabbitMQ)
    Kafka -.->|6. Bridge Forward| Bridge
    Bridge -->|7. Publish| RMQ
    RMQ -.->|8. Subscribe| Handler2
    Handler2 -->|9. Dedup Check| DB3

    %% Error Flow
    Handler1 -.->|Exception| DLQ
    Handler2 -.->|Exception| DLQ

    %% Styling
    classDef app fill:#4CAF50,stroke:#2E7D32,color:#fff
    classDef db fill:#2196F3,stroke:#1565C0,color:#fff
    classDef broker fill:#FF9800,stroke:#E65100,color:#fff
    classDef infra fill:#9C27B0,stroke:#6A1B9A,color:#fff
    classDef error fill:#F44336,stroke:#C62828,color:#fff

    class AppA,Handler1,Handler2 app
    class DB1,DB2,DB3 db
    class Kafka,RMQ broker
    class CDC,Bridge infra
    class DLQ error

Key points:

  • Each service has separate PostgreSQL database (DB1, DB2, DB3)
  • Kafka is the shared event bus connecting all services
  • Kafka Connect CDC watches Service A's outbox and publishes to Kafka
  • Service B consumes directly from Kafka
  • Bridge forwards Kafka โ†’ RabbitMQ for services preferring AMQP
  • Service C consumes from RabbitMQ
  • Both Kafka and RabbitMQ are part of the standard architecture

Guarantees:

  • Write Phase: โœ… Atomic (business data + event in same transaction, same database)
  • Publish Phase: โœ… At-least-once (CDC retries on failure)
  • Consume Phase: โœ… Exactly-once (idempotency via processed message store in consumer's database)

๐Ÿ“– Cross-Service Communication Guide - Detailed explanation with production deployment patterns

Database isolation and event flow

Critical architecture principle: Each service maintains its own PostgreSQL database (database-per-service pattern). Services do NOT directly access each other's databases. Kafka acts as the shared event bus connecting isolated services.

Complete event flow:

  1. Service A (Producer) writes event to its own outbox_events table (in postgres-a)
  2. Kafka Connect CDC monitors Service A's outbox table via PostgreSQL WAL
  3. CDC publishes event to Kafka topic (shared infrastructure)
  4. Service B (Consumer) subscribes to Kafka topic, processes event, stores idempotency check in its own processed_messages table (in postgres-b)
  5. Bridge Service forwards events from Kafka โ†’ RabbitMQ (part of standard architecture)
  6. Service C (Consumer) subscribes to RabbitMQ exchange, processes event, stores idempotency check in its own processed_messages table (in postgres-c)

Result: Services remain isolated (no shared database), communicate via Kafka, and maintain exactly-once processing guarantees via their own idempotency stores.

Full Component Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                         YOUR SERVICE (FastAPI)                               โ”‚
โ”‚                                                                              โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚
โ”‚  โ”‚   Routes     โ”‚โ”€โ”€โ”ฌโ”€โ”€โ–ถโ”‚      messagekit (SHARED CONTRACTS)       โ”‚    โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚   โ”‚                                               โ”‚    โ”‚
โ”‚                    โ”‚   โ”‚  โ€ข BaseEvent (Pydantic base)                  โ”‚    โ”‚
โ”‚                    โ”‚   โ”‚  โ€ข IOutboxRepository (Protocol)               โ”‚    โ”‚
โ”‚                    โ”‚   โ”‚  โ€ข IProcessedMessageStore (Protocol)          โ”‚    โ”‚
โ”‚                    โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚
โ”‚                    โ”‚                       โ”‚ Uses contracts                 โ”‚
โ”‚                    โ”‚                       โ–ผ                                โ”‚
โ”‚                    โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚
โ”‚                    โ”‚   โ”‚         messagekit (implementations)     โ”‚    โ”‚
โ”‚                    โ”‚   โ”‚                                               โ”‚    โ”‚
โ”‚                    โ”‚   โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚    โ”‚
โ”‚                    โ”‚   โ”‚  โ”‚  WRITE SIDE (Primary)                  โ”‚  โ”‚    โ”‚
โ”‚                    โ””โ”€โ”€โ”€โ”ผโ”€โ”€โ”‚  โ€ข SqlAlchemyOutboxRepository          โ”‚  โ”‚    โ”‚
โ”‚                        โ”‚  โ”‚    (implements IOutboxRepository)      โ”‚  โ”‚    โ”‚
โ”‚                        โ”‚  โ”‚  โ€ข OutboxEventHandler                  โ”‚  โ”‚    โ”‚
โ”‚                        โ”‚  โ”‚                                        โ”‚  โ”‚    โ”‚
โ”‚                        โ”‚  โ”‚  OPTIONAL (In-process dispatch)        โ”‚  โ”‚    โ”‚
โ”‚                        โ”‚  โ”‚  โ€ข EventBus (NOT wired to outbox)      โ”‚  โ”‚    โ”‚
โ”‚                        โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚    โ”‚
โ”‚                        โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                                     โ”‚ Writes to
                                     โ–ผ
                          โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                          โ”‚   PostgreSQL DB     โ”‚
                          โ”‚                     โ”‚
                          โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
                          โ”‚ โ”‚ Business Tables โ”‚ โ”‚
                          โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
                          โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚โ—€โ”€โ”
                          โ”‚ โ”‚ outbox_event_   โ”‚ โ”‚  โ”‚ Monitors
                          โ”‚ โ”‚ record          โ”‚ โ”‚  โ”‚ (WAL)
                          โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚  โ”‚
                          โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚  โ”‚
                          โ”‚ โ”‚ processed_      โ”‚ โ”‚  โ”‚
                          โ”‚ โ”‚ messages        โ”‚ โ”‚  โ”‚
                          โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚  โ”‚
                          โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
                                     โ”‚              โ”‚
                 โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                 โ”‚                   โ”‚
                 โ”‚                   โ–ผ
                 โ”‚    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                 โ”‚    โ”‚  EXTERNAL INFRASTRUCTURE         โ”‚
                 โ”‚    โ”‚  (Not provided by this package)  โ”‚
                 โ”‚    โ”‚                                  โ”‚
                 โ”‚    โ”‚  Kafka Connect + Debezium CDC    โ”‚
                 โ”‚    โ”‚  - PostgreSQL Connector          โ”‚
                 โ”‚    โ”‚  - Outbox Event Router SMT       โ”‚
                 โ”‚    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                 โ”‚                   โ”‚ Publishes
                 โ”‚                   โ–ผ
                 โ”‚            โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                 โ”‚            โ”‚    Kafka     โ”‚
                 โ”‚            โ”‚   Topics     โ”‚
                 โ”‚            โ””โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”˜
                 โ”‚               โ”‚        โ”‚
                 โ”‚               โ”‚        โ”‚
            Bookkeeping          โ”‚        โ”‚
            updates              โ”‚        โ”‚
                 โ”‚               โ”‚        โ”‚
                 โ”‚               โ–ผ        โ–ผ
                 โ”‚    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                 โ”‚    โ”‚         FASTSTREAM BROKER LAYER                      โ”‚
                 โ”‚    โ”‚  (Broker abstraction + middleware)                   โ”‚
                 โ”‚    โ”‚                                                      โ”‚
                 โ”‚    โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
                 โ”‚    โ”‚  โ”‚  KafkaBroker (from faststream.confluent)       โ”‚ โ”‚
                 โ”‚    โ”‚  โ”‚  โ€ข @broker.subscriber("topic") decorators      โ”‚ โ”‚
                 โ”‚    โ”‚  โ”‚  โ€ข broker.publish() wrapper                    โ”‚ โ”‚
                 โ”‚    โ”‚  โ”‚  โ€ข Pydantic auto-deserialization               โ”‚ โ”‚
                 โ”‚    โ”‚  โ”‚  โ€ข make_ping_asgi health endpoint              โ”‚ โ”‚
                 โ”‚    โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
                 โ”‚    โ”‚                                                      โ”‚
                 โ”‚    โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
                 โ”‚    โ”‚  โ”‚  MIDDLEWARE STACK (Wired)                      โ”‚ โ”‚
                 โ”‚    โ”‚  โ”‚  โ€ข CircuitBreakerMiddleware (resilience)       โ”‚ โ”‚
                 โ”‚    โ”‚  โ”‚  โ€ข RateLimiterMiddleware (optional, disabled)  โ”‚ โ”‚
                 โ”‚    โ”‚  โ”‚  โ€ข KafkaPrometheusMiddleware (metrics)         โ”‚ โ”‚
                 โ”‚    โ”‚  โ”‚  โ€ข KafkaTelemetryMiddleware (OTel)             โ”‚ โ”‚
                 โ”‚    โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
                 โ”‚    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                 โ”‚                           โ”‚
                 โ”‚                           โ”‚ Consumes
                 โ”‚                           โ–ผ
                 โ”‚    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                 โ”‚    โ”‚  DOWNSTREAM SERVICES                                 โ”‚
                 โ”‚    โ”‚                                                      โ”‚
                 โ”‚    โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
                 โ”‚    โ”‚  โ”‚  READ SIDE (messagekit)                 โ”‚   โ”‚
                 โ”‚    โ”‚  โ”‚                                              โ”‚   โ”‚
                 โ”‚    โ”‚  โ”‚  @broker.subscriber("user.created")          โ”‚   โ”‚
                 โ”‚    โ”‚  โ”‚  async def handle(event: UserCreated):       โ”‚   โ”‚
                 โ”‚    โ”‚  โ”‚      # FastStream deserializes to Pydantic   โ”‚   โ”‚
                 โ”‚    โ”‚  โ”‚      ...                                     โ”‚   โ”‚
                 โ”‚    โ”‚  โ”‚                                              โ”‚   โ”‚
                 โ”‚    โ”‚  โ”‚  OR use IdempotentConsumerBase for:          โ”‚   โ”‚
                 โ”‚    โ”‚  โ”‚  โ€ข Programmatic idempotency control          โ”‚   โ”‚
                 โ”‚    โ”‚  โ”‚  โ€ข Dynamic/polymorphic event types           โ”‚   โ”‚
                 โ”‚    โ”‚  โ”‚  โ€ข Legacy dict-based handling                โ”‚   โ”‚
                 โ”‚    โ”‚  โ”‚                                              โ”‚   โ”‚
                 โ”‚    โ”‚  โ”‚  Components:                                 โ”‚   โ”‚
                โ”‚    โ”‚  โ”‚  โ€ข EventRegistry (event typeโ†’class mapping)  โ”‚   โ”‚
                โ”‚    โ”‚  โ”‚  โ€ข SqlAlchemyProcessedMessageStore           โ”‚   โ”‚
                โ”‚    โ”‚  โ”‚    (implements IProcessedMessageStore)       โ”‚   โ”‚
                โ”‚    โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
                โ”‚    โ”‚                                                      โ”‚
                โ”‚    โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
                โ”‚    โ”‚  โ”‚  BRIDGE COMPONENT (Standard Architecture)    โ”‚   โ”‚
                โ”‚    โ”‚  โ”‚  โ€ข BridgeConsumer (Kafkaโ†’RabbitMQ bridge)    โ”‚   โ”‚
                โ”‚    โ”‚  โ”‚    (manual idempotency, not base class)      โ”‚   โ”‚
                โ”‚    โ”‚  โ”‚  โ€ข Part of dual-broker event distribution    โ”‚   โ”‚
                โ”‚    โ”‚  โ”‚                                              โ”‚   โ”‚
                โ”‚    โ”‚  โ”‚  RabbitMQ Broker Middleware Stack:           โ”‚   โ”‚
                โ”‚    โ”‚  โ”‚  โ€ข CircuitBreakerMiddleware (resilience)     โ”‚   โ”‚
                โ”‚    โ”‚  โ”‚  โ€ข RateLimiterMiddleware (optional)          โ”‚   โ”‚
                โ”‚    โ”‚  โ”‚  โ€ข RabbitPrometheusMiddleware (metrics)      โ”‚   โ”‚
                โ”‚    โ”‚  โ”‚  โ€ข TelemetryMiddleware (OTel, shared)        โ”‚   โ”‚
                 โ”‚    โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
                 โ”‚    โ”‚                                  โ”‚                   โ”‚
                 โ”‚    โ”‚                                  โ”‚ On failure        โ”‚
                 โ”‚    โ”‚                                  โ–ผ                   โ”‚
                 โ”‚    โ”‚                        โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”          โ”‚
                 โ”‚    โ”‚                        โ”‚ EXTERNAL DLQ     โ”‚          โ”‚
                 โ”‚    โ”‚                        โ”‚ (broker native)  โ”‚          โ”‚
                 โ”‚    โ”‚                        โ”‚ โ€ข Kafka Connect  โ”‚          โ”‚
                 โ”‚    โ”‚                        โ”‚   DLQ SMT        โ”‚          โ”‚
                 โ”‚    โ”‚                        โ”‚ โ€ข RabbitMQ DLX   โ”‚          โ”‚
                 โ”‚    โ”‚                        โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜          โ”‚
                 โ”‚    โ”‚                                  โ”‚                   โ”‚
                 โ”‚    โ”‚                                  โ–ผ                   โ”‚
                 โ”‚    โ”‚                        โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”          โ”‚
                 โ”‚    โ”‚                        โ”‚  DLQ Bookkeeper  โ”‚          โ”‚
                 โ”‚    โ”‚                        โ”‚  Consumer        โ”‚          โ”‚
                 โ”‚    โ”‚                        โ”‚  (@broker.sub)   โ”‚          โ”‚
                 โ”‚    โ”‚                        โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜          โ”‚
                 โ”‚    โ”‚                                  โ”‚                   โ”‚
                 โ””โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                   โ”‚
                      โ”‚                                                      โ”‚
                      โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                                                 โ”‚
                                                 โ–ผ
                                    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                                    โ”‚  Admin/Ops API      โ”‚
                                    โ”‚  โ€ข DLQAdminService  โ”‚
                                    โ”‚  โ€ข Replay API       โ”‚
                                    โ”‚  โ€ข Health Checks    โ”‚
                                    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

FastStream: The Broker Adapter

FastStream is the Pythonic adapter that wraps low-level Kafka/RabbitMQ clients (like confluent-kafka, aio-pika) and provides a decorator-based, async/await, type-safe API.

Think of it like SQLAlchemy for message brokers - it hides 200+ lines of boilerplate:

# WITHOUT FastStream (raw Kafka client)
from confluent_kafka import Consumer
consumer = Consumer({'bootstrap.servers': 'localhost:9092', ...})
consumer.subscribe(['user.created'])
while True:
    msg = consumer.poll(1.0)
    if msg:
        event_dict = json.loads(msg.value().decode())
        event = UserCreatedEvent(**event_dict)  # Manual conversion
        await handle_user_created(event)
        consumer.commit(msg)

# WITH FastStream (what you actually write)
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")

@broker.subscriber("user.created")
async def handle_user_created(event: UserCreatedEvent):  # โœ… Already a Pydantic model!
    # FastStream automatically:
    # - Polls Kafka, decodes bytes โ†’ JSON โ†’ Pydantic
    # - Runs middleware (telemetry, circuit breaker, rate limiter)
    # - Commits offset on success, sends to DLQ on failure
    print(f"User {event.user_id} created!")

What FastStream provides:

  • ๐ŸŽฏ Decorator-based subscriptions: @broker.subscriber("topic")
  • ๐Ÿ”„ Automatic Pydantic conversion: Type hints โ†’ auto-deserialization
  • ๐Ÿ”Œ Middleware hooks: Circuit breakers, rate limiting, telemetry, metrics
  • ๐Ÿฅ Native health checks: make_ping_asgi endpoint for broker status
  • ๐Ÿ”€ Unified API: Same code works for Kafka, RabbitMQ, NATS, Redis Streams

messagekit uses FastStream for all Kafka/RabbitMQ interactions, giving you a clean, Pythonic API while handling all the low-level complexity.

Setup

Database schema

The outbox table stores events transactionally with your business data:

CREATE TABLE outbox_events (
    event_id VARCHAR(36) PRIMARY KEY,
    event_type VARCHAR(255) NOT NULL,
    aggregate_id VARCHAR(255) NOT NULL,
    payload JSONB NOT NULL,
    occurred_at TIMESTAMPTZ NOT NULL,
    published BOOLEAN DEFAULT FALSE NOT NULL,
    failed BOOLEAN DEFAULT FALSE NOT NULL,
    attempt_count INTEGER DEFAULT 0 NOT NULL,
    created_at TIMESTAMPTZ NOT NULL,
    published_at TIMESTAMPTZ,
    failed_at TIMESTAMPTZ,
    error_message TEXT
);

CREATE INDEX idx_outbox_unpublished ON outbox_events (published, created_at);

Application startup

Initialize the outbox repository and event bus at application startup:

from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine
from messagekit.infrastructure import SqlAlchemyOutboxRepository
from messagekit.core import build_event_bus

app = FastAPI()

@app.on_event("startup")
async def startup():
    # Database engine
    engine = create_async_engine("postgresql+asyncpg://...")

    # Outbox repository
    outbox_repo = SqlAlchemyOutboxRepository(engine)
    app.state.outbox_repository = outbox_repo

    # Optional: EventBus for advanced patterns
    event_bus = build_event_bus()
    app.state.event_bus = event_bus

CDC Publishing: Kafka Connect with Debezium CDC automatically detects outbox table changes and publishes to Kafka. See debezium-cdc-architecture.md for configuration.

Quick Start: Transactional Outbox

The core pattern is the transactional outbox - persist events atomically with your business data:

from fastapi import Depends
from messagekit.core import BaseEvent
from messagekit.infrastructure import SqlAlchemyOutboxRepository

# Define domain event
class UserCreated(BaseEvent):
    event_type: str = "user.created"
    aggregate_id: str
    user_id: int
    email: str

# Simple, direct approach (recommended)
@app.post("/users")
async def create_user(
    data: CreateUserRequest,
    session = Depends(get_session),
    outbox_repo: SqlAlchemyOutboxRepository = Depends(get_outbox_repo)
):
    # 1. Business logic
    user = User(**data.dict())
    session.add(user)

    # 2. Persist event to outbox (same transaction)
    await outbox_repo.add_event(
        UserCreated(
            aggregate_id=f"user-{user.id}",
            user_id=user.id,
            email=user.email,
        ),
        session=session
    )

    # 3. Commit both atomically
    await session.commit()

    # 4. Kafka Connect (Debezium CDC) detects the outbox insert and publishes to Kafka
    return {"user_id": user.id}

Result: Guaranteed delivery, no lost events, atomic writes.


Advanced: EventBus (Optional)

For decoupled architectures with multiple side effects per event, use the EventBus abstraction layer:

from messagekit.core import BaseEvent
from messagekit.infrastructure import OutboxEventHandler

# Access EventBus (initialized at startup)
event_bus = request.app.state.event_bus
outbox_repo = request.app.state.outbox_repository

# Register handler (typically at startup)
event_bus.register(UserCreated, OutboxEventHandler(outbox_repo))

# Dispatch (same result as direct add_event, but decoupled)
await event_bus.dispatch(UserCreated(...))

When to use EventBus:

  • โœ… Multiple side effects per event (audit, metrics, cache)
  • โœ… Need lifecycle hooks for observability
  • โœ… Testing isolation (enable/disable toggle)
  • โœ… Decorator-based handler registration

When NOT needed:

  • โŒ Simple event persistence (use direct outbox_repo.add_event())
  • โŒ Single handler per event
  • โŒ No need for hooks/tracing

๐Ÿ“š EventBus Documentation - Complete guide for advanced patterns

Distribution

  • PyPI distribution name: messagekit
  • Python import package: messagekit
# Install
pip install messagekit

# Import
from messagekit.core import BaseEvent
from messagekit.infrastructure import SqlAlchemyOutboxRepository

Services should consume the published package rather than source checkout. Kafka remains shared infrastructure with local producer/consumer clients per service.

Local development

poetry install
poetry build
poetry run pytest

Testing

Local (Windows):

  • Run non-RabbitMQ tests: poetry run pytest tests/ -v -m "not requires_rabbitmq"
  • RabbitMQ integration tests fail due to Docker Desktop networking limitations

CI/CD (Recommended for complete coverage):

  • GitHub Actions runs all 192 tests on Linux (100% pass rate)
  • Workflows configured in .github/workflows/
  • See .github/workflows/README.md for details

For complete local test coverage on Windows: See Windows Testing with WSL2 (requires Docker Desktop WSL integration setup).

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

messagekit-0.1.0.tar.gz (64.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

messagekit-0.1.0-py3-none-any.whl (100.7 kB view details)

Uploaded Python 3

File details

Details for the file messagekit-0.1.0.tar.gz.

File metadata

  • Download URL: messagekit-0.1.0.tar.gz
  • Upload date:
  • Size: 64.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.10.11 Windows/10

File hashes

Hashes for messagekit-0.1.0.tar.gz
Algorithm Hash digest
SHA256 fafd78d92f4f76b54c32207f39babbe70b75417e08e934c4d36700469e63e44e
MD5 1deb974f75b5da76991ad98ef5d0b19f
BLAKE2b-256 21bdfce327b7e8453969b4fa418d67fece00f899fc8912e1159d099ed144a46c

See more details on using hashes here.

File details

Details for the file messagekit-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: messagekit-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 100.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.10.11 Windows/10

File hashes

Hashes for messagekit-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7278cd3cffde1bf542f2b72912dad75007f48dc3e4e604650ae2e90cfa7cb5cb
MD5 5a0be44b603ec67ef39eccd261eed860
BLAKE2b-256 a5792614bf3474da8129870dc1d2548a65d439680d1d5de375e50ff5bec2648e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page