Skip to main content

Pure Python client for Apache Kafka

Project description

https://img.shields.io/pypi/v/kafka-python.svg https://img.shields.io/badge/kafka-4.3--0.8-brightgreen.svg https://img.shields.io/badge/license-Apache%202-blue.svg https://img.shields.io/pypi/pyversions/kafka-python.svg

kafka-python is a pure-python client library for Apache Kafka, the distributed stream processing engine. It has no external dependencies and no Cython/C/rust core, making installation across a wide variety of environments simple and easy to manage. It provides high-level class components for consumer, producer, and admin clients, as well as CLI scripts for quick interactive tasks.

kafka-python admin serves as a simple alternative to the apache kafka bin/ scripts, particularly if/when you do not have easy access to an installed/compatible jvm. The CLI interface for admin commands is provided as kafka-python admin and python -m kafka.admin.

Users looking to add more raw throughput can pip install crc32c as an optional dependency, offloading one of the most CPU intensive subsystems to an optimized C library.

pip install kafka-python

# callable as module or as cli-script
kafka-python admin -b localhost:9092 cluster describe

# Create a topic with the admin cli
python -m kafka.admin -b localhost:9092 topics create -t foo-topic

# Produce messages
echo "foo message" | python -m kafka.producer -b localhost:9092 -t foo-topic

# Consume messages
python -m kafka.consumer -b localhost:9092 -C auto_offset_reset=earliest -C consumer_timeout_ms=1000 -g foo-group -t foo-topic

What’s New in 3.0

  • Protocol Stack dynamically generated from Apache Kafka json message schemas.

  • Encode/decode performance optimizations with compiled/cached python bytecode.

  • Expanded KIP feature support, including Cooperative Rebalance (KIP-429), Rack-aware Fetch (KIP-392), Log-Truncation detection (KIP-320), Transactional Producer improvements (KIP-360, KIP-447, KIP-654), Sticky Partitioner (KIP-480), and splittting oversized producer batches (KIP-126).

  • Full refactor and expansion of KafkaAdminClient.

  • Networking changes to leverage kafka.net event-loop and async/await syntax.

  • Python 3.8+ required

KafkaConsumer

KafkaConsumer is a high-level message consumer, intended to operate as similarly as possible to the official java client. See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html for API and configuration details.

The consumer iterator returns ConsumerRecords, which are simple namedtuples that expose basic message attributes: topic, partition, offset, key, and value:

from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
for msg in consumer:
    print (msg)
# join a consumer group for dynamic partition assignment and offset commits
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
for msg in consumer:
    print (msg)
# manually assign the partition list for the consumer
from kafka import KafkaConsumer, TopicPartition
consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
consumer.assign([TopicPartition('foobar', 2)])
msg = next(consumer)

Keys and Values returned by KafkaConsumer will be raw bytes by default. Use a value_deserializer to automatically decode into something else. Helpers are available for simple utf-8 string decoding (DefaultSerializer) and json (JsonSerializer).

# Deserialize json-encoded values
from kafka import KafkaConsumer, JsonSerializer
consumer = KafkaConsumer(value_deserializer=JsonSerializer())
consumer.subscribe(['json-foo'])
for msg in consumer:
    assert isinstance(msg.value, dict)
# Access record headers. The returned value is a list of
# (str, bytes) tuples, representing the header key and value.
for msg in consumer:
    print (msg.headers)
# Read only committed messages from transactional topic
from kafka import KafkaConsumer, IsolationLevel
consumer = KafkaConsumer(isolation_level=IsolationLevel.READ_COMMITTED)
consumer.subscribe(['txn_topic'])
for msg in consumer:
    print(msg)
# Get consumer metrics
metrics = consumer.metrics()

KafkaProducer

KafkaProducer is a high-level, asynchronous message producer. The class is intended to operate as similarly as possible to the official java client. See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html for more details.

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:1234')
for _ in range(100):
    # Fire-and-forget: send() is async and returns before delivery
    producer.send('foobar', b'some_message_bytes')
# To check the status of an async message delivery, use .get()
future = producer.send('foobar', b'another_message')
# future.get() will block until it can return the result or raise on error
result = future.get(timeout=60)
# Block until all pending messages are at least put on the network
# NOTE: This does not guarantee delivery or success! It is really
# only useful if you configure internal batching using linger_ms
producer.flush()

Message keys are used to hash messages with the same key to the same partition. Both keys and values should be raw bytes unless a serializer is configured.

