Skip to main content

An extension to the aiokafka library with enhanced producer and consumer functionalities

Project description

Zhatlebaye Kafka

Zhatlebaye Kafka is an extension to the aiokafka library with enhanced producer and consumer functionalities.

Installation

You can install the Zhatlebaye Kafka library using pip:

pip install zhatlebaye_kafka

Classes and Functions

KafkaProducer

The KafkaProducer class is used to produce messages to Kafka topics. It uses the aiokafka.AIOKafkaProducer class under the hood. It has the following methods:

  • connect(): Connects to the Kafka server.
  • disconnect(): Disconnects from the Kafka server.
  • send_message(topic, message, callback=None): Sends a message to a Kafka topic. If a callback function is provided, it will be called with the result of the send operation.
  • send_message_sync(topic, message): A synchronous version of send_message(). It blocks until the message has been sent.

KafkaProducerSingleton

The KafkaProducerSingleton class is a singleton version of the KafkaProducer class. It has the same methods as KafkaProducer, but they are class methods instead of instance methods.

KafkaRouter

The KafkaRouter class is used to route messages from Kafka topics to their respective handlers. It has the following methods:

  • add_handler(topic, handler): Adds a handler function for a Kafka topic. The handler function will be called with the message whenever a message is received from the topic.
  • handle_message(message): Calls the handler function for the topic of the given message with the message as an argument.

KafkaConsumer

The KafkaConsumer class is used to consume messages from Kafka topics. It uses the aiokafka.AIOKafkaConsumer class under the hood. It has the following methods:

  • start(): Starts the consumer. It will begin consuming messages from its topics and passing them to their respective handlers.
  • stop(): Stops the consumer.
  • run(): Starts the consumer and keeps it running until stop() is called.

Usage

KafkaProducer

from zhatlebaye_kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')
await producer.connect()
await producer.send_message('my-topic', {'key': 'value'})

KafkaProducerSingleton

from zhatlebaye_kafka import KafkaProducerSingleton

await KafkaProducerSingleton.connect(bootstrap_servers='localhost:9092')
await KafkaProducerSingleton.send_message_sync('my-topic', {'key': 'value'})

KafkaRouter

from zhatlebaye_kafka import KafkaRouter

router = KafkaRouter()
router.add_handler('my-topic', my_handler)

KafkaRouter

The KafkaRouter is used to route messages from Kafka topics to their respective handlers. Here's a basic example of how to use it:

from zhatlebaye_kafka import KafkaRouter, KafkaConsumer

def my_handler(message):
    print(f"Received message: {message}")

router = KafkaRouter()
router.add_handler('my-topic', my_handler)

consumer = KafkaConsumer(group_id='my-group', bootstrap_servers='localhost:9092', router=router)
await consumer.start()

License

This project is licensed under the terms of the MIT license.

Contact

Yerlan Yesmoldin - e_esmoldin@kbtu.kz

Project Link: https://gitlab.com/di-halyk-academy-zhatlebaye/zhatlebaye-kafka

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zhatlebaye_kafka-0.1.6.tar.gz (4.3 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page