Skip to main content

Async Kafka helpers built on confluent_kafka.

Project description

kafklient

Async Kafka utilities built on confluent-kafka (librdkafka).

This library wraps the synchronous Consumer/Producer with dedicated thread executors so Kafka operations do not block the event loop, and provides typed streams and RPC utilities via Parser[T].

What's inside

  • KafkaListener: Subscribe to topics and stream parsed objects as TypeStream[T].
  • KafkaRPC: Send requests and await responses matched by correlation id.
  • KafkaRPCServer: Consume request topics and produce responses to reply topics specified in headers.
  • MCP over Kafka (optional): Run MCP (JSON-RPC) over Kafka topics + stdio bridges via CLI (kafklient mcp-client / kafklient mcp-server).

This library supports consumer-group subscribe mode only. (Manual assign is intentionally not supported.)

Requirements

  • Python >= 3.12
  • A reachable Kafka broker
  • confluent-kafka >= 2.12.0

Install

pip install kafklient

Optional extras:

# MCP server/bridge support
pip install "kafklient[mcp]"

# Dev tools (pyright/ruff)
pip install "kafklient[all]"

Local Kafka for development (optional)

This repository includes a single-node Kafka docker-compose.yml for local testing.

docker compose up -d

Default bootstrap server: 127.0.0.1:9092.

Core concepts

Consumer group (group.id)

  • Same group.id: competing consumers / load balancing (each record is delivered to one member in the group).
  • Different group.id: each instance receives the full stream (broadcast-style consumption).
  • Important: do not share a group.id between different roles (e.g. RPC clients vs RPC servers).

Start-from-latest behavior (seek_to_end_on_assign)

With the default seek_to_end_on_assign=True, the consumer seeks to the end when partitions are assigned. This focuses processing on messages produced after the client becomes ready and reduces accidental reprocessing of old data.

To read from older offsets:

  • seek_to_end_on_assign=False
  • optionally set consumer_config["auto.offset.reset"] = "earliest"

Parser[T]

Parser[T] declares which topics to parse and what type to parse into.

  • Recommended: provide a factory for JSON/custom binary payloads.
  • factory can be sync or async; the input is a Message.

Examples

KafkaListener: consume as a typed stream

This example parses JSON into a Hello dataclass and consumes a TypeStream[Hello].

import asyncio
import json
from dataclasses import dataclass

from kafklient import ConsumerConfig, KafkaListener, Message, Parser, ProducerConfig


@dataclass(frozen=True, slots=True)
class Hello:
    message: str
    count: int


def parse_hello(rec: Message) -> Hello:
    raw = rec.value() or b"{}"
    data = json.loads(raw.decode("utf-8"))
    return Hello(
        message=str(data.get("message", "")),
        count=int(data.get("count", 0)),
    )


async def main() -> None:
    topic = "hello-events"

    consumer_config: ConsumerConfig = {
        "bootstrap.servers": "127.0.0.1:9092",
        "group.id": "hello-listener",
        "auto.offset.reset": "latest",
    }
    producer_config: ProducerConfig = {"bootstrap.servers": "127.0.0.1:9092"}

    async with KafkaListener(
        parsers=[Parser[Hello](topics=[topic], factory=parse_hello)],
        consumer_config=consumer_config,
        producer_config=producer_config,
        auto_create_topics=True,
    ) as listener:
        stream = await listener.subscribe(Hello)

        # Demo only: produce and consume in the same process
        await listener.produce(
            topic,
            json.dumps({"message": "hi", "count": 1}).encode("utf-8"),
            flush=True,
        )

        async def receive_one() -> Hello:
            async for item in stream:
                return item
            raise RuntimeError("stream stopped before receiving a message")

        msg = await asyncio.wait_for(receive_one(), timeout=5.0)
        print(msg)


if __name__ == "__main__":
    asyncio.run(main())

KafkaRPC + KafkaRPCServer: request/response

RPC follows these rules:

  • Request topic: consumed by KafkaRPCServer (share the same group.id across servers to load-balance).
  • Reply topic: consumed by KafkaRPC (clients typically should use a unique group.id).
  • Reply routing: passed via one or more x-reply-topic headers on the request message.
  • Correlation matching: by default uses the message key (or the x-corr-id header).
import asyncio
from dataclasses import dataclass

from kafklient import ConsumerConfig, KafkaRPC, KafkaRPCServer, Message, Parser, ProducerConfig


@dataclass(frozen=True, slots=True)
class EchoRequest:
    data: bytes


def parse_echo_request(msg: Message) -> EchoRequest:
    return EchoRequest(data=msg.value() or b"")


def parse_bytes(msg: Message) -> bytes:
    return msg.value() or b""


async def run_server(*, ready: asyncio.Event, stop: asyncio.Event) -> None:
    request_topic = "rpc-requests"

    server_consumer_config: ConsumerConfig = {
        "bootstrap.servers": "127.0.0.1:9092",
        "group.id": "rpc-server",
        "auto.offset.reset": "latest",
    }
    server_producer_config: ProducerConfig = {"bootstrap.servers": "127.0.0.1:9092"}

    server = KafkaRPCServer(
        parsers=[Parser[EchoRequest](topics=[request_topic], factory=parse_echo_request)],
        consumer_config=server_consumer_config,
        producer_config=server_producer_config,
        auto_create_topics=True,
    )

    @server.handler(EchoRequest)
    async def echo(req: EchoRequest, message: Message) -> bytes:  # pyright: ignore[reportUnusedFunction]
        return req.data

    await server.start()
    ready.set()
    try:
        await stop.wait()
    finally:
        await server.stop()


