Skip to main content

Kafka clients

Project description

dgkafka

Python package for working with Apache Kafka supporting multiple data formats.

Installation

pip install dgkafka

For Avro support (requires additional dependencies):

pip install dgkafka[avro]

For Json support (requires additional dependencies):

pip install dgkafka[json]

Features

  • Producers and consumers for different data formats:
    • Raw messages (bytes/strings)
    • JSON
    • Avro (with Schema Registry integration)
  • Robust error handling
  • Comprehensive operation logging
  • Context manager support
  • Flexible configuration

Quick Start

Basic Producer/Consumer

from dgkafka import KafkaProducer, KafkaConsumer

# Producer
with KafkaProducer(bootstrap_servers='localhost:9092') as producer:
    producer.produce('test_topic', 'Hello, Kafka!')

# Consumer
with KafkaConsumer(bootstrap_servers='localhost:9092', group_id='test_group') as consumer:
    consumer.subscribe(['test_topic'])
    for msg in consumer.consume():
        print(msg.value())

JSON Support

from dgkafka import JsonKafkaProducer, JsonKafkaConsumer

# Producer
with JsonKafkaProducer(bootstrap_servers='localhost:9092') as producer:
    producer.produce('json_topic', {'key': 'value'})

# Consumer
with JsonKafkaConsumer(bootstrap_servers='localhost:9092', group_id='json_group') as consumer:
    consumer.subscribe(['json_topic'])
    for msg in consumer.consume():
        print(msg.value())  # Automatically deserialized JSON

Avro Support

from dgkafka import AvroKafkaProducer, AvroKafkaConsumer

# Producer
value_schema = {
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": "int"}
    ]
}

with AvroKafkaProducer(
    schema_registry_url='http://localhost:8081',
    bootstrap_servers='localhost:9092',
    default_value_schema=value_schema
) as producer:
    producer.produce('avro_topic', {'name': 'Alice', 'age': 30})

# Consumer
with AvroKafkaConsumer(
    schema_registry_url='http://localhost:8081',
    bootstrap_servers='localhost:9092',
    group_id='avro_group'
) as consumer:
    consumer.subscribe(['avro_topic'])
    for msg in consumer.consume():
        print(msg.value())  # Automatically deserialized Avro object

Classes

Base Classes

  • KafkaProducer - base message producer
  • KafkaConsumer - base message consumer

Specialized Classes

  • JsonKafkaProducer - JSON message producer (inherits from KafkaProducer)
  • JsonKafkaConsumer - JSON message consumer (inherits from KafkaConsumer)
  • AvroKafkaProducer - Avro message producer (inherits from KafkaProducer)
  • AvroKafkaConsumer - Avro message consumer (inherits from KafkaConsumer)

Configuration

All classes accept standard Kafka configuration parameters:

config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my_group',
    'auto.offset.reset': 'earliest'
}

Avro classes require additional parameter:

  • schema_registry_url - Schema Registry URL

Logging

All classes use dglog.Logger for logging. You can provide a custom logger:

from dglog import Logger

logger = Logger()
producer = KafkaProducer(logger_=logger, ...)

Best Practices

  1. Always use context managers (with) for proper resource cleanup
  2. Implement error handling and retry logic for production use
  3. Pre-register Avro schemas in Schema Registry
  4. Configure appropriate acks and retries parameters for producers
  5. Monitor consumer lag and producer throughput

Advanced Usage

Custom Serialization

# Custom Avro serializer
class CustomAvroProducer(AvroKafkaProducer):
    def _serialize_value(self, value):
        # Custom serialization logic
        return super()._serialize_value(value)

Message Headers

# Adding headers to messages
headers = {
    'correlation_id': '12345',
    'message_type': 'user_update'
}

producer.produce(
    topic='events',
    value=message_data,
    headers=headers
)

Error Handling

from confluent_kafka import KafkaException

try:
    with AvroKafkaProducer(...) as producer:
        producer.produce(...)
except KafkaException as e:
    print(f"Kafka error occurred: {e}")

Performance Tips

  1. Batch messages when possible (batch.num.messages config)
  2. Adjust linger.ms for better batching
  3. Use compression.type (lz4, snappy, or gzip)
  4. Tune fetch.max.bytes and max.partition.fetch.bytes for consumers

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dgkafka-1.0.0a2.tar.gz (11.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dgkafka-1.0.0a2-py3-none-any.whl (11.8 kB view details)

Uploaded Python 3

File details

Details for the file dgkafka-1.0.0a2.tar.gz.

File metadata

  • Download URL: dgkafka-1.0.0a2.tar.gz
  • Upload date:
  • Size: 11.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.2

File hashes

Hashes for dgkafka-1.0.0a2.tar.gz
Algorithm Hash digest
SHA256 b018062f9585de47804104256625f8b0950a026b2b9330860a3bbd124e6910f5
MD5 c89666f338efe695a7a933c62d22383c
BLAKE2b-256 847f545a471d57d3486de15aa990913cc7448acff5263e65d0aca68eaa9aa8b5

See more details on using hashes here.

File details

Details for the file dgkafka-1.0.0a2-py3-none-any.whl.

File metadata

  • Download URL: dgkafka-1.0.0a2-py3-none-any.whl
  • Upload date:
  • Size: 11.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.2

File hashes

Hashes for dgkafka-1.0.0a2-py3-none-any.whl
Algorithm Hash digest
SHA256 88f6294b415e07fea7d31ea2de7ad6412f8864daee6a8913d6b2fe3f5eddc7a5
MD5 193b42341182075813bc3e37f88e05d2
BLAKE2b-256 375fac9a7284a94fc441d0569c4fc5a6eb9581e28ebed80de5c85602f035f6a5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page