Concurrent message processing middleware for FastStream with aiokafka
Project description
faststream-concurrent-aiokafka
Concurrent message processing middleware for FastStream with aiokafka.
By default FastStream processes Kafka messages sequentially. This library allows you to process multiple messages concurrently using asyncio tasks, with optional batch offset committing.
Installation
pip install faststream-concurrent-aiokafka
Usage
from faststream import FastStream, ContextRepo
from faststream.kafka import KafkaBroker
from faststream_concurrent_aiokafka import (
KafkaConcurrentProcessingMiddleware,
initialize_concurrent_processing,
stop_concurrent_processing,
)
broker = KafkaBroker(middlewares=[KafkaConcurrentProcessingMiddleware])
app = FastStream(broker)
@app.on_startup
async def on_startup(context: ContextRepo) -> None:
await initialize_concurrent_processing(
context=context,
concurrency_limit=20, # max concurrent tasks (0 = unlimited)
)
@app.on_shutdown
async def on_shutdown(context: ContextRepo) -> None:
await stop_concurrent_processing(context)
@broker.subscriber("my-topic", group_id="my-group")
async def handle(msg: str) -> None:
# runs concurrently with other messages
...
Batch offset committing
By default aiokafka auto-commits offsets. If you manage commits manually, enable enable_batch_commit=True to have the library commit offsets in batches after each task completes:
await initialize_concurrent_processing(
context=context,
concurrency_limit=20,
commit_batch_size=100,
commit_batch_timeout_sec=5,
enable_batch_commit=True,
)
With batch commit enabled, offsets are committed per partition at the highest completed offset in each batch.
Consumer group filtering
When multiple consumer groups subscribe to the same topic, producers can tag messages with a topic_group header to direct them to a specific group. The middleware skips messages whose topic_group header doesn't match the consumer's group ID. Messages with no topic_group header are always processed.
# Producer side — send to a specific consumer group only
await broker.publish(
{"data": "..."},
topic="my-topic",
headers={"topic_group": "group-a"},
)
Parameters
initialize_concurrent_processing
| Parameter | Default | Description |
|---|---|---|
concurrency_limit |
10 |
Max concurrent asyncio tasks. 0 disables the limit. |
commit_batch_size |
10 |
Max messages per commit batch. |
commit_batch_timeout_sec |
10 |
Max seconds before flushing a batch. |
enable_batch_commit |
False |
Enable manual batch offset committing. |
Requirements
- Python >= 3.11
faststream[kafka]
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file faststream_concurrent_aiokafka-0.1.0.tar.gz.
File metadata
- Download URL: faststream_concurrent_aiokafka-0.1.0.tar.gz
- Upload date:
- Size: 6.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
825e55053c3bf906ee912e73a6a0b5af04c47141277263053dbca75fd7e9fe26
|
|
| MD5 |
a0d9d1652a4548dfed7def121f7eaf2e
|
|
| BLAKE2b-256 |
bf2871542b67709204c24a316466bf0f4086a50b89a841561f52fdcb3856452b
|
File details
Details for the file faststream_concurrent_aiokafka-0.1.0-py3-none-any.whl.
File metadata
- Download URL: faststream_concurrent_aiokafka-0.1.0-py3-none-any.whl
- Upload date:
- Size: 8.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9b1dc0f65da7c8a2a5ec1e987260a3af4f0debe2bfb8bddb488435c541f4f0eb
|
|
| MD5 |
15893ca52898192079e55bc96ed004a7
|
|
| BLAKE2b-256 |
a814cc7a28f7a3eb4a82dcb98cbcd539a26e0d724ec9d99ae971d6f34ad3d19f
|