This is a pre-production deployment of Warehouse. Changes made here affect the production instance of PyPI (
Help us improve Python packaging - Donate today!

Kafka integration with asyncio.

Project Description
.. image::
:alt: |Build status|
.. image::
:alt: |Coverage|

asyncio client for Kafka


AIOKafkaProducer is a high-level, asynchronous message producer.

Example of AIOKafkaProducer usage:

.. code-block:: python

import asyncio
from aiokafka import AIOKafkaProducer

def produce(loop):
# Just adds message to sending queue
future = yield from producer.send('foobar', b'some_message_bytes')
# waiting for message to be delivered
resp = yield from future
print("Message produced: partition {}; offset {}".format(
resp.partition, resp.offset))
# Also can use a helper to send and wait in 1 call
resp = yield from producer.send_and_wait(
'foobar', key=b'foo', value=b'bar')
resp = yield from producer.send_and_wait(
'foobar', b'message for partition 1', partition=1)

loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers='localhost:9092')
# Bootstrap client, will get initial cluster metadata
# Wait for all pending messages to be delivered or expire


AIOKafkaConsumer is a high-level, asynchronous message consumer.
It interacts with the assigned Kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >=

Example of AIOKafkaConsumer usage:

.. code-block:: python

import asyncio
from kafka.common import KafkaError
from aiokafka import AIOKafkaConsumer

def consume_task(consumer):
while True:
msg = yield from consumer.getone()
print("consumed: ", msg.topic, msg.partition, msg.offset,
msg.key, msg.value, msg.timestamp)
except KafkaError as err:
print("error while consuming message: ", err)

loop = asyncio.get_event_loop()
consumer = AIOKafkaConsumer(
'topic1', 'topic2', loop=loop, bootstrap_servers='localhost:1234')
# Bootstrap client, will get initial cluster metadata
c_task = loop.create_task(consume_task(consumer))
# Will gracefully leave consumer group; perform autocommit if enabled

Running tests

Docker is required to run tests. See for installation notes. Also note, that `lz4` compression libraries for python will require `python-dev` package,
or python source header files for compilation on Linux.

Setting up tests requirements (assuming you're within virtualenv on ubuntu 14.04+)::

sudo apt-get install -y libsnappy-dev
make setup

Running tests::

make cov

To run tests with a specific version of Kafka (default one is use KAFKA_VERSION variable::



0.2.2 (2017-04-17)

* Reconnect after KafkaTimeoutException. (PR #149 by @Artimi)
* Fixed compacted topic handling. It could skip messages if those were
compacted (issue #71)
* Fixed old issue with new topics not adding to subscription on pattern
(issue #46)
* Another fix for Consumer race condition on JoinGroup. This forces Leader to
wait for new metadata before assigning partitions. (issue #118)
* Changed metadata listener in Coordinator to avoid 2 rejoins in a rare
condition (issue #108)
* `getmany` will not return 0 results until we hit timeout. (issue #117)

Big thanks to @Artimi for pointing out several of those issues.

0.2.1 (2017-02-19)

* Add a check to wait topic autocreation in Consumer, instead of raising UnknownTopicOrPartitionError (PR #92 by fabregas)
* Consumer now stops consumption after `consumer.stop()` call. Any new `get*` calls will result in ConsumerStoppedError (PR #81)
* Added `exclude_internal_topics` option for Consumer (PR #111)
* Better support for pattern subscription when used with `group_id` (part of PR #111)
* Fix for Consumer `subscribe` and JoinGroup race condition (issue #88). Coordinator will now notice subscription changes during rebalance and will join group again. (PR #106)
* Changed logging messages according to KAFKA-3318. Now INFO level should be less messy and more informative. (PR #110)
* Add support for connections_max_idle_ms config (PR #113)

0.2.0 (2016-12-18)

* Added SSL support. (PR #81 by Drizzt1991)
* Fixed UnknownTopicOrPartitionError error on first message for autocreated topic (PR #96 by fabregas)
* Fixed `next_record` recursion (PR #94 by fabregas)
* Fixed Heartbeat fail if no consumers (PR #92 by fabregas)
* Added docs addressing kafka-python and aiokafka differences (PR #70 by Drizzt1991)
* Added `max_poll_records` option for Consumer (PR #72 by Drizzt1991)
* Fix kafka-python typos in docs (PR #69 by jeffwidman)
* Topics and partitions are now randomized on each Fetch request (PR #66 by Drizzt1991)

0.1.4 (2016-11-07)

* Bumped kafka-python version to 1.3.1 and Kafka to
* Fixed auto version detection, to correctly handle version
* Updated Fetch and Produce requests to use v2 with v0.10.0 message format on brokers.
This allows a ``timestamp`` to be associated with messages.
* Changed lz4 compression framing, as it was changed due to KIP-57 in new message format.
* Minor refactorings

Big thanks to @fabregas for the hard work on this release (PR #60)

0.1.3 (2016-10-18)

* Fixed bug with infinite loop on heartbeats with autocommit=True. #44
* Bumped kafka-python to version 1.1.1
* Fixed docker test runner with multiple interfaces
* Minor documentation fixes

0.1.2 (2016-04-30)

* Added Python3.5 usage example to docs
* Don't raise retriable exceptions in 3.5's async for iterator
* Fix Cancellation issue with producer's `send_and_wait` method

0.1.1 (2016-04-15)

* Fix packaging issues. Removed unneded files from package.

0.1.0 (2016-04-15)

Initial release

Added full support for Kafka 9.0. Older Kafka versions are not tested.
Release History

Release History

This version
History Node


History Node


History Node


History Node


History Node


History Node


History Node


History Node


History Node


Download Files

Download Files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

File Name & Checksum SHA256 Checksum Help Version File Type Upload Date
aiokafka-0.2.2.tar.gz (50.8 kB) Copy SHA256 Checksum SHA256 Source Apr 17, 2017

Supported By

WebFaction WebFaction Technical Writing Elastic Elastic Search Pingdom Pingdom Monitoring Dyn Dyn DNS Sentry Sentry Error Logging CloudAMQP CloudAMQP RabbitMQ Heroku Heroku PaaS Kabu Creative Kabu Creative UX & Design Fastly Fastly CDN DigiCert DigiCert EV Certificate Rackspace Rackspace Cloud Servers DreamHost DreamHost Log Hosting