Skip to main content

Python utilities used for interacting with Apache Kafka

Project description

Christopher H. Todd's Python Library For Interacting With Kafka

The ctodd-python-lib-kafka project is responsible for interacting with Apache Kafka. This includes producing and consuming records from topics, utilizing .avro format, and other tasks in creating event driven applications with Python.

Table of Contents

Dependencies

Python Packages

  • confluent-kafka==0.11.6
  • simplejson==3.16.0

Libraries

kafka_admin_helpers.py

This library is used to interacting with Kafka Admin functionality. This includes getting the admin object that will return details about kafka state.

Functions:

def get_kafka_admin_client(kafka_brokers):
    """
    Purpose:
        Get a Kafka Admin Client Object. Allows for polling information about Kafka
        configuration and creating objects in Kafka
    Args:
        kafka_brokers (List of Strings): List of host:port combinations for kakfa
            brokers
    Return:
        kafka_admin_client (Kafka Admin Client Obj): Kafka Admin Client Obj for the
            brokers
    """

kafka_consumer_helpers.py

This library is used to aid in creating kafka consumers.

Functions:

def get_kafka_consumer(
    kafka_brokers,
    consumer_group="default",
    timeout=6000,
    offset_start="latest",
    get_stats=True
):
    """
    Purpose:
        Get a Kafka Consumer Object (not yet connected to a topic)
    Args:
        kafka_brokers (List of Strings): List of host:port combinations for kakfa brokers
        consumer_group (String): Consumer group to consume as. default is "default"
        timeout (String): Timeout in ms if no messages are found (during poll). Default
            is 6000
        offset_start (String): Where to start consuming with respect to the consumer
            group/topic offset. Default is "latest", which ignores any messages in the
            topic before the consumer begins consuming
        get_stats (Bool): Whether or not to print statistics. Default is True
    Return:
        kafka_consumer (Kafka Consumer Obj): Kafka Consumer Object
    """
def consume_topic(kafka_consumer, kafka_topics):
    """
    Purpose:
        Consume Kafka Topics
    Args:
        kafka_consumer (Kafka Consumer Obj): Kafka Consumer Object
        kafka_topics (List of Strings): List of Kafka Topics to Consume.
    Yields:
        msg (Kafka Message Obj): Message Obj returned from the topic
    """

kafka_exceptions.py

File for holding custom exception types that will be generated by the kafka_helpers libraries

Classes:

class TopicNotFound(Exception):
    """
    Purpose:
        The TopicNotFound will be raised when attempting to consume a topic that
        does not exist
    """

kafka_general_helpers.py

This library is used to interact with kafka not specificlly related to consuming or producing messages

Functions:

N/A

kafka_producer_helpers.py

This library is used to aid in creating kafka producers.

Functions:

def get_kafka_producer(kafka_brokers, get_stats=True):
    """
    Purpose:
        Get a Kafka Producer Object (not yet connected to a topic)
    Args:
        kafka_brokers (List of Strings): List of host:port combinations for kakfa brokers
        get_stats (Bool): Whether or not to print statistics. Default is True
    Return:
        kafka_producer (Kafka Producer Obj): Kafka Producer Object
    """
def produce_message(kafka_producer, kafka_topic, msg):
    """
    Purpose:
        Consume Kafka Topics
    Args:
        kafka_producer (Kafka Producer Obj): Kafka Producer Object
        kafka_topic (String): Kafka Topic to Produce message to.
        msg (String): Message to produce to Kafka
    Returns:
        N/A
    """
def produce_results_callback(err, msg):
    """
    Purpose:
        Optional per-message delivery callback (triggered by poll() or
        flush()) when a message has been successfully delivered or
        permanently failed delivery (after retries).
    Args:
        err (String): Error Message
        msg (Object): Kafka Callback Message Object
    Return:
        N/A
    """

kafka_topic_helpers.py

This library is used to interact with kafka topics. This includes getting a list of the topics, finding details about a topic, creating topics, and more.

Functions:

def get_topics(kafka_admin_client, return_system_topics=False):
    """
    Purpose:
        Get a List of Kafka Topics.
    Args:
        kafka_admin_client (Kafka Admin Client Obj): Kafka Admin Client Obj for the
            brokers
    Return:
        kafka_topics (Dict of Kafka Topics): Key is the topic name and value is a
            Kafka metadata object that has basic topic information
    """
def create_kafka_topic(
    kafka_admin_client, topic_name, topic_replication=1, topic_partitions=1
):
    """
    Purpose:
        Create a Kafka Topic
    Args:
        kafka_admin_client (Kafka Admin Client Obj): Kafka Admin Client Obj for the
            brokers
        topic_name (String): Name of the topic to create
        topic_replication (Int): Replication factor for the new topic
        topic_partitions (Int): Number of partitions to devide the topic into
    Return:
        N/A
    """

Example Scripts

Example executable Python scripts/modules for testing and interacting with the library. These show example use-cases for the libraries and can be used as templates for developing with the libraries or to use as one-off development efforts.

consume_from_kafka_topic.py

    Purpose:
        Consume from a Kafka Topic

    Steps:
        - Connect to Kafka
        - Create Consumer Object
        - Poll Topic
        - Parse Message
        - Print Message

    example script call:
        python3 consume_from_kafka_topic.py --topic="test-env-topic" \
            --broker="0.0.0.0:9092" --consumer-group="test-env-consumer"

produce_to_kafka_topic.py

    Purpose:
        Produce to a Kafka Topic

    Steps:
        - Connect to Kafka
        - Create Producer Object
        - Prompt for Input
        - Parse Input
        - Produce Input to Kafka

    example script call:
        python3 produce_to_kafka_topic.py --topic="test-env-topic" \
            --broker="localhost:9092"

create_kakfa_topic.py

    Purpose:
        Create a Kafka Topic. Takes in replication and parition information

    Steps:
        - Connect to Kafka
        - Create Kafka Admin Client
        - Create Topic In Kafka

    function call:
        ---
    example script call:
        python3 create_kafka_topic.py --topic-name="test-env-topic" \
            --topic-replication=3 --topic-partitions=4 \
            --broker="localhost:9092"

Notes

  • Relies on f-string notation, which is limited to Python3.6. A refactor to remove these could allow for development with Python3.0.x through 3.5.x

TODO

  • Unittest framework in place, but lacking tests

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ctodd-python-lib-kafka-1.0.2.tar.gz (9.5 kB view details)

Uploaded Source

Built Distribution

ctodd_python_lib_kafka-1.0.2-py3-none-any.whl (11.7 kB view details)

Uploaded Python 3

File details

Details for the file ctodd-python-lib-kafka-1.0.2.tar.gz.

File metadata

  • Download URL: ctodd-python-lib-kafka-1.0.2.tar.gz
  • Upload date:
  • Size: 9.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.9.0 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.6.5

File hashes

Hashes for ctodd-python-lib-kafka-1.0.2.tar.gz
Algorithm Hash digest
SHA256 1ec6dac71d07a6a29728975c260e7a5b4313bae186386bbf7b7fedf1e9b44e77
MD5 0fe49def6e15b66e9717690da88baf00
BLAKE2b-256 e2c9cf4ed2616b045e21cae69810e6609707b88410e12b39e6d0a0fd96e5ca5c

See more details on using hashes here.

File details

Details for the file ctodd_python_lib_kafka-1.0.2-py3-none-any.whl.

File metadata

  • Download URL: ctodd_python_lib_kafka-1.0.2-py3-none-any.whl
  • Upload date:
  • Size: 11.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.9.0 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.6.5

File hashes

Hashes for ctodd_python_lib_kafka-1.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 bf7b5b400d0177f69a189f4d1c50d0f62bbf29e6ffa6587256ab6261b16409c5
MD5 138b71ffb1513a9331e7aa1724634cc4
BLAKE2b-256 3e8f2fd2e8678d0e8ebfa96fd5a6a95167f7085b01b68f4fbeb0779df86eee59

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page