Skip to main content

Model Context Protocol server for Apache Kafka

Project description

MCP Kafka

Category Status
Build & CI CI CodeQL Pre-commit Dependency Review License Compliance
SonarCloud Quality Gate Status Coverage Maintainability Rating Reliability Rating Security Rating
Security Bandit Dependabot
Technology Python 3.11-3.14 Kafka License: MIT Code style: ruff type-checked: mypy MCP

A Model Context Protocol (MCP) server for Apache Kafka that provides AI assistants with safe, controlled access to Kafka clusters.

Features

  • 12 Kafka tools for topic management, message operations, consumer groups, and cluster info
  • 2-tier access control: READ (default) and READ/WRITE modes
  • Universal Kafka support: SASL (PLAIN, SCRAM, GSSAPI) and mTLS authentication
  • Safety controls: Protected internal topics, consumer group validation, message size limits
  • Built with FastMCP: Modern MCP server implementation

Installation

# Using uv (recommended)
uv add mcp-kafka

# Using pip
pip install mcp-kafka

Quick Start

1. Configure Environment

# Basic connection
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092

# SASL authentication
export KAFKA_SECURITY_PROTOCOL=SASL_SSL
export KAFKA_SASL_MECHANISM=SCRAM-SHA-256
export KAFKA_SASL_USERNAME=your-username
export KAFKA_SASL_PASSWORD=your-password

# Enable write operations (disabled by default)
export SAFETY_ALLOW_WRITE_OPERATIONS=true

2. Run the Server

# stdio transport (default, for MCP clients)
uv run mcp-kafka

# HTTP transport (for web integrations)
uv run mcp-kafka --transport http --host 127.0.0.1 --port 8000

# Using convenience scripts
./scripts/http-read.sh      # Read-only HTTP server
./scripts/http-readwrite.sh # Read-write HTTP server

CLI Options

Option Default Description
--transport stdio Transport type: stdio or http
--host 127.0.0.1 Host to bind (HTTP only)
--port 8000 Port to bind (HTTP only)
--health-check - Run health check and exit
--version, -v - Show version and exit

3. Connect to MCP Client

Add to your MCP client configuration (e.g., Claude Desktop):

{
  "mcpServers": {
    "kafka": {
      "command": "uv",
      "args": ["run", "mcp-kafka"],
      "env": {
        "KAFKA_BOOTSTRAP_SERVERS": "localhost:9092"
      }
    }
  }
}

Available Tools

Topic Management

Tool Access Description
kafka_list_topics READ List all topics with partition counts
kafka_describe_topic READ Get detailed topic info and configuration
kafka_create_topic READ/WRITE Create a new topic with partitions, replication factor, and config

Message Operations

Tool Access Description
kafka_consume_messages READ Peek at messages (no offset commit)
kafka_produce_message READ/WRITE Produce a message with optional key, headers, and partition

Consumer Group Management

Tool Access Description
kafka_list_consumer_groups READ List all consumer groups
kafka_describe_consumer_group READ Get group details, members, and lag
kafka_get_consumer_lag READ Get lag per partition
kafka_reset_offsets READ/WRITE Reset consumer group offsets to earliest, latest, or specific offset

Cluster Information

Tool Access Description
kafka_cluster_info READ Get cluster metadata
kafka_list_brokers READ List all brokers
kafka_get_watermarks READ Get topic partition watermarks

Configuration

For detailed configuration options and examples, see CONFIGURATION.md.

Environment Variables

Kafka Connection

Variable Default Description
KAFKA_BOOTSTRAP_SERVERS localhost:9092 Kafka broker addresses
KAFKA_SECURITY_PROTOCOL PLAINTEXT Security protocol (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL)
KAFKA_CLIENT_ID mcp-kafka Client identifier
KAFKA_TIMEOUT 30 Operation timeout in seconds

SASL Authentication

Variable Default Description
KAFKA_SASL_MECHANISM - SASL mechanism (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI)
KAFKA_SASL_USERNAME - SASL username
KAFKA_SASL_PASSWORD - SASL password
KAFKA_SASL_KERBEROS_SERVICE_NAME kafka Kerberos service name
KAFKA_SASL_KERBEROS_KEYTAB - Path to Kerberos keytab
KAFKA_SASL_KERBEROS_PRINCIPAL - Kerberos principal

SSL/TLS

