Skip to main content

Simple interface for sending messages through Kafka

Project description

Kafka Messaging

Kafka Messaging is a minimalist interface for using the confluent kafka package in Python3.

It can be installed with the following command:

pip3 install kafka_messaging

Usage

Kafka Messaging uses Senders to send messages and Listeners to receive them. Optionally, you may also use ListenerPool, an interface to synchronize and act on multiple Listeners.

Sender

To use the Sender, simply instance one in the script. Then, the send() method can be used to send a message to the topic specified in the first argument.

The sent message is a collection of data that may be compartimentalized in different arguments. The argument names can then be used to retrieve such data with a Listener.

The sending of data can be done like so:

from kafka_messaging import Sender

sender = Sender()
while True:
    sender.send("my_topic", info1="Hello", info_two=["World"])

As shown, data can be in different formats (string, list, dictionary, or anything else that can be serialized with Pickle).

A Sender instance can also be customized with different Kafka Producer configurations that can be set in it's instantiation like so:

sender = Sender(ip="127.0.0.1", port=9092, acks=0)

Listener

To receive messages sent by a Sender, a Listener can be used. A Listener can be configured in initialization just like a Sender, with the addition that a topic to listen to must be specified:

listener = Listener(topic="my_topic", ip="127.0.0.1", port=9092)

Keep in mind that the Senders are an interface for Kafka Producers, and Listeners for Kafka Consumers, so they don't have the same configuration attributes.

The listen() method of a Listener may be called to activate it, but be mindful to not be stuck, since it loops until stop() is called.

A minimalist usage of a Listener is as such:

from threading import Thread

from kafka_messaging import Listener


def listener_thread(listener: Listener):
    """Calls the listening loop of the received Listener. """
    listener.listen()


my_listener = Listener("my_topic")
thread = Thread(target=listener_thread, args[my_listener])

# Wait until information is received
while not my_listener.has_info("info1"):
    pass
print(my_listener.get_info("info1"))
print(my_listener.get_info("info_two"))
my_listener.stop()
thread.join()

ListenerPool

The ListenerPool can be used to synchronize multiple Listeners and execute methods on the synchronized Listeners.

The method that defines when Listeners are synchronized is completly customizable, and so can be customized the methods that are executed after the syncronization, as exemplified bellow:

from threading import Thread

from kafka_messaging import ListenerPool


def synchronization(listeners: dict):
    """Considers the listeners synchronized when all of them have received info1
    """
    synced = True
    for key in listeners.keys():
        if not listeners[key].has_info("info1"):
            synced = False
    return synced


def execute(listeners: dict, fps):
    """Prints all listeners' info1 after they are synchronized and then disposes
    of itself. """
    for key in listeners.keys():
        print(listeners[key].get_info("info1"))
        print(listeners[key].get_info("info_two"))
    return True # Returning True leads a method to be unbound from ListenerPool


pool = ListenerPool("*")
pool.set_sync_condition(synchronization)
pool.bind(execute)
pool.start()
pool.get_syncer_thread().join()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafka_messaging-1.1.1.tar.gz (6.1 kB view hashes)

Uploaded Source

Built Distribution

kafka_messaging-1.1.1-py3-none-any.whl (8.4 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page