Skip to main content

Kafka framework for Python.

Project description

Overview

PyMom is a Python framework that enables the creation of event-driven systems leveraging messaging systems like Kafka and Google's Cloud Pub/Sub.

Installation

This repo can be installed with pip:

pip3 install pymom

Configuration

PyMom expects the environment variable PYMOM_CONFIG_FILE to be set to the name of the configuration file. This file specifies the Kafka bootstrap brokers and Zookeeper servers in this format:

[DEFAULT]
kafka-bootstrap-brokers = localhost:9092
zookeeper-servers = localhost:2181

Message Formats

The current implementation of PyMom uses the PyMomMessage format. All messages on Kafka are in JSON format and must include at least two elements:

  • id is a key that correlates multiple related messages. It is used for partitioning Kafka topics.
  • payload is the content of the message. It can be any valid JSON.

Producing Messages

PyMom provides access to a class for writing messages to a Kafka topic. PyMomTestProducer.py demonstrates how to use it:

pymom = PyMom()
producer = pymom.producer('test.pymom.consume')  # The topic name.
producer.write(sys.argv[1],sys.argv[2])  # The ID and payload.

Consuming Messages

To receive messages from Kafka, a class must derive from PyMomAbstractConsumer and implement the on_message method. The class must then be registered with PyMom. PyMomTestConsumer.py demonstrates how to do this:

class PyMomTestConsumer(PyMomAbstractConsumer):
    def __init__(self,pymom):
        self.pymom = PyMom()
        self.producer = self.pymom.producer('test.pymom.produce')

    def on_message(self,message):
        """ Process messages. """
        id = message['id']
        payload = message['payload']
        print("Received message:  ({}) {}".format(id,payload))
        try:
            self.producer.write(id,payload)
            print("Wrote message.")
        except Exception as error:
            print("Unable to send message:  {}".format(error))


if __name__ == "__main__":
    pymom = PyMom()
    consumer = PyMomTestConsumer(framework)
    pymom.register(consumer,'PyMomTestConsumerGroup','test.pymom.consume')
    pymom.run()

    print("PyMomTestConsumer terminated.")
    sys.exit(0)

on_message can either return normally or throw one of two exceptions. If a recoverable error (e.g an external API is temporarily unavailable) occurs, throw PyMomRecoverableError. If an unrecoverable error (e.g. the message format is invalid) occurs, throw PyMomUnrecoverableError. In both cases, the error is logged. In the case of a recoverable error, the message will be retried. In the case of an unrecoverable error, processing will continue with the next message.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pymom-1.0.1.tar.gz (8.3 kB view details)

Uploaded Source

Built Distribution

pymom-1.0.1-py3-none-any.whl (14.7 kB view details)

Uploaded Python 3

File details

Details for the file pymom-1.0.1.tar.gz.

File metadata

  • Download URL: pymom-1.0.1.tar.gz
  • Upload date:
  • Size: 8.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/3.10.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.60.0 CPython/3.9.1

File hashes

Hashes for pymom-1.0.1.tar.gz
Algorithm Hash digest
SHA256 146d8fe51505f78904483016a74beeb242f450d41b4ba7156b21b2b27077e6db
MD5 9ec705a6c510ed34735c3ed8b1d4ef58
BLAKE2b-256 f1ee3c237a2e326430321e099676f461de2ba5b443e527c72ef83c7f0cae4e4d

See more details on using hashes here.

File details

Details for the file pymom-1.0.1-py3-none-any.whl.

File metadata

  • Download URL: pymom-1.0.1-py3-none-any.whl
  • Upload date:
  • Size: 14.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/3.10.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.60.0 CPython/3.9.1

File hashes

Hashes for pymom-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 a3471585eb5d95d42886e6cfc99fe55fcf727e2b7918b27667b779d0f9512aca
MD5 d27528294b5e0301b96113062882a6d4
BLAKE2b-256 d5a85c75076f40a1b2fd587c3aa18842fb085a172ac1079463ae25f842bccd1c

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page