Kafka integration with asyncio.
asyncio client for Kafka
AIOKafkaProducer is a high-level, asynchronous message producer.
Example of AIOKafkaProducer usage:
from aiokafka import AIOKafkaProducer import asyncio loop = asyncio.get_event_loop() async def send_one(): producer = AIOKafkaProducer( loop=loop, bootstrap_servers='localhost:9092') # Get cluster layout and initial topic/partition leadership information await producer.start() try: # Produce message await producer.send_and_wait("my_topic", b"Super message") finally: # Wait for all pending messages to be delivered or expire. await producer.stop() loop.run_until_complete(send_one())
AIOKafkaConsumer is a high-level, asynchronous message consumer. It interacts with the assigned Kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).
Example of AIOKafkaConsumer usage:
from aiokafka import AIOKafkaConsumer import asyncio loop = asyncio.get_event_loop() async def consume(): consumer = AIOKafkaConsumer( 'my_topic', 'my_other_topic', loop=loop, bootstrap_servers='localhost:9092', group_id="my-group") # Get cluster layout and join group `my-group` await consumer.start() try: # Consume messages async for msg in consumer: print("consumed: ", msg.topic, msg.partition, msg.offset, msg.key, msg.value, msg.timestamp) finally: # Will leave consumer group; perform autocommit if enabled. await consumer.stop() loop.run_until_complete(consume())
Docker is required to run tests. See https://docs.docker.com/engine/installation for installation notes. Also note, that lz4 compression libraries for python will require python-dev package, or python source header files for compilation on Linux.
Setting up tests requirements (assuming you’re within virtualenv on ubuntu 14.04+):
sudo apt-get install -y libsnappy-dev make setup
To run tests with a specific version of Kafka (default one is 0.10.1.0) use KAFKA_VERSION variable:
make cov KAFKA_VERSION=0.10.0.0
- Added AIOKafkaProducer.flush() method. (PR #209 by @vineet-rh)
- Fixed a bug with uvloop involving float(“inf”) for timeout. (PR #210 by dmitry-moroz)
- Changed test runner to allow running tests on OSX. (PR #213 by @shargan)
- Moved all public structures and errors to aiokafka namespace. You will no longer need to import from kafka namespace.
- Changed ConsumerRebalanceListener to support either function or coroutine for on_partitions_assigned and on_partitions_revoked callbacks. (PR #190 by @ask)
- Added support for offsets_for_times, beginning_offsets, end_offsets API’s. (issue #164)
- Coordinator requests are now sent using a separate socket. Fixes slow commit issue. (issuer #137, issue #128)
- Added seek_to_end, seek_to_beginning API’s. (issue #154)
- Updated documentation to provide more useful usage guide on both Consumer and Producer interface.
- Fixed retry problem in Producer, when buffer is not reset to 0 offset. Thanks to @ngavrysh for the fix in Tubular/aiokafka fork. (issue #184)
- Fixed how Producer handles retries on Leader node failure. It just did not work before… Thanks to @blugowski for the help in locating the problem. (issue #176, issue #173)
- Fixed degrade in v0.2.2 on Consumer with no group_id. (issue #166)
- Reconnect after KafkaTimeoutException. (PR #149 by @Artimi)
- Fixed compacted topic handling. It could skip messages if those were compacted (issue #71)
- Fixed old issue with new topics not adding to subscription on pattern (issue #46)
- Another fix for Consumer race condition on JoinGroup. This forces Leader to wait for new metadata before assigning partitions. (issue #118)
- Changed metadata listener in Coordinator to avoid 2 rejoins in a rare condition (issue #108)
- getmany will not return 0 results until we hit timeout. (issue #117)
Big thanks to @Artimi for pointing out several of those issues.
- Add a check to wait topic autocreation in Consumer, instead of raising UnknownTopicOrPartitionError (PR #92 by fabregas)
- Consumer now stops consumption after consumer.stop() call. Any new get* calls will result in ConsumerStoppedError (PR #81)
- Added exclude_internal_topics option for Consumer (PR #111)
- Better support for pattern subscription when used with group_id (part of PR #111)
- Fix for Consumer subscribe and JoinGroup race condition (issue #88). Coordinator will now notice subscription changes during rebalance and will join group again. (PR #106)
- Changed logging messages according to KAFKA-3318. Now INFO level should be less messy and more informative. (PR #110)
- Add support for connections_max_idle_ms config (PR #113)
- Added SSL support. (PR #81 by Drizzt1991)
- Fixed UnknownTopicOrPartitionError error on first message for autocreated topic (PR #96 by fabregas)
- Fixed next_record recursion (PR #94 by fabregas)
- Fixed Heartbeat fail if no consumers (PR #92 by fabregas)
- Added docs addressing kafka-python and aiokafka differences (PR #70 by Drizzt1991)
- Added max_poll_records option for Consumer (PR #72 by Drizzt1991)
- Fix kafka-python typos in docs (PR #69 by jeffwidman)
- Topics and partitions are now randomized on each Fetch request (PR #66 by Drizzt1991)
- Bumped kafka-python version to 1.3.1 and Kafka to 0.10.1.0.
- Fixed auto version detection, to correctly handle 0.10.0.0 version
- Updated Fetch and Produce requests to use v2 with v0.10.0 message format on brokers. This allows a timestamp to be associated with messages.
- Changed lz4 compression framing, as it was changed due to KIP-57 in new message format.
- Minor refactorings
Big thanks to @fabregas for the hard work on this release (PR #60)
- Fixed bug with infinite loop on heartbeats with autocommit=True. #44
- Bumped kafka-python to version 1.1.1
- Fixed docker test runner with multiple interfaces
- Minor documentation fixes
- Added Python3.5 usage example to docs
- Don’t raise retriable exceptions in 3.5’s async for iterator
- Fix Cancellation issue with producer’s send_and_wait method
- Fix packaging issues. Removed unneded files from package.
Added full support for Kafka 9.0. Older Kafka versions are not tested.