Kafka integration with asyncio
Project description
asyncio client for Kafka
AIOKafkaProducer
AIOKafkaProducer is a high-level, asynchronous message producer.
Example of AIOKafkaProducer usage:
from aiokafka import AIOKafkaProducer
import asyncio
async def send_one():
producer = AIOKafkaProducer(bootstrap_servers='localhost:9092')
# Get cluster layout and initial topic/partition leadership information
await producer.start()
try:
# Produce message
await producer.send_and_wait("my_topic", b"Super message")
finally:
# Wait for all pending messages to be delivered or expire.
await producer.stop()
asyncio.run(send_one())
AIOKafkaConsumer
AIOKafkaConsumer is a high-level, asynchronous message consumer. It interacts with the assigned Kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).
Example of AIOKafkaConsumer usage:
from aiokafka import AIOKafkaConsumer
import asyncio
async def consume():
consumer = AIOKafkaConsumer(
'my_topic', 'my_other_topic',
bootstrap_servers='localhost:9092',
group_id="my-group")
# Get cluster layout and join group `my-group`
await consumer.start()
try:
# Consume messages
async for msg in consumer:
print("consumed: ", msg.topic, msg.partition, msg.offset,
msg.key, msg.value, msg.timestamp)
finally:
# Will leave consumer group; perform autocommit if enabled.
await consumer.stop()
asyncio.run(consume())
Running tests
Docker is required to run tests. See https://docs.docker.com/engine/installation for installation notes. Also note, that lz4 compression libraries for python will require python-dev package, or python source header files for compilation on Linux. NOTE: You will also need a valid java installation. It’s required for the keytool utility, used to generate ssh keys for some tests.
Setting up tests requirements (assuming you’re within virtualenv on ubuntu 14.04+):
sudo apt-get install -y libsnappy-dev libzstd-dev libkrb5-dev krb5-user make setup
Running tests with coverage:
make cov
To run tests with a specific version of Kafka (default one is 1.0.2) use KAFKA_VERSION variable:
make cov KAFKA_VERSION=0.10.2.1
Test running cheatsheat:
make test FLAGS="-l -x --ff" - run until 1 failure, rerun failed tests first. Great for cleaning up a lot of errors, say after a big refactor.
make test FLAGS="-k consumer" - run only the consumer tests.
make test FLAGS="-m 'not ssl'" - run tests excluding ssl.
make test FLAGS="--no-pull" - do not try to pull new docker image before test run.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file divyanx_aiokafka-0.10.6204991.tar.gz
.
File metadata
- Download URL: divyanx_aiokafka-0.10.6204991.tar.gz
- Upload date:
- Size: 558.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.9.19
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 401fcea88cf361e07586ba8b922f61562bd25e59ddcc0f475f58b96e14d52abf |
|
MD5 | 910e63146c727c1cc83c700370c72af6 |
|
BLAKE2b-256 | f364028294d174f68f7d6180fb89734044f35024f7572411e5f2181be7aa3730 |
File details
Details for the file divyanx_aiokafka-0.10.6204991-cp39-cp39-macosx_11_0_arm64.whl
.
File metadata
- Download URL: divyanx_aiokafka-0.10.6204991-cp39-cp39-macosx_11_0_arm64.whl
- Upload date:
- Size: 353.7 kB
- Tags: CPython 3.9, macOS 11.0+ ARM64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.9.19
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2a96eb1d3038af248d5159be2b5d6293e89e99561810c93255341745f4536e6b |
|
MD5 | a9bd7cea86308fddd7931ae79eb82237 |
|
BLAKE2b-256 | 22135cb76a1e4fa5a1c5edf4164a18d3caddf09383ac95a32548b759c900ed74 |