Core abstractions for implementing the Transactional Outbox pattern with Kafka
Project description
Python Outbox Core
Core abstractions for implementing the Transactional Outbox pattern with Kafka/FastStream
๐ฆ What is this?
This library provides framework-agnostic base classes and interfaces for implementing the Transactional Outbox Pattern, ensuring atomic writes to your database and reliable event publishing.
๐ฏ Problem Solved
The Dual Write Problem:
- Writing to DB + Publishing to Kafka = Two separate systems
- If app crashes between the two โ Event lost
- If Kafka is down โ Event lost
- Result: Inconsistent state
The Outbox Solution:
- Store events in outbox table (same DB transaction as your domain changes)
- Background worker publishes events to Kafka asynchronously
- โ Atomic: DB writes + event storage in single transaction
- โ Durable: Events never lost (stored in DB)
- โ Reliable: Publishing retries handled separately
๐ When to Use This Library?
Use IOutboxEvent (this library) for:
- ๐ Cross-service communication (other microservices need to react)
- ๐ External integrations (analytics, audit services, third-party)
- ๐ Event-driven architecture (Kafka, message brokers)
- ๐ Guaranteed delivery (at-least-once semantics)
Do NOT use for:
- ๐ท In-process events (use
BaseDomainEventfrompython-domain-primitives) - ๐ท Same service side effects (send email, update cache within same service)
- ๐ท Direct function calls (no message broker needed)
Event Type Decision Tree
Need to publish an event?
โ
โโ Same service only?
โ โโ Use BaseDomainEvent (python-domain-primitives)
โ Examples: Send email, log activity, update cache
โ
โโ Other services/external?
โโ Use IOutboxEvent (python-outbox-core) โญ THIS LIBRARY
Examples: Notify microservices, analytics, audit
Related Libraries:
- ๐ท
python-domain-primitives- For internal domain events (BaseDomainEvent) - ๐
python-outbox-core- For external integration events (IOutboxEvent) โญ YOU ARE HERE - ๐ง
python-outbox-fastapi- FastAPI lifespan helpers - ๐ง
python-outbox-celery- Celery background workers - ๐ง
python-outbox-faststream- Kafka/FastStream publishers
๐๏ธ What's Included
Core Abstractions
from python_outbox_core import IOutboxEvent, IOutboxRepository, OutboxPublisherBase
# 1. Event Interface
class MyDomainEvent(IOutboxEvent):
"""Your domain events implement this interface."""
pass
# 2. Repository Interface
class MyOutboxRepo(IOutboxRepository):
"""Your SQLAlchemy repository implements this."""
pass
# 3. Publisher Base Class
class MyOutboxWorker(OutboxPublisherBase):
"""Your background worker extends this."""
pass
๐ Components
| Component | Purpose | LOC |
|---|---|---|
IOutboxEvent |
Base interface for outbox events with metadata | ~20 |
IOutboxRepository |
Repository contract for outbox persistence | ~25 |
OutboxPublisherBase |
Reusable publishing logic for workers | ~40 |
NOTE: No custom serializer! Use Pydantic's model_dump_json() & FastStream's auto-serialization.
๐ก RECOMMENDED: FastStream (by ag2ai) - cutting-edge event-driven framework, ideal for startups. Or implement custom serializers as needed.
๐ง Tech Stack Integration
- SQLAlchemy - For ORM models and async sessions
- Kafka - Event streaming platform
- FastStream - High-level Kafka abstraction
- Kong Events Gateway - API gateway for event routing
๐ Usage Example
from python_outbox_core import IOutboxEvent, IOutboxRepository, OutboxPublisherBase
from pydantic import BaseModel, Field
from datetime import datetime
from uuid import UUID, uuid4
# 1. Define your domain event
class InviteCreatedEvent(BaseModel, IOutboxEvent):
event_id: UUID = Field(default_factory=uuid4)
event_type: str = "invite.created"
aggregate_id: str
occurred_at: datetime = Field(default_factory=datetime.utcnow)
# Domain-specific payload
email: str
role: str
def to_message(self) -> dict:
return self.model_dump(mode='json')
# 2. Implement repository (your SQLAlchemy code)
class OutboxRepository(IOutboxRepository):
def __init__(self, session: AsyncSession):
self.session = session
async def add_event(self, event: IOutboxEvent) -> None:
outbox_record = OutboxEventModel(**event.to_message())
self.session.add(outbox_record)
# ... implement other methods
# 3. Create background worker
class KafkaOutboxWorker(OutboxPublisherBase):
async def schedule_publishing(self) -> None:
while True:
published = await self.publish_batch(limit=100)
await asyncio.sleep(5) # Poll every 5 seconds
๐ Installation
cd C:\coding\gridflow
pip install -e ./python-web-toolkit/packages/python-outbox-core
๐ Related Libraries
sqlalchemy-async-repositories- Base repository patternspython-infrastructure-exceptions- Infrastructure errorspython-structlog-config- Structured logging with OTel
๐ License
MIT
๐ค Contributing
This is part of the GridFlow python-web-toolkit monorepo. Follow the 100-line file limit rule.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file python_outbox_core-0.1.0.tar.gz.
File metadata
- Download URL: python_outbox_core-0.1.0.tar.gz
- Upload date:
- Size: 17.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
80e54b4f3381e847abec7fc202868402f5a82c538d09448926f9f454d1f69359
|
|
| MD5 |
e3e19b2bf4d9ab3610238afbf72574eb
|
|
| BLAKE2b-256 |
bc2f9436f27bfa9c28bd202816eef5d66f8cb78ceab6cdd13022b525a72e100a
|
File details
Details for the file python_outbox_core-0.1.0-py3-none-any.whl.
File metadata
- Download URL: python_outbox_core-0.1.0-py3-none-any.whl
- Upload date:
- Size: 18.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a2310d54fab48deb93e3192c0fabe422f362a30dbbf7fba3664578e90fd6bf42
|
|
| MD5 |
46126db144e6f8f89104fb76b1cf6765
|
|
| BLAKE2b-256 |
65646d3747ea269ebcc98435b8ea70536f6a9225bd4af374c6b281eda0fc9ac6
|