async def main() -> None:
    request_topic = "rpc-requests"
    reply_topic = "rpc-replies"

    server_ready = asyncio.Event()
    server_stop = asyncio.Event()
    server_task = asyncio.create_task(run_server(ready=server_ready, stop=server_stop))

    rpc_consumer_config: ConsumerConfig = {
        "bootstrap.servers": "127.0.0.1:9092",
        "group.id": "rpc-client-1",
        "auto.offset.reset": "latest",
    }
    rpc_producer_config: ProducerConfig = {"bootstrap.servers": "127.0.0.1:9092"}

    rpc = KafkaRPC(
        parsers=[Parser[bytes](topics=[reply_topic], factory=parse_bytes)],
        consumer_config=rpc_consumer_config,
        producer_config=rpc_producer_config,
        auto_create_topics=True,
    )

    try:
        await server_ready.wait()
        await rpc.start()

        res = await rpc.request(
            req_topic=request_topic,
            req_value=b"ping",
            req_headers_reply_to=[reply_topic],
            res_timeout=5.0,
            res_expect_type=bytes,
        )
        print(res)
    finally:
        await rpc.stop()
        server_stop.set()
        await server_task


if __name__ == "__main__":
    asyncio.run(main())

Automatic topic creation

All clients (KafkaListener/KafkaRPC/KafkaRPCServer) can create topics in one of two ways:

  • auto_create_topics=True: create topics referenced by parsers on start (before subscribing).
  • Manual call: await client.create_topics("a", "b", ...)

Options:

  • topic_num_partitions (default: 1)
  • topic_replication_factor (default: 1)

Internally this uses AdminClient and requires broker permissions.

MCP over Kafka (optional)

You can run MCP (JSON-RPC) over Kafka topics and connect any stdio-based MCP client via a bridge.

Install

pip install "kafklient[mcp]"

Server example (FastMCP + run_server)

import logging

from fastmcp import FastMCP

from kafklient.mcp.server import run_server

logging.basicConfig(level=logging.INFO)

mcp = FastMCP("My Kafka MCP Server")


@mcp.tool()
def echo(message: str) -> str:
    return f"Echo: {message}"


if __name__ == "__main__":
    run_server(
        mcp,
        bootstrap_servers="127.0.0.1:9092",
        consumer_topic="mcp-requests",
        producer_topic="mcp-responses",
        consumer_group_id="mcp-server",
        auto_create_topics=True,
        show_banner=False,
        log_level="info",
    )

CLI: server (kafklient mcp-server)

The CLI can load a FastMCP instance from a module or a Python file and run it over Kafka.

Supported --mcp formats:

  • Module: mypkg.myserver:mcp (or mypkg.myserver:mcp.some_attr)
  • File: ./myserver.py:mcp (or ./myserver.py:mcp.some_attr)
  • If : is omitted, it defaults to :mcp (e.g. mypkg.myserver == mypkg.myserver:mcp)
# Module-based
kafklient mcp-server --mcp mypkg.myserver:mcp --bootstrap-servers 127.0.0.1:9092

# File-based
kafklient mcp-server --mcp ./myserver.py:mcp --bootstrap-servers 127.0.0.1:9092

CLI: stdio <-> Kafka bridge (kafklient mcp-client)

uv run kafklient mcp-client --bootstrap-servers 127.0.0.1:9092

Useful flags:

  • --consumer-topic: topic to read responses/notifications from (default: mcp-responses)
  • --producer-topic: topic to write requests to (default: mcp-requests)
  • Session isolation is always enabled (responses are filtered using x-session-id).

For local MCP Inspector experiments against this checkout, run the bridge through uv --directory:

uv --directory C:/Projects/kafklient run kafklient mcp-client -b 127.0.0.1:9092

Development

Lint/type-check:

uv run ruff check .
uv run pyright

If Kafka is running (e.g. after docker compose up -d), run the tests:

uv run kafklient-test

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafklient-0.6.2.tar.gz (36.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kafklient-0.6.2-py3-none-any.whl (46.8 kB view details)

Uploaded Python 3

File details

Details for the file kafklient-0.6.2.tar.gz.

File metadata

  • Download URL: kafklient-0.6.2.tar.gz
  • Upload date:
  • Size: 36.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.2 {"installer":{"name":"uv","version":"0.10.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for kafklient-0.6.2.tar.gz
Algorithm Hash digest
SHA256 4f0d272d1c5e782483fbc4ea39c8dd81acd7aefd6d585bf967e01e550dd8ebd4
MD5 c7a30bef4ff4bd289dc45779362a6966
BLAKE2b-256 d6374f4b8e626319fbbcf39b1f0c34e19555d11a3b50c7a8e8818ca77621c98a

See more details on using hashes here.

File details

Details for the file kafklient-0.6.2-py3-none-any.whl.

File metadata

  • Download URL: kafklient-0.6.2-py3-none-any.whl
  • Upload date:
  • Size: 46.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.2 {"installer":{"name":"uv","version":"0.10.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for kafklient-0.6.2-py3-none-any.whl
Algorithm Hash digest
SHA256 61183a94030ad0af5fcce9bffaa3bab44c7446c35220e79eb3874977dfe21456
MD5 aea03882b4930ef96fa613014587f6e0
BLAKE2b-256 88478de610cbab0b6310ab3b5255db6d39bf07e9c6616fdd2efd8d6114d74b55

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page