Wrapper around aio-kafka to use and receive via a asyncio queue
Project description
AsyncKafkaEngine
AsyncKafkaEngine is a wrapper around the aiokafka package.
its build with the idea of decoupling the reading & batching task from your main application task, this way the queue of batches is always full and reading from kafka is not being blocked by the work of your application logic.
Installation
Installing the package using uv
uv add AioKafkaEngine
Install the package using pip:
pip install AioKafkaEngine
Usage
ConsumerEngine
The ConsumerEngine class manages the consumption of messages from Kafka topics asynchronously and places them into an queue. Your application can consume the queue. Example
import asyncio
from AioKafkaEngine import ConsumerEngine
from aiokafka import AIOKafkaConsumer
import json
async def work(queue):
message = await queue.get()
print(message)
async def main():
"""
Test that the consumer fetches and processes a single batch of messages.
"""
test_queue = asyncio.Queue()
# Using the mock setup, getmany should return two messages
engine = ConsumerEngine(
consumer=AIOKafkaConsumer(
*["test_topic"],
bootstrap_servers="localhost:9092",
group_id="my_group",
value_deserializer=lambda x: json.loads(x.decode("utf-8")),
auto_offset_reset="earliest",
),
queue=test_queue,
batch_size=10,
timeout=1,
)
await engine.start()
consume_task = asyncio.create_task(engine.consume())
# create workers
workers = [asyncio.create_task(work(queue)) for _ in range(10)]
# will never exit
await asyncio.gather(consume_task, *workers)
asyncio.run(main())
ProducerEngine
The ProducerEngine class manages the production of messages to a Kafka topic asynchronously by retrieving messages from an internal queue. It also logs production statistics periodically. Example
import asyncio
from AioKafkaEngine import ProducerEngine
from aiokafka import AIOKafkaProducer
import json
async def work(queue):
await queue.put(item={"key": 1})
async def main():
"""
Test that the consumer fetches and processes a single batch of messages.
"""
queue = asyncio.Queue()
await queue.put(item={"key": "k", "key2": 2})
# Using the mock setup, getmany should return two messages
engine = ProducerEngine(
producer=AIOKafkaProducer(
bootstrap_servers="localhost:9092",
value_serializer=lambda v: json.dumps(v).encode(),
acks="all",
),
queue=queue,
topic="produce_topic",
)
await engine.start()
produce_task = asyncio.create_task(engine.produce())
# create workers
workers = [asyncio.create_task(work(queue)) for _ in range(10)]
# will never exit
await asyncio.gather(produce_task, *workers)
asyncio.run(main())
Contributing
Contributions are welcome! Please submit a pull request or open an issue on GitHub.
License
This project is licensed under the BSD 2-Clause License.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aiokafkaengine-0.1.0.tar.gz.
File metadata
- Download URL: aiokafkaengine-0.1.0.tar.gz
- Upload date:
- Size: 29.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5e2c4ea2e370a2d1844708fa205dc1e87d61ca8bb52fd14aa53e5919e8135f58
|
|
| MD5 |
5743c03c28790c355ac16a8f51ada54d
|
|
| BLAKE2b-256 |
ef36242c724b5f292584db0c841abf397295e4734cf048087c423ad682dd369e
|
Provenance
The following attestation bundles were made for aiokafkaengine-0.1.0.tar.gz:
Publisher:
python-package.yml on Bot-detector/AioKafkaEngine
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
aiokafkaengine-0.1.0.tar.gz -
Subject digest:
5e2c4ea2e370a2d1844708fa205dc1e87d61ca8bb52fd14aa53e5919e8135f58 - Sigstore transparency entry: 151218103
- Sigstore integration time:
-
Permalink:
Bot-detector/AioKafkaEngine@30b8156dea3aeb7d0fd48c0fee501a29955d0d94 -
Branch / Tag:
refs/heads/develop - Owner: https://github.com/Bot-detector
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-package.yml@30b8156dea3aeb7d0fd48c0fee501a29955d0d94 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file aiokafkaengine-0.1.0-py3-none-any.whl.
File metadata
- Download URL: aiokafkaengine-0.1.0-py3-none-any.whl
- Upload date:
- Size: 5.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7fe7cfabf24d47d6d7c55ed1c367fefe81990f08ca331ce1bbf81ac1e0b1b644
|
|
| MD5 |
decaefebab5f49ef389431f5e193ccc7
|
|
| BLAKE2b-256 |
120281a63df5c2216ba74254c40885fe87f2680284c8d2d31664d1b7ad03af0a
|
Provenance
The following attestation bundles were made for aiokafkaengine-0.1.0-py3-none-any.whl:
Publisher:
python-package.yml on Bot-detector/AioKafkaEngine
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
aiokafkaengine-0.1.0-py3-none-any.whl -
Subject digest:
7fe7cfabf24d47d6d7c55ed1c367fefe81990f08ca331ce1bbf81ac1e0b1b644 - Sigstore transparency entry: 151218104
- Sigstore integration time:
-
Permalink:
Bot-detector/AioKafkaEngine@30b8156dea3aeb7d0fd48c0fee501a29955d0d94 -
Branch / Tag:
refs/heads/develop - Owner: https://github.com/Bot-detector
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-package.yml@30b8156dea3aeb7d0fd48c0fee501a29955d0d94 -
Trigger Event:
workflow_dispatch
-
Statement type: