CLI tool for managing Apache Kafka clusters
Project description
Kafka CLI (kafka)
CLI tool for managing Apache Kafka clusters — topics, consumer groups, messages, Schema Registry, ACLs, and multi-cluster configurations.
Installation
Requires Python 3.10+.
pip install kafka-cli
Or with pipx (recommended for CLI tools):
pipx install kafka-cli
With optional serialization support:
pipx install "kafka-cli[avro]" # Avro support
pipx install "kafka-cli[protobuf]" # Protobuf support
pipx install "kafka-cli[all]" # All formats
Verify:
kafka --help
From Source
git clone <repo-url>
cd kafka-cli
poetry install
Quick Start
# 1. Initialize — set up your first org & cluster
kafka init
# 2. Check cluster health
kafka cluster info
# 3. List topics
kafka topic list
# 4. Create a topic
kafka topic create my-topic --partitions 6 --replication 3
# 5. Produce a message
kafka produce my-topic --message "hello world"
# 6. Consume messages
kafka consume my-topic --from-beginning --limit 10
Commands
kafka init
Interactive setup wizard. Creates or adds organizations and clusters to ~/.kafka-cli/config.yaml.
kafka init
First run prompts for organization ID, name, cluster name, bootstrap servers, authentication, and optional Schema Registry URL.
kafka config
Manage configurations and switch clusters.
kafka config list # List all orgs & clusters
kafka config current # Show current active config
kafka config use # Switch cluster (interactive)
kafka config use acme-corp prod # Switch cluster (direct)
kafka config test # Test current connection
kafka config delete acme-corp staging --yes # Remove a cluster config
Config file: ~/.kafka-cli/config.yaml
Example config structure:
organizations:
acme-corp:
name: Acme Corporation
clusters:
dev:
bootstrap_servers: localhost:9092
auth:
mechanism: none
schema_registry:
url: http://localhost:8081
prod:
bootstrap_servers: kafka-prod-1:9092,kafka-prod-2:9092
auth:
mechanism: sasl_scram_256
username: ${KAFKA_USER}
password: ${KAFKA_PASSWORD}
security_protocol: SASL_SSL
default:
organization: acme-corp
cluster: dev
kafka cluster
Cluster information and health.
kafka cluster info # Cluster metadata
kafka cluster health # Broker health check
kafka cluster brokers # List brokers
kafka cluster topics-summary # Topic/partition overview
kafka topic
Manage Kafka topics.
kafka topic list # List all topics
kafka topic list --filter "order*" # Glob filter
kafka topic create my-topic -p 6 -r 3 # Create topic
kafka topic describe my-topic # Full topic detail
kafka topic alter my-topic --config retention.ms=604800000
kafka topic delete my-topic # Delete topic
kafka topic consume-offsets my-topic # Consumer group lag per topic
Global flags for all cluster commands:
kafka topic list --org acme-corp --cluster prod
kafka group
Manage consumer groups.
kafka group list # List all groups
kafka group list --state stable # Filter by state
kafka group describe my-group # Group detail + offsets
kafka group delete my-group # Delete group
kafka group reset-offsets my-group \
--topic my-topic --to-earliest --execute # Reset offsets
kafka produce
Send messages to a topic.
kafka produce my-topic # Interactive mode
kafka produce my-topic -m "hello world" # Single message
kafka produce my-topic -k "key1" -m '{"data": 1}' # With key
kafka produce my-topic --file data.jsonl # Bulk from file
kafka produce my-topic --file data.jsonl --key-field "id" --rate 100
kafka consume
Read messages from a topic.
kafka consume my-topic # Live tail
kafka consume my-topic --from-beginning --limit 10 # First 10 messages
kafka consume my-topic --group my-reader # With consumer group
kafka consume my-topic --follow # Continuous (tail -f)
kafka consume my-topic --output json # JSON for piping
kafka consume my-topic --output raw # Values only
kafka consume my-topic --schema-registry # Auto-deserialize
kafka schema
Schema Registry management (Avro, JSON Schema, Protobuf).
kafka schema list # List subjects
kafka schema get my-topic-value # Get latest schema
kafka schema get my-topic-value --version 3 # Specific version
kafka schema versions my-topic-value # List versions
kafka schema create my-topic-value --file schema.avsc # Register schema
kafka schema test my-topic-value --file schema.avsc # Test compatibility
kafka schema delete my-topic-value # Soft delete
kafka schema config --level BACKWARD # Set compatibility
kafka acl
ACL (Access Control List) management.
kafka acl list # List all ACLs
kafka acl list --principal "User:alice" # Filter by user
kafka acl create --principal "User:alice" \
--topic my-topic --operation read --permission allow
kafka acl delete --principal "User:alice" \
--topic my-topic --operation read --yes
kafka history
View CLI command history.
kafka history # Show recent commands (default: 50)
kafka history -n 10 # Last 10
kafka history --clear # Clear history
Multi-Cluster Workflow
# Set up dev cluster
kafka init
# Org: acme-corp, Cluster: dev, Bootstrap: localhost:9092
# Add prod cluster
kafka init
# Select: acme-corp, Cluster: prod, Bootstrap: kafka-prod-1:9092,...
# Switch between clusters
kafka config use acme-corp dev
kafka config use acme-corp prod
# Run against specific cluster without switching
kafka topic list --org acme-corp --cluster prod
Development
# Install with dev dependencies
poetry install
# Run tests
pytest
# Run tests with coverage
pytest --cov=kafka_cli --cov-report=html
# Lint
ruff check src/
ruff format src/
# Type check
mypy src/
Project Structure
src/kafka_cli/
main.py # Typer app entry point
commands/
init.py # kafka init
config.py # kafka config [list|use|current|delete|test]
cluster.py # kafka cluster [info|health|brokers|topics-summary]
topic.py # kafka topic [list|create|describe|alter|delete]
group.py # kafka group [list|describe|delete|reset-offsets]
produce.py # kafka produce
consume.py # kafka consume
schema.py # kafka schema [list|get|create|delete|test|versions|config]
acl.py # kafka acl [list|create|delete]
history.py # kafka history
core/
config.py # Config load/save (~/.kafka-cli/)
context.py # Current org/cluster context
client.py # KafkaClientFactory (confluent-kafka wrapper)
admin_client.py # Admin operations (topics, groups, ACLs)
producer.py # Producer wrapper
consumer.py # Consumer wrapper
schema_client.py # Schema Registry HTTP client
history.py # Command history tracking
models/ # Pydantic schemas
serializers/ # Message serialization (string, JSON, Avro, Protobuf)
ui/
console.py # Rich console helpers
prompts.py # Questionary wrappers
tables.py # Rich table builders
utils/
validators.py # Input validators
clipboard.py # Copy to clipboard
formatters.py # Byte/timestamp formatting
Documentation
- Project Structure — Architecture & module details
- Command Reference — All commands, flags, output examples
- Configuration — Config schema & multi-cluster setup
- Authentication — SASL, SSL, mTLS, Confluent Cloud
- Schema Registry — Avro, JSON Schema, Protobuf workflows
- Examples — Real-world usage examples
License
GPL-3.0-or-later
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kafka_util-0.1.3.tar.gz.
File metadata
- Download URL: kafka_util-0.1.3.tar.gz
- Upload date:
- Size: 36.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.3 CPython/3.10.13 Darwin/25.3.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c25e04a8c1865e1f69a388d45bc4d5f28a20edd96e8cfab902aa2820f682cb1a
|
|
| MD5 |
ab0ddd98faa6cb2465039a314d16a99e
|
|
| BLAKE2b-256 |
a19e5f8890e33bf81c2b6d77a4cdf31002b45845afc108579cb7094097208637
|
File details
Details for the file kafka_util-0.1.3-py3-none-any.whl.
File metadata
- Download URL: kafka_util-0.1.3-py3-none-any.whl
- Upload date:
- Size: 51.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.3 CPython/3.10.13 Darwin/25.3.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
55e2bbfa02228be3ef6a43f2072ccafd748e5074b646f26352ea46501b37e5e6
|
|
| MD5 |
d2440c987578398dede5156c86db2650
|
|
| BLAKE2b-256 |
92599ea4add52efa558285dbd24f51f820c053051979a8c6dd5323d1cd070c55
|