FastStream broker integration for the transactional outbox pattern: a Postgres table is the queue
Project description
faststream-outbox
faststream-outbox is a FastStream broker integration for the transactional outbox pattern — a Postgres table is the message queue.
A producer writes a domain entity and an outbox row in the same SQLAlchemy transaction by calling broker.publish(body, queue=..., session=session). A separate subscriber polls the table and relays each row to a real message bus (Kafka, RabbitMQ, NATS, Redis…) with a single decorator — or processes the rows in-place if you don't have a downstream broker.
Quickstart — outbox relay to Kafka
Write the outbox row in your domain transaction; relay rows to Kafka with a stacked decorator.
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from faststream import FastStream
from faststream.kafka import KafkaBroker
from faststream_outbox import OutboxBroker, make_outbox_table
metadata = MetaData()
outbox_table = make_outbox_table(metadata, table_name="outbox")
engine = create_async_engine("postgresql+asyncpg://localhost/app")
broker_outbox = OutboxBroker(engine, outbox_table=outbox_table)
broker_kafka = KafkaBroker("127.0.0.1:9092")
publisher_kafka = broker_kafka.publisher("orders")
@publisher_kafka
@broker_outbox.subscriber("orders_outbox")
async def relay(body: dict) -> dict:
return body
app = FastStream(broker_outbox, on_startup=[broker_kafka.connect])
# Producer side — share the caller's open transaction; on commit both the
# domain row and the outbox row land atomically. The relay subscriber picks
# the outbox row up and publishes it to Kafka with at-least-once delivery.
session_factory = async_sessionmaker(engine, expire_on_commit=False)
async with session_factory() as session, session.begin():
session.add(Order(id=1))
await broker_outbox.publish(
{"order_id": 1},
queue="orders_outbox",
session=session,
)
The same one-decorator pattern works for RabbitMQ, NATS, Redis, and Confluent. See the relay tutorial for the FastAPI lifecycle, header propagation, router shapes, and the at-least-once contract.
Quickstart — standalone outbox queue
If you don't have a downstream broker, the same broker can process outbox rows in-place — the table is the queue.
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from faststream import FastStream
from faststream_outbox import OutboxBroker, make_outbox_table
metadata = MetaData()
outbox_table = make_outbox_table(metadata, table_name="outbox")
engine = create_async_engine("postgresql+asyncpg://localhost/app")
broker = OutboxBroker(engine, outbox_table=outbox_table)
app = FastStream(broker)
@broker.subscriber("orders", max_workers=4)
async def handle(order_id: int) -> None:
print(f"order {order_id}")
# Producer side — share the caller's open transaction:
session_factory = async_sessionmaker(engine, expire_on_commit=False)
async with session_factory() as session, session.begin():
session.add(Order(id=1))
await broker.publish(1, queue="orders", session=session)
How it works
A subscriber owns two async loops: a fetch loop claims available rows via a single CTE (SELECT … FOR UPDATE SKIP LOCKED → UPDATE acquired_token=:uuid, acquired_at=now() RETURNING *), and max_workers worker loops dispatch to the handler. On success, DELETE WHERE id=:id AND acquired_token=:token; on failure, the retry strategy schedules another attempt or terminally drops the row. Terminal failures DELETE by default; pass dlq_table=make_dlq_table(metadata) to atomically archive them into a sibling audit table instead — see Dead-letter queue.
The acquired_token is the load-bearing invariant: a slow handler whose lease expired and was re-claimed by another worker finds its terminal DELETE to be a no-op (the token no longer matches), preventing it from clobbering the new lease holder.
With the asyncpg driver, the fetch loop also LISTENs on outbox_<table> and publish emits pg_notify(...), so idle dispatch latency is sub-100ms instead of up to max_fetch_interval.
See How it works for the full architecture.
Optional extras
faststream-outbox[asyncpg]— asyncpg driver (enablesLISTEN/NOTIFYfor sub-100ms idle dispatch)faststream-outbox[fastapi]— FastAPI integration viaOutboxRouterfaststream-outbox[validate]— Alembic forbroker.validate_schema()faststream-outbox[prometheus]— Prometheus metrics adapterfaststream-outbox[opentelemetry]— OpenTelemetry metrics adapter
Acknowledgements
The architecture of this package is heavily informed by Arseniy Popov's PR #2704 (feat: add sqla broker) on upstream FastStream — the FastStream broker/registrator/subscriber wiring, the SELECT … FOR UPDATE SKIP LOCKED fetch-and-claim CTE, the retry strategy hierarchy, and the in-transaction publish contract all originate from there. This package is a Postgres-only reimplementation that diverges in storage model (lease tokens instead of an explicit state column, archive table is opt-in), loop structure (two loops instead of four), wake-up mechanism (LISTEN/NOTIFY), and adds timer mechanics. Credit for the original design belongs to Arseniy.
Part of modern-python
Browse the full list of templates and libraries in
modern-python — see the org profile for the
categorized index.
📚 Documentation
📦 PyPi
📝 License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file faststream_outbox-0.8.0.tar.gz.
File metadata
- Download URL: faststream_outbox-0.8.0.tar.gz
- Upload date:
- Size: 61.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a09b8a317fee32df65bca632c81256995d5637282c2d614b5677a8a0f61d5fa6
|
|
| MD5 |
ad1ac65dec06d0422240be45933eb1cb
|
|
| BLAKE2b-256 |
bf3eb110fc98959d12f58aeeb67df99079ee4aa8f43cf17e6f9e1fabfa61d697
|
File details
Details for the file faststream_outbox-0.8.0-py3-none-any.whl.
File metadata
- Download URL: faststream_outbox-0.8.0-py3-none-any.whl
- Upload date:
- Size: 81.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8315ac5fef4c29634b40c21d6e838ded705aba8b9b9a76434e92ba8cadac3c6e
|
|
| MD5 |
8cd23a848c098dc03f4dbcd8c1f5c3fc
|
|
| BLAKE2b-256 |
fa6b891ded74c84d40efb1f93a9aa5f34b7183f056fd86244d944ac286a8010f
|