Skip to main content

FastStream broker integration for the transactional outbox pattern: a Postgres table is the queue

Project description

faststream-outbox

Supported versions downloads

faststream-outbox is a FastStream broker integration for the transactional outbox pattern — a Postgres table is the message queue.

A producer writes a domain entity and an outbox row in the same SQLAlchemy transaction by calling broker.publish(body, queue=..., session=session). A subscriber polls the table directly with FOR UPDATE SKIP LOCKED, runs the handler, and deletes the row on success. No downstream broker, no separate relay process — the table is the queue.

from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from faststream import FastStream
from faststream_outbox import OutboxBroker, make_outbox_table

metadata = MetaData()
outbox_table = make_outbox_table(metadata, table_name="outbox")

engine = create_async_engine("postgresql+asyncpg://localhost/app")
broker = OutboxBroker(engine, outbox_table=outbox_table)
app = FastStream(broker)

@broker.subscriber("orders", max_workers=4)
async def handle(order_id: int) -> None:
    print(f"order {order_id}")

# Producer side — share the caller's open transaction:
session_factory = async_sessionmaker(engine, expire_on_commit=False)
async with session_factory() as session, session.begin():
    session.add(Order(id=1))
    await broker.publish(1, queue="orders", session=session)

How it works

make_outbox_table(metadata, table_name="outbox") returns a sqlalchemy.Table that you attach to your own MetaData and migrate via Alembic. The package does not own your schema; it only describes the columns it needs.

broker.publish(body, *, queue, session, headers=None, correlation_id=None) inserts one outbox row through the caller's AsyncSession. It does not flush, commit, or open its own transaction — the whole point is that the row commits atomically with the caller's domain writes. Use it inside an async with session.begin(): block.

broker.publish_batch(*bodies, queue, session, headers=None) inserts many rows in a single round-trip with the same transactional contract.

A subscriber owns two async loops:

  1. fetch — claims available rows via SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING * in a single CTE. A row is "available" iff its lease is unset or expired (acquired_at < now() - lease_ttl_seconds), so the fetch query reclaims stuck rows inline — no separate reaper is needed. With the asyncpg driver, the loop also LISTENs on outbox_<table> and publish emits pg_notify(...), so idle dispatch latency is sub-100ms instead of up to max_fetch_interval. Polling stays as the fallback.
  2. workersmax_workers) — dispatch to the handler. On success, DELETE WHERE id=:id AND acquired_token=:token. On failure, the retry strategy decides: schedule another attempt, or terminal DELETE.

The acquired_token is critical: a slow handler whose lease expired and was re-claimed by another worker will find its terminal DELETE/UPDATE to be a no-op (the token no longer matches), preventing it from clobbering the new lease holder's row.

lease_ttl_seconds (default 60.0) must exceed your handler's P99 duration with margin — otherwise healthy in-flight handlers race their own lease expiry and the row gets re-claimed by another worker, triggering a duplicate delivery.

Schema validation

Schema validation is opt-in:

await broker.validate_schema()  # raises if user's table drifts from expected columns

Call it from a /health endpoint or startup hook — not at broker.start(), so Alembic can run migrations against the same DB without a startup loop.

Retry strategies

from faststream_outbox import ExponentialRetry, ConstantRetry, LinearRetry, NoRetry

@broker.subscriber(
    "orders",
    retry_strategy=ExponentialRetry(
        initial_delay_seconds=1.0,
        max_delay_seconds=300.0,
        max_attempts=5,
        jitter_factor=0.5,
    ),
)
async def handle(order_id: int) -> None: ...

Strategies receive the raised exception so users may subclass for "retry only on transient errors":

class TransientOnly(ExponentialRetry):
    def get_next_attempt_at(self, *, exception=None, **kw):
        if exception and not isinstance(exception, TransientError):
            return None
        return super().get_next_attempt_at(exception=exception, **kw)

Failure modes

  • Handlers must be idempotent. Crash between commit-of-handler-side-effects and the broker's DELETE re-delivers the message.
  • Best-effort ordering only. FOR UPDATE SKIP LOCKED does not preserve strict order under concurrent workers. If you need strict per-aggregate ordering, route to a single subscriber and run a single worker.
  • No DLQ / archive. Terminal failures DELETE the row. Hook on_terminal_failure(row) to capture them in your own table or alerting.

Connection ownership

OutboxBroker does not close the AsyncEngine you pass in — the caller owns its lifecycle.

Tuning

Per-subscriber knobs (passed to @broker.subscriber("…", …)):

  • max_workers (default 1) — concurrent handlers per subscriber.
  • fetch_batch_size (default 10) — rows claimed per fetch cycle.
  • min_fetch_interval / max_fetch_interval (default 1.0 / 10.0 s) — base + ceiling for the adaptive idle backoff with jitter.
  • lease_ttl_seconds (default 60.0 s) — how long a claim is valid before another fetch may reclaim it. Must exceed your handler's P99 duration with margin.
  • max_deliveries (default None — unbounded) — total claims (including lease-expiry re-claims) after which the row is dropped without invoking the handler. Defends against handlers that consistently wedge.

📝 License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

faststream_outbox-0.2.0.tar.gz (20.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

faststream_outbox-0.2.0-py3-none-any.whl (28.9 kB view details)

Uploaded Python 3

File details

Details for the file faststream_outbox-0.2.0.tar.gz.

File metadata

  • Download URL: faststream_outbox-0.2.0.tar.gz
  • Upload date:
  • Size: 20.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.11 {"installer":{"name":"uv","version":"0.11.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for faststream_outbox-0.2.0.tar.gz
Algorithm Hash digest
SHA256 9ab36d7953b586c18ab6fb5da53d38adada2234685bc721b9c1e73fed196cb3c
MD5 1c212424b31bf00c67f361580a33b886
BLAKE2b-256 a067af5f2a1fbf001accfb3ead71bc6d1b1cfb4e4cd91e99f79eb01e8ea28572

See more details on using hashes here.

File details

Details for the file faststream_outbox-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: faststream_outbox-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 28.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.11 {"installer":{"name":"uv","version":"0.11.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for faststream_outbox-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2e689fc0fc088e073aec7dfcaeb741083f75564278323d94c874bb75ff02a41c
MD5 76b1757cd288cc1bee6a7b2c0a736a96
BLAKE2b-256 e4a61f54ef09fcc4f91d9d0dd17c6fd48a63b25b1fa25f1700e926dba3889dd9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page