An extension to the aiokafka library with enhanced producer and consumer functionalities
Project description
Zhatlebaye Kafka
Zhatlebaye Kafka is an extension to the aiokafka library with enhanced producer and consumer functionalities.
Installation
You can install the Zhatlebaye Kafka library using pip:
pip install zhatlebaye_kafka
Classes and Functions
KafkaProducer
The KafkaProducer
class is used to produce messages to Kafka topics. It uses the aiokafka.AIOKafkaProducer
class under the hood. It has the following methods:
connect()
: Connects to the Kafka server.disconnect()
: Disconnects from the Kafka server.send_message(topic, message, callback=None)
: Sends a message to a Kafka topic. If a callback function is provided, it will be called with the result of the send operation.send_message_sync(topic, message)
: A synchronous version ofsend_message()
. It blocks until the message has been sent.
KafkaProducerSingleton
The KafkaProducerSingleton
class is a singleton version of the KafkaProducer
class. It has the same methods as KafkaProducer
, but they are class methods instead of instance methods.
KafkaRouter
The KafkaRouter
class is used to route messages from Kafka topics to their respective handlers. It has the following methods:
add_handler(topic, handler)
: Adds a handler function for a Kafka topic. The handler function will be called with the message whenever a message is received from the topic.handle_message(message)
: Calls the handler function for the topic of the given message with the message as an argument.
KafkaConsumer
The KafkaConsumer
class is used to consume messages from Kafka topics. It uses the aiokafka.AIOKafkaConsumer
class under the hood. It has the following methods:
start()
: Starts the consumer. It will begin consuming messages from its topics and passing them to their respective handlers.stop()
: Stops the consumer.run()
: Starts the consumer and keeps it running untilstop()
is called.
Usage
KafkaProducer
from zhatlebaye_kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
await producer.connect()
await producer.send_message('my-topic', {'key': 'value'})
KafkaProducerSingleton
from zhatlebaye_kafka import KafkaProducerSingleton
await KafkaProducerSingleton.connect(bootstrap_servers='localhost:9092')
await KafkaProducerSingleton.send_message_sync('my-topic', {'key': 'value'})
KafkaRouter
from zhatlebaye_kafka import KafkaRouter
router = KafkaRouter()
router.add_handler('my-topic', my_handler)
KafkaRouter
The KafkaRouter is used to route messages from Kafka topics to their respective handlers. Here's a basic example of how to use it:
from zhatlebaye_kafka import KafkaRouter, KafkaConsumer
def my_handler(message):
print(f"Received message: {message}")
router = KafkaRouter()
router.add_handler('my-topic', my_handler)
consumer = KafkaConsumer(group_id='my-group', bootstrap_servers='localhost:9092', router=router)
await consumer.start()
License
This project is licensed under the terms of the MIT license.
Contact
Yerlan Yesmoldin - e_esmoldin@kbtu.kz
Project Link: https://gitlab.com/di-halyk-academy-zhatlebaye/zhatlebaye-kafka
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.