Skip to main content

Concurrent message processing middleware for FastStream with aiokafka

Project description

faststream-concurrent-aiokafka

Concurrent message processing middleware for FastStream with aiokafka.

By default FastStream processes Kafka messages sequentially. This library allows you to process multiple messages concurrently using asyncio tasks, with optional batch offset committing.

Installation

pip install faststream-concurrent-aiokafka

Usage

from faststream import FastStream, ContextRepo
from faststream.kafka import KafkaBroker
from faststream_concurrent_aiokafka import (
    KafkaConcurrentProcessingMiddleware,
    initialize_concurrent_processing,
    stop_concurrent_processing,
)

broker = KafkaBroker(middlewares=[KafkaConcurrentProcessingMiddleware])
app = FastStream(broker)


@app.on_startup
async def on_startup(context: ContextRepo) -> None:
    await initialize_concurrent_processing(
        context=context,
        concurrency_limit=20,  # max concurrent tasks (0 = unlimited)
    )


@app.on_shutdown
async def on_shutdown(context: ContextRepo) -> None:
    await stop_concurrent_processing(context)


@broker.subscriber("my-topic", group_id="my-group")
async def handle(msg: str) -> None:
    # runs concurrently with other messages
    ...

Batch offset committing

By default aiokafka auto-commits offsets. If you manage commits manually, enable enable_batch_commit=True to have the library commit offsets in batches after each task completes:

await initialize_concurrent_processing(
    context=context,
    concurrency_limit=20,
    commit_batch_size=100,
    commit_batch_timeout_sec=5,
    enable_batch_commit=True,
)

With batch commit enabled, offsets are committed per partition at the highest completed offset in each batch.

Consumer group filtering

When multiple consumer groups subscribe to the same topic, producers can tag messages with a topic_group header to direct them to a specific group. The middleware skips messages whose topic_group header doesn't match the consumer's group ID. Messages with no topic_group header are always processed.

# Producer side — send to a specific consumer group only
await broker.publish(
    {"data": "..."},
    topic="my-topic",
    headers={"topic_group": "group-a"},
)

Parameters

initialize_concurrent_processing

Parameter Default Description
concurrency_limit 10 Max concurrent asyncio tasks. 0 disables the limit.
commit_batch_size 10 Max messages per commit batch.
commit_batch_timeout_sec 10 Max seconds before flushing a batch.
enable_batch_commit False Enable manual batch offset committing.

Requirements

  • Python >= 3.11
  • faststream[kafka]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

faststream_concurrent_aiokafka-0.1.0.tar.gz (6.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file faststream_concurrent_aiokafka-0.1.0.tar.gz.

File metadata

  • Download URL: faststream_concurrent_aiokafka-0.1.0.tar.gz
  • Upload date:
  • Size: 6.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for faststream_concurrent_aiokafka-0.1.0.tar.gz
Algorithm Hash digest
SHA256 825e55053c3bf906ee912e73a6a0b5af04c47141277263053dbca75fd7e9fe26
MD5 a0d9d1652a4548dfed7def121f7eaf2e
BLAKE2b-256 bf2871542b67709204c24a316466bf0f4086a50b89a841561f52fdcb3856452b

See more details on using hashes here.

File details

Details for the file faststream_concurrent_aiokafka-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: faststream_concurrent_aiokafka-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 8.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for faststream_concurrent_aiokafka-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9b1dc0f65da7c8a2a5ec1e987260a3af4f0debe2bfb8bddb488435c541f4f0eb
MD5 15893ca52898192079e55bc96ed004a7
BLAKE2b-256 a814cc7a28f7a3eb4a82dcb98cbcd539a26e0d724ec9d99ae971d6f34ad3d19f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page