Skip to main content

Wrapper around aio-kafka to use and receive via a asyncio queue

Project description

AsyncKafkaEngine

AsyncKafkaEngine is an asynchronous Kafka consumer and producer package built using the aiokafka library. This package allows efficient and scalable message handling with Kafka by providing classes for consuming and producing messages asynchronously. Features

  • Asynchronous Kafka consumer and producer
  • JSON message serialization and deserialization
  • Internal message queue management
  • Periodic logging of message throughput
  • Graceful shutdown of consumer and producer

Installation

Install the package using pip:

pip install AsyncKafkaEngine

Usage

ConsumerEngine

The ConsumerEngine class manages the consumption of messages from Kafka topics asynchronously and places them into an internal queue. It also logs consumption statistics periodically. Example

import asyncio
from AsyncKafkaEngine import ConsumerEngine

async def main():
    consumer = ConsumerEngine(
        bootstrap_servers='localhost:9092', 
        group_id='my-group', 
        report_interval=5
    )
    await consumer.start_engine(['my_topic'])

asyncio.run(main())

ProducerEngine

The ProducerEngine class manages the production of messages to a Kafka topic asynchronously by retrieving messages from an internal queue. It also logs production statistics periodically. Example

import asyncio
from AsyncKafkaEngine import ProducerEngine

async def main():
    producer = ProducerEngine(
        bootstrap_servers='localhost:9092', 
        report_interval=5
    )
    await producer.start_engine('my_topic')

asyncio.run(main())

API

ConsumerEngine

    __init__(bootstrap_servers, group_id=None, report_interval=5, queue_size=None)
        bootstrap_servers: Kafka server addresses.
        group_id: Consumer group ID (optional).
        report_interval: Interval for logging consumption statistics.
        queue_size: Maximum size of the internal message queue (optional).

    async start_engine(topics)
        topics: List of Kafka topics to consume from.

    async stop_engine()
        Stops the consumer gracefully.

    get_queue()
        Returns the internal queue holding consumed messages.

ProducerEngine

    __init__(bootstrap_servers, report_interval=5, queue_size=None)
        bootstrap_servers: Kafka server addresses.
        report_interval: Interval for logging production statistics.
        queue_size: Maximum size of the internal message queue (optional).

    async start_engine(topic)
        topic: Kafka topic to produce messages to.

    async stop_engine()
        Stops the producer gracefully.

    get_queue()
        Returns the internal queue holding messages to be sent.

Logging

The package uses the logging module to log debug information about the number of messages consumed and produced per report interval. Configure logging in your application as needed.

Contributing

Contributions are welcome! Please submit a pull request or open an issue on GitHub.

License

This project is licensed under the BSD 2-Clause License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiokafkaengine-0.0.4.tar.gz (8.4 kB view details)

Uploaded Source

Built Distribution

aiokafkaengine-0.0.4-py3-none-any.whl (4.2 kB view details)

Uploaded Python 3

File details

Details for the file aiokafkaengine-0.0.4.tar.gz.

File metadata

  • Download URL: aiokafkaengine-0.0.4.tar.gz
  • Upload date:
  • Size: 8.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.0.0 CPython/3.12.3

File hashes

Hashes for aiokafkaengine-0.0.4.tar.gz
Algorithm Hash digest
SHA256 62173f0fbbe95ad452808e3478d2e071c5079b99c9f3c24ee30efce73a3cae98
MD5 f3e9a75f4c0ca58b95d08f74ac9c2477
BLAKE2b-256 3c1724ef437cf8a9b8532c4a5e59007fd2c2155733f3c33ae4f5ac47a2240c87

See more details on using hashes here.

File details

Details for the file aiokafkaengine-0.0.4-py3-none-any.whl.

File metadata

File hashes

Hashes for aiokafkaengine-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 b293b7db58652d338c8ef2bdddab6c2fa1868ce0a6078400474c7f81cb02286e
MD5 3349faab8e5d00dfd5ff82c6dcc00c33
BLAKE2b-256 a6b50f07b7d0727ba20535f48dba1fde5a22b285c51c63f609f9b4919c7346f5

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page