A well-documented, fully type-hinted Kafka client for Python
Project description
typedkafka
A well-documented, fully type-hinted Kafka client for Python.
Overview
typedkafka provides a modern Python interface to Apache Kafka with comprehensive documentation, full type hints, and developer-friendly features. Built on confluent-kafka for performance and reliability.
Key Features:
- Comprehensive docstrings for every class and method
- Full type hints for IDE autocomplete and type checking
- Convenient helper methods for JSON and string messages
- Testing utilities (MockProducer/MockConsumer) for unit tests
- Type-safe configuration builders
- Admin client for topic management
- Context managers for automatic resource cleanup
Installation
pip install typedkafka
Requires Python 3.9+ and installs confluent-kafka as a dependency.
Quick Start
Producer
from typedkafka import KafkaProducer
with KafkaProducer({"bootstrap.servers": "localhost:9092"}) as producer:
# Send bytes
producer.send("my-topic", b"Hello, Kafka!")
# Send JSON (automatic serialization)
producer.send_json("events", {"user_id": 123, "action": "click"})
# Send string
producer.send_string("logs", "Application started")
producer.flush()
Consumer
from typedkafka import KafkaConsumer
config = {
"bootstrap.servers": "localhost:9092",
"group.id": "my-consumer-group",
"auto.offset.reset": "earliest"
}
with KafkaConsumer(config) as consumer:
consumer.subscribe(["my-topic"])
for msg in consumer:
# Convenient deserialization
data = msg.value_as_json()
print(f"Received: {data}")
consumer.commit(msg)
Testing Utilities
Mock implementations for testing without running Kafka:
from typedkafka.testing import MockProducer, MockConsumer
def test_my_function():
producer = MockProducer()
my_function(producer)
# Verify what was sent
assert len(producer.messages["events"]) == 1
msg = producer.messages["events"][0]
assert msg.value == b"expected"
def test_message_processing():
consumer = MockConsumer()
consumer.add_json_message("events", {"user_id": 123})
result = process_messages(consumer)
assert result is not None
Type-Safe Configuration
Fluent builders with IDE autocomplete:
from typedkafka import ProducerConfig, KafkaProducer
config = (ProducerConfig()
.bootstrap_servers("localhost:9092")
.acks("all")
.compression("gzip")
.linger_ms(10)
.build())
producer = KafkaProducer(config)
Admin Operations
Manage topics and cluster configuration:
from typedkafka import KafkaAdmin
admin = KafkaAdmin({"bootstrap.servers": "localhost:9092"})
# Create topic
admin.create_topic("events", num_partitions=10, replication_factor=3)
# List topics
topics = admin.list_topics()
# Get topic details
info = admin.describe_topic("events")
# Delete topic
admin.delete_topic("old-topic")
Comprehensive Documentation
Every method includes detailed documentation:
def send(
self,
topic: str,
value: bytes,
key: Optional[bytes] = None,
partition: Optional[int] = None,
on_delivery: Optional[Callable] = None,
) -> None:
"""
Send a message to a Kafka topic.
This method is asynchronous - returns immediately after queuing.
Use flush() to wait for delivery confirmation.
Args:
topic: The topic name to send the message to
value: The message payload as bytes
key: Optional message key as bytes. Messages with the same
key go to the same partition.
partition: Optional partition number. If None, partition is
chosen by the partitioner.
on_delivery: Optional callback function called when delivery
succeeds or fails.
Raises:
ProducerError: If the message cannot be queued
Examples:
>>> producer.send("my-topic", b"Hello!")
>>> producer.send("events", b"data", key=b"user-123")
"""
Better Error Messages
Clear, actionable errors with context:
try:
producer.send_json("topic", non_serializable_object)
except SerializationError as e:
# Error includes:
# - Clear message
# - The problematic value (e.value)
# - Original error (e.original_error)
print(f"Failed to serialize: {e}")
Development
# Clone the repository
git clone https://github.com/Jgprog117/typedkafka.git
cd typedkafka
# Install in editable mode with dev dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Run linter
ruff check .
License
MIT License - see LICENSE file for details.
Changelog
0.2.0 (2026-01-31)
- Added testing utilities (MockProducer, MockConsumer)
- Added type-safe configuration builders (ProducerConfig, ConsumerConfig)
- Added Admin client wrapper for topic management
- Improved documentation and examples
0.1.0 (2026-01-31)
- Initial release
- KafkaProducer with comprehensive documentation
- KafkaConsumer with helper methods
- Full type hints throughout
- Context manager support
- JSON and string convenience methods
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file typedkafka-0.3.0.tar.gz.
File metadata
- Download URL: typedkafka-0.3.0.tar.gz
- Upload date:
- Size: 32.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6012cbd1eb1ab9a56417cd4217cf99e0a66c75a10eae4a0fb04f8311c55eb4b8
|
|
| MD5 |
05dd404ab2e8d9a933c8fbbb6c6de21f
|
|
| BLAKE2b-256 |
3d01db131c74d03e4bb0da1f60d1f2902060e328741ecfab04b12ea1c9c03f34
|
File details
Details for the file typedkafka-0.3.0-py3-none-any.whl.
File metadata
- Download URL: typedkafka-0.3.0-py3-none-any.whl
- Upload date:
- Size: 28.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dcbda3fbcb0e0878da4e086144671999216680e12985860bfa52f0077e0d265f
|
|
| MD5 |
b6c17b43309e76ad17761e194e168967
|
|
| BLAKE2b-256 |
0da50120625df800933ddaf1689467a6eaf323c206abfb40575ec57c61f5f189
|