Wrapper around aio-kafka to use and receive via a asyncio queue
Project description
AsyncKafkaEngine
AsyncKafkaEngine is an asynchronous Kafka consumer and producer package built using the aiokafka library. This package allows efficient and scalable message handling with Kafka by providing classes for consuming and producing messages asynchronously. Features
- Asynchronous Kafka consumer and producer
- JSON message serialization and deserialization
- Internal message queue management
- Periodic logging of message throughput
- Graceful shutdown of consumer and producer
Installation
Install the package using pip:
pip install AsyncKafkaEngine
Usage
ConsumerEngine
The ConsumerEngine class manages the consumption of messages from Kafka topics asynchronously and places them into an internal queue. It also logs consumption statistics periodically. Example
import asyncio
from AsyncKafkaEngine import ConsumerEngine
async def main():
consumer = ConsumerEngine(
bootstrap_servers='localhost:9092',
group_id='my-group',
report_interval=5
)
await consumer.start_engine(['my_topic'])
asyncio.run(main())
ProducerEngine
The ProducerEngine class manages the production of messages to a Kafka topic asynchronously by retrieving messages from an internal queue. It also logs production statistics periodically. Example
import asyncio
from AsyncKafkaEngine import ProducerEngine
async def main():
producer = ProducerEngine(
bootstrap_servers='localhost:9092',
report_interval=5
)
await producer.start_engine('my_topic')
asyncio.run(main())
API
ConsumerEngine
__init__(bootstrap_servers, group_id=None, report_interval=5, queue_size=None)
bootstrap_servers: Kafka server addresses.
group_id: Consumer group ID (optional).
report_interval: Interval for logging consumption statistics.
queue_size: Maximum size of the internal message queue (optional).
async start_engine(topics)
topics: List of Kafka topics to consume from.
async stop_engine()
Stops the consumer gracefully.
get_queue()
Returns the internal queue holding consumed messages.
ProducerEngine
__init__(bootstrap_servers, report_interval=5, queue_size=None)
bootstrap_servers: Kafka server addresses.
report_interval: Interval for logging production statistics.
queue_size: Maximum size of the internal message queue (optional).
async start_engine(topic)
topic: Kafka topic to produce messages to.
async stop_engine()
Stops the producer gracefully.
get_queue()
Returns the internal queue holding messages to be sent.
Logging
The package uses the logging module to log debug information about the number of messages consumed and produced per report interval. Configure logging in your application as needed.
Contributing
Contributions are welcome! Please submit a pull request or open an issue on GitHub.
License
This project is licensed under the BSD 2-Clause License.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file aiokafkaengine-0.0.4.tar.gz
.
File metadata
- Download URL: aiokafkaengine-0.0.4.tar.gz
- Upload date:
- Size: 8.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.0.0 CPython/3.12.3
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 62173f0fbbe95ad452808e3478d2e071c5079b99c9f3c24ee30efce73a3cae98 |
|
MD5 | f3e9a75f4c0ca58b95d08f74ac9c2477 |
|
BLAKE2b-256 | 3c1724ef437cf8a9b8532c4a5e59007fd2c2155733f3c33ae4f5ac47a2240c87 |
File details
Details for the file aiokafkaengine-0.0.4-py3-none-any.whl
.
File metadata
- Download URL: aiokafkaengine-0.0.4-py3-none-any.whl
- Upload date:
- Size: 4.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.0.0 CPython/3.12.3
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | b293b7db58652d338c8ef2bdddab6c2fa1868ce0a6078400474c7f81cb02286e |
|
MD5 | 3349faab8e5d00dfd5ff82c6dcc00c33 |
|
BLAKE2b-256 | a6b50f07b7d0727ba20535f48dba1fde5a22b285c51c63f609f9b4919c7346f5 |