Skip to main content

Pure Python client for Apache Kafka

Project description

https://img.shields.io/pypi/v/kafka-python.svg https://img.shields.io/badge/kafka-4.3--0.8-brightgreen.svg https://img.shields.io/badge/license-Apache%202-blue.svg https://img.shields.io/pypi/pyversions/kafka-python.svg

kafka-python is a pure-python client library for Apache Kafka, the distributed stream processing engine. It has no external dependencies and no Cython/C/rust core, making installation across a wide variety of environments simple and easy to manage. It provides high-level class components for consumer, producer, and admin clients, as well as CLI scripts for quick interactive tasks.

kafka-python admin serves as a simple alternative to the apache kafka bin/ scripts, particularly if/when you do not have easy access to an installed/compatible jvm. The CLI interface for admin commands is provided as kafka-python admin and python -m kafka.admin.

Users looking to add more raw throughput can pip install crc32c as an optional dependency, offloading one of the most CPU intensive subsystems to an optimized C library.

pip install kafka-python

# callable as module or as cli-script
kafka-python admin -b localhost:9092 cluster describe

# Create a topic with the admin cli
python -m kafka.admin -b localhost:9092 topics create -t foo-topic

# Produce messages
echo "foo message" | python -m kafka.producer -b localhost:9092 -t foo-topic

# Consume messages
python -m kafka.consumer -b localhost:9092 -C auto_offset_reset=earliest -C consumer_timeout_ms=1000 -g foo-group -t foo-topic

What’s New in 3.0

  • Protocol Stack dynamically generated from Apache Kafka json message schemas.

  • Encode/decode performance optimizations with compiled/cached python bytecode.

  • Expanded KIP feature support, including Cooperative Rebalance (KIP-429), Rack-aware Fetch (KIP-392), Log-Truncation detection (KIP-320), Transactional Producer improvements (KIP-360, KIP-447, KIP-654), Sticky Partitioner (KIP-480), and splittting oversized producer batches (KIP-126).

  • Full refactor and expansion of KafkaAdminClient.

  • Networking changes to leverage kafka.net event-loop and async/await syntax.

  • Python 3.8+ required

KafkaConsumer

KafkaConsumer is a high-level message consumer, intended to operate as similarly as possible to the official java client. See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html for API and configuration details.

The consumer iterator returns ConsumerRecords, which are simple namedtuples that expose basic message attributes: topic, partition, offset, key, and value:

from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
for msg in consumer:
    print (msg)
# join a consumer group for dynamic partition assignment and offset commits
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
for msg in consumer:
    print (msg)
# manually assign the partition list for the consumer
from kafka import KafkaConsumer, TopicPartition
consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
consumer.assign([TopicPartition('foobar', 2)])
msg = next(consumer)

Keys and Values returned by KafkaConsumer will be raw bytes by default. Use a value_deserializer to automatically decode into something else. Helpers are available for simple utf-8 string decoding (DefaultSerializer) and json (JsonSerializer).

# Deserialize json-encoded values
from kafka import KafkaConsumer, JsonSerializer
consumer = KafkaConsumer(value_deserializer=JsonSerializer())
consumer.subscribe(['json-foo'])
for msg in consumer:
    assert isinstance(msg.value, dict)
# Access record headers. The returned value is a list of
# (str, bytes) tuples, representing the header key and value.
for msg in consumer:
    print (msg.headers)
# Read only committed messages from transactional topic
from kafka import KafkaConsumer, IsolationLevel
consumer = KafkaConsumer(isolation_level=IsolationLevel.READ_COMMITTED)
consumer.subscribe(['txn_topic'])
for msg in consumer:
    print(msg)
# Get consumer metrics
metrics = consumer.metrics()

KafkaProducer

KafkaProducer is a high-level, asynchronous message producer. The class is intended to operate as similarly as possible to the official java client. See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html for more details.

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:1234')
for _ in range(100):
    # Fire-and-forget: send() is async and returns before delivery
    producer.send('foobar', b'some_message_bytes')
# To check the status of an async message delivery, use .get()
future = producer.send('foobar', b'another_message')
# future.get() will block until it can return the result or raise on error
result = future.get(timeout=60)
# Block until all pending messages are at least put on the network
# NOTE: This does not guarantee delivery or success! It is really
# only useful if you configure internal batching using linger_ms
producer.flush()

Message keys are used to hash messages with the same key to the same partition. Both keys and values should be raw bytes unless a serializer is configured.

