Simple interface for sending messages through Kafka
Project description
Kafka Messaging
Kafka Messaging is a minimalist interface for using the confluent kafka package in Python3.
It can be installed with the following command:
pip3 install kafka_messaging
Usage
Kafka Messaging uses Senders to send messages and Listeners to receive them. Optionally, you may also use ListenerPool, an interface to synchronize and act on multiple Listeners.
Sender
To use the Sender, simply instance one in the script. Then, the send() method can be used to send a message to the topic specified in the first argument.
The sent message is a collection of data that may be compartimentalized in different arguments. The argument names can then be used to retrieve such data with a Listener.
The sending of data can be done like so:
from kafka_messaging import Sender
sender = Sender()
while True:
sender.send("my_topic", info1="Hello", info_two=["World"])
As shown, data can be in different formats (string, list, dictionary, or anything else that can be serialized with Pickle).
A Sender instance can also be customized with different Kafka Producer configurations that can be set in it's instantiation like so:
sender = Sender(ip="127.0.0.1", port=9092, acks=0)
Listener
To receive messages sent by a Sender, a Listener can be used. A Listener can be configured in initialization just like a Sender, with the addition that a topic to listen to must be specified:
listener = Listener(topic="my_topic", ip="127.0.0.1", port=9092)
Keep in mind that the Senders are an interface for Kafka Producers, and Listeners for Kafka Consumers, so they don't have the same configuration attributes.
The listen() method of a Listener may be called to activate it, but be mindful to not be stuck, since it loops until stop() is called.
A minimalist usage of a Listener is as such:
from threading import Thread
from kafka_messaging import Listener
def listener_thread(listener: Listener):
"""Calls the listening loop of the received Listener. """
listener.listen()
my_listener = Listener("my_topic")
thread = Thread(target=listener_thread, args[my_listener])
# Wait until information is received
while not my_listener.has_info("info1"):
pass
print(my_listener.get_info("info1"))
print(my_listener.get_info("info_two"))
my_listener.stop()
thread.join()
ListenerPool
The ListenerPool can be used to synchronize multiple Listeners and execute methods on the synchronized Listeners.
The method that defines when Listeners are synchronized is completly customizable, and so can be customized the methods that are executed after the syncronization, as exemplified bellow:
from threading import Thread
from kafka_messaging import ListenerPool
def synchronization(listeners: dict):
"""Considers the listeners synchronized when all of them have received info1
"""
synced = True
for key in listeners.keys():
if not listeners[key].has_info("info1"):
synced = False
return synced
def execute(listeners: dict, fps):
"""Prints all listeners' info1 after they are synchronized and then disposes
of itself. """
for key in listeners.keys():
print(listeners[key].get_info("info1"))
print(listeners[key].get_info("info_two"))
return True # Returning True leads a method to be unbound from ListenerPool
pool = ListenerPool("*")
pool.set_sync_condition(synchronization)
pool.bind(execute)
pool.start()
pool.get_syncer_thread().join()
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for kafka_messaging-1.1.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 04cfc1acd9b9e5ac2b601394fb80fcb5539fd47ef124508b0e28e0b3a5b04fcd |
|
MD5 | 1a8d4508b74f67679f7eac15df288f91 |
|
BLAKE2b-256 | 11c082e0b63766246b36c689eb2d98cf301e6706c0cd0c775ce5292269e83366 |