Async Kafka helpers built on confluent_kafka.
Project description
kafklient
Async Kafka helpers that wrap confluent_kafka with typed listeners and RPC utilities.
What's inside
KafkaListener: subscribe to topics and stream parsed objects.KafkaRPC: request/response helper using correlation IDs over Kafka.- MCP over Kafka (optional): run an MCP server over Kafka + provide a stdio bridge (
kafklient-mcp-client) so any MCP stdio client can connect. - Group-managed subscriptions only (subscribe-based). Manual assign is removed.
Requirements
- Python >= 3.12
- Kafka cluster reachable from your app
- confluent-kafka >= 2.6.0
Install
pip install kafklient
MCP (Model Context Protocol) over Kafka (optional)
If you want to expose tools/resources/prompts to MCP clients over Kafka, install the optional extra:
pip install "kafklient[mcp]"
Quickstart (requires a reachable Kafka broker):
# 1) Start YOUR MCP server that speaks MCP(JSON-RPC) over Kafka
# (example below: my_mcp_server.py)
uv run python my_mcp_server.py
# 2) Start a stdio <-> Kafka bridge for MCP stdio clients (LangChain, etc.)
uv run kafklient-mcp-client --bootstrap-servers localhost:9092
Example: custom MCP server over Kafka
Create my_mcp_server.py:
import logging
from fastmcp import FastMCP
from kafklient.mcp.server import run_server
logging.basicConfig(level=logging.INFO)
mcp = FastMCP("My Kafka MCP Server")
@mcp.tool()
def echo(message: str) -> str:
return f"Echo: {message}"
@mcp.tool()
def add(a: int, b: int) -> int:
return a + b
if __name__ == "__main__":
run_server(
mcp,
bootstrap_servers="localhost:9092",
consumer_topic="mcp-requests",
producer_topic="mcp-responses",
show_banner=False,
log_level="info",
)
Notes:
- The server <-> bridge communication goes through Kafka topics (
mcp-requests/mcp-responsesby default). - The client <-> bridge communication is stdio (JSON-RPC over stdin/stdout), so avoid printing to stdout in the bridge process.
Core concepts
- Group-managed subscribe: Consumers must have a non-empty
group_idto join a consumer group. Kafka partitions are assigned and rebalanced by the coordinator. - group_id strategy:
- Same
group_idamong instances → competing consumers (load-balancing, each record processed once by the group). - Different
group_idamong instances → each instance receives the full stream (broadcast-style consumption). - Never share
group_idbetween logically different roles (e.g., RPC clients vs RPC servers).
- Same
ParserSpec
You declare which topics a client parses and how to parse them.
from kafklient import Message, ParserSpec
def parse_json(rec: Message) -> dict[str, object]:
import json
return json.loads(rec.value() or b"{}")
spec: ParserSpec[dict[str, object]] = {
"topics": ["events"],
"type": dict[str, object],
"parser": parse_json,
}
KafkaListener quickstart
import asyncio
from kafklient import KafkaListener
async def main() -> None:
async with KafkaListener(
parsers=[
{
"topics": ["my-topic"],
"type": dict[str, object],
"parser": lambda r: {"topic": r.topic(), "value": (r.value() or b"").decode("utf-8")},
}
],
consumer_factory={
"bootstrap.servers": "127.0.0.1:9092",
"auto.offset.reset": "latest",
},
) as listener:
stream = await listener.subscribe(dict[str, object])
async for item in stream:
print("got:", item)
asyncio.run(main())
KafkaRPC quickstart
import asyncio
from kafklient import KafkaRPC
async def main() -> None:
async with KafkaRPC(
parsers=[{"topics": ["my-topic"], "type": bytes, "parser": lambda r: r.value() or b""}],
producer_factory={"bootstrap.servers": "127.0.0.1:9092"},
consumer_factory={
"bootstrap.servers": "127.0.0.1:9092",
"auto.offset.reset": "latest",
},
) as rpc:
res = await rpc.request(
req_topic="request",
req_value=b"hello",
# Optionally direct server to respond to specific topics
req_headers_reply_to=["reply"],
res_expect_type=bytes,
)
print("response:", res)
asyncio.run(main())
RPC server pattern
Typical layout: servers consume from request and produce to reply topics passed in headers.
Guidelines:
- Server instances should share the same
group_idto load-balance requests. - Servers must NOT share
group_idwith clients. - Server reads request, extracts
x-reply-topicheaders, and produces the response to that topic. If multiple reply topics are present, produce to each (or choose policy).
Group_id guidance
- Listener
- Same group_id → scale-out (each record processed once by the group).
- Different group_id → broadcast (each listener gets all records).
- RPC server (responders)
- Same group_id among servers → load-balancing for requests.
- Different group_id among servers → all servers handle each request (usually wrong for RPC).
- RPC client (requesters)
- Each client should have a unique group_id to avoid competing on replies.
- Do not reuse server group_id.
Offsets & auto commit
auto_offset_resetdefaults are set on consumer factories in examples tolatest.
Correlation IDs
KafkaBaseClientextracts correlation IDs from headers (e.g.x-corr-id) or from the key when present.KafkaRPC.requestcan propagate the correlation ID in key and/or a header you choose (default header:x-corr-id).
Thread-based async implementation
This library uses sync Consumer and Producer from confluent-kafka, wrapped with dedicated thread executors (DedicatedThreadExecutor) to provide a non-blocking async API. This approach:
- Avoids blocking the event loop during Kafka operations
- Uses separate dedicated threads for consumer and producer operations
- Provides stable, production-ready Kafka client behavior
- Works with all existing confluent-kafka features and configurations
Auto topic creation
Both KafkaListener and KafkaRPC can automatically create topics if they don't exist. Enable this feature with auto_create_topics=True:
async with KafkaListener(
parsers=[{"topics": ["my-topic"], "type": bytes, "parser": lambda r: r.value() or b""}],
consumer_factory={"bootstrap.servers": "127.0.0.1:9092"},
auto_create_topics=True,
topic_num_partitions=3, # default: 1
topic_replication_factor=1, # default: 1
) as listener:
# Topics are created before subscribing
stream = await listener.subscribe(bytes)
async for item in stream:
print(item)
Options:
auto_create_topics: bool = False- Enable automatic topic creationtopic_num_partitions: int = 1- Number of partitions for auto-created topicstopic_replication_factor: int = 1- Replication factor for auto-created topics
Note: This uses AdminClient internally and requires appropriate broker permissions.
Production notes
- Always set explicit
group_idin your providedconsumer_factory. - Use dedicated topics for requests and for replies. Avoid sending replies to the request topic.
- Isolate roles with different
group_ids (clients vs servers). - Ensure idempotency in servers when necessary.
API reference (selected)
KafkaListener(parsers: Iterable[ParserSpec[object]], ...)subscribe(tp: Type[T], *, queue_maxsize: int = 0, fresh: bool = False) -> TypeStream[T]
KafkaRPC(parsers: Iterable[ParserSpec[object]], ...)request(req_topic: str, req_value: bytes, *, req_key: bytes | None = None, req_headers: list[tuple[str, str | bytes]] | None = None, req_headers_reply_to: list[str] | None = None, res_timeout: float = 30.0, res_expect_type: Type[T] | None = None, correlation_id: bytes | None = None, propagate_corr_to: str = "both", correlation_header_key: str = "x-corr-id") -> T
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kafklient-0.3.0.tar.gz.
File metadata
- Download URL: kafklient-0.3.0.tar.gz
- Upload date:
- Size: 37.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bb30cae134be81c6837dcfc335e5195438a07313a3d83c77dcbc879662cc9adb
|
|
| MD5 |
fdf525a3569e72c439839057cc1678f4
|
|
| BLAKE2b-256 |
cded713841123c9f94978f8ae2f1b30fab1adffe34e69b083dfbc9c17643ef7f
|
File details
Details for the file kafklient-0.3.0-py3-none-any.whl.
File metadata
- Download URL: kafklient-0.3.0-py3-none-any.whl
- Upload date:
- Size: 50.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b8449dc785d469ef6f5e3eef46c8ee142efed36a8ca39357e3fcc8b577495b2c
|
|
| MD5 |
b3ae7b303469ee591ad646653b8e4869
|
|
| BLAKE2b-256 |
af76241697af58df7cf70282fc713993fc44f1e3803504e160a1659b5c4dfb2c
|