Skip to main content

FastStream broker integration for the transactional outbox pattern: a Postgres table is the queue

Project description

faststream-outbox

Supported versions downloads

faststream-outbox is a FastStream broker integration for the transactional outbox pattern — a Postgres table is the message queue.

A producer writes a domain entity and an outbox row in the same SQLAlchemy transaction by calling broker.publish(body, queue=..., session=session). A subscriber polls the table directly with FOR UPDATE SKIP LOCKED, runs the handler, and deletes the row on success. No downstream broker, no separate relay process — the table is the queue.

from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from faststream import FastStream
from faststream_outbox import OutboxBroker, make_outbox_table

metadata = MetaData()
outbox_table = make_outbox_table(metadata, table_name="outbox")

engine = create_async_engine("postgresql+asyncpg://localhost/app")
broker = OutboxBroker(engine, outbox_table=outbox_table)
app = FastStream(broker)

@broker.subscriber("orders", max_workers=4)
async def handle(order_id: int) -> None:
    print(f"order {order_id}")

# Producer side — share the caller's open transaction:
session_factory = async_sessionmaker(engine, expire_on_commit=False)
async with session_factory() as session, session.begin():
    session.add(Order(id=1))
    await broker.publish(1, queue="orders", session=session)

How it works

A subscriber owns two async loops: a fetch loop claims available rows via a single CTE (SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *), and max_workers worker loops dispatch to the handler. On success, DELETE WHERE id=:id AND acquired_token=:token; on failure, the retry strategy schedules another attempt or terminally drops the row. Terminal failures DELETE by default; pass dlq_table=make_dlq_table(metadata) to atomically archive them into a sibling audit table instead — see Dead-letter queue.

The acquired_token is the load-bearing invariant: a slow handler whose lease expired and was re-claimed by another worker finds its terminal DELETE to be a no-op (the token no longer matches), preventing it from clobbering the new lease holder.

With the asyncpg driver, the fetch loop also LISTENs on outbox_<table> and publish emits pg_notify(...), so idle dispatch latency is sub-100ms instead of up to max_fetch_interval.

See How it works for the full architecture.

Optional extras

  • faststream-outbox[asyncpg] — asyncpg driver (enables LISTEN/NOTIFY for sub-100ms idle dispatch)
  • faststream-outbox[fastapi] — FastAPI integration via OutboxRouter
  • faststream-outbox[validate] — Alembic for broker.validate_schema()
  • faststream-outbox[prometheus] — Prometheus metrics adapter
  • faststream-outbox[opentelemetry] — OpenTelemetry metrics adapter

Acknowledgements

The architecture of this package is heavily informed by Arseniy Popov's PR #2704 (feat: add sqla broker) on upstream FastStream — the FastStream broker/registrator/subscriber wiring, the SELECT … FOR UPDATE SKIP LOCKED fetch-and-claim CTE, the retry strategy hierarchy, and the in-transaction publish contract all originate from there. This package is a Postgres-only reimplementation that diverges in storage model (lease tokens instead of an explicit state column, archive table is opt-in), loop structure (two loops instead of four), wake-up mechanism (LISTEN/NOTIFY), and adds timer mechanics. Credit for the original design belongs to Arseniy.

Part of modern-python

Browse the full list of templates and libraries in modern-python — see the org profile for the categorized index.

📚 Documentation

📦 PyPi

📝 License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

faststream_outbox-0.6.2.tar.gz (57.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

faststream_outbox-0.6.2-py3-none-any.whl (77.4 kB view details)

Uploaded Python 3

File details

Details for the file faststream_outbox-0.6.2.tar.gz.

File metadata

  • Download URL: faststream_outbox-0.6.2.tar.gz
  • Upload date:
  • Size: 57.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.18 {"installer":{"name":"uv","version":"0.11.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for faststream_outbox-0.6.2.tar.gz
Algorithm Hash digest
SHA256 33988fc9e22afd8e6d3d890f06182aec3bb291293c89f3d69acdef21699b9a10
MD5 a4d7cc51ab3f006ddb77939547022c89
BLAKE2b-256 ba1bdfbd03711ff57a1abc82a809af18e9298d09e1f7b7949055ca4086643e03

See more details on using hashes here.

File details

Details for the file faststream_outbox-0.6.2-py3-none-any.whl.

File metadata

  • Download URL: faststream_outbox-0.6.2-py3-none-any.whl
  • Upload date:
  • Size: 77.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.18 {"installer":{"name":"uv","version":"0.11.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for faststream_outbox-0.6.2-py3-none-any.whl
Algorithm Hash digest
SHA256 a2be701c572568be2172d9c20fff873e59a7e8a60550e42abf3405d1f3d2f67e
MD5 570a7f863a62f7eb3be29fe7012b16dd
BLAKE2b-256 00170d761c6f348c5fa7b40b650f8601a6af84b49f4e5e1db0e0537ca78c8987

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page