Unified transactional messaging (Outbox & Inbox) library for async Python applications
Project description
omni-box
Unified transactional messaging primitives for the Transactional Outbox and Transactional Inbox patterns in async Python services.
The library ships:
- Domain entities (
OutboxEvent,InboxEvent) and aOmniBoxDomainServicefactory. - Storage-agnostic repository protocols and a production-ready PostgreSQL implementation (SQLAlchemy 2 + asyncpg).
- A composable pipeline (
EventProcessorBuilder,EventBatchProcessor) with built-in steps for metrics, OpenTelemetry, DLQ, circuit breaker and inbox deduplication. - High-level orchestrators:
OutboxPublisher(background publisher) andInboxConsumerRunner(broker consumer with configurable commit semantics). - A pure-
aiokafkabroker adapter (publisher and consumer).
The library does not provide a Unit-of-Work implementation. The transactional boundary that ties business state with the outbox row (or with the inbox row plus side effects) is owned by the calling service.
Requirements
- Python 3.12+ (uses PEP 695 generics).
- The core package only depends on
pydantic,orjson, andstructlog.
Installation
pip install omni-box
Optional extras (declared under [project.optional-dependencies] in pyproject.toml):
| Extra | Pulls in | Used by |
|---|---|---|
postgres |
sqlalchemy[asyncio], asyncpg |
omni_box.infra.storage.postgres |
kafka |
aiokafka |
omni_box.infra.brokers.kafka |
metrics |
prometheus-client |
omni_box.infra.metrics |
opentelemetry |
opentelemetry-api, opentelemetry-sdk |
OpenTelemetryStep |
settings |
pydantic-settings |
omni_box.contrib.settings |
dishka |
dishka |
omni_box.contrib.dishka |
Combine as needed, e.g. pip install "omni-box[postgres,kafka,metrics]".
Outbox Quick Start
from omni_box import OmniBoxDomainService, OutboxPublisher
from omni_box.core.converters import EnvelopeEventConverter
from omni_box.infra.brokers.kafka import KafkaEventPublisher
# 1. Persist the event in the same DB transaction as your business state.
domain = OmniBoxDomainService()
event = domain.create_outbox_event(
aggregate_type="user",
aggregate_id=user_id,
event_type="user.created",
topic="users.events",
partition_key=str(user_id),
payload={"email": "user@example.com"},
)
async with uow.transaction() as tx: # your own UoW, not part of omni-box
await tx.users.create(user)
await tx.outbox.create(event)
# 2. A background worker reads pending rows and publishes them.
broker = KafkaEventPublisher(producer=kafka_producer, converter=EnvelopeEventConverter())
publisher = OutboxPublisher(repo=outbox_repo, broker=broker)
while not shutdown:
result = await publisher.publish_batch(worker_id="publisher-1", batch_size=100)
if not result.processed_event_ids:
await asyncio.sleep(1.0)
OutboxPublisher is defined in omni_box.application.services.publish. Under the hood it builds an EventBatchProcessor via create_outbox_processor, so you get fetch, lock, retry, metrics and (optionally) DLQ for free.
Inbox Quick Start
InboxConsumerRunner consumes from a broker and lands every message in the inbox table inside a transaction. The transaction is opened via a user-supplied InboxTransactionProviderProtocol, which yields an InboxEventRepository bound to the open session — this keeps the library free of any UoW.
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from omni_box import AckStrategy, InboxConsumerRunner
from omni_box.core.protocols import InboxEventRepository
from omni_box.core.protocols.transaction import InboxTransactionProviderProtocol
class InboxTxProvider(InboxTransactionProviderProtocol):
"""Bridges your session/UoW to the runner."""
def __init__(self, session_factory, repo_factory) -> None:
self._session_factory = session_factory
self._repo_factory = repo_factory
@asynccontextmanager
async def transaction(self) -> AsyncIterator[InboxEventRepository]:
async with self._session_factory() as session, session.begin():
yield self._repo_factory(session)
runner = InboxConsumerRunner(
consumer=kafka_inbox_consumer, # your EventConsumer adapter
transaction_provider=InboxTxProvider(...),
handler=handle_inbox_event, # optional: process within the same tx
worker_id="worker-1",
consumer_group="identity-service",
ack_strategy=AckStrategy.EXACTLY_ONCE_INBOX,
)
await runner.start()
try:
await runner.run_forever()
finally:
await runner.stop()
Commit semantics are configurable via ack_strategy (AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE_INBOX) and commit_offset_policy (ON_PERSIST, ON_SUCCESS). See omni_box.application.services.consume for the full contract.
Public API
The package re-exports its stable surface from omni_box:
import omni_box
print(omni_box.__all__)
print(omni_box.__version__)
For detailed component reference see docs/api_reference.md.
Documentation
License
Apache 2.0 — see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file omni_box-0.1.0.tar.gz.
File metadata
- Download URL: omni_box-0.1.0.tar.gz
- Upload date:
- Size: 60.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.17 {"installer":{"name":"uv","version":"0.11.17","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dca94f834ac6990307641e9fba337749e6b7877785577d967ee3858595fc0549
|
|
| MD5 |
7cad44789b7ca38d1e6536a1cda2ed26
|
|
| BLAKE2b-256 |
092074f7af809b70da736edf628d3e0f6fea3b6d71d6416b004f15e91335c555
|
File details
Details for the file omni_box-0.1.0-py3-none-any.whl.
File metadata
- Download URL: omni_box-0.1.0-py3-none-any.whl
- Upload date:
- Size: 91.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.17 {"installer":{"name":"uv","version":"0.11.17","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
98a2f10dbc11cc9208591b56ada7ccab0a884a915f607709854fcc945385c00f
|
|
| MD5 |
fde31dd43979af55ba343f674c0a0728
|
|
| BLAKE2b-256 |
2f80caa402795becdbaf04fa37ee9b5a0ebfb78857c8eedc90043078f6f909d5
|