Skip to main content

CLI tool for managing Apache Kafka clusters

Project description

Kafka CLI (kafka)

CLI tool for managing Apache Kafka clusters — topics, consumer groups, messages, Schema Registry, ACLs, and multi-cluster configurations.

Installation

Requires Python 3.10+.

pip install kafka-cli

Or with pipx (recommended for CLI tools):

pipx install kafka-cli

With optional serialization support:

pipx install "kafka-cli[avro]"       # Avro support
pipx install "kafka-cli[protobuf]"   # Protobuf support
pipx install "kafka-cli[all]"        # All formats

Verify:

kafka --help

From Source

git clone <repo-url>
cd kafka-cli
poetry install

Quick Start

# 1. Initialize — set up your first org & cluster
kafka init

# 2. Check cluster health
kafka cluster info

# 3. List topics
kafka topic list

# 4. Create a topic
kafka topic create my-topic --partitions 6 --replication 3

# 5. Produce a message
kafka produce my-topic --message "hello world"

# 6. Consume messages
kafka consume my-topic --from-beginning --limit 10

Commands

kafka init

Interactive setup wizard. Creates or adds organizations and clusters to ~/.kafka-cli/config.yaml.

kafka init

First run prompts for organization ID, name, cluster name, bootstrap servers, authentication, and optional Schema Registry URL.


kafka config

Manage configurations and switch clusters.

kafka config list                              # List all orgs & clusters
kafka config current                           # Show current active config
kafka config use                               # Switch cluster (interactive)
kafka config use acme-corp prod                # Switch cluster (direct)
kafka config test                              # Test current connection
kafka config delete acme-corp staging --yes    # Remove a cluster config

Config file: ~/.kafka-cli/config.yaml

Example config structure:

organizations:
  acme-corp:
    name: Acme Corporation
    clusters:
      dev:
        bootstrap_servers: localhost:9092
        auth:
          mechanism: none
        schema_registry:
          url: http://localhost:8081
      prod:
        bootstrap_servers: kafka-prod-1:9092,kafka-prod-2:9092
        auth:
          mechanism: sasl_scram_256
          username: ${KAFKA_USER}
          password: ${KAFKA_PASSWORD}
          security_protocol: SASL_SSL
default:
  organization: acme-corp
  cluster: dev

kafka cluster

Cluster information and health.

kafka cluster info                             # Cluster metadata
kafka cluster health                           # Broker health check
kafka cluster brokers                          # List brokers
kafka cluster topics-summary                   # Topic/partition overview

kafka topic

Manage Kafka topics.

kafka topic list                               # List all topics
kafka topic list --filter "order*"             # Glob filter
kafka topic create my-topic -p 6 -r 3         # Create topic
kafka topic describe my-topic                  # Full topic detail
kafka topic alter my-topic --config retention.ms=604800000
kafka topic delete my-topic                    # Delete topic
kafka topic consume-offsets my-topic           # Consumer group lag per topic

Global flags for all cluster commands:

kafka topic list --org acme-corp --cluster prod

kafka group

Manage consumer groups.

kafka group list                               # List all groups
kafka group list --state stable                # Filter by state
kafka group describe my-group                  # Group detail + offsets
kafka group delete my-group                    # Delete group
kafka group reset-offsets my-group \
  --topic my-topic --to-earliest --execute     # Reset offsets

kafka produce

Send messages to a topic.

kafka produce my-topic                                # Interactive mode
kafka produce my-topic -m "hello world"               # Single message
kafka produce my-topic -k "key1" -m '{"data": 1}'    # With key
kafka produce my-topic --file data.jsonl              # Bulk from file
kafka produce my-topic --file data.jsonl --key-field "id" --rate 100

kafka consume

Read messages from a topic.

kafka consume my-topic                                # Live tail
kafka consume my-topic --from-beginning --limit 10    # First 10 messages
kafka consume my-topic --group my-reader              # With consumer group
kafka consume my-topic --follow                       # Continuous (tail -f)
kafka consume my-topic --output json                  # JSON for piping
kafka consume my-topic --output raw                   # Values only
kafka consume my-topic --schema-registry              # Auto-deserialize

kafka schema

Schema Registry management (Avro, JSON Schema, Protobuf).

