A wrapper for confluent-kafka producer and consumer
Project description
A wrapper for kafka producer and consumer that can be used as decorator for a function which can keep consuming data, process this data and broadcast it to next topics/queues.
This uses confluent-kafka python package to create prooducer, consumer and then wraps it. So, big thanks to them!
Installation
$ pip install kafka-client-decorator
Usage
Define your function how you want to process the data and then decorate it.
from kafka_client_decorator.kafka_client import KafkaClient
@KafkaClient(bootstrap_servers, security_protocol, sasl_username, sasl_password).consumer_producer(consumer_from_topic='my-topic-1', group_id='pdf', produce_to_topic=['my-topic-2'])
def process_data(data = None):
# Call your driver modules here to process the data
result = Driver(data)
return result
NOTE: If you want the your driver result to be pushed to next topic/queue, you can simply pass produce_to_topic as arg in decorator 'consumer_prodcuer' method.
To only produce to topic(s) -
from kafka_client_decorator.client_producer import ClientProducer
producer = ClientProducer(bootstrap_servers, security_protocol, sasl_username, sasl_password)
prodcuer.produce_to_broker(data, topics_list)
NOTE: If your kafka broker does not uses SASL or SSL protocol, no need to pass 'sasl_username' and 'sasl_password'.
NOTE: If you want to work with multiple partitions in kafka, you can use below method to produce (it provides custom unique key to be sent with the message)
prodcuer.produce_to_broker_with_key(data, topic_list)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for kafka-client-decorator-1.7.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 79d2324d5a5f2dbd9d728cdde12d54aa1fe4c1f8d261ea667ded7ec78f232e47 |
|
MD5 | 240cfde8112ee781afaa39515690e412 |
|
BLAKE2b-256 | 609af3ef48d8464ff021ae2ddb775bf481497b67e9a76cdd4004065a53db87f6 |
Hashes for kafka_client_decorator-1.7-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 58bf112731319d71dd9f3d81fa2574d64d808d3b24be5f166ffcbef910cd70fe |
|
MD5 | 4f4b37ca82301024409c50bbfd55340f |
|
BLAKE2b-256 | aa75e5c96038d853d9f32991188ccd54376521386829522aa5994ccdb91f7c88 |