Skip to main content

A wrapper for confluent-kafka producer and consumer

Project description

A wrapper for kafka producer and consumer that can be used as decorator for a function which can keep consuming data, process this data and broadcast it to next topics/queues.

This uses confluent-kafka python package to create prooducer, consumer and then wraps it. So, big thanks to them!

Installation

$ pip install kafka-client-decorator

Usage

Define your function how you want to process the data and then decorate it.

from kafka_client_decorator.kafka_client import KafkaClient

@KafkaClient(bootstrap_servers, security_protocol, sasl_username, sasl_password).consumer_producer(consumer_from_topic='my-topic-1', group_id='pdf', produce_to_topic=['my-topic-2'])
def process_data(data = None):
    # Call your driver modules here to process the data
    result = Driver(data)
    return result

NOTE: If you want the your driver result to be pushed to next topic/queue, you can simply pass produce_to_topic as arg in decorator 'consumer_prodcuer' method.

To only produce to topic(s) -

from kafka_client_decorator.client_producer import ClientProducer

producer = ClientProducer(bootstrap_servers, security_protocol, sasl_username, sasl_password)
prodcuer.produce_to_broker(data, topics_list)

NOTE: If your kafka broker does not uses SASL or SSL protocol, no need to pass 'sasl_username' and 'sasl_password'.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafka-client-decorator-1.5.tar.gz (4.9 kB view details)

Uploaded Source

File details

Details for the file kafka-client-decorator-1.5.tar.gz.

File metadata

  • Download URL: kafka-client-decorator-1.5.tar.gz
  • Upload date:
  • Size: 4.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.10

File hashes

Hashes for kafka-client-decorator-1.5.tar.gz
Algorithm Hash digest
SHA256 61d0ef738a0abffe88acc71e1f475c5d3f3d2e34fca717ee5478d1c96ed56007
MD5 09930f67a39cf6e69d7dc54be51863c9
BLAKE2b-256 be371b886ab5565e6c18ac9e383ebdf5c4e8fc9ba74ad58b115bbfec0d7a6816

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page