Skip to main content

Decorator interface to pykafka

Project description

This module is based on pykafka project (https://github.com/Parsely/pykafka.git) and define a class KafkaDecorator and its 4 decorators

@KafkaDecorator.host The first defines the connection parameters its parameters are the same of pykafka.KafkaClient

@KafkaDecorator.balanced_consumer its parameters are the same of pykafka.topic.get_balanced_consumer function

@KafkaDecorator.simple_consumer its parameters are the same of pykafka.topic.get_simple_consumer function

its parameters are the same of pykafka.topic.get_producer function @KafkaDecorator.producer

Install

pip install kafka-client-decorators

How to use

from kafka_client_decorators import KafkaDecorator
from kafka_client_decorators.kafka.logging_helper import setDebugLevel
import logging


kc = KafkaDecorator(  )

#@kc.host(zookeeper_hosts='localhost:2181' )
@kc.host(hosts='localhost:9092' )
class A:
	def __init__(self, testA, cls):
		self.a = testA
		self.cls = cls
		pass

	@kc.balanced_consumer('test1', consumer_group='testgroup3', auto_commit_enable=True, managed=True, consumer_timeout_ms=1000)
	def get(self, msg):
		print ( f'{self.a} Receive offset {msg.offset} key {msg.partition_key} message: { msg.value }' )
		self.send( msg.value )

	@kc.simple_consumer('test2', consumer_group='testgroup4', auto_commit_enable=True, consumer_timeout_ms=1000)
	def get2(self, msg):
		print ( f'{self.a} Receive offset {msg.offset}, message: { msg.value }' )
		self.cls.stop(self)

	@kc.producer('test2')
	def send(self, msg):
		pass

	@kc.producer('test1')
	def sendKey(self, msg, key ):
		pass

class B:

	def __init__(self):
		pass

	def stop(self, conn):
		conn.stop()

a = A('Example', B())
a.start()

a.sendKey( 'Hello'.encode('utf-8'), partition_key='world'.encode('utf-8') )

a.wait()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafka_client_decorators-0.8.1.tar.gz (8.1 kB view hashes)

Uploaded Source

Built Distribution

kafka_client_decorators-0.8.1-py3-none-any.whl (12.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page