Skip to main content

Concurrent message processing middleware for FastStream with aiokafka

Project description

faststream-concurrent-aiokafka

Concurrent message processing middleware for FastStream with aiokafka.

By default FastStream processes Kafka messages sequentially. This library allows you to process multiple messages concurrently using asyncio tasks, with optional batch offset committing.

Installation

pip install faststream-concurrent-aiokafka

Usage

from faststream import FastStream, ContextRepo
from faststream.kafka import KafkaBroker
from faststream_concurrent_aiokafka import (
    KafkaConcurrentProcessingMiddleware,
    initialize_concurrent_processing,
    stop_concurrent_processing,
)

broker = KafkaBroker(middlewares=[KafkaConcurrentProcessingMiddleware])
app = FastStream(broker)


@app.on_startup
async def on_startup(context: ContextRepo) -> None:
    await initialize_concurrent_processing(
        context=context,
        concurrency_limit=20,  # max concurrent tasks (0 = unlimited)
    )


@app.on_shutdown
async def on_shutdown(context: ContextRepo) -> None:
    await stop_concurrent_processing(context)


@broker.subscriber("my-topic", group_id="my-group")
async def handle(msg: str) -> None:
    # runs concurrently with other messages
    ...

Batch offset committing

By default aiokafka auto-commits offsets. If you manage commits manually, enable enable_batch_commit=True to have the library commit offsets in batches after each task completes:

await initialize_concurrent_processing(
    context=context,
    concurrency_limit=20,
    commit_batch_size=100,
    commit_batch_timeout_sec=5,
    enable_batch_commit=True,
)

With batch commit enabled, offsets are committed per partition at the highest completed offset in each batch.

Consumer group filtering

When multiple consumer groups subscribe to the same topic, producers can tag messages with a topic_group header to direct them to a specific group. The middleware skips messages whose topic_group header doesn't match the consumer's group ID. Messages with no topic_group header are always processed.

# Producer side — send to a specific consumer group only
await broker.publish(
    {"data": "..."},
    topic="my-topic",
    headers={"topic_group": "group-a"},
)

Parameters

initialize_concurrent_processing

Parameter Default Description
concurrency_limit 10 Max concurrent asyncio tasks. 0 disables the limit.
commit_batch_size 10 Max messages per commit batch.
commit_batch_timeout_sec 10 Max seconds before flushing a batch.
enable_batch_commit False Enable manual batch offset committing.

Requirements

  • Python >= 3.11
  • faststream[kafka]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

faststream_concurrent_aiokafka-0.0.1.tar.gz (6.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file faststream_concurrent_aiokafka-0.0.1.tar.gz.

File metadata

  • Download URL: faststream_concurrent_aiokafka-0.0.1.tar.gz
  • Upload date:
  • Size: 6.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.5 {"installer":{"name":"uv","version":"0.11.5","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for faststream_concurrent_aiokafka-0.0.1.tar.gz
Algorithm Hash digest
SHA256 d4da94924cf99fdd3e856c6413169ad203a0681f6c9a59f77263a89c56a05222
MD5 d61c37228f4a9694136b81203c732bfd
BLAKE2b-256 56e6f64cf2ce13a5ccd8c39128254ded4b633af05468ded1504d336d3030c2f7

See more details on using hashes here.

File details

Details for the file faststream_concurrent_aiokafka-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: faststream_concurrent_aiokafka-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 8.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.5 {"installer":{"name":"uv","version":"0.11.5","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for faststream_concurrent_aiokafka-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 afc6dc20b82bbb43f9f1677ede99acb73941fa7922a4374f95d293ae037b12b8
MD5 aa7cb4c2caec0366e94bf89befc94d84
BLAKE2b-256 9b1aeff4830fc9237b7e1aed6552076bade6dbf244726bb2da1c96ad4a1d796e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page