Skip to main content

FastStream SQL broker

Project description

Downloads Package version Documentation

faststream-sqlbroker

A SQL-backed broker for FastStream. Documentation.

Transactional outbox

Implementing the transactional outbox pattern becomes as simple as the following.

Publish messages transactionally with your other database operations.

from sqlalchemy.ext.asyncio import create_async_engine

from faststream import AckPolicy, FastStream
from faststream.kafka import KafkaBroker

from faststream_sqlbroker.sqlbroker import SqlBroker
from faststream_sqlbroker.sqlbroker.retry import ExponentialBackoffRetryStrategy

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/mydb")
broker_sqlbroker = SqlBroker(engine=engine)
broker_kafka = KafkaBroker("127.0.0.1:9092")
app = FastStream(broker_sqlbroker, on_startup=[broker_kafka.connect])
publisher_sqlbroker = broker_sqlbroker.publisher()


@app.after_startup # just an example
async def publish_examples():
    async with engine.begin() as connection:
        # ... your other database operations using `connection` ...
        await publisher_sqlbroker.publish(
            {"message": "Hello, SqlBroker!"},
            queue="sqlbroker_queue",
            connection=connection,
        )

And relay the messages from the database to another broker.

publisher_kafka = broker_kafka.publisher("kafka_topic")


@publisher_kafka
@broker_sqlbroker.subscriber(
    queues=["sqlbroker_queue"],
    max_workers=10,
    retry_strategy=ExponentialBackoffRetryStrategy(
        initial_delay_seconds=1,
        multiplier=2,
        max_delay_seconds=60 * 5,
        max_total_delay_seconds=60 * 60 * 6,
        max_attempts=None,
    ),
    max_fetch_interval=1,
    min_fetch_interval=0,
    fetch_batch_size=10,
    overfetch_factor=1.5,
    flush_interval=3,
    release_stuck_interval=5,
    release_stuck_timeout=60 * 60,
    max_deliveries=20,
    ack_policy=AckPolicy.NACK_ON_ERROR,
)
async def handle_msg(msg_body: dict) -> dict:
    return msg_body

Origins

Originated as a PR to FastStream.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

faststream_sqlbroker-0.1.0a3.tar.gz (19.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

faststream_sqlbroker-0.1.0a3-py3-none-any.whl (31.9 kB view details)

Uploaded Python 3

File details

Details for the file faststream_sqlbroker-0.1.0a3.tar.gz.

File metadata

  • Download URL: faststream_sqlbroker-0.1.0a3.tar.gz
  • Upload date:
  • Size: 19.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.13

File hashes

Hashes for faststream_sqlbroker-0.1.0a3.tar.gz
Algorithm Hash digest
SHA256 1a68b1e54cff53ea4fd74f0c01fc4572360621a9a33b8643421a6bf7a46b6014
MD5 1434ced00e80cef23e3b96a3f363b656
BLAKE2b-256 607a6e11156b993e542251904c9d73d1f35db42784b7fc0fae802ec39871bc92

See more details on using hashes here.

Provenance

The following attestation bundles were made for faststream_sqlbroker-0.1.0a3.tar.gz:

Publisher: release_pypi.yaml on faststream-community/faststream-sqlbroker

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file faststream_sqlbroker-0.1.0a3-py3-none-any.whl.

File metadata

File hashes

Hashes for faststream_sqlbroker-0.1.0a3-py3-none-any.whl
Algorithm Hash digest
SHA256 1bb032545dfa2bbd8baaff4f7d4acbeb03a3982cd8368fb691eef905377e4640
MD5 dbd0ff36f587e21b065fb9363d1bf156
BLAKE2b-256 1d7c026eb2613c6f44598cb87f50a2f106e04548df8976c5793752ab8016c0ae

See more details on using hashes here.

Provenance

The following attestation bundles were made for faststream_sqlbroker-0.1.0a3-py3-none-any.whl:

Publisher: release_pypi.yaml on faststream-community/faststream-sqlbroker

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page