Skip to main content

FastStream broker integration for the transactional outbox pattern: a Postgres table is the queue

Project description

faststream-outbox

Supported versions downloads

faststream-outbox is a FastStream broker integration for the transactional outbox pattern — a Postgres table is the message queue.

A producer writes a domain entity and an outbox row in the same SQLAlchemy transaction by calling broker.publish(body, queue=..., session=session). A subscriber polls the table directly with FOR UPDATE SKIP LOCKED, runs the handler, and deletes the row on success. No downstream broker, no separate relay process — the table is the queue.

from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from faststream import FastStream
from faststream_outbox import OutboxBroker, make_outbox_table

metadata = MetaData()
outbox_table = make_outbox_table(metadata, table_name="outbox")

engine = create_async_engine("postgresql+asyncpg://localhost/app")
broker = OutboxBroker(engine, outbox_table=outbox_table)
app = FastStream(broker)

@broker.subscriber("orders", max_workers=4)
async def handle(order_id: int) -> None:
    print(f"order {order_id}")

# Producer side — share the caller's open transaction:
session_factory = async_sessionmaker(engine, expire_on_commit=False)
async with session_factory() as session, session.begin():
    session.add(Order(id=1))
    await broker.publish(1, queue="orders", session=session)

How it works

make_outbox_table(metadata, table_name="outbox") returns a sqlalchemy.Table that you attach to your own MetaData and migrate via Alembic. The package does not own your schema; it only describes the columns it needs.

broker.publish(body, *, queue, session, headers=None, correlation_id=None) inserts one outbox row through the caller's AsyncSession. It does not flush, commit, or open its own transaction — the whole point is that the row commits atomically with the caller's domain writes. Use it inside an async with session.begin(): block.

broker.publish_batch(*bodies, queue, session, headers=None) inserts many rows in a single round-trip with the same transactional contract.

A subscriber owns three async loops:

  1. fetch — claims due rows via SELECT … FOR UPDATE SKIP LOCKED → UPDATE state='processing', acquired_token=:uuid RETURNING * in a single CTE.
  2. workersmax_workers) — dispatch to the handler. On success, DELETE WHERE id=:id AND acquired_token=:token. On failure, the retry strategy decides: schedule another attempt, or terminal DELETE.
  3. release-stuck — periodically flips processing rows back to pending if their lease is older than release_stuck_timeout. Wrapped in a Postgres advisory lock so multiple processes don't compete.

The acquired_token is critical: a slow handler whose lease expired and was re-claimed by another worker will find its terminal DELETE/UPDATE to be a no-op (the token no longer matches), preventing it from clobbering the new lease holder's row.

Recommended index

Add this to your Alembic migration alongside the table:

CREATE INDEX outbox_pending_idx ON outbox (queue, next_attempt_at)
  WHERE state = 'pending';

Schema validation

Schema validation is opt-in:

await broker.validate_schema()  # raises if user's table drifts from expected columns

Call it from a /health endpoint or startup hook — not at broker.start(), so Alembic can run migrations against the same DB without a startup loop.

Retry strategies

from faststream_outbox import ExponentialRetry, ConstantRetry, LinearRetry, NoRetry

@broker.subscriber(
    "orders",
    retry_strategy=ExponentialRetry(
        initial_delay_seconds=1.0,
        max_delay_seconds=300.0,
        max_attempts=5,
        jitter_factor=0.5,
    ),
)
async def handle(order_id: int) -> None: ...

Strategies receive the raised exception so users may subclass for "retry only on transient errors":

class TransientOnly(ExponentialRetry):
    def get_next_attempt_at(self, *, exception=None, **kw):
        if exception and not isinstance(exception, TransientError):
            return None
        return super().get_next_attempt_at(exception=exception, **kw)

Failure modes

  • Handlers must be idempotent. Crash between commit-of-handler-side-effects and the broker's DELETE re-delivers the message.
  • Best-effort ordering only. FOR UPDATE SKIP LOCKED does not preserve strict order under concurrent workers. If you need strict per-aggregate ordering, route to a single subscriber and run a single worker.
  • No DLQ / archive. Terminal failures DELETE the row. Hook on_terminal_failure(row) to capture them in your own table or alerting.

Connection ownership

OutboxBroker does not close the AsyncEngine you pass in — the caller owns its lifecycle.

Tuning

Per-subscriber knobs (passed to @broker.subscriber("…", …)):

  • max_workers (default 1) — concurrent handlers per subscriber.
  • fetch_batch_size (default 10) — rows claimed per fetch cycle.
  • min_fetch_interval / max_fetch_interval (default 1.0 / 10.0 s) — base + ceiling for the adaptive idle backoff with jitter.
  • release_stuck_timeout (default 300.0 s) — how long a processing row may live before being released back to pending.
  • release_stuck_interval (default release_stuck_timeout / 2).
  • max_deliveries (default None — unbounded) — total claims (including stuck-recovery re-claims) after which the row is dropped without invoking the handler. Defends against handlers that consistently wedge.

📝 License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

faststream_outbox-0.1.0.tar.gz (18.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

faststream_outbox-0.1.0-py3-none-any.whl (26.4 kB view details)

Uploaded Python 3

File details

Details for the file faststream_outbox-0.1.0.tar.gz.

File metadata

  • Download URL: faststream_outbox-0.1.0.tar.gz
  • Upload date:
  • Size: 18.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.11 {"installer":{"name":"uv","version":"0.11.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for faststream_outbox-0.1.0.tar.gz
Algorithm Hash digest
SHA256 86c8f2d4e1c81ab21b8786744a4addc69ea5447f2f019ab263db99bf46a4f64b
MD5 7f9819f230b601e79459bb28e4c71c6a
BLAKE2b-256 f4e76fe08e06029f6023dd7976167a98969e25029b521bc18707e677177effe1

See more details on using hashes here.

File details

Details for the file faststream_outbox-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: faststream_outbox-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 26.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.11 {"installer":{"name":"uv","version":"0.11.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for faststream_outbox-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2202ddb4fbab63df49e51656bd6c3fb0a278d006c9cb4d4539db1ba0eb05a991
MD5 d40043b5fe25add3837be04bebd198df
BLAKE2b-256 ed1f11f50bd2126de105c68a0ae293342dcaf8bdfb38272c8a5cc4cc7e9d6803

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page