Skip to main content

Async Kafka producer/consumer helpers for FastAPI projects

Project description

pronto-kafka

Async Kafka producer/consumer helpers for FastAPI (and any async Python) projects. Wraps aiokafka with a module-level producer, context-manager consumers, and ready-made FastAPI lifespan hooks.

Installation

pip install pronto-kafka

Quick start — FastAPI producer

from fastapi import FastAPI
from pronto_kafka.v1 import make_producer_lifespan, send

app = FastAPI(lifespan=make_producer_lifespan("KAFKA_BOOTSTRAP_SERVERS"))

@app.post("/orders")
async def create_order(order: dict):
    import json
    await send("orders", json.dumps(order).encode())
    return {"status": "queued"}

Set your bootstrap servers in the environment:

KAFKA_BOOTSTRAP_SERVERS=localhost:9092

Pass any env var name to make_producer_lifespanMY_KAFKA_SERVERS, whatever fits your project.

Quick start — consumer

from pronto_kafka.v1 import create_consumer_from_env

async def process_orders():
    async with create_consumer_from_env("orders", group_id="order-svc") as consumer:
        async for msg in consumer:
            print(msg.topic, msg.value)

Quick start — consumer as FastAPI background task

from fastapi import FastAPI
from pronto_kafka.v1 import make_consumer_lifespan

async def handle(msg):
    print(f"received: {msg.value}")

app = FastAPI(
    lifespan=make_consumer_lifespan(
        "orders",
        group_id="order-svc",
        handler=handle,
    )
)

Producer API

make_producer_lifespan(env_var="KAFKA_BOOTSTRAP_SERVERS", **kwargs)

Returns a FastAPI-compatible lifespan that starts the producer on startup and stops it on shutdown. Extra kwargs are forwarded to AIOKafkaProducer.

init_producer(bootstrap_servers, **kwargs)

Start the module-level producer with an explicit bootstrap-servers string.

init_producer_from_env(env_var="KAFKA_BOOTSTRAP_SERVERS", **kwargs)

Start the producer by reading bootstrap servers from an environment variable. Raises RuntimeError if the variable is missing or empty.

close_producer()

Stop and tear down the module-level producer.

get_producer() -> AIOKafkaProducer

Return the active producer instance. Raises RuntimeError if not initialised.

send(topic, value=None, key=None, headers=None, partition=None, **kwargs) -> RecordMetadata

Convenience wrapper around producer.send_and_wait(...). Raises RuntimeError if the producer is not initialised.

meta = await send("events", b'{"type": "login"}', key=b"user-42")
print(meta.topic, meta.partition, meta.offset)

Consumer API

create_consumer(*topics, group_id, bootstrap_servers, **kwargs)

Async context manager that yields a started AIOKafkaConsumer. Stops the consumer on exit.

async with create_consumer("orders", group_id="svc", bootstrap_servers="localhost:9092") as consumer:
    async for msg in consumer:
        process(msg)

create_consumer_from_env(*topics, group_id, env_var="KAFKA_BOOTSTRAP_SERVERS", **kwargs)

Same as create_consumer but reads bootstrap servers from an environment variable.

make_consumer_lifespan(*topics, group_id, env_var="KAFKA_BOOTSTRAP_SERVERS", handler, **kwargs)

Returns a FastAPI-compatible lifespan that runs a consumer loop as an asyncio background task. Supply a handler async function that receives each ConsumerRecord.

async def handle(msg):
    await process(msg.value)

app = FastAPI(lifespan=make_consumer_lifespan("orders", group_id="svc", handler=handle))

Manual lifespan (without helpers)

from contextlib import asynccontextmanager
from fastapi import FastAPI
from pronto_kafka.v1 import init_producer_from_env, close_producer

@asynccontextmanager
async def lifespan(app):
    await init_producer_from_env("KAFKA_BOOTSTRAP_SERVERS")
    yield
    await close_producer()

app = FastAPI(lifespan=lifespan)

Development

pip install -e ".[dev]"
pytest

Integration tests run automatically when KAFKA_BOOTSTRAP_SERVERS is set (e.g. a local Docker Kafka), otherwise they are skipped.

Versioning

Versions are derived from git tags via hatch-vcs.

git tag v0.1.0
hatch build

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pronto_kafka-0.0.1.tar.gz (10.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pronto_kafka-0.0.1-py3-none-any.whl (6.9 kB view details)

Uploaded Python 3

File details

Details for the file pronto_kafka-0.0.1.tar.gz.

File metadata

  • Download URL: pronto_kafka-0.0.1.tar.gz
  • Upload date:
  • Size: 10.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pronto_kafka-0.0.1.tar.gz
Algorithm Hash digest
SHA256 e04930194110c3f435d7051366b43e80280294f31fd7bcc14dda0d7fb22d5698
MD5 91a4ab4169c6f671b1bf90393ec623c7
BLAKE2b-256 5b10cdf8273ca5eb0d571a2ec7f27281c6da0422e42a59eacad51e2f387cf4c8

See more details on using hashes here.

Provenance

The following attestation bundles were made for pronto_kafka-0.0.1.tar.gz:

Publisher: publish.yml on justTil/pronto-kafka

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pronto_kafka-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: pronto_kafka-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 6.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pronto_kafka-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 6b194a154ae17024245f93c2de179407a1c73c88e0aea0f7aefbeb59cdf96deb
MD5 f68bc9c5df499b2dc3d02bd510383ae5
BLAKE2b-256 2e43549279565da319aca8c129c3cc80ead51e9c5d64b313ee7ed175f81150a8

See more details on using hashes here.

Provenance

The following attestation bundles were made for pronto_kafka-0.0.1-py3-none-any.whl:

Publisher: publish.yml on justTil/pronto-kafka

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page