A well-documented, fully type-hinted Kafka client for Python
Project description
typedkafka
A well-documented, fully type-hinted Kafka client for Python.
Overview
typedkafka provides a modern Python interface to Apache Kafka with comprehensive documentation, full type hints, and developer-friendly features. Built on confluent-kafka for performance and reliability.
Key Features:
- Full type hints and IDE autocomplete on every public method
- Type-safe topics with generic serialization/deserialization
- Transactions, async, retry, and batch operations built in
- Pluggable serializers: JSON, string, Protobuf, Avro/Schema Registry
- Structured logging, metrics, OpenTelemetry tracing, and dead letter queues
- Testing mocks with full API parity — no broker needed for unit tests
- Type-safe config builders with presets, validation, and security helpers
Why typedkafka?
confluent-kafka is fast and reliable, but its Python API lacks type hints, has sparse docs, and surfaces raw C-level errors. typedkafka wraps it with a modern, Pythonic interface:
- Type safety — full type hints, generic
TypedTopic[T], and IDE autocomplete so you catch mistakes before runtime - Batteries included — transactions, async, retry, serialization, logging, metrics, and dead letter queues out of the box
- Testable — mock producer/consumer with full API parity; write unit tests without Docker
- Observable — structured logging, OpenTelemetry tracing, and metrics collection built in
Installation
pip install typedkafka
# With Avro/Schema Registry support
pip install typedkafka[avro]
# With Protobuf support
pip install typedkafka[protobuf]
# Everything
pip install typedkafka[all]
Requires Python 3.9+.
Quick Start
from typedkafka import KafkaProducer
with KafkaProducer({"bootstrap.servers": "localhost:9092"}) as producer:
producer.send("my-topic", b"Hello, Kafka!")
producer.send_json("events", {"user_id": 123, "action": "click"})
producer.flush()
from typedkafka import KafkaConsumer
with KafkaConsumer({"bootstrap.servers": "localhost:9092", "group.id": "my-group"}) as consumer:
consumer.subscribe(["my-topic"])
for msg in consumer:
print(msg.value_as_json())
consumer.commit(msg)
Configuration Builders
from typedkafka import ProducerConfig, KafkaProducer
config = (
ProducerConfig()
.bootstrap_servers("broker:9093")
.sasl_scram("user", "password")
.acks("all")
.compression("gzip")
.build(validate=True)
)
producer = KafkaProducer(config)
See the examples/ directory for more: transactions, async, retry, serializers, batch send, testing mocks, config builders, metrics, and dead letter queues.
Development
git clone https://github.com/Jgprog117/typedkafka.git
cd typedkafka
pip install -e ".[dev]"
pytest
ruff check .
mypy src
License
MIT License - see LICENSE file for details.
Changelog
See CHANGELOG.md for the full release history.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file typedkafka-0.7.0.tar.gz.
File metadata
- Download URL: typedkafka-0.7.0.tar.gz
- Upload date:
- Size: 87.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1c4ff7541d3f0066dbc7cd4bfd2f8c88a0e762988b8a0503502f10809b4f9e9d
|
|
| MD5 |
d924c5ac8a1e608e74539ef9fef17644
|
|
| BLAKE2b-256 |
eb2311f9aff29cfc1bb06c57f7cb0c527deda63400beb9f0ce40c4d69faa31c3
|
Provenance
The following attestation bundles were made for typedkafka-0.7.0.tar.gz:
Publisher:
publish.yml on Jgprog117/typedkafka
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
typedkafka-0.7.0.tar.gz -
Subject digest:
1c4ff7541d3f0066dbc7cd4bfd2f8c88a0e762988b8a0503502f10809b4f9e9d - Sigstore transparency entry: 893924323
- Sigstore integration time:
-
Permalink:
Jgprog117/typedkafka@3dc4340d9e5308b66b0d048abd54fdc64e21ccc0 -
Branch / Tag:
refs/tags/v0.7.0 - Owner: https://github.com/Jgprog117
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@3dc4340d9e5308b66b0d048abd54fdc64e21ccc0 -
Trigger Event:
release
-
Statement type:
File details
Details for the file typedkafka-0.7.0-py3-none-any.whl.
File metadata
- Download URL: typedkafka-0.7.0-py3-none-any.whl
- Upload date:
- Size: 45.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6fb98a04e64f6b58445b1f323c806d343ea869c9bf225b9c3f8f00469e11d52c
|
|
| MD5 |
a2c4a8f8cf50461f672422d610a11755
|
|
| BLAKE2b-256 |
a36d40f73357dd5a9ba70baa359c920ca915d27495772810a95b9ba500f191d1
|
Provenance
The following attestation bundles were made for typedkafka-0.7.0-py3-none-any.whl:
Publisher:
publish.yml on Jgprog117/typedkafka
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
typedkafka-0.7.0-py3-none-any.whl -
Subject digest:
6fb98a04e64f6b58445b1f323c806d343ea869c9bf225b9c3f8f00469e11d52c - Sigstore transparency entry: 893924396
- Sigstore integration time:
-
Permalink:
Jgprog117/typedkafka@3dc4340d9e5308b66b0d048abd54fdc64e21ccc0 -
Branch / Tag:
refs/tags/v0.7.0 - Owner: https://github.com/Jgprog117
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@3dc4340d9e5308b66b0d048abd54fdc64e21ccc0 -
Trigger Event:
release
-
Statement type: