Skip to main content

Kafka integration with asyncio

Project description

|Build status| |Coverage| |Chat on Gitter|

asyncio client for Kafka

AIOKafkaProducer

AIOKafkaProducer is a high-level, asynchronous message producer.

Example of AIOKafkaProducer usage:

from aiokafka import AIOKafkaProducer
import asyncio

async def send_one():
    producer = AIOKafkaProducer(bootstrap_servers='localhost:9092')
    # Get cluster layout and initial topic/partition leadership information
    await producer.start()
    try:
        # Produce message
        await producer.send_and_wait("my_topic", b"Super message")
    finally:
        # Wait for all pending messages to be delivered or expire.
        await producer.stop()

asyncio.run(send_one())

AIOKafkaConsumer

AIOKafkaConsumer is a high-level, asynchronous message consumer. It interacts with the assigned Kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).

Example of AIOKafkaConsumer usage:

from aiokafka import AIOKafkaConsumer
import asyncio

async def consume():
    consumer = AIOKafkaConsumer(
        'my_topic', 'my_other_topic',
        bootstrap_servers='localhost:9092',
        group_id="my-group")
    # Get cluster layout and join group `my-group`
    await consumer.start()
    try:
        # Consume messages
        async for msg in consumer:
            print("consumed: ", msg.topic, msg.partition, msg.offset,
                  msg.key, msg.value, msg.timestamp)
    finally:
        # Will leave consumer group; perform autocommit if enabled.
        await consumer.stop()

asyncio.run(consume())

Running tests

Docker is required to run tests. See https://docs.docker.com/engine/installation for installation notes. Also note, that lz4 compression libraries for python will require python-dev package, or python source header files for compilation on Linux. NOTE: You will also need a valid java installation. It’s required for the keytool utility, used to generate ssh keys for some tests.

Setting up tests requirements (assuming you’re within virtualenv on ubuntu 14.04+):

sudo apt-get install -y libsnappy-dev libzstd-dev libkrb5-dev krb5-user
make setup

Running tests with coverage:

make cov

To run tests with a specific version of Kafka (default one is 1.0.2) use KAFKA_VERSION variable:

make cov KAFKA_VERSION=0.10.2.1

Test running cheatsheat:

  • make test FLAGS="-l -x --ff" - run until 1 failure, rerun failed tests first. Great for cleaning up a lot of errors, say after a big refactor.

  • make test FLAGS="-k consumer" - run only the consumer tests.

  • make test FLAGS="-m 'not ssl'" - run tests excluding ssl.

  • make test FLAGS="--no-pull" - do not try to pull new docker image before test run.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

divyanx_aiokafka-0.0.620499.tar.gz (558.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

divyanx_aiokafka-0.0.620499-cp39-cp39-macosx_11_0_arm64.whl (353.8 kB view details)

Uploaded CPython 3.9macOS 11.0+ ARM64

File details

Details for the file divyanx_aiokafka-0.0.620499.tar.gz.

File metadata

  • Download URL: divyanx_aiokafka-0.0.620499.tar.gz
  • Upload date:
  • Size: 558.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.9.19

File hashes

Hashes for divyanx_aiokafka-0.0.620499.tar.gz
Algorithm Hash digest
SHA256 293ab3615b84d8e7b38f805c37639e0c68851207eed136d817c836077d73a9ac
MD5 55d19ca065fa1e46416ab4a46407d29c
BLAKE2b-256 a4fd5f425fd58f88b0385b8b0ca620d409546310bda849e716c9037d12ae8470

See more details on using hashes here.

File details

Details for the file divyanx_aiokafka-0.0.620499-cp39-cp39-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for divyanx_aiokafka-0.0.620499-cp39-cp39-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 d7dcb92b17c420983c839198ce6a780c1f43daafb9f2503166ea2e44dd1f7ad2
MD5 13e3f578a416b878de1a0ee2b5620f9f
BLAKE2b-256 1ae5291d8767dca3e6368928e8820d729294e314efccfc507f67afe701b0eb47

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page