FastStream broker integration for the transactional outbox pattern: a Postgres table is the queue
Project description
faststream-outbox
faststream-outbox is a FastStream broker integration for the transactional outbox pattern — a Postgres table is the message queue.
A producer writes a domain entity and an outbox row in the same SQLAlchemy transaction by calling broker.publish(body, queue=..., session=session). A subscriber polls the table directly with FOR UPDATE SKIP LOCKED, runs the handler, and deletes the row on success. No downstream broker, no separate relay process — the table is the queue.
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from faststream import FastStream
from faststream_outbox import OutboxBroker, make_outbox_table
metadata = MetaData()
outbox_table = make_outbox_table(metadata, table_name="outbox")
engine = create_async_engine("postgresql+asyncpg://localhost/app")
broker = OutboxBroker(engine, outbox_table=outbox_table)
app = FastStream(broker)
@broker.subscriber("orders", max_workers=4)
async def handle(order_id: int) -> None:
print(f"order {order_id}")
# Producer side — share the caller's open transaction:
session_factory = async_sessionmaker(engine, expire_on_commit=False)
async with session_factory() as session, session.begin():
session.add(Order(id=1))
await broker.publish(1, queue="orders", session=session)
How it works
make_outbox_table(metadata, table_name="outbox") returns a sqlalchemy.Table that you attach to your own MetaData and migrate via Alembic. The package does not own your schema; it only describes the columns it needs.
broker.publish(body, *, queue, session, headers=None, correlation_id=None) inserts one outbox row through the caller's AsyncSession. It does not flush, commit, or open its own transaction — the whole point is that the row commits atomically with the caller's domain writes. Use it inside an async with session.begin(): block.
broker.publish_batch(*bodies, queue, session, headers=None) inserts many rows in a single round-trip with the same transactional contract.
A subscriber owns three async loops:
- fetch — claims due rows via
SELECT … FOR UPDATE SKIP LOCKED → UPDATE state='processing', acquired_token=:uuid RETURNING *in a single CTE. - workers (×
max_workers) — dispatch to the handler. On success,DELETE WHERE id=:id AND acquired_token=:token. On failure, the retry strategy decides: schedule another attempt, or terminalDELETE. - release-stuck — periodically flips
processingrows back topendingif their lease is older thanrelease_stuck_timeout. Wrapped in a Postgres advisory lock so multiple processes don't compete.
The acquired_token is critical: a slow handler whose lease expired and was re-claimed by another worker will find its terminal DELETE/UPDATE to be a no-op (the token no longer matches), preventing it from clobbering the new lease holder's row.
Recommended index
Add this to your Alembic migration alongside the table:
CREATE INDEX outbox_pending_idx ON outbox (queue, next_attempt_at)
WHERE state = 'pending';
Schema validation
Schema validation is opt-in:
await broker.validate_schema() # raises if user's table drifts from expected columns
Call it from a /health endpoint or startup hook — not at broker.start(), so Alembic can run migrations against the same DB without a startup loop.
Retry strategies
from faststream_outbox import ExponentialRetry, ConstantRetry, LinearRetry, NoRetry
@broker.subscriber(
"orders",
retry_strategy=ExponentialRetry(
initial_delay_seconds=1.0,
max_delay_seconds=300.0,
max_attempts=5,
jitter_factor=0.5,
),
)
async def handle(order_id: int) -> None: ...
Strategies receive the raised exception so users may subclass for "retry only on transient errors":
class TransientOnly(ExponentialRetry):
def get_next_attempt_at(self, *, exception=None, **kw):
if exception and not isinstance(exception, TransientError):
return None
return super().get_next_attempt_at(exception=exception, **kw)
Failure modes
- Handlers must be idempotent. Crash between commit-of-handler-side-effects and the broker's
DELETEre-delivers the message. - Best-effort ordering only.
FOR UPDATE SKIP LOCKEDdoes not preserve strict order under concurrent workers. If you need strict per-aggregate ordering, route to a single subscriber and run a single worker. - No DLQ / archive. Terminal failures
DELETEthe row. Hookon_terminal_failure(row)to capture them in your own table or alerting.
Connection ownership
OutboxBroker does not close the AsyncEngine you pass in — the caller owns its lifecycle.
Tuning
Per-subscriber knobs (passed to @broker.subscriber("…", …)):
max_workers(default1) — concurrent handlers per subscriber.fetch_batch_size(default10) — rows claimed per fetch cycle.min_fetch_interval/max_fetch_interval(default1.0/10.0s) — base + ceiling for the adaptive idle backoff with jitter.release_stuck_timeout(default300.0s) — how long aprocessingrow may live before being released back topending.release_stuck_interval(defaultrelease_stuck_timeout / 2).max_deliveries(defaultNone— unbounded) — total claims (including stuck-recovery re-claims) after which the row is dropped without invoking the handler. Defends against handlers that consistently wedge.
📝 License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file faststream_outbox-0.1.0.tar.gz.
File metadata
- Download URL: faststream_outbox-0.1.0.tar.gz
- Upload date:
- Size: 18.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.11 {"installer":{"name":"uv","version":"0.11.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
86c8f2d4e1c81ab21b8786744a4addc69ea5447f2f019ab263db99bf46a4f64b
|
|
| MD5 |
7f9819f230b601e79459bb28e4c71c6a
|
|
| BLAKE2b-256 |
f4e76fe08e06029f6023dd7976167a98969e25029b521bc18707e677177effe1
|
File details
Details for the file faststream_outbox-0.1.0-py3-none-any.whl.
File metadata
- Download URL: faststream_outbox-0.1.0-py3-none-any.whl
- Upload date:
- Size: 26.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.11 {"installer":{"name":"uv","version":"0.11.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2202ddb4fbab63df49e51656bd6c3fb0a278d006c9cb4d4539db1ba0eb05a991
|
|
| MD5 |
d40043b5fe25add3837be04bebd198df
|
|
| BLAKE2b-256 |
ed1f11f50bd2126de105c68a0ae293342dcaf8bdfb38272c8a5cc4cc7e9d6803
|