Kafka integration with asyncio.
Project description
aiokafka
========
.. image:: https://travis-ci.org/aio-libs/aiokafka.svg?branch=master
:target: https://travis-ci.org/aio-libs/aiokafka
:alt: |Build status|
.. image:: https://codecov.io/github/aio-libs/aiokafka/coverage.svg?branch=master
:target: https://codecov.io/gh/aio-libs/aiokafka/branch/master
:alt: |Coverage|
asyncio client for Kafka
AIOKafkaProducer
****************
AIOKafkaProducer is a high-level, asynchronous message producer.
Example of AIOKafkaProducer usage:
.. code-block:: python
import asyncio
from aiokafka import AIOKafkaProducer
@asyncio.coroutine
def produce(loop):
# Just adds message to sending queue
future = yield from producer.send('foobar', b'some_message_bytes')
# waiting for message to be delivered
resp = yield from future
print("Message produced: partition {}; offset {}".format(
resp.partition, resp.offset))
# Also can use a helper to send and wait in 1 call
resp = yield from producer.send_and_wait(
'foobar', key=b'foo', value=b'bar')
resp = yield from producer.send_and_wait(
'foobar', b'message for partition 1', partition=1)
loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers='localhost:9092')
# Bootstrap client, will get initial cluster metadata
loop.run_until_complete(producer.start())
loop.run_until_complete(produce(loop))
# Wait for all pending messages to be delivered or expire
loop.run_until_complete(producer.stop())
loop.close()
AIOKafkaConsumer
****************
AIOKafkaConsumer is a high-level, asynchronous message consumer.
It interacts with the assigned Kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).
Example of AIOKafkaConsumer usage:
.. code-block:: python
import asyncio
from kafka.common import KafkaError
from aiokafka import AIOKafkaConsumer
@asyncio.coroutine
def consume_task(consumer):
while True:
try:
msg = yield from consumer.getone()
print("consumed: ", msg.topic, msg.partition, msg.offset,
msg.key, msg.value, msg.timestamp)
except KafkaError as err:
print("error while consuming message: ", err)
loop = asyncio.get_event_loop()
consumer = AIOKafkaConsumer(
'topic1', 'topic2', loop=loop, bootstrap_servers='localhost:1234')
# Bootstrap client, will get initial cluster metadata
loop.run_until_complete(consumer.start())
c_task = loop.create_task(consume_task(consumer))
try:
loop.run_forever()
finally:
# Will gracefully leave consumer group; perform autocommit if enabled
loop.run_until_complete(consumer.stop())
c_task.cancel()
loop.close()
Running tests
-------------
Docker is required to run tests. See https://docs.docker.com/engine/installation for installation notes. Also note, that `lz4` compression libraries for python will require `python-dev` package,
or python source header files for compilation on Linux.
Setting up tests requirements (assuming you're within virtualenv on ubuntu 14.04+)::
sudo apt-get install -y libsnappy-dev
make setup
Running tests::
make cov
To run tests with a specific version of Kafka (default one is 0.10.1.0) use KAFKA_VERSION variable::
make cov KAFKA_VERSION=0.10.0.0
CHANGES
--------
0.2.3 (2017-07-23)
^^^^^^^^^^^^^^^^^^
* Fixed retry problem in Producer, when buffer is not reset to 0 offset.
Thanks to @ngavrysh for the fix in Tubular/aiokafka fork. (issue #184)
* Fixed how Producer handles retries on Leader node failure. It just did not
work before... Thanks to @blugowski for the help in locating the problem.
(issue #176, issue #173)
* Fixed degrade in v0.2.2 on Consumer with no group_id. (issue #166)
0.2.2 (2017-04-17)
^^^^^^^^^^^^^^^^^^
* Reconnect after KafkaTimeoutException. (PR #149 by @Artimi)
* Fixed compacted topic handling. It could skip messages if those were
compacted (issue #71)
* Fixed old issue with new topics not adding to subscription on pattern
(issue #46)
* Another fix for Consumer race condition on JoinGroup. This forces Leader to
wait for new metadata before assigning partitions. (issue #118)
* Changed metadata listener in Coordinator to avoid 2 rejoins in a rare
condition (issue #108)
* `getmany` will not return 0 results until we hit timeout. (issue #117)
Big thanks to @Artimi for pointing out several of those issues.
0.2.1 (2017-02-19)
^^^^^^^^^^^^^^^^^^
* Add a check to wait topic autocreation in Consumer, instead of raising UnknownTopicOrPartitionError (PR #92 by fabregas)
* Consumer now stops consumption after `consumer.stop()` call. Any new `get*` calls will result in ConsumerStoppedError (PR #81)
* Added `exclude_internal_topics` option for Consumer (PR #111)
* Better support for pattern subscription when used with `group_id` (part of PR #111)
* Fix for Consumer `subscribe` and JoinGroup race condition (issue #88). Coordinator will now notice subscription changes during rebalance and will join group again. (PR #106)
* Changed logging messages according to KAFKA-3318. Now INFO level should be less messy and more informative. (PR #110)
* Add support for connections_max_idle_ms config (PR #113)
0.2.0 (2016-12-18)
^^^^^^^^^^^^^^^^^^
* Added SSL support. (PR #81 by Drizzt1991)
* Fixed UnknownTopicOrPartitionError error on first message for autocreated topic (PR #96 by fabregas)
* Fixed `next_record` recursion (PR #94 by fabregas)
* Fixed Heartbeat fail if no consumers (PR #92 by fabregas)
* Added docs addressing kafka-python and aiokafka differences (PR #70 by Drizzt1991)
* Added `max_poll_records` option for Consumer (PR #72 by Drizzt1991)
* Fix kafka-python typos in docs (PR #69 by jeffwidman)
* Topics and partitions are now randomized on each Fetch request (PR #66 by Drizzt1991)
0.1.4 (2016-11-07)
^^^^^^^^^^^^^^^^^^
* Bumped kafka-python version to 1.3.1 and Kafka to 0.10.1.0.
* Fixed auto version detection, to correctly handle 0.10.0.0 version
* Updated Fetch and Produce requests to use v2 with v0.10.0 message format on brokers.
This allows a ``timestamp`` to be associated with messages.
* Changed lz4 compression framing, as it was changed due to KIP-57 in new message format.
* Minor refactorings
Big thanks to @fabregas for the hard work on this release (PR #60)
0.1.3 (2016-10-18)
^^^^^^^^^^^^^^^^^^
* Fixed bug with infinite loop on heartbeats with autocommit=True. #44
* Bumped kafka-python to version 1.1.1
* Fixed docker test runner with multiple interfaces
* Minor documentation fixes
0.1.2 (2016-04-30)
^^^^^^^^^^^^^^^^^^
* Added Python3.5 usage example to docs
* Don't raise retriable exceptions in 3.5's async for iterator
* Fix Cancellation issue with producer's `send_and_wait` method
0.1.1 (2016-04-15)
^^^^^^^^^^^^^^^^^^
* Fix packaging issues. Removed unneded files from package.
0.1.0 (2016-04-15)
^^^^^^^^^^^^^^^^^^
Initial release
Added full support for Kafka 9.0. Older Kafka versions are not tested.
========
.. image:: https://travis-ci.org/aio-libs/aiokafka.svg?branch=master
:target: https://travis-ci.org/aio-libs/aiokafka
:alt: |Build status|
.. image:: https://codecov.io/github/aio-libs/aiokafka/coverage.svg?branch=master
:target: https://codecov.io/gh/aio-libs/aiokafka/branch/master
:alt: |Coverage|
asyncio client for Kafka
AIOKafkaProducer
****************
AIOKafkaProducer is a high-level, asynchronous message producer.
Example of AIOKafkaProducer usage:
.. code-block:: python
import asyncio
from aiokafka import AIOKafkaProducer
@asyncio.coroutine
def produce(loop):
# Just adds message to sending queue
future = yield from producer.send('foobar', b'some_message_bytes')
# waiting for message to be delivered
resp = yield from future
print("Message produced: partition {}; offset {}".format(
resp.partition, resp.offset))
# Also can use a helper to send and wait in 1 call
resp = yield from producer.send_and_wait(
'foobar', key=b'foo', value=b'bar')
resp = yield from producer.send_and_wait(
'foobar', b'message for partition 1', partition=1)
loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers='localhost:9092')
# Bootstrap client, will get initial cluster metadata
loop.run_until_complete(producer.start())
loop.run_until_complete(produce(loop))
# Wait for all pending messages to be delivered or expire
loop.run_until_complete(producer.stop())
loop.close()
AIOKafkaConsumer
****************
AIOKafkaConsumer is a high-level, asynchronous message consumer.
It interacts with the assigned Kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).
Example of AIOKafkaConsumer usage:
.. code-block:: python
import asyncio
from kafka.common import KafkaError
from aiokafka import AIOKafkaConsumer
@asyncio.coroutine
def consume_task(consumer):
while True:
try:
msg = yield from consumer.getone()
print("consumed: ", msg.topic, msg.partition, msg.offset,
msg.key, msg.value, msg.timestamp)
except KafkaError as err:
print("error while consuming message: ", err)
loop = asyncio.get_event_loop()
consumer = AIOKafkaConsumer(
'topic1', 'topic2', loop=loop, bootstrap_servers='localhost:1234')
# Bootstrap client, will get initial cluster metadata
loop.run_until_complete(consumer.start())
c_task = loop.create_task(consume_task(consumer))
try:
loop.run_forever()
finally:
# Will gracefully leave consumer group; perform autocommit if enabled
loop.run_until_complete(consumer.stop())
c_task.cancel()
loop.close()
Running tests
-------------
Docker is required to run tests. See https://docs.docker.com/engine/installation for installation notes. Also note, that `lz4` compression libraries for python will require `python-dev` package,
or python source header files for compilation on Linux.
Setting up tests requirements (assuming you're within virtualenv on ubuntu 14.04+)::
sudo apt-get install -y libsnappy-dev
make setup
Running tests::
make cov
To run tests with a specific version of Kafka (default one is 0.10.1.0) use KAFKA_VERSION variable::
make cov KAFKA_VERSION=0.10.0.0
CHANGES
--------
0.2.3 (2017-07-23)
^^^^^^^^^^^^^^^^^^
* Fixed retry problem in Producer, when buffer is not reset to 0 offset.
Thanks to @ngavrysh for the fix in Tubular/aiokafka fork. (issue #184)
* Fixed how Producer handles retries on Leader node failure. It just did not
work before... Thanks to @blugowski for the help in locating the problem.
(issue #176, issue #173)
* Fixed degrade in v0.2.2 on Consumer with no group_id. (issue #166)
0.2.2 (2017-04-17)
^^^^^^^^^^^^^^^^^^
* Reconnect after KafkaTimeoutException. (PR #149 by @Artimi)
* Fixed compacted topic handling. It could skip messages if those were
compacted (issue #71)
* Fixed old issue with new topics not adding to subscription on pattern
(issue #46)
* Another fix for Consumer race condition on JoinGroup. This forces Leader to
wait for new metadata before assigning partitions. (issue #118)
* Changed metadata listener in Coordinator to avoid 2 rejoins in a rare
condition (issue #108)
* `getmany` will not return 0 results until we hit timeout. (issue #117)
Big thanks to @Artimi for pointing out several of those issues.
0.2.1 (2017-02-19)
^^^^^^^^^^^^^^^^^^
* Add a check to wait topic autocreation in Consumer, instead of raising UnknownTopicOrPartitionError (PR #92 by fabregas)
* Consumer now stops consumption after `consumer.stop()` call. Any new `get*` calls will result in ConsumerStoppedError (PR #81)
* Added `exclude_internal_topics` option for Consumer (PR #111)
* Better support for pattern subscription when used with `group_id` (part of PR #111)
* Fix for Consumer `subscribe` and JoinGroup race condition (issue #88). Coordinator will now notice subscription changes during rebalance and will join group again. (PR #106)
* Changed logging messages according to KAFKA-3318. Now INFO level should be less messy and more informative. (PR #110)
* Add support for connections_max_idle_ms config (PR #113)
0.2.0 (2016-12-18)
^^^^^^^^^^^^^^^^^^
* Added SSL support. (PR #81 by Drizzt1991)
* Fixed UnknownTopicOrPartitionError error on first message for autocreated topic (PR #96 by fabregas)
* Fixed `next_record` recursion (PR #94 by fabregas)
* Fixed Heartbeat fail if no consumers (PR #92 by fabregas)
* Added docs addressing kafka-python and aiokafka differences (PR #70 by Drizzt1991)
* Added `max_poll_records` option for Consumer (PR #72 by Drizzt1991)
* Fix kafka-python typos in docs (PR #69 by jeffwidman)
* Topics and partitions are now randomized on each Fetch request (PR #66 by Drizzt1991)
0.1.4 (2016-11-07)
^^^^^^^^^^^^^^^^^^
* Bumped kafka-python version to 1.3.1 and Kafka to 0.10.1.0.
* Fixed auto version detection, to correctly handle 0.10.0.0 version
* Updated Fetch and Produce requests to use v2 with v0.10.0 message format on brokers.
This allows a ``timestamp`` to be associated with messages.
* Changed lz4 compression framing, as it was changed due to KIP-57 in new message format.
* Minor refactorings
Big thanks to @fabregas for the hard work on this release (PR #60)
0.1.3 (2016-10-18)
^^^^^^^^^^^^^^^^^^
* Fixed bug with infinite loop on heartbeats with autocommit=True. #44
* Bumped kafka-python to version 1.1.1
* Fixed docker test runner with multiple interfaces
* Minor documentation fixes
0.1.2 (2016-04-30)
^^^^^^^^^^^^^^^^^^
* Added Python3.5 usage example to docs
* Don't raise retriable exceptions in 3.5's async for iterator
* Fix Cancellation issue with producer's `send_and_wait` method
0.1.1 (2016-04-15)
^^^^^^^^^^^^^^^^^^
* Fix packaging issues. Removed unneded files from package.
0.1.0 (2016-04-15)
^^^^^^^^^^^^^^^^^^
Initial release
Added full support for Kafka 9.0. Older Kafka versions are not tested.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
aiokafka-0.2.3.tar.gz
(49.6 kB
view hashes)