Skip to main content

A Python library for managing Kafka Producers, Consumers and Topics

Project description

https://img.shields.io/badge/license-MIT-blue https://codecov.io/gh/anshumanpattnaik/kafka-manager/graph/badge.svg?token=8DES91MFEU

A Kafka Manager is a Python utility class that simplifies Kafka interactions by providing a high-level abstraction for managing Producers, Consumers, and Topics. It provides a user-friendly interface for developers to implement Kafka effectively in their applications, encapsulating the complexity of the Kafka-python library. This abstraction allows quicker development and more manageable maintenance of Kafka-related applications.

Requirements

  • Python 3.7+

  • kafka-python

Installation

$ pip install kafka-manager

Features

Producer Management

It provides interfaces to start/stop producers, send messages to topics, and check producer running status. It also invokes functions to initialize and terminate producer instances to publish messages to Kafka, and it effectively checks the producer status to ensure that messages are sent successfully.

import json

from kafka_manager.kafka_manager import KafkaManager

bootstrap_servers = ['localhost:9092']  # Replace with your Kafka broker addresses
topic_name = 'example_topic'  # Replace topic name with your choice
group_id = 'example_group'  # Replace consumer group ID with your choice

# Create a KafkaManager instance
kafka_manager = KafkaManager(bootstrap_servers=bootstrap_servers)

# Start the Kafka producer
kafka_manager.start_producer();

# Send Kafka message
try:
    message_payload = json.dumps({
        "message_key": "message_value"
    })
    metadata = kafka_manager.send_message(topic=topic_name, value=message_payload)
    if metadata:
        print(f'Message sent successfully to Kafka topic: "{topic_name}"')
    else:
        print(f'Failed to send message to Kafka topic: "{topic_name}"')
except Exception as e:
    print(f'Error in sending message to Kafka topic: {e}')

# Stop Kafka producer
kafka_manager.stop_producer();

Consumer Management

It enables configuring various configurations to Create/Manage consumers and provides an interface to start/stop consumers. The Kafka Manager allows developers to create consumers per their application needs, such as different deserialization methods or offset management strategies. It provides a user-defined callback function to consume messages, allowing developers to define custom logic for processing each received message and enabling further data processing.

from kafka_manager.kafka_manager import KafkaManager

bootstrap_servers = ['localhost:9092']  # Replace with your Kafka broker addresses
topic_name = 'example_topic'  # Replace topic name with your choice
group_id = 'example_group'  # Replace consumer group ID with your choice

# Create a KafkaManager instance
kafka_manager = KafkaManager(bootstrap_servers=bootstrap_servers)

# Create a Kafka Consumer
consumer = kafka_manager.create_consumer(topics=[topic_name], group_id=group_id, auto_offset_reset='earliest')

# Start the Kafka Consumer
kafka_manager.start_consumer(consumer_id=group_id):

def message_handler(message):
    """
    This method is a callback function called by the consumer, which handles the received messages when a new message
    arrives.

    In production real-world application, the received message would be processed as follows:
    - Perform some business logic
    - Store the message in a database for further processing.
    - Message deserialization
    - etc.

    :param message: Message received from the consumer.
    """
    print(f'Received message: Partition={message.partition}, Offset={message.offset}, Value={message.value}')

# Consume Messages
kafka_manager.consume_messages(consumer_id=group_id, message_handler=message_handler)

Topic Management

Kafka Manager allows developers to create and delete topics dynamically, which serve as categories from which messages are published. It’s essential for managing data streams and evolving application requirements.

from kafka_manager.kafka_manager import KafkaManager

bootstrap_servers = ['localhost:9092']  # Replace with your Kafka broker addresses
topic_name = 'example_topic'  # Replace topic name with your choice
group_id = 'example_group'  # Replace consumer group ID with your choice

# Create a KafkaManager instance
kafka_manager = KafkaManager(bootstrap_servers=bootstrap_servers)

# For topic management connect to Kafka admin client
kafka_manager.connect_admin_client()

# Create a topic - (if it doesn't exist)
kafka_manager.create_topic(topic_name=topic_name, num_partitions=1, replication_factor=1)

Admin Client

It provides interfaces to connect to the Kafka Admin client and allows developers to perform administrative operations such as creating and deleting topics. However, the admin-client connection is vital to performing many advanced Kafka management tasks, such as describing cluster configurations and managing Kafka ACLs.

from kafka_manager.kafka_manager import KafkaManager

bootstrap_servers = ['localhost:9092']  # Replace with your Kafka broker addresses
topic_name = 'example_topic'  # Replace topic name with your choice
group_id = 'example_group'  # Replace consumer group ID with your choice

# Create a KafkaManager instance
kafka_manager = KafkaManager(bootstrap_servers=bootstrap_servers)

# Connect to Kafka admin client
admin_client = kafka_manager.connect_admin_client()

# Listing Consumer Groups
consumers_groups = admin_client.list_consumer_groups()
print(consumers_groups)

# Describing Consumer Groups
admin_client.describe_consumer_groups(list(consumers_groups))

Error Handling

To handle errors in Kafka due to network failures, broker failures, or misconfigurations, Kafka Manager handles these exceptions efficiently and ensures application stability.

Resource Management

Kafka Manager resource management ensures that all connections to Kafka are correctly closed. It provides a close() function for proper shutdown, which prevents resource leaks and potential data corruption. It’s essential for maintaining data integrity and managing the Kafka cluster and application.

License

MIT License, See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafka_manager-0.0.3.tar.gz (11.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kafka_manager-0.0.3-py3-none-any.whl (11.7 kB view details)

Uploaded Python 3

File details

Details for the file kafka_manager-0.0.3.tar.gz.

File metadata

  • Download URL: kafka_manager-0.0.3.tar.gz
  • Upload date:
  • Size: 11.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.6

File hashes

Hashes for kafka_manager-0.0.3.tar.gz
Algorithm Hash digest
SHA256 cc8318480a9edc902aed077bf34766235c74a5cf1eaac6c005c753bf37e2bb8b
MD5 c240f838633ef22e023cab7a5fb760e3
BLAKE2b-256 8128553edbfd775b371ab4119f22a23eb1b8537e5ceb081c0412cd609b93b287

See more details on using hashes here.

File details

Details for the file kafka_manager-0.0.3-py3-none-any.whl.

File metadata

  • Download URL: kafka_manager-0.0.3-py3-none-any.whl
  • Upload date:
  • Size: 11.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.6

File hashes

Hashes for kafka_manager-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 568979fce33fcdd6b46a0e77cbc3e23cf770f93eac1df2cb1e86fd1054086e32
MD5 b5f90d17bb798c6ca5f9482457e550e5
BLAKE2b-256 c2503e8c93584fc9b2634c733d4993cdd70a1a7ba04b04ed41216d7d84316ec3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page