Kafka framework for Python.
Project description
Overview
PyMom is a Python framework that enables the creation of event-driven systems leveraging messaging systems like Kafka and Google's Cloud Pub/Sub.
Installation
This repo can be installed with pip:
pip3 install pymom
Configuration
PyMom expects the environment variable PYMOM_CONFIG_FILE
to be set to the name of the configuration file. This file specifies the Kafka bootstrap brokers and Zookeeper servers in this format:
[DEFAULT]
kafka-bootstrap-brokers = localhost:9092
zookeeper-servers = localhost:2181
Message Formats
The current implementation of PyMom uses the PyMomMessage format. All messages on Kafka are in JSON format and must include at least two elements:
id
is a key that correlates multiple related messages. It is used for partitioning Kafka topics.payload
is the content of the message. It can be any valid JSON.
Producing Messages
PyMom provides access to a class for writing messages to a Kafka topic. PyMomTestProducer.py
demonstrates how to use it:
pymom = PyMom()
producer = pymom.producer('test.pymom.consume') # The topic name.
producer.write(sys.argv[1],sys.argv[2]) # The ID and payload.
Consuming Messages
To receive messages from Kafka, a class must derive from PyMomAbstractConsumer
and implement the on_message
method. The class must then be registered with PyMom. PyMomTestConsumer.py
demonstrates how to do this:
class PyMomTestConsumer(PyMomAbstractConsumer):
def __init__(self,pymom):
self.pymom = PyMom()
self.producer = self.pymom.producer('test.pymom.produce')
def on_message(self,message):
""" Process messages. """
id = message['id']
payload = message['payload']
print("Received message: ({}) {}".format(id,payload))
try:
self.producer.write(id,payload)
print("Wrote message.")
except Exception as error:
print("Unable to send message: {}".format(error))
if __name__ == "__main__":
pymom = PyMom()
consumer = PyMomTestConsumer(framework)
pymom.register(consumer,'PyMomTestConsumerGroup','test.pymom.consume')
pymom.run()
print("PyMomTestConsumer terminated.")
sys.exit(0)
on_message
can either return normally or throw one of two exceptions. If a recoverable error (e.g an external API is temporarily unavailable) occurs, throw PyMomRecoverableError
. If an unrecoverable error (e.g. the message format is invalid) occurs, throw PyMomUnrecoverableError
. In both cases, the error is logged. In the case of a recoverable error, the message will be retried. In the case of an unrecoverable error, processing will continue with the next message.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file pymom-1.0.1.tar.gz
.
File metadata
- Download URL: pymom-1.0.1.tar.gz
- Upload date:
- Size: 8.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/3.10.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.60.0 CPython/3.9.1
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 146d8fe51505f78904483016a74beeb242f450d41b4ba7156b21b2b27077e6db |
|
MD5 | 9ec705a6c510ed34735c3ed8b1d4ef58 |
|
BLAKE2b-256 | f1ee3c237a2e326430321e099676f461de2ba5b443e527c72ef83c7f0cae4e4d |
File details
Details for the file pymom-1.0.1-py3-none-any.whl
.
File metadata
- Download URL: pymom-1.0.1-py3-none-any.whl
- Upload date:
- Size: 14.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/3.10.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.60.0 CPython/3.9.1
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | a3471585eb5d95d42886e6cfc99fe55fcf727e2b7918b27667b779d0f9512aca |
|
MD5 | d27528294b5e0301b96113062882a6d4 |
|
BLAKE2b-256 | d5a85c75076f40a1b2fd587c3aa18842fb085a172ac1079463ae25f842bccd1c |