# Use a key for hashed-partitioning
producer.send('foobar', key=b'foo', value=b'bar')
# Serialize json messages
from kafka import KafkaProducer, JsonSerializer
producer = KafkaProducer(value_serializer=JsonSerializer())
producer.send('fizzbuzz', {'foo': 'bar'})
# Serialize string keys
from kafka import KafkaProducer, DefaultSerializer
producer = KafkaProducer(key_serializer=DefaultSerializer())
producer.send('flipflap', key='ping', value=b'1234')

Compression can be used to reduce message size on the wire. Gzip is supported via python stdlib. For other compression types you must install optional dependencies.

# Compress messages
producer = KafkaProducer(compression_type='gzip')
for i in range(1000):
    producer.send('foobar', b'msg %d' % i)

KafkaProducer also supports transactions and message headers when needed.

# Use transactions
producer = KafkaProducer(transactional_id='fizzbuzz')
producer.init_transactions()
producer.begin_transaction()
future = producer.send('txn_topic', value=b'yes')
future.get() # wait for successful produce
producer.commit_transaction() # commit the transaction

producer.begin_transaction()
future = producer.send('txn_topic', value=b'no')
future.get() # wait for successful produce
producer.abort_transaction() # abort the transaction
# Include record headers. The format is list of tuples with string key
# and bytes value.
producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])
# Get producer performance metrics
metrics = producer.metrics()

Module CLI Interface

kafka-python also provides simple command-line interfaces for consumer, producer, and admin clients. Access via python -m kafka.consumer, python -m kafka.producer, and python -m kafka.admin. See https://kafka-python.readthedocs.io/en/master/usage.html for more details.

Compression

kafka-python supports the following compression formats:

  • gzip (via stdlib)

  • LZ4 (via python-lz4, lz4tools, or py-lz4framed)

  • Snappy (via python-snappy)

  • Zstandard (via python-zstandard)

gzip is supported natively, the others require installing additional libraries. See https://kafka-python.readthedocs.io/en/master/install.html for more information.

Optimized CRC32 Validation

Kafka uses CRC32 checksums to validate messages. kafka-python includes a pure python implementation for compatibility. To improve performance for high-throughput applications, kafka-python will use crc32c for optimized native code if installed. See https://kafka-python.readthedocs.io/en/master/install.html for installation instructions. See https://pypi.org/project/crc32c/ for details on the underlying crc32c lib.

Protocol

A secondary goal of kafka-python is to provide an easy-to-use protocol layer for interacting with kafka brokers via the python repl. This is useful for testing, probing, and general experimentation. In version 3.0 the protocol layer was re-written to generate encoder/decoder classes using json message definitions imported directly from the Apache Kafka project source.

Debugging

Use python’s logging module to view internal operational events. See https://docs.python.org/3/howto/logging.html for overview / howto.

import logging
logging.basicConfig(level=logging.DEBUG)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafka_python-3.0.6.tar.gz (511.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kafka_python-3.0.6-py3-none-any.whl (613.9 kB view details)

Uploaded Python 3

File details

Details for the file kafka_python-3.0.6.tar.gz.

File metadata

  • Download URL: kafka_python-3.0.6.tar.gz
  • Upload date:
  • Size: 511.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.30 {"installer":{"name":"uv","version":"0.9.30","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"NixOS","version":"25.11","id":"xantusia","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for kafka_python-3.0.6.tar.gz
Algorithm Hash digest
SHA256 4723f06e6989956cb4ee4e210442d5f7ec716acd59f961070e39a429c50d696e
MD5 42158efd5ac54f1f996b21a04866fce0
BLAKE2b-256 41f95a44953b12b44501969ffb143b5f62bac137c05073f1d3cc97f35786c1da

See more details on using hashes here.

File details

Details for the file kafka_python-3.0.6-py3-none-any.whl.

File metadata

  • Download URL: kafka_python-3.0.6-py3-none-any.whl
  • Upload date:
  • Size: 613.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.30 {"installer":{"name":"uv","version":"0.9.30","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"NixOS","version":"25.11","id":"xantusia","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for kafka_python-3.0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 fdaabef28fe3a67fcd59d3637873505922c8f1b0e6f14fc3014bdd60ff615e44
MD5 d18687e9662259c860f0dece109fa3b1
BLAKE2b-256 9e3a7e5fd7b0f324d211da6c3267df7d823745cbb6189e38135d370e1944f5a6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page