Skip to main content

Kafka integration with asyncio

Project description

|Build status| |Coverage| |Chat on Gitter|

asyncio client for Kafka

AIOKafkaProducer

AIOKafkaProducer is a high-level, asynchronous message producer.

Example of AIOKafkaProducer usage:

from aiokafka import AIOKafkaProducer
import asyncio

async def send_one():
    producer = AIOKafkaProducer(bootstrap_servers='localhost:9092')
    # Get cluster layout and initial topic/partition leadership information
    await producer.start()
    try:
        # Produce message
        await producer.send_and_wait("my_topic", b"Super message")
    finally:
        # Wait for all pending messages to be delivered or expire.
        await producer.stop()

asyncio.run(send_one())

AIOKafkaConsumer

AIOKafkaConsumer is a high-level, asynchronous message consumer. It interacts with the assigned Kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).

Example of AIOKafkaConsumer usage:

from aiokafka import AIOKafkaConsumer
import asyncio

async def consume():
    consumer = AIOKafkaConsumer(
        'my_topic', 'my_other_topic',
        bootstrap_servers='localhost:9092',
        group_id="my-group")
    # Get cluster layout and join group `my-group`
    await consumer.start()
    try:
        # Consume messages
        async for msg in consumer:
            print("consumed: ", msg.topic, msg.partition, msg.offset,
                  msg.key, msg.value, msg.timestamp)
    finally:
        # Will leave consumer group; perform autocommit if enabled.
        await consumer.stop()

asyncio.run(consume())

Running tests

Docker is required to run tests. See https://docs.docker.com/engine/installation for installation notes. Also note, that lz4 compression libraries for python will require python-dev package, or python source header files for compilation on Linux. NOTE: You will also need a valid java installation. It’s required for the keytool utility, used to generate ssh keys for some tests.

Setting up tests requirements (assuming you’re within virtualenv on ubuntu 14.04+):

sudo apt-get install -y libsnappy-dev libzstd-dev libkrb5-dev krb5-user
make setup

Running tests with coverage:

make cov

To run tests with a specific version of Kafka (default one is 1.0.2) use KAFKA_VERSION variable:

make cov KAFKA_VERSION=0.10.2.1

Test running cheatsheat:

  • make test FLAGS="-l -x --ff" - run until 1 failure, rerun failed tests first. Great for cleaning up a lot of errors, say after a big refactor.

  • make test FLAGS="-k consumer" - run only the consumer tests.

  • make test FLAGS="-m 'not ssl'" - run tests excluding ssl.

  • make test FLAGS="--no-pull" - do not try to pull new docker image before test run.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

divyanx_aiokafka-0.10.6204991.tar.gz (558.7 kB view details)

Uploaded Source

Built Distribution

divyanx_aiokafka-0.10.6204991-cp39-cp39-macosx_11_0_arm64.whl (353.7 kB view details)

Uploaded CPython 3.9 macOS 11.0+ ARM64

File details

Details for the file divyanx_aiokafka-0.10.6204991.tar.gz.

File metadata

File hashes

Hashes for divyanx_aiokafka-0.10.6204991.tar.gz
Algorithm Hash digest
SHA256 401fcea88cf361e07586ba8b922f61562bd25e59ddcc0f475f58b96e14d52abf
MD5 910e63146c727c1cc83c700370c72af6
BLAKE2b-256 f364028294d174f68f7d6180fb89734044f35024f7572411e5f2181be7aa3730

See more details on using hashes here.

File details

Details for the file divyanx_aiokafka-0.10.6204991-cp39-cp39-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for divyanx_aiokafka-0.10.6204991-cp39-cp39-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 2a96eb1d3038af248d5159be2b5d6293e89e99561810c93255341745f4536e6b
MD5 a9bd7cea86308fddd7931ae79eb82237
BLAKE2b-256 22135cb76a1e4fa5a1c5edf4164a18d3caddf09383ac95a32548b759c900ed74

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page