Skip to main content

Unified transactional messaging (Outbox & Inbox) library for async Python applications

Project description

omni-box

Unified transactional messaging primitives for the Transactional Outbox and Transactional Inbox patterns in async Python services.

PyPI Python License CI codecov Docs

The library ships:

  • Domain entities (OutboxEvent, InboxEvent) and a OmniBoxDomainService factory.
  • Storage-agnostic repository protocols and a production-ready PostgreSQL implementation (SQLAlchemy 2 + asyncpg).
  • A composable pipeline (EventProcessorBuilder, EventBatchProcessor) with built-in steps for metrics, OpenTelemetry, DLQ, circuit breaker and inbox deduplication.
  • High-level orchestrators: OutboxPublisher (background publisher) and InboxConsumerRunner (broker consumer with configurable commit semantics).
  • A pure-aiokafka broker adapter (publisher and consumer).

The library does not provide a Unit-of-Work implementation. The transactional boundary that ties business state with the outbox row (or with the inbox row plus side effects) is owned by the calling service.

Requirements

  • Python 3.12+ (uses PEP 695 generics).
  • The core package only depends on pydantic, orjson, and structlog.

Installation

pip install omni-box

Optional extras (declared under [project.optional-dependencies] in pyproject.toml):

Extra Pulls in Used by
postgres sqlalchemy[asyncio], asyncpg omni_box.infra.storage.postgres
kafka aiokafka omni_box.infra.brokers.kafka
metrics prometheus-client omni_box.infra.metrics
opentelemetry opentelemetry-api, opentelemetry-sdk OpenTelemetryStep
settings pydantic-settings omni_box.contrib.settings
dishka dishka omni_box.contrib.dishka

Combine as needed, e.g. pip install "omni-box[postgres,kafka,metrics]".

Outbox Quick Start

from omni_box import OmniBoxDomainService, OutboxPublisher
from omni_box.core.converters import EnvelopeEventConverter
from omni_box.infra.brokers.kafka import KafkaEventPublisher

# 1. Persist the event in the same DB transaction as your business state.
domain = OmniBoxDomainService()
event = domain.create_outbox_event(
    aggregate_type="user",
    aggregate_id=user_id,
    event_type="user.created",
    topic="users.events",
    partition_key=str(user_id),
    payload={"email": "user@example.com"},
)

async with uow.transaction() as tx:        # your own UoW, not part of omni-box
    await tx.users.create(user)
    await tx.outbox.create(event)

# 2. A background worker reads pending rows and publishes them.
broker = KafkaEventPublisher(producer=kafka_producer, converter=EnvelopeEventConverter())
publisher = OutboxPublisher(repo=outbox_repo, broker=broker)

while not shutdown:
    result = await publisher.publish_batch(worker_id="publisher-1", batch_size=100)
    if not result.processed_event_ids:
        await asyncio.sleep(1.0)

OutboxPublisher is defined in omni_box.application.services.publish. Under the hood it builds an EventBatchProcessor via create_outbox_processor, so you get fetch, lock, retry, metrics and (optionally) DLQ for free.

Inbox Quick Start

InboxConsumerRunner consumes from a broker and lands every message in the inbox table inside a transaction. The transaction is opened via a user-supplied InboxTransactionProviderProtocol, which yields an InboxEventRepository bound to the open session — this keeps the library free of any UoW.

from contextlib import asynccontextmanager
from collections.abc import AsyncIterator

from omni_box import AckStrategy, InboxConsumerRunner
from omni_box.core.protocols import InboxEventRepository
from omni_box.core.protocols.transaction import InboxTransactionProviderProtocol


class InboxTxProvider(InboxTransactionProviderProtocol):
    """Bridges your session/UoW to the runner."""

    def __init__(self, session_factory, repo_factory) -> None:
        self._session_factory = session_factory
        self._repo_factory = repo_factory

    @asynccontextmanager
    async def transaction(self) -> AsyncIterator[InboxEventRepository]:
        async with self._session_factory() as session, session.begin():
            yield self._repo_factory(session)


runner = InboxConsumerRunner(
    consumer=kafka_inbox_consumer,            # your EventConsumer adapter
    transaction_provider=InboxTxProvider(...),
    handler=handle_inbox_event,               # optional: process within the same tx
    worker_id="worker-1",
    consumer_group="identity-service",
    ack_strategy=AckStrategy.EXACTLY_ONCE_INBOX,
)

await runner.start()
try:
    await runner.run_forever()
finally:
    await runner.stop()

Commit semantics are configurable via ack_strategy (AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE_INBOX) and commit_offset_policy (ON_PERSIST, ON_SUCCESS). See omni_box.application.services.consume for the full contract.

Public API

The package re-exports its stable surface from omni_box:

import omni_box
print(omni_box.__all__)
print(omni_box.__version__)

For detailed component reference see docs/api_reference.md.

Documentation

License

Apache 2.0 — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

omni_box-0.1.0.tar.gz (60.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

omni_box-0.1.0-py3-none-any.whl (91.6 kB view details)

Uploaded Python 3

File details

Details for the file omni_box-0.1.0.tar.gz.

File metadata

  • Download URL: omni_box-0.1.0.tar.gz
  • Upload date:
  • Size: 60.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.17 {"installer":{"name":"uv","version":"0.11.17","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for omni_box-0.1.0.tar.gz
Algorithm Hash digest
SHA256 dca94f834ac6990307641e9fba337749e6b7877785577d967ee3858595fc0549
MD5 7cad44789b7ca38d1e6536a1cda2ed26
BLAKE2b-256 092074f7af809b70da736edf628d3e0f6fea3b6d71d6416b004f15e91335c555

See more details on using hashes here.

File details

Details for the file omni_box-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: omni_box-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 91.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.17 {"installer":{"name":"uv","version":"0.11.17","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for omni_box-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 98a2f10dbc11cc9208591b56ada7ccab0a884a915f607709854fcc945385c00f
MD5 fde31dd43979af55ba343f674c0a0728
BLAKE2b-256 2f80caa402795becdbaf04fa37ee9b5a0ebfb78857c8eedc90043078f6f909d5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page