Skip to main content

Core abstractions for implementing the Transactional Outbox pattern with Kafka

Project description

Python Outbox Core

Core abstractions for implementing the Transactional Outbox pattern with Kafka/FastStream

๐Ÿ“ฆ What is this?

This library provides framework-agnostic base classes and interfaces for implementing the Transactional Outbox Pattern, ensuring atomic writes to your database and reliable event publishing.

๐ŸŽฏ Problem Solved

The Dual Write Problem:

  • Writing to DB + Publishing to Kafka = Two separate systems
  • If app crashes between the two โ†’ Event lost
  • If Kafka is down โ†’ Event lost
  • Result: Inconsistent state

The Outbox Solution:

  1. Store events in outbox table (same DB transaction as your domain changes)
  2. Background worker publishes events to Kafka asynchronously
  3. โœ… Atomic: DB writes + event storage in single transaction
  4. โœ… Durable: Events never lost (stored in DB)
  5. โœ… Reliable: Publishing retries handled separately

๐Ÿ”€ When to Use This Library?

Use IOutboxEvent (this library) for:

  • ๐ŸŒ Cross-service communication (other microservices need to react)
  • ๐ŸŒ External integrations (analytics, audit services, third-party)
  • ๐ŸŒ Event-driven architecture (Kafka, message brokers)
  • ๐ŸŒ Guaranteed delivery (at-least-once semantics)

Do NOT use for:

  • ๐Ÿ”ท In-process events (use BaseDomainEvent from python-domain-primitives)
  • ๐Ÿ”ท Same service side effects (send email, update cache within same service)
  • ๐Ÿ”ท Direct function calls (no message broker needed)

Event Type Decision Tree

Need to publish an event?
โ”‚
โ”œโ”€ Same service only?
โ”‚  โ””โ”€ Use BaseDomainEvent (python-domain-primitives)
โ”‚     Examples: Send email, log activity, update cache
โ”‚
โ””โ”€ Other services/external?
   โ””โ”€ Use IOutboxEvent (python-outbox-core) โญ THIS LIBRARY
      Examples: Notify microservices, analytics, audit

Related Libraries:

  • ๐Ÿ”ท python-domain-primitives - For internal domain events (BaseDomainEvent)
  • ๐ŸŒ python-outbox-core - For external integration events (IOutboxEvent) โญ YOU ARE HERE
  • ๐Ÿ”ง python-outbox-fastapi - FastAPI lifespan helpers
  • ๐Ÿ”ง python-outbox-celery - Celery background workers
  • ๐Ÿ”ง python-outbox-faststream - Kafka/FastStream publishers

๐Ÿ—๏ธ What's Included

Core Abstractions

from python_outbox_core import IOutboxEvent, IOutboxRepository, OutboxPublisherBase

# 1. Event Interface
class MyDomainEvent(IOutboxEvent):
    """Your domain events implement this interface."""
    pass

# 2. Repository Interface
class MyOutboxRepo(IOutboxRepository):
    """Your SQLAlchemy repository implements this."""
    pass

# 3. Publisher Base Class
class MyOutboxWorker(OutboxPublisherBase):
    """Your background worker extends this."""
    pass

๐Ÿ“š Components

Component Purpose LOC
IOutboxEvent Base interface for outbox events with metadata ~20
IOutboxRepository Repository contract for outbox persistence ~25
OutboxPublisherBase Reusable publishing logic for workers ~40

NOTE: No custom serializer! Use Pydantic's model_dump_json() & FastStream's auto-serialization.

๐Ÿ’ก RECOMMENDED: FastStream (by ag2ai) - cutting-edge event-driven framework, ideal for startups. Or implement custom serializers as needed.

๐Ÿ”ง Tech Stack Integration

  • SQLAlchemy - For ORM models and async sessions
  • Kafka - Event streaming platform
  • FastStream - High-level Kafka abstraction
  • Kong Events Gateway - API gateway for event routing

๐Ÿ“– Usage Example

from python_outbox_core import IOutboxEvent, IOutboxRepository, OutboxPublisherBase
from pydantic import BaseModel, Field
from datetime import datetime
from uuid import UUID, uuid4

# 1. Define your domain event
class InviteCreatedEvent(BaseModel, IOutboxEvent):
    event_id: UUID = Field(default_factory=uuid4)
    event_type: str = "invite.created"
    aggregate_id: str
    occurred_at: datetime = Field(default_factory=datetime.utcnow)

    # Domain-specific payload
    email: str
    role: str

    def to_message(self) -> dict:
        return self.model_dump(mode='json')

# 2. Implement repository (your SQLAlchemy code)
class OutboxRepository(IOutboxRepository):
    def __init__(self, session: AsyncSession):
        self.session = session

    async def add_event(self, event: IOutboxEvent) -> None:
        outbox_record = OutboxEventModel(**event.to_message())
        self.session.add(outbox_record)

    # ... implement other methods

# 3. Create background worker
class KafkaOutboxWorker(OutboxPublisherBase):
    async def schedule_publishing(self) -> None:
        while True:
            published = await self.publish_batch(limit=100)
            await asyncio.sleep(5)  # Poll every 5 seconds

๐Ÿš€ Installation

cd C:\coding\gridflow
pip install -e ./python-web-toolkit/packages/python-outbox-core

๐Ÿ”— Related Libraries

  • sqlalchemy-async-repositories - Base repository patterns
  • python-infrastructure-exceptions - Infrastructure errors
  • python-structlog-config - Structured logging with OTel

๐Ÿ“„ License

MIT

๐Ÿค Contributing

This is part of the GridFlow python-web-toolkit monorepo. Follow the 100-line file limit rule.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

python_outbox_core-0.1.0.tar.gz (17.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

python_outbox_core-0.1.0-py3-none-any.whl (18.2 kB view details)

Uploaded Python 3

File details

Details for the file python_outbox_core-0.1.0.tar.gz.

File metadata

  • Download URL: python_outbox_core-0.1.0.tar.gz
  • Upload date:
  • Size: 17.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.11

File hashes

Hashes for python_outbox_core-0.1.0.tar.gz
Algorithm Hash digest
SHA256 80e54b4f3381e847abec7fc202868402f5a82c538d09448926f9f454d1f69359
MD5 e3e19b2bf4d9ab3610238afbf72574eb
BLAKE2b-256 bc2f9436f27bfa9c28bd202816eef5d66f8cb78ceab6cdd13022b525a72e100a

See more details on using hashes here.

File details

Details for the file python_outbox_core-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for python_outbox_core-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a2310d54fab48deb93e3192c0fabe422f362a30dbbf7fba3664578e90fd6bf42
MD5 46126db144e6f8f89104fb76b1cf6765
BLAKE2b-256 65646d3747ea269ebcc98435b8ea70536f6a9225bd4af374c6b281eda0fc9ac6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page