Skip to main content

Wrapper around aio-kafka to use and receive via a asyncio queue

Project description

AsyncKafkaEngine

AsyncKafkaEngine is a wrapper around the aiokafka package.

its build with the idea of decoupling the reading & batching task from your main application task, this way the queue of batches is always full and reading from kafka is not being blocked by the work of your application logic.

Installation

Installing the package using uv

uv add AioKafkaEngine

Install the package using pip:

pip install AioKafkaEngine

Usage

ConsumerEngine

The ConsumerEngine class manages the consumption of messages from Kafka topics asynchronously and places them into an queue. Your application can consume the queue. Example

import asyncio
from AioKafkaEngine import ConsumerEngine
from aiokafka import AIOKafkaConsumer
import json

async def work(queue):
    message = await queue.get()
    print(message)

async def main():
    """
    Test that the consumer fetches and processes a single batch of messages.
    """
    test_queue = asyncio.Queue()

    # Using the mock setup, getmany should return two messages
    engine = ConsumerEngine(
        consumer=AIOKafkaConsumer(
            *["test_topic"],
            bootstrap_servers="localhost:9092",
            group_id="my_group",
            value_deserializer=lambda x: json.loads(x.decode("utf-8")),
            auto_offset_reset="earliest",
        ),
        queue=test_queue,
        batch_size=10,
        timeout=1,
    )
    await engine.start()
    consume_task = asyncio.create_task(engine.consume())
    
    # create workers
    workers = [asyncio.create_task(work(queue)) for _ in range(10)]

    # will never exit
    await asyncio.gather(consume_task, *workers)

asyncio.run(main())

ProducerEngine

The ProducerEngine class manages the production of messages to a Kafka topic asynchronously by retrieving messages from an internal queue. It also logs production statistics periodically. Example

import asyncio
from AioKafkaEngine import ProducerEngine
from aiokafka import AIOKafkaProducer
import json

async def work(queue):
    await queue.put(item={"key": 1})

async def main():
    """
    Test that the consumer fetches and processes a single batch of messages.
    """
    queue = asyncio.Queue()
    await queue.put(item={"key": "k", "key2": 2})

    # Using the mock setup, getmany should return two messages
    engine = ProducerEngine(
        producer=AIOKafkaProducer(
            bootstrap_servers="localhost:9092",
            value_serializer=lambda v: json.dumps(v).encode(),
            acks="all",
        ),
        queue=queue,
        topic="produce_topic",
    )
    await engine.start()

    produce_task = asyncio.create_task(engine.produce())
    
    # create workers
    workers = [asyncio.create_task(work(queue)) for _ in range(10)]

    # will never exit
    await asyncio.gather(produce_task, *workers)
asyncio.run(main())

Contributing

Contributions are welcome! Please submit a pull request or open an issue on GitHub.

License

This project is licensed under the BSD 2-Clause License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiokafkaengine-0.1.0.tar.gz (29.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aiokafkaengine-0.1.0-py3-none-any.whl (5.0 kB view details)

Uploaded Python 3

File details

Details for the file aiokafkaengine-0.1.0.tar.gz.

File metadata

  • Download URL: aiokafkaengine-0.1.0.tar.gz
  • Upload date:
  • Size: 29.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for aiokafkaengine-0.1.0.tar.gz
Algorithm Hash digest
SHA256 5e2c4ea2e370a2d1844708fa205dc1e87d61ca8bb52fd14aa53e5919e8135f58
MD5 5743c03c28790c355ac16a8f51ada54d
BLAKE2b-256 ef36242c724b5f292584db0c841abf397295e4734cf048087c423ad682dd369e

See more details on using hashes here.

Provenance

The following attestation bundles were made for aiokafkaengine-0.1.0.tar.gz:

Publisher: python-package.yml on Bot-detector/AioKafkaEngine

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file aiokafkaengine-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: aiokafkaengine-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 5.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for aiokafkaengine-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7fe7cfabf24d47d6d7c55ed1c367fefe81990f08ca331ce1bbf81ac1e0b1b644
MD5 decaefebab5f49ef389431f5e193ccc7
BLAKE2b-256 120281a63df5c2216ba74254c40885fe87f2680284c8d2d31664d1b7ad03af0a

See more details on using hashes here.

Provenance

The following attestation bundles were made for aiokafkaengine-0.1.0-py3-none-any.whl:

Publisher: python-package.yml on Bot-detector/AioKafkaEngine

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page