Skip to main content

Concurrent message processing middleware for FastStream with aiokafka

Project description

faststream-concurrent-aiokafka

Supported versions downloads GitHub stars

Concurrent message processing middleware for FastStream with aiokafka.

By default FastStream processes Kafka messages sequentially — one message at a time per subscriber. This library turns each incoming message into an asyncio task so multiple messages are handled concurrently, while keeping offset commits correct and shutdown graceful.

Features

  • Concurrent message processing via asyncio tasks
  • Configurable concurrency limit (semaphore-based)
  • Batch offset committing per partition after each task completes
  • Graceful shutdown: waits up to 10 s for in-flight tasks before exiting
  • Signal handling (SIGTERM / SIGINT / SIGQUIT) triggers graceful shutdown
  • Background observer task to detect and discard stale completed tasks
  • Handler exceptions are logged but do not crash the consumer

📦 PyPi

📝 License

Installation

pip install faststream-concurrent-aiokafka

Quick Start

ack_policy=AckPolicy.MANUAL is required on every subscriber — the middleware enforces this at runtime. Without it, aiokafka's auto-commit timer would commit offsets before processing tasks complete, causing silent message loss on crash.

from faststream import FastStream, ContextRepo
from faststream.kafka import KafkaBroker
from faststream.middlewares import AckPolicy
from faststream_concurrent_aiokafka import (
    KafkaConcurrentProcessingMiddleware,
    initialize_concurrent_processing,
    stop_concurrent_processing,
)

broker = KafkaBroker(middlewares=[KafkaConcurrentProcessingMiddleware])
app = FastStream(broker)


@app.on_startup
async def on_startup(context: ContextRepo) -> None:
    await initialize_concurrent_processing(
        context=context,
        concurrency_limit=20,       # max concurrent tasks (minimum: 1)
        commit_batch_size=100,      # commit after this many completed tasks
        commit_batch_timeout_sec=5.0,  # or after this many seconds
    )


@app.on_shutdown
async def on_shutdown(context: ContextRepo) -> None:
    await stop_concurrent_processing(context)


@broker.subscriber("my-topic", group_id="my-group", ack_policy=AckPolicy.MANUAL)
async def handle(msg: str) -> None:
    # runs concurrently with other messages
    ...

Core Concepts

KafkaConcurrentProcessingMiddleware

A FastStream BaseMiddleware subclass. Add it to your broker to enable concurrent processing. It wraps each incoming message in an asyncio task submitted to KafkaConcurrentHandler.

KafkaConcurrentHandler

The processing engine. Manages:

  • An asyncio.Semaphore to enforce concurrency_limit
  • A set of in-flight asyncio tasks
  • A background observer that periodically discards stale completed tasks
  • Signal handlers for graceful shutdown

KafkaBatchCommitter

Runs as a background asyncio task. Receives KafkaCommitTask objects, waits for each task's asyncio future to complete, then commits the max offset per partition to Kafka. Batching is triggered by size or timeout. If the committer's task dies, CommitterIsDeadError is raised to callers.

API Reference

initialize_concurrent_processing(context, ...)

Create and start the concurrent processing handler; store it in FastStream's context.

Parameter Default Description
context required FastStream ContextRepo instance
concurrency_limit 10 Max concurrent asyncio tasks (minimum: 1)
commit_batch_size 10 Max messages per commit batch
commit_batch_timeout_sec 10.0 Max seconds before flushing a batch

Returns the KafkaConcurrentHandler instance.

stop_concurrent_processing(context)

Flush pending commits, wait for in-flight tasks (up to 10 s), then stop the handler.

KafkaConcurrentProcessingMiddleware

FastStream middleware class. Pass it to KafkaBroker(middlewares=[...]) or broker.add_middleware(...).

How It Works

  1. Message dispatch: On each incoming message, consume_scope calls handle_task(), which acquires a semaphore slot then fires the handler coroutine as a background asyncio.Task.

  2. Concurrency control: The semaphore blocks new tasks when concurrency_limit is reached. The slot is released via a done-callback when the task finishes or fails.

  3. Offset committing: Each dispatched task is paired with its Kafka offset and consumer reference and enqueued in KafkaBatchCommitter. Once the task completes, the committer groups offsets by partition and calls consumer.commit(partitions_to_offsets) with offset + 1 (Kafka's "next offset to fetch" convention).

  4. Graceful shutdown: stop_concurrent_processing sets the shutdown event, flushes the committer, cancels the observer task, and calls asyncio.gather with a 10-second timeout to wait for all in-flight tasks.

Requirements

  • Python >= 3.11
  • faststream[kafka]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

faststream_concurrent_aiokafka-0.2.0.tar.gz (7.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file faststream_concurrent_aiokafka-0.2.0.tar.gz.

File metadata

  • Download URL: faststream_concurrent_aiokafka-0.2.0.tar.gz
  • Upload date:
  • Size: 7.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for faststream_concurrent_aiokafka-0.2.0.tar.gz
Algorithm Hash digest
SHA256 10892c94aabdf836abd03f168be145dd7fb41814182136b04e05aeb75b7e2a0d
MD5 378cc9105091e7876dcb8a0b5984b634
BLAKE2b-256 57fd792e33ef601894318529b41634f22e604caa19e6fffad35c88a5e58ac113

See more details on using hashes here.

File details

Details for the file faststream_concurrent_aiokafka-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: faststream_concurrent_aiokafka-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 9.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for faststream_concurrent_aiokafka-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 521f4bef67f6a13196c48ed2220a2a132daaa1f4dae787d329d84609a9701fa6
MD5 49a4bc1403d350abd47dca6b88ed765a
BLAKE2b-256 807641a205b0f8e1563542ad353fc0eac7aeff630147e99ff721e32ea5e8e438

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page