Skip to main content

A well-documented, fully type-hinted Kafka client for Python

Project description

typedkafka

A well-documented, fully type-hinted Kafka client for Python.

Python Version License: MIT

Overview

typedkafka provides a modern Python interface to Apache Kafka with comprehensive documentation, full type hints, and developer-friendly features. Built on confluent-kafka for performance and reliability.

Key Features:

  • Comprehensive docstrings for every class and method
  • Full type hints for IDE autocomplete and type checking
  • Convenient helper methods for JSON and string messages
  • Testing utilities (MockProducer/MockConsumer) for unit tests
  • Type-safe configuration builders
  • Admin client for topic management
  • Context managers for automatic resource cleanup

Installation

pip install typedkafka

Requires Python 3.9+ and installs confluent-kafka as a dependency.

Quick Start

Producer

from typedkafka import KafkaProducer

with KafkaProducer({"bootstrap.servers": "localhost:9092"}) as producer:
    # Send bytes
    producer.send("my-topic", b"Hello, Kafka!")

    # Send JSON (automatic serialization)
    producer.send_json("events", {"user_id": 123, "action": "click"})

    # Send string
    producer.send_string("logs", "Application started")

    producer.flush()

Consumer

from typedkafka import KafkaConsumer

config = {
    "bootstrap.servers": "localhost:9092",
    "group.id": "my-consumer-group",
    "auto.offset.reset": "earliest"
}

with KafkaConsumer(config) as consumer:
    consumer.subscribe(["my-topic"])

    for msg in consumer:
        # Convenient deserialization
        data = msg.value_as_json()
        print(f"Received: {data}")

        consumer.commit(msg)

Testing Utilities

Mock implementations for testing without running Kafka:

from typedkafka.testing import MockProducer, MockConsumer

def test_my_function():
    producer = MockProducer()
    my_function(producer)

    # Verify what was sent
    assert len(producer.messages["events"]) == 1
    msg = producer.messages["events"][0]
    assert msg.value == b"expected"

def test_message_processing():
    consumer = MockConsumer()
    consumer.add_json_message("events", {"user_id": 123})

    result = process_messages(consumer)
    assert result is not None

Type-Safe Configuration

Fluent builders with IDE autocomplete:

from typedkafka import ProducerConfig, KafkaProducer

config = (ProducerConfig()
    .bootstrap_servers("localhost:9092")
    .acks("all")
    .compression("gzip")
    .linger_ms(10)
    .build())

producer = KafkaProducer(config)

Admin Operations

Manage topics and cluster configuration:

from typedkafka import KafkaAdmin

admin = KafkaAdmin({"bootstrap.servers": "localhost:9092"})

# Create topic
admin.create_topic("events", num_partitions=10, replication_factor=3)

# List topics
topics = admin.list_topics()

# Get topic details
info = admin.describe_topic("events")

# Delete topic
admin.delete_topic("old-topic")

Comprehensive Documentation

Every method includes detailed documentation:

def send(
    self,
    topic: str,
    value: bytes,
    key: Optional[bytes] = None,
    partition: Optional[int] = None,
    on_delivery: Optional[Callable] = None,
) -> None:
    """
    Send a message to a Kafka topic.

    This method is asynchronous - returns immediately after queuing.
    Use flush() to wait for delivery confirmation.

    Args:
        topic: The topic name to send the message to
        value: The message payload as bytes
        key: Optional message key as bytes. Messages with the same
             key go to the same partition.
        partition: Optional partition number. If None, partition is
                  chosen by the partitioner.
        on_delivery: Optional callback function called when delivery
                    succeeds or fails.

    Raises:
        ProducerError: If the message cannot be queued

    Examples:
        >>> producer.send("my-topic", b"Hello!")
        >>> producer.send("events", b"data", key=b"user-123")
    """

Better Error Messages

Clear, actionable errors with context:

try:
    producer.send_json("topic", non_serializable_object)
except SerializationError as e:
    # Error includes:
    # - Clear message
    # - The problematic value (e.value)
    # - Original error (e.original_error)
    print(f"Failed to serialize: {e}")

Development

# Clone the repository
git clone https://github.com/Jgprog117/typedkafka.git
cd typedkafka

# Install in editable mode with dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Run linter
ruff check .

License

MIT License - see LICENSE file for details.

Changelog

0.2.0 (2026-01-31)

  • Added testing utilities (MockProducer, MockConsumer)
  • Added type-safe configuration builders (ProducerConfig, ConsumerConfig)
  • Added Admin client wrapper for topic management
  • Improved documentation and examples

0.1.0 (2026-01-31)

  • Initial release
  • KafkaProducer with comprehensive documentation
  • KafkaConsumer with helper methods
  • Full type hints throughout
  • Context manager support
  • JSON and string convenience methods

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

typedkafka-0.2.0.tar.gz (18.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

typedkafka-0.2.0-py3-none-any.whl (19.8 kB view details)

Uploaded Python 3

File details

Details for the file typedkafka-0.2.0.tar.gz.

File metadata

  • Download URL: typedkafka-0.2.0.tar.gz
  • Upload date:
  • Size: 18.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.8

File hashes

Hashes for typedkafka-0.2.0.tar.gz
Algorithm Hash digest
SHA256 5540661cf6c956af3b7c35112deeb895bff3edae5c34a80e556ecfc28b5a718f
MD5 62e51e92e747f147d9cd819f82733178
BLAKE2b-256 634328440e4898cfbb515ede8e1da8b1dc0e738af543ce9e4c6463790a82ce2c

See more details on using hashes here.

File details

Details for the file typedkafka-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: typedkafka-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 19.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.8

File hashes

Hashes for typedkafka-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 595c89239ee633b4966c21e6615b726f5a40a2769b8d2abd62b8b5bf25adaee8
MD5 a046bdc0924cf30063f016b025fb44eb
BLAKE2b-256 fbe57755c7d4a8c21317b29e5e4067a9de554d9177374636336aa054cddb1b32

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page