Async Kafka producer/consumer helpers for FastAPI projects
Project description
pronto-kafka
Async Kafka producer/consumer helpers for FastAPI (and any async Python) projects. Wraps aiokafka with a module-level producer, context-manager consumers, and ready-made FastAPI lifespan hooks.
Installation
pip install pronto-kafka
Quick start — FastAPI producer
from fastapi import FastAPI
from pronto_kafka.v1 import make_producer_lifespan, send
app = FastAPI(lifespan=make_producer_lifespan("KAFKA_BOOTSTRAP_SERVERS"))
@app.post("/orders")
async def create_order(order: dict):
import json
await send("orders", json.dumps(order).encode())
return {"status": "queued"}
Set your bootstrap servers in the environment:
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
Pass any env var name to make_producer_lifespan — MY_KAFKA_SERVERS, whatever fits your project.
Quick start — consumer
from pronto_kafka.v1 import create_consumer_from_env
async def process_orders():
async with create_consumer_from_env("orders", group_id="order-svc") as consumer:
async for msg in consumer:
print(msg.topic, msg.value)
Quick start — consumer as FastAPI background task
from fastapi import FastAPI
from pronto_kafka.v1 import make_consumer_lifespan
async def handle(msg):
print(f"received: {msg.value}")
app = FastAPI(
lifespan=make_consumer_lifespan(
"orders",
group_id="order-svc",
handler=handle,
)
)
Producer API
make_producer_lifespan(env_var="KAFKA_BOOTSTRAP_SERVERS", **kwargs)
Returns a FastAPI-compatible lifespan that starts the producer on startup and stops it on shutdown. Extra kwargs are forwarded to AIOKafkaProducer.
init_producer(bootstrap_servers, **kwargs)
Start the module-level producer with an explicit bootstrap-servers string.
init_producer_from_env(env_var="KAFKA_BOOTSTRAP_SERVERS", **kwargs)
Start the producer by reading bootstrap servers from an environment variable. Raises RuntimeError if the variable is missing or empty.
close_producer()
Stop and tear down the module-level producer.
get_producer() -> AIOKafkaProducer
Return the active producer instance. Raises RuntimeError if not initialised.
send(topic, value=None, key=None, headers=None, partition=None, **kwargs) -> RecordMetadata
Convenience wrapper around producer.send_and_wait(...). Raises RuntimeError if the producer is not initialised.
meta = await send("events", b'{"type": "login"}', key=b"user-42")
print(meta.topic, meta.partition, meta.offset)
Consumer API
create_consumer(*topics, group_id, bootstrap_servers, **kwargs)
Async context manager that yields a started AIOKafkaConsumer. Stops the consumer on exit.
async with create_consumer("orders", group_id="svc", bootstrap_servers="localhost:9092") as consumer:
async for msg in consumer:
process(msg)
create_consumer_from_env(*topics, group_id, env_var="KAFKA_BOOTSTRAP_SERVERS", **kwargs)
Same as create_consumer but reads bootstrap servers from an environment variable.
make_consumer_lifespan(*topics, group_id, env_var="KAFKA_BOOTSTRAP_SERVERS", handler, **kwargs)
Returns a FastAPI-compatible lifespan that runs a consumer loop as an asyncio background task. Supply a handler async function that receives each ConsumerRecord.
async def handle(msg):
await process(msg.value)
app = FastAPI(lifespan=make_consumer_lifespan("orders", group_id="svc", handler=handle))
Manual lifespan (without helpers)
from contextlib import asynccontextmanager
from fastapi import FastAPI
from pronto_kafka.v1 import init_producer_from_env, close_producer
@asynccontextmanager
async def lifespan(app):
await init_producer_from_env("KAFKA_BOOTSTRAP_SERVERS")
yield
await close_producer()
app = FastAPI(lifespan=lifespan)
Development
pip install -e ".[dev]"
pytest
Integration tests run automatically when KAFKA_BOOTSTRAP_SERVERS is set (e.g. a local Docker Kafka), otherwise they are skipped.
Versioning
Versions are derived from git tags via hatch-vcs.
git tag v0.1.0
hatch build
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pronto_kafka-0.0.1.tar.gz.
File metadata
- Download URL: pronto_kafka-0.0.1.tar.gz
- Upload date:
- Size: 10.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e04930194110c3f435d7051366b43e80280294f31fd7bcc14dda0d7fb22d5698
|
|
| MD5 |
91a4ab4169c6f671b1bf90393ec623c7
|
|
| BLAKE2b-256 |
5b10cdf8273ca5eb0d571a2ec7f27281c6da0422e42a59eacad51e2f387cf4c8
|
Provenance
The following attestation bundles were made for pronto_kafka-0.0.1.tar.gz:
Publisher:
publish.yml on justTil/pronto-kafka
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pronto_kafka-0.0.1.tar.gz -
Subject digest:
e04930194110c3f435d7051366b43e80280294f31fd7bcc14dda0d7fb22d5698 - Sigstore transparency entry: 1629570506
- Sigstore integration time:
-
Permalink:
justTil/pronto-kafka@2c1ecaa4de67562c00217478da429bd128baec13 -
Branch / Tag:
refs/tags/v0.0.1 - Owner: https://github.com/justTil
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@2c1ecaa4de67562c00217478da429bd128baec13 -
Trigger Event:
push
-
Statement type:
File details
Details for the file pronto_kafka-0.0.1-py3-none-any.whl.
File metadata
- Download URL: pronto_kafka-0.0.1-py3-none-any.whl
- Upload date:
- Size: 6.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6b194a154ae17024245f93c2de179407a1c73c88e0aea0f7aefbeb59cdf96deb
|
|
| MD5 |
f68bc9c5df499b2dc3d02bd510383ae5
|
|
| BLAKE2b-256 |
2e43549279565da319aca8c129c3cc80ead51e9c5d64b313ee7ed175f81150a8
|
Provenance
The following attestation bundles were made for pronto_kafka-0.0.1-py3-none-any.whl:
Publisher:
publish.yml on justTil/pronto-kafka
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pronto_kafka-0.0.1-py3-none-any.whl -
Subject digest:
6b194a154ae17024245f93c2de179407a1c73c88e0aea0f7aefbeb59cdf96deb - Sigstore transparency entry: 1629570517
- Sigstore integration time:
-
Permalink:
justTil/pronto-kafka@2c1ecaa4de67562c00217478da429bd128baec13 -
Branch / Tag:
refs/tags/v0.0.1 - Owner: https://github.com/justTil
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@2c1ecaa4de67562c00217478da429bd128baec13 -
Trigger Event:
push
-
Statement type: