Python utilities used for interacting with Apache Kafka
Project description
Christopher H. Todd's Python Library For Interacting With Kafka
The ctodd-python-lib-kafka project is responsible for interacting with Apache Kafka. This includes producing and consuming records from topics, utilizing .avro format, and other tasks in creating event driven applications with Python.
Table of Contents
Dependencies
Python Packages
- confluent-kafka==0.11.6
- simplejson==3.16.0
Libraries
kafka_admin_helpers.py
This library is used to interacting with Kafka Admin functionality. This includes getting the admin object that will return details about kafka state.
Functions:
def get_kafka_admin_client(kafka_brokers):
"""
Purpose:
Get a Kafka Admin Client Object. Allows for polling information about Kafka
configuration and creating objects in Kafka
Args:
kafka_brokers (List of Strings): List of host:port combinations for kakfa
brokers
Return:
kafka_admin_client (Kafka Admin Client Obj): Kafka Admin Client Obj for the
brokers
"""
kafka_consumer_helpers.py
This library is used to aid in creating kafka consumers.
Functions:
def get_kafka_consumer(
kafka_brokers,
consumer_group="default",
timeout=6000,
offset_start="latest",
get_stats=True
):
"""
Purpose:
Get a Kafka Consumer Object (not yet connected to a topic)
Args:
kafka_brokers (List of Strings): List of host:port combinations for kakfa brokers
consumer_group (String): Consumer group to consume as. default is "default"
timeout (String): Timeout in ms if no messages are found (during poll). Default
is 6000
offset_start (String): Where to start consuming with respect to the consumer
group/topic offset. Default is "latest", which ignores any messages in the
topic before the consumer begins consuming
get_stats (Bool): Whether or not to print statistics. Default is True
Return:
kafka_consumer (Kafka Consumer Obj): Kafka Consumer Object
"""
def consume_topic(kafka_consumer, kafka_topics):
"""
Purpose:
Consume Kafka Topics
Args:
kafka_consumer (Kafka Consumer Obj): Kafka Consumer Object
kafka_topics (List of Strings): List of Kafka Topics to Consume.
Yields:
msg (Kafka Message Obj): Message Obj returned from the topic
"""
kafka_exceptions.py
File for holding custom exception types that will be generated by the kafka_helpers libraries
Classes:
class TopicNotFound(Exception):
"""
Purpose:
The TopicNotFound will be raised when attempting to consume a topic that
does not exist
"""
kafka_general_helpers.py
This library is used to interact with kafka not specificlly related to consuming or producing messages
Functions:
N/A
kafka_producer_helpers.py
This library is used to aid in creating kafka producers.
Functions:
def get_kafka_producer(kafka_brokers, get_stats=True):
"""
Purpose:
Get a Kafka Producer Object (not yet connected to a topic)
Args:
kafka_brokers (List of Strings): List of host:port combinations for kakfa brokers
get_stats (Bool): Whether or not to print statistics. Default is True
Return:
kafka_producer (Kafka Producer Obj): Kafka Producer Object
"""
def produce_message(kafka_producer, kafka_topic, msg):
"""
Purpose:
Consume Kafka Topics
Args:
kafka_producer (Kafka Producer Obj): Kafka Producer Object
kafka_topic (String): Kafka Topic to Produce message to.
msg (String): Message to produce to Kafka
Returns:
N/A
"""
def produce_results_callback(err, msg):
"""
Purpose:
Optional per-message delivery callback (triggered by poll() or
flush()) when a message has been successfully delivered or
permanently failed delivery (after retries).
Args:
err (String): Error Message
msg (Object): Kafka Callback Message Object
Return:
N/A
"""
kafka_topic_helpers.py
This library is used to interact with kafka topics. This includes getting a list of the topics, finding details about a topic, creating topics, and more.
Functions:
def get_topics(kafka_admin_client, return_system_topics=False):
"""
Purpose:
Get a List of Kafka Topics.
Args:
kafka_admin_client (Kafka Admin Client Obj): Kafka Admin Client Obj for the
brokers
Return:
kafka_topics (Dict of Kafka Topics): Key is the topic name and value is a
Kafka metadata object that has basic topic information
"""
def create_kafka_topic(
kafka_admin_client, topic_name, topic_replication=1, topic_partitions=1
):
"""
Purpose:
Create a Kafka Topic
Args:
kafka_admin_client (Kafka Admin Client Obj): Kafka Admin Client Obj for the
brokers
topic_name (String): Name of the topic to create
topic_replication (Int): Replication factor for the new topic
topic_partitions (Int): Number of partitions to devide the topic into
Return:
N/A
"""
Example Scripts
Example executable Python scripts/modules for testing and interacting with the library. These show example use-cases for the libraries and can be used as templates for developing with the libraries or to use as one-off development efforts.
consume_from_kafka_topic.py
Purpose:
Consume from a Kafka Topic
Steps:
- Connect to Kafka
- Create Consumer Object
- Poll Topic
- Parse Message
- Print Message
example script call:
python3 consume_from_kafka_topic.py --topic="test-env-topic" \
--broker="0.0.0.0:9092" --consumer-group="test-env-consumer"
produce_to_kafka_topic.py
Purpose:
Produce to a Kafka Topic
Steps:
- Connect to Kafka
- Create Producer Object
- Prompt for Input
- Parse Input
- Produce Input to Kafka
example script call:
python3 produce_to_kafka_topic.py --topic="test-env-topic" \
--broker="localhost:9092"
create_kakfa_topic.py
Purpose:
Create a Kafka Topic. Takes in replication and parition information
Steps:
- Connect to Kafka
- Create Kafka Admin Client
- Create Topic In Kafka
function call:
---
example script call:
python3 create_kafka_topic.py --topic-name="test-env-topic" \
--topic-replication=3 --topic-partitions=4 \
--broker="localhost:9092"
Notes
- Relies on f-string notation, which is limited to Python3.6. A refactor to remove these could allow for development with Python3.0.x through 3.5.x
TODO
- Unittest framework in place, but lacking tests
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for ctodd-python-lib-kafka-1.0.2.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1ec6dac71d07a6a29728975c260e7a5b4313bae186386bbf7b7fedf1e9b44e77 |
|
MD5 | 0fe49def6e15b66e9717690da88baf00 |
|
BLAKE2b-256 | e2c9cf4ed2616b045e21cae69810e6609707b88410e12b39e6d0a0fd96e5ca5c |
Hashes for ctodd_python_lib_kafka-1.0.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | bf7b5b400d0177f69a189f4d1c50d0f62bbf29e6ffa6587256ab6261b16409c5 |
|
MD5 | 138b71ffb1513a9331e7aa1724634cc4 |
|
BLAKE2b-256 | 3e8f2fd2e8678d0e8ebfa96fd5a6a95167f7085b01b68f4fbeb0779df86eee59 |