# Use a key for hashed-partitioning
producer.send('foobar', key=b'foo', value=b'bar')
# Serialize json messages
from kafka import KafkaProducer, JsonSerializer
producer = KafkaProducer(value_serializer=JsonSerializer())
producer.send('fizzbuzz', {'foo': 'bar'})
# Serialize string keys
from kafka import KafkaProducer, DefaultSerializer
producer = KafkaProducer(key_serializer=DefaultSerializer())
producer.send('flipflap', key='ping', value=b'1234')

Compression can be used to reduce message size on the wire. Gzip is supported via python stdlib. For other compression types you must install optional dependencies.

# Compress messages
producer = KafkaProducer(compression_type='gzip')
for i in range(1000):
    producer.send('foobar', b'msg %d' % i)

KafkaProducer also supports transactions and message headers when needed.

# Use transactions
producer = KafkaProducer(transactional_id='fizzbuzz')
producer.init_transactions()
producer.begin_transaction()
future = producer.send('txn_topic', value=b'yes')
future.get() # wait for successful produce
producer.commit_transaction() # commit the transaction

producer.begin_transaction()
future = producer.send('txn_topic', value=b'no')
future.get() # wait for successful produce
producer.abort_transaction() # abort the transaction
# Include record headers. The format is list of tuples with string key
# and bytes value.
producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])
# Get producer performance metrics
metrics = producer.metrics()

Module CLI Interface

kafka-python also provides simple command-line interfaces for consumer, producer, and admin clients. Access via python -m kafka.consumer, python -m kafka.producer, and python -m kafka.admin. See https://kafka-python.readthedocs.io/en/master/usage.html for more details.

Compression

kafka-python supports the following compression formats:

  • gzip (via stdlib)

  • LZ4 (via python-lz4, lz4tools, or py-lz4framed)

  • Snappy (via python-snappy)

  • Zstandard (via python-zstandard)

gzip is supported natively, the others require installing additional libraries. See https://kafka-python.readthedocs.io/en/master/install.html for more information.

Optimized CRC32 Validation

Kafka uses CRC32 checksums to validate messages. kafka-python includes a pure python implementation for compatibility. To improve performance for high-throughput applications, kafka-python will use crc32c for optimized native code if installed. See https://kafka-python.readthedocs.io/en/master/install.html for installation instructions. See https://pypi.org/project/crc32c/ for details on the underlying crc32c lib.

Protocol

A secondary goal of kafka-python is to provide an easy-to-use protocol layer for interacting with kafka brokers via the python repl. This is useful for testing, probing, and general experimentation. In version 3.0 the protocol layer was re-written to generate encoder/decoder classes using json message definitions imported directly from the Apache Kafka project source.

Debugging

Use python’s logging module to view internal operational events. See https://docs.python.org/3/howto/logging.html for overview / howto.

import logging
logging.basicConfig(level=logging.DEBUG)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafka_python-3.0.0.tar.gz (492.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kafka_python-3.0.0-py3-none-any.whl (603.6 kB view details)

Uploaded Python 3

File details

Details for the file kafka_python-3.0.0.tar.gz.

File metadata

  • Download URL: kafka_python-3.0.0.tar.gz
  • Upload date:
  • Size: 492.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.30 {"installer":{"name":"uv","version":"0.9.30","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"NixOS","version":"25.11","id":"xantusia","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for kafka_python-3.0.0.tar.gz
Algorithm Hash digest
SHA256 c9bf82fdf8a6d9aae12b3ab1f86968d88080b96bb71e1c6184ec3a2239b6c6d4
MD5 50878a496af9e734c9e95013b067c6b1
BLAKE2b-256 147af24fb0257e50a67288a4241122b6e03a4ad4f8b17f30bcfee58054c043ff

See more details on using hashes here.

File details

Details for the file kafka_python-3.0.0-py3-none-any.whl.

File metadata

  • Download URL: kafka_python-3.0.0-py3-none-any.whl
  • Upload date:
  • Size: 603.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.30 {"installer":{"name":"uv","version":"0.9.30","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"NixOS","version":"25.11","id":"xantusia","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for kafka_python-3.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d5fd1d3ebb0fcdc24c32bdbeb0b59812fe0cfbe395502584ce53f11742b8649d
MD5 55e65e3c26396811cd11e93f405a33a3
BLAKE2b-256 da7ea98147549d743f1a25584158a13f5a18202e7ee2c017d5977d9c4f62dcea

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page