Concurrent message processing middleware for FastStream with aiokafka
Project description
faststream-concurrent-aiokafka
Concurrent message processing middleware for FastStream with aiokafka.
By default FastStream processes Kafka messages sequentially — one message at a time per subscriber. This library turns each incoming message into an asyncio task so multiple messages are handled concurrently, while keeping offset commits correct and shutdown graceful.
Features
- Concurrent message processing via asyncio tasks
- Configurable concurrency limit (semaphore-based)
- Batch offset committing per partition after each task completes
- Graceful shutdown: waits up to 10 s for in-flight tasks before exiting
- Signal handling (SIGTERM / SIGINT / SIGQUIT) triggers graceful shutdown
- Background observer task to detect and discard stale completed tasks
- Handler exceptions are logged but do not crash the consumer
- Health check helper to probe handler status from a
ContextRepo
📦 PyPi
📝 License
Installation
pip install faststream-concurrent-aiokafka
Quick Start
ack_policy=AckPolicy.MANUAL is required on every subscriber — the middleware enforces this at runtime.
Without it, aiokafka's auto-commit timer would commit offsets before processing tasks complete, causing silent message loss on crash.
from faststream import FastStream, ContextRepo
from faststream.kafka import KafkaBroker
from faststream.middlewares import AckPolicy
from faststream_concurrent_aiokafka import (
KafkaConcurrentProcessingMiddleware,
initialize_concurrent_processing,
is_kafka_handler_healthy,
stop_concurrent_processing,
)
broker = KafkaBroker(middlewares=[KafkaConcurrentProcessingMiddleware])
app = FastStream(broker)
@app.on_startup
async def on_startup(context: ContextRepo) -> None:
await initialize_concurrent_processing(
context=context,
concurrency_limit=20, # max concurrent tasks (minimum: 1)
commit_batch_size=100, # commit after this many completed tasks
commit_batch_timeout_sec=5.0, # or after this many seconds
)
@app.on_shutdown
async def on_shutdown(context: ContextRepo) -> None:
await stop_concurrent_processing(context)
@broker.subscriber("my-topic", group_id="my-group", ack_policy=AckPolicy.MANUAL)
async def handle(msg: str) -> None:
# runs concurrently with other messages
...
Core Concepts
KafkaConcurrentProcessingMiddleware
A FastStream BaseMiddleware subclass. Add it to your broker to enable concurrent processing. It wraps each incoming message in an asyncio task submitted to KafkaConcurrentHandler.
KafkaConcurrentHandler
The processing engine. Manages:
- An
asyncio.Semaphoreto enforceconcurrency_limit - A set of in-flight asyncio tasks
- A background observer that periodically discards stale completed tasks
- Signal handlers for graceful shutdown
KafkaBatchCommitter
Runs as a background asyncio task. Receives KafkaCommitTask objects, waits for each task's asyncio future to complete, then commits the max offset per partition to Kafka. Batching is triggered by size or timeout. If the committer's task dies, CommitterIsDeadError is raised to callers.
API Reference
initialize_concurrent_processing(context, ...)
Create and start the concurrent processing handler; store it in FastStream's context.
| Parameter | Default | Description |
|---|---|---|
context |
required | FastStream ContextRepo instance |
concurrency_limit |
10 |
Max concurrent asyncio tasks (minimum: 1) |
commit_batch_size |
10 |
Max messages per commit batch |
commit_batch_timeout_sec |
10.0 |
Max seconds before flushing a batch |
Returns the KafkaConcurrentHandler instance.
stop_concurrent_processing(context)
Flush pending commits, wait for in-flight tasks (up to 10 s), then stop the handler.
is_kafka_handler_healthy(context)
Returns True if the KafkaConcurrentHandler stored in context is running and healthy, False otherwise (not initialized, stopped, or observer task dead). Useful for readiness/liveness probes.
KafkaConcurrentProcessingMiddleware
FastStream middleware class. Pass it to KafkaBroker(middlewares=[...]) or broker.add_middleware(...).
How It Works
-
Message dispatch: On each incoming message,
consume_scopecallshandle_task(), which acquires a semaphore slot then fires the handler coroutine as a backgroundasyncio.Task. -
Concurrency control: The semaphore blocks new tasks when
concurrency_limitis reached. The slot is released via a done-callback when the task finishes or fails. -
Offset committing: Each dispatched task is paired with its Kafka offset and consumer reference and enqueued in
KafkaBatchCommitter. Once the task completes, the committer groups offsets by partition and callsconsumer.commit(partitions_to_offsets)withoffset + 1(Kafka's "next offset to fetch" convention). -
Graceful shutdown:
stop_concurrent_processingsets the shutdown event, flushes the committer, cancels the observer task, and callsasyncio.gatherwith a 10-second timeout to wait for all in-flight tasks.
Requirements
- Python >= 3.11
faststream[kafka]
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file faststream_concurrent_aiokafka-0.3.1.tar.gz.
File metadata
- Download URL: faststream_concurrent_aiokafka-0.3.1.tar.gz
- Upload date:
- Size: 8.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0eade7ef700ebb4cf06ee51dee81efe671f0fb8d22a3e589c16c17903f557957
|
|
| MD5 |
0a6c1cabee9a9e2b61a5e37a3c5a3662
|
|
| BLAKE2b-256 |
82fb84394a99853501d6e7990711543e06c16a0cf3e90cf619eccdf9577b9144
|
File details
Details for the file faststream_concurrent_aiokafka-0.3.1-py3-none-any.whl.
File metadata
- Download URL: faststream_concurrent_aiokafka-0.3.1-py3-none-any.whl
- Upload date:
- Size: 10.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4b60b769d382fc85babb9e2bcaebe025cb4bbf2639c1b916768db80ce4a08428
|
|
| MD5 |
e7d78a3af4d3d76c0996e1ed8f7f069b
|
|
| BLAKE2b-256 |
5d575dc5e41578447c116b4d2b0deef2f2a6e482423c4def1369c63c4e2a1518
|