Skip to main content

A well-documented, fully type-hinted Kafka client for Python

Project description

typedkafka

A well-documented, fully type-hinted Kafka client for Python.

Python Version License: MIT

Overview

typedkafka provides a modern Python interface to Apache Kafka with comprehensive documentation, full type hints, and developer-friendly features. Built on confluent-kafka for performance and reliability.

Key Features:

  • Comprehensive docstrings for every class and method
  • Full type hints for IDE autocomplete and type checking
  • Convenient helper methods for JSON and string messages
  • Testing utilities (MockProducer/MockConsumer) for unit tests
  • Type-safe configuration builders
  • Admin client for topic management
  • Context managers for automatic resource cleanup

Installation

pip install typedkafka

Requires Python 3.9+ and installs confluent-kafka as a dependency.

Quick Start

Producer

from typedkafka import KafkaProducer

with KafkaProducer({"bootstrap.servers": "localhost:9092"}) as producer:
    # Send bytes
    producer.send("my-topic", b"Hello, Kafka!")

    # Send JSON (automatic serialization)
    producer.send_json("events", {"user_id": 123, "action": "click"})

    # Send string
    producer.send_string("logs", "Application started")

    producer.flush()

Consumer

from typedkafka import KafkaConsumer

config = {
    "bootstrap.servers": "localhost:9092",
    "group.id": "my-consumer-group",
    "auto.offset.reset": "earliest"
}

with KafkaConsumer(config) as consumer:
    consumer.subscribe(["my-topic"])

    for msg in consumer:
        # Convenient deserialization
        data = msg.value_as_json()
        print(f"Received: {data}")

        consumer.commit(msg)

Testing Utilities

Mock implementations for testing without running Kafka:

from typedkafka.testing import MockProducer, MockConsumer

def test_my_function():
    producer = MockProducer()
    my_function(producer)

    # Verify what was sent
    assert len(producer.messages["events"]) == 1
    msg = producer.messages["events"][0]
    assert msg.value == b"expected"

def test_message_processing():
    consumer = MockConsumer()
    consumer.add_json_message("events", {"user_id": 123})

    result = process_messages(consumer)
    assert result is not None

Type-Safe Configuration

Fluent builders with IDE autocomplete:

from typedkafka import ProducerConfig, KafkaProducer

config = (ProducerConfig()
    .bootstrap_servers("localhost:9092")
    .acks("all")
    .compression("gzip")
    .linger_ms(10)
    .build())

producer = KafkaProducer(config)

Admin Operations

Manage topics and cluster configuration:

from typedkafka import KafkaAdmin

admin = KafkaAdmin({"bootstrap.servers": "localhost:9092"})

# Create topic
admin.create_topic("events", num_partitions=10, replication_factor=3)

# List topics
topics = admin.list_topics()

# Get topic details
info = admin.describe_topic("events")

# Delete topic
admin.delete_topic("old-topic")

Comprehensive Documentation

Every method includes detailed documentation:

def send(
    self,
    topic: str,
    value: bytes,
    key: Optional[bytes] = None,
    partition: Optional[int] = None,
    on_delivery: Optional[Callable] = None,
) -> None:
    """
    Send a message to a Kafka topic.

    This method is asynchronous - returns immediately after queuing.
    Use flush() to wait for delivery confirmation.

    Args:
        topic: The topic name to send the message to
        value: The message payload as bytes
        key: Optional message key as bytes. Messages with the same
             key go to the same partition.
        partition: Optional partition number. If None, partition is
                  chosen by the partitioner.
        on_delivery: Optional callback function called when delivery
                    succeeds or fails.

    Raises:
        ProducerError: If the message cannot be queued

    Examples:
        >>> producer.send("my-topic", b"Hello!")
        >>> producer.send("events", b"data", key=b"user-123")
    """

Better Error Messages

Clear, actionable errors with context:

try:
    producer.send_json("topic", non_serializable_object)
except SerializationError as e:
    # Error includes:
    # - Clear message
    # - The problematic value (e.value)
    # - Original error (e.original_error)
    print(f"Failed to serialize: {e}")

Development

# Clone the repository
git clone https://github.com/Jgprog117/typedkafka.git
cd typedkafka

# Install in editable mode with dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Run linter
ruff check .

License

MIT License - see LICENSE file for details.

Changelog

0.2.0 (2026-01-31)

  • Added testing utilities (MockProducer, MockConsumer)
  • Added type-safe configuration builders (ProducerConfig, ConsumerConfig)
  • Added Admin client wrapper for topic management
  • Improved documentation and examples

0.1.0 (2026-01-31)

  • Initial release
  • KafkaProducer with comprehensive documentation
  • KafkaConsumer with helper methods
  • Full type hints throughout
  • Context manager support
  • JSON and string convenience methods

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

typedkafka-0.3.0.tar.gz (32.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

typedkafka-0.3.0-py3-none-any.whl (28.1 kB view details)

Uploaded Python 3

File details

Details for the file typedkafka-0.3.0.tar.gz.

File metadata

  • Download URL: typedkafka-0.3.0.tar.gz
  • Upload date:
  • Size: 32.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for typedkafka-0.3.0.tar.gz
Algorithm Hash digest
SHA256 6012cbd1eb1ab9a56417cd4217cf99e0a66c75a10eae4a0fb04f8311c55eb4b8
MD5 05dd404ab2e8d9a933c8fbbb6c6de21f
BLAKE2b-256 3d01db131c74d03e4bb0da1f60d1f2902060e328741ecfab04b12ea1c9c03f34

See more details on using hashes here.

File details

Details for the file typedkafka-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: typedkafka-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 28.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for typedkafka-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 dcbda3fbcb0e0878da4e086144671999216680e12985860bfa52f0077e0d265f
MD5 b6c17b43309e76ad17761e194e168967
BLAKE2b-256 0da50120625df800933ddaf1689467a6eaf323c206abfb40575ec57c61f5f189

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page