kafka schema list                                     # List subjects
kafka schema get my-topic-value                       # Get latest schema
kafka schema get my-topic-value --version 3           # Specific version
kafka schema versions my-topic-value                  # List versions
kafka schema create my-topic-value --file schema.avsc # Register schema
kafka schema test my-topic-value --file schema.avsc   # Test compatibility
kafka schema delete my-topic-value                    # Soft delete
kafka schema config --level BACKWARD                  # Set compatibility

kafka acl

ACL (Access Control List) management.

kafka acl list                                        # List all ACLs
kafka acl list --principal "User:alice"               # Filter by user
kafka acl create --principal "User:alice" \
  --topic my-topic --operation read --permission allow
kafka acl delete --principal "User:alice" \
  --topic my-topic --operation read --yes

kafka history

View CLI command history.

kafka history                  # Show recent commands (default: 50)
kafka history -n 10            # Last 10
kafka history --clear          # Clear history

Multi-Cluster Workflow

# Set up dev cluster
kafka init
# Org: acme-corp, Cluster: dev, Bootstrap: localhost:9092

# Add prod cluster
kafka init
# Select: acme-corp, Cluster: prod, Bootstrap: kafka-prod-1:9092,...

# Switch between clusters
kafka config use acme-corp dev
kafka config use acme-corp prod

# Run against specific cluster without switching
kafka topic list --org acme-corp --cluster prod

Development

# Install with dev dependencies
poetry install

# Run tests
pytest

# Run tests with coverage
pytest --cov=kafka_cli --cov-report=html

# Lint
ruff check src/
ruff format src/

# Type check
mypy src/

Project Structure

src/kafka_cli/
  main.py              # Typer app entry point
  commands/
    init.py            # kafka init
    config.py          # kafka config [list|use|current|delete|test]
    cluster.py         # kafka cluster [info|health|brokers|topics-summary]
    topic.py           # kafka topic [list|create|describe|alter|delete]
    group.py           # kafka group [list|describe|delete|reset-offsets]
    produce.py         # kafka produce
    consume.py         # kafka consume
    schema.py          # kafka schema [list|get|create|delete|test|versions|config]
    acl.py             # kafka acl [list|create|delete]
    history.py         # kafka history
  core/
    config.py          # Config load/save (~/.kafka-cli/)
    context.py         # Current org/cluster context
    client.py          # KafkaClientFactory (confluent-kafka wrapper)
    admin_client.py    # Admin operations (topics, groups, ACLs)
    producer.py        # Producer wrapper
    consumer.py        # Consumer wrapper
    schema_client.py   # Schema Registry HTTP client
    history.py         # Command history tracking
  models/              # Pydantic schemas
  serializers/         # Message serialization (string, JSON, Avro, Protobuf)
  ui/
    console.py         # Rich console helpers
    prompts.py         # Questionary wrappers
    tables.py          # Rich table builders
  utils/
    validators.py      # Input validators
    clipboard.py       # Copy to clipboard
    formatters.py      # Byte/timestamp formatting

Documentation

License

GPL-3.0-or-later

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafka_util-0.1.2.tar.gz (35.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kafka_util-0.1.2-py3-none-any.whl (50.1 kB view details)

Uploaded Python 3

File details

Details for the file kafka_util-0.1.2.tar.gz.

File metadata

  • Download URL: kafka_util-0.1.2.tar.gz
  • Upload date:
  • Size: 35.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.10.13 Darwin/25.3.0

File hashes

Hashes for kafka_util-0.1.2.tar.gz
Algorithm Hash digest
SHA256 79b619cd52f658f029fefef9842da6a795eb10c7240fd3e0aebbab9004e5477c
MD5 58c297a4bb48d0e8d18193aa5527baf0
BLAKE2b-256 6b79df7f5050112f20d0fffe792179e197622955d5652a97d1055bc5af3aa11a

See more details on using hashes here.

File details

Details for the file kafka_util-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: kafka_util-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 50.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.10.13 Darwin/25.3.0

File hashes

Hashes for kafka_util-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 4ac63945c4cccb3d0e88b46a8d34b77c56439c0c63c53912eb190fd6db37a2ea
MD5 bb300ac83106ea397338dc819e145028
BLAKE2b-256 f46a925ff09b9ad9e8e45dae9fbc6e8f06d736e16b900eb0df9d8158f2a79b48

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page