Skip to main content

Async Kafka helpers built on confluent_kafka.experimental.aio.

Project description

kafklient

Async Kafka client utilities for SDML built on confluent_kafka with thread-based async wrappers.

What's inside

  • KafkaListener: subscribe to topics and stream parsed objects.
  • KafkaRPC: request/response helper using correlation IDs over Kafka.
  • Group-managed subscriptions only (subscribe-based). Manual assign is removed.

Requirements

  • Python >= 3.12
  • Kafka cluster reachable from your app
  • confluent-kafka >= 2.6.0

Install

pip install sdml-kafka-client

Core concepts

  • Group-managed subscribe: Consumers must have a non-empty group_id to join a consumer group. Kafka partitions are assigned and rebalanced by the coordinator.
  • group_id strategy:
    • Same group_id among instances → competing consumers (load-balancing, each record processed once by the group).
    • Different group_id among instances → each instance receives the full stream (broadcast-style consumption).
    • Never share group_id between logically different roles (e.g., RPC clients vs RPC servers).

ParserSpec

You declare which topics a client parses and how to parse them.

from kafklient.types import ParserSpec
from confluent_kafka import Message

def parse_json(rec: Message) -> dict:
    import json
    return json.loads(rec.value or b"{}")

spec: ParserSpec[dict] = {
    "topics": ["events"],
    "type": dict,
    "parser": parse_json,
}

KafkaListener quickstart

import asyncio
from kafklient.clients import KafkaListener
from kafklient.types import ParserSpec
from confluent_kafka import Consumer

specs: list[ParserSpec[dict]] = [
    {
        "topics": ["events"],
        "type": dict,
        "parser": lambda r: {"topic": r.topic, "value": (r.value or b"").decode("utf-8")},
    }
]

listener = KafkaListener(
    parsers=specs,
    auto_commit={"every": 100, "interval_s": 5.0},
    consumer_factory=lambda: Consumer(
        {
            "bootstrap.servers": "127.0.0.1:9092",
            "group.id": "listener",              # required
            "auto.offset.reset": "latest",
        }
    ),
)

async def main() -> None:
    await listener.start()
    stream = await listener.subscribe(dict)
    async for item in stream:
        print("got:", item)

asyncio.run(main())

KafkaRPC quickstart

import asyncio
from kafklient.clients import KafkaRPC
from kafklient.types import ParserSpec
from confluent_kafka import Consumer, Message

def parse_reply(rec: Message) -> bytes:
    return rec.value or b""

rpc = KafkaRPC(
    parsers=[{"topics": ["reply"], "type": bytes, "parser": parse_reply}],
    consumer_factory=lambda: Consumer(
        {
            "bootstrap.servers": "127.0.0.1:9092",
            "group.id": "rpc-client-unique",     # must be unique per requester instance
            "auto.offset.reset": "latest",
        }
    ),
)

async def main() -> None:
    await rpc.start()
    res = await rpc.request(
        req_topic="request",
        req_value=b"hello",
        # Optionally direct server to respond to specific topics
        req_headers_reply_to=["reply"],
        res_expect_type=bytes,
    )
    print("response:", res)

asyncio.run(main())

RPC server pattern

Typical layout: servers consume from request and produce to reply topics passed in headers.

Guidelines:

  • Server instances should share the same group_id to load-balance requests.
  • Servers must NOT share group_id with clients.
  • Server reads request, extracts x-reply-topic headers, and produces the response to that topic. If multiple reply topics are present, produce to each (or choose policy).

Group_id guidance

  • Listener
    • Same group_id → scale-out (each record processed once by the group).
    • Different group_id → broadcast (each listener gets all records).
  • RPC server (responders)
    • Same group_id among servers → load-balancing for requests.
    • Different group_id among servers → all servers handle each request (usually wrong for RPC).
  • RPC client (requesters)
    • Each client should have a unique group_id to avoid competing on replies.
    • Do not reuse server group_id.

Offsets & auto commit

  • auto_commit is optional. When provided, commits happen by message count and/or interval.
  • auto_offset_reset defaults are set on consumer factories in examples to latest.

Correlation IDs

  • KafkaBaseClient extracts correlation IDs from headers (request_id, correlation_id, x-correlation-id) or from the key when present.
  • KafkaRPC.request can propagate the correlation ID in key and/or a header you choose (default header: request_id).

Thread-based async implementation

This library uses sync Consumer and Producer from confluent-kafka, wrapped with asyncio.to_thread() to provide a non-blocking async API. This approach:

  • Avoids blocking the event loop during Kafka operations
  • Provides stable, production-ready Kafka client behavior
  • Works with all existing confluent-kafka features and configurations

Production notes

  • Always set explicit group_id in your provided consumer_factory.
  • Use dedicated topics for requests and for replies. Avoid sending replies to the request topic.
  • Isolate roles with different group_ids (clients vs servers).
  • Ensure idempotency in servers when necessary.

API reference (selected)

  • KafkaListener(parsers: Iterable[ParserSpec[object]], ...)
    • subscribe(tp: Type[T], queue_maxsize: int = 0, fresh: bool = False) -> TypeStream[T]
  • KafkaRPC(parsers: Iterable[ParserSpec[object]], ...)
    • request(req_topic: str, req_value: bytes, *, req_key: bytes | None = None, req_headers: list[tuple[str, bytes]] | None = None, req_headers_reply_to: list[str] | None = None, res_timeout: float = 30.0, res_expect_type: Type[T] | None = None, correlation_id: bytes | None = None, propagate_corr_to: str = "both", correlation_header_key: str = "request_id") -> T

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafklient-0.2.0.tar.gz (29.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kafklient-0.2.0-py3-none-any.whl (35.7 kB view details)

Uploaded Python 3

File details

Details for the file kafklient-0.2.0.tar.gz.

File metadata

  • Download URL: kafklient-0.2.0.tar.gz
  • Upload date:
  • Size: 29.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.22

File hashes

Hashes for kafklient-0.2.0.tar.gz
Algorithm Hash digest
SHA256 091699fc47f280759e6c4467d3d2d7d721f75cd30937e03637bcee287c07df5c
MD5 9e1351bfc143ce0320196a5e34a135a5
BLAKE2b-256 33f3ab6502503567b982b6a3d7be46e9a38128d7c331a2e08c8b1b6829e36c42

See more details on using hashes here.

File details

Details for the file kafklient-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: kafklient-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 35.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.22

File hashes

Hashes for kafklient-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 fbe1a0dc613f8b1b61007f9552c8e962e6613dfcb7c0bc709e0f9e5abc226554
MD5 4d39cba406a92a46fd17ff61c68c7ece
BLAKE2b-256 c27f126c5e4e47d24e38433e48c3132d9ad595e24b989f45c96ad531caee5bd0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page