Kafka clients
Project description
dgkafka
Python package for working with Apache Kafka supporting multiple data formats.
Installation
pip install dgkafka
For Avro support (requires additional dependencies):
pip install dgkafka[avro]
For Json support (requires additional dependencies):
pip install dgkafka[json]
Features
- Producers and consumers for different data formats:
- Raw messages (bytes/strings)
- JSON
- Avro (with Schema Registry integration)
- Robust error handling
- Comprehensive operation logging
- Context manager support
- Flexible configuration
Quick Start
Basic Producer/Consumer
from dgkafka import KafkaProducer, KafkaConsumer
# Producer
with KafkaProducer(bootstrap_servers='localhost:9092') as producer:
producer.produce('test_topic', 'Hello, Kafka!')
# Consumer
with KafkaConsumer(bootstrap_servers='localhost:9092', group_id='test_group') as consumer:
consumer.subscribe(['test_topic'])
for msg in consumer.consume():
print(msg.value())
JSON Support
from dgkafka import JsonKafkaProducer, JsonKafkaConsumer
# Producer
with JsonKafkaProducer(bootstrap_servers='localhost:9092') as producer:
producer.produce('json_topic', {'key': 'value'})
# Consumer
with JsonKafkaConsumer(bootstrap_servers='localhost:9092', group_id='json_group') as consumer:
consumer.subscribe(['json_topic'])
for msg in consumer.consume():
print(msg.value()) # Automatically deserialized JSON
Avro Support
from dgkafka import AvroKafkaProducer, AvroKafkaConsumer
# Producer
value_schema = {
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
with AvroKafkaProducer(
schema_registry_url='http://localhost:8081',
bootstrap_servers='localhost:9092',
default_value_schema=value_schema
) as producer:
producer.produce('avro_topic', {'name': 'Alice', 'age': 30})
# Consumer
with AvroKafkaConsumer(
schema_registry_url='http://localhost:8081',
bootstrap_servers='localhost:9092',
group_id='avro_group'
) as consumer:
consumer.subscribe(['avro_topic'])
for msg in consumer.consume():
print(msg.value()) # Automatically deserialized Avro object
Classes
Base Classes
KafkaProducer- base message producerKafkaConsumer- base message consumer
Specialized Classes
JsonKafkaProducer- JSON message producer (inherits fromKafkaProducer)JsonKafkaConsumer- JSON message consumer (inherits fromKafkaConsumer)AvroKafkaProducer- Avro message producer (inherits fromKafkaProducer)AvroKafkaConsumer- Avro message consumer (inherits fromKafkaConsumer)
Configuration
All classes accept standard Kafka configuration parameters:
config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my_group',
'auto.offset.reset': 'earliest'
}
Avro classes require additional parameter:
schema_registry_url- Schema Registry URL
Logging
All classes use dglog.Logger for logging. You can provide a custom logger:
from dglog import Logger
logger = Logger()
producer = KafkaProducer(logger_=logger, ...)
Best Practices
- Always use context managers (
with) for proper resource cleanup - Implement error handling and retry logic for production use
- Pre-register Avro schemas in Schema Registry
- Configure appropriate
acksandretriesparameters for producers - Monitor consumer lag and producer throughput
Advanced Usage
Custom Serialization
# Custom Avro serializer
class CustomAvroProducer(AvroKafkaProducer):
def _serialize_value(self, value):
# Custom serialization logic
return super()._serialize_value(value)
Message Headers
# Adding headers to messages
headers = {
'correlation_id': '12345',
'message_type': 'user_update'
}
producer.produce(
topic='events',
value=message_data,
headers=headers
)
Error Handling
from confluent_kafka import KafkaException
try:
with AvroKafkaProducer(...) as producer:
producer.produce(...)
except KafkaException as e:
print(f"Kafka error occurred: {e}")
Performance Tips
- Batch messages when possible (
batch.num.messagesconfig) - Adjust
linger.msfor better batching - Use
compression.type(lz4, snappy, or gzip) - Tune
fetch.max.bytesandmax.partition.fetch.bytesfor consumers
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dgkafka-1.0.1.tar.gz.
File metadata
- Download URL: dgkafka-1.0.1.tar.gz
- Upload date:
- Size: 12.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
06f727c8bccd795e8eaed28cbfffcfcadcada4991d720876c3710a9726e725f7
|
|
| MD5 |
d10764e97924f46fef7a483ad9f5893d
|
|
| BLAKE2b-256 |
f55e9347901c2ca080d350568d1b176045d3ca7d927954659212380c6be45505
|
File details
Details for the file dgkafka-1.0.1-py3-none-any.whl.
File metadata
- Download URL: dgkafka-1.0.1-py3-none-any.whl
- Upload date:
- Size: 13.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9c16a3ab8f16019a8a7a8fbb32dc080342f0e15c259ad4262b12a756c0be0f2a
|
|
| MD5 |
0a71e835bba8c13c1bce007b496f2648
|
|
| BLAKE2b-256 |
19573a49836eee716f6ecd412402a405232e02cf655bd32c0962f6bc8445d66a
|