Variable Default Description
KAFKA_SSL_CA_LOCATION - CA certificate path
KAFKA_SSL_CERTIFICATE_LOCATION - Client certificate path
KAFKA_SSL_KEY_LOCATION - Client key path
KAFKA_SSL_KEY_PASSWORD - Client key password

Safety Controls

Variable Default Description
SAFETY_ALLOW_WRITE_OPERATIONS false Enable READ/WRITE tools
SAFETY_MAX_MESSAGE_SIZE 1048576 Max message size in bytes (1MB)
SAFETY_MAX_CONSUME_MESSAGES 100 Max messages per consume request
SAFETY_TOPIC_BLOCKLIST - Comma-separated blocked topic patterns

Security Controls

Variable Default Description
SECURITY_RATE_LIMIT_ENABLED true Enable rate limiting
SECURITY_RATE_LIMIT_RPM 60 Max requests per minute
SECURITY_AUDIT_LOG_ENABLED true Enable audit logging
SECURITY_AUDIT_LOG_FILE mcp_audit.log Audit log file path
SECURITY_OAUTH_ENABLED false Enable OAuth/OIDC authentication
SECURITY_OAUTH_ISSUER - OAuth issuer URL (e.g., https://auth.example.com)
SECURITY_OAUTH_AUDIENCE - Expected JWT audience claim
SECURITY_OAUTH_JWKS_URL - JWKS endpoint URL (auto-derived from issuer if not set)

Access Control

MCP Kafka uses a simple 2-tier access control system:

READ Access (Default)

  • List topics, consumer groups, and brokers
  • Describe topics and consumer groups
  • Consume messages (read-only peek)
  • Get cluster info and watermarks

READ/WRITE Access

Requires SAFETY_ALLOW_WRITE_OPERATIONS=true:

  • Create topics
  • Produce messages
  • Reset consumer group offsets

Protected Resources

The following resources are always protected:

  • Internal topics: __consumer_offsets, __transaction_state
  • Internal consumer groups: Groups starting with __
  • Topics in blocklist: Configured via SAFETY_TOPIC_BLOCKLIST

Development

Prerequisites

  • Python 3.11+
  • uv package manager

Setup

# Clone repository
git clone https://github.com/williajm/mcp_kafka.git
cd mcp_kafka

# Install dependencies
uv sync --all-extras

# Run tests
uv run pytest

# Run linting
uv run ruff check src/ tests/
uv run ruff format --check src/ tests/

# Run type checking
uv run mypy src/

Local Kafka for Testing

A Docker Compose environment is provided for local development:

# Start Kafka
docker compose -f docker/docker-compose.yml up -d

# Create test topics
docker compose -f docker/docker-compose.yml exec kafka \
  kafka-topics --create --topic test-topic --partitions 3 --replication-factor 1 \
  --bootstrap-server localhost:9092

# Stop Kafka
docker compose -f docker/docker-compose.yml down

License

MIT License - see LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mcp_kafka-0.1.1.tar.gz (90.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mcp_kafka-0.1.1-py3-none-any.whl (52.9 kB view details)

Uploaded Python 3

File details

Details for the file mcp_kafka-0.1.1.tar.gz.

File metadata

  • Download URL: mcp_kafka-0.1.1.tar.gz
  • Upload date:
  • Size: 90.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for mcp_kafka-0.1.1.tar.gz
Algorithm Hash digest
SHA256 0a8e9dc6ce50497a600a8f1d5377eab3528ca255f998f3bb366e4cd85fdcf969
MD5 64b335be5633c23ca499fadfc8f81382
BLAKE2b-256 938fdc7d8b599eb94d3e13d7f36932cf49407e9ae26557ca72a1d18a9d3112bf

See more details on using hashes here.

Provenance

The following attestation bundles were made for mcp_kafka-0.1.1.tar.gz:

Publisher: release.yml on williajm/mcp_kafka

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file mcp_kafka-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: mcp_kafka-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 52.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for mcp_kafka-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 15550d79a11c2b2fab42d3f3e90bf31b73a7c1e934d85d6582ceddfe0d10aaa2
MD5 f7e0419b3054ccf2701101b6b2d76544
BLAKE2b-256 e23aebec970d6dc39f81c5fdbb93334b9f07b3625902fa16b4b1da8a595c97a0

See more details on using hashes here.

Provenance

The following attestation bundles were made for mcp_kafka-0.1.1-py3-none-any.whl:

Publisher: release.yml on williajm/mcp_kafka

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page