Skip to main content

Model Context Protocol server for Apache Kafka

Project description

MCP Kafka

Category Status
Build & CI CI CodeQL Pre-commit Dependency Review License Compliance
SonarCloud Quality Gate Status Coverage Maintainability Rating Reliability Rating Security Rating
Security Bandit Dependabot
Technology Python 3.11-3.14 Kafka License: MIT Code style: ruff type-checked: mypy MCP

A Model Context Protocol (MCP) server for Apache Kafka that provides AI assistants with safe, controlled access to Kafka clusters.

Features

  • 12 Kafka tools for topic management, message operations, consumer groups, and cluster info
  • 2-tier access control: READ (default) and READ/WRITE modes
  • Universal Kafka support: SASL (PLAIN, SCRAM, GSSAPI) and mTLS authentication
  • Safety controls: Protected internal topics, consumer group validation, message size limits
  • Built with FastMCP: Modern MCP server implementation

Installation

# Using uv (recommended)
uv add mcp-kafka

# Using pip
pip install mcp-kafka

Quick Start

1. Configure Environment

# Basic connection
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092

# SASL authentication
export KAFKA_SECURITY_PROTOCOL=SASL_SSL
export KAFKA_SASL_MECHANISM=SCRAM-SHA-256
export KAFKA_SASL_USERNAME=your-username
export KAFKA_SASL_PASSWORD=your-password

# Enable write operations (disabled by default)
export SAFETY_ALLOW_WRITE_OPERATIONS=true

2. Run the Server

# stdio transport (default, for MCP clients)
uv run mcp-kafka

# HTTP transport (for web integrations)
uv run mcp-kafka --transport http --host 127.0.0.1 --port 8000

# Using convenience scripts
./scripts/http-read.sh      # Read-only HTTP server
./scripts/http-readwrite.sh # Read-write HTTP server

CLI Options

Option Default Description
--transport stdio Transport type: stdio or http
--host 127.0.0.1 Host to bind (HTTP only)
--port 8000 Port to bind (HTTP only)
--health-check - Run health check and exit
--version, -v - Show version and exit

3. Connect to MCP Client

Add to your MCP client configuration (e.g., Claude Desktop):

{
  "mcpServers": {
    "kafka": {
      "command": "uv",
      "args": ["run", "mcp-kafka"],
      "env": {
        "KAFKA_BOOTSTRAP_SERVERS": "localhost:9092"
      }
    }
  }
}

Available Tools

Topic Management

Tool Access Description
kafka_list_topics READ List all topics with partition counts
kafka_describe_topic READ Get detailed topic info and configuration
kafka_create_topic READ/WRITE Create a new topic with partitions, replication factor, and config

Message Operations

Tool Access Description
kafka_consume_messages READ Peek at messages (no offset commit)
kafka_produce_message READ/WRITE Produce a message with optional key, headers, and partition

Consumer Group Management

Tool Access Description
kafka_list_consumer_groups READ List all consumer groups
kafka_describe_consumer_group READ Get group details, members, and lag
kafka_get_consumer_lag READ Get lag per partition
kafka_reset_offsets READ/WRITE Reset consumer group offsets to earliest, latest, or specific offset

Cluster Information

Tool Access Description
kafka_cluster_info READ Get cluster metadata
kafka_list_brokers READ List all brokers
kafka_get_watermarks READ Get topic partition watermarks

Configuration

For detailed configuration options and examples, see CONFIGURATION.md.

Environment Variables

Kafka Connection

Variable Default Description
KAFKA_BOOTSTRAP_SERVERS localhost:9092 Kafka broker addresses
KAFKA_SECURITY_PROTOCOL PLAINTEXT Security protocol (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL)
KAFKA_CLIENT_ID mcp-kafka Client identifier
KAFKA_TIMEOUT 30 Operation timeout in seconds

SASL Authentication

Variable Default Description
KAFKA_SASL_MECHANISM - SASL mechanism (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI)
KAFKA_SASL_USERNAME - SASL username
KAFKA_SASL_PASSWORD - SASL password
KAFKA_SASL_KERBEROS_SERVICE_NAME kafka Kerberos service name
KAFKA_SASL_KERBEROS_KEYTAB - Path to Kerberos keytab
KAFKA_SASL_KERBEROS_PRINCIPAL - Kerberos principal

SSL/TLS

Variable Default Description
KAFKA_SSL_CA_LOCATION - CA certificate path
KAFKA_SSL_CERTIFICATE_LOCATION - Client certificate path
KAFKA_SSL_KEY_LOCATION - Client key path
KAFKA_SSL_KEY_PASSWORD - Client key password

Safety Controls

Variable Default Description
SAFETY_ALLOW_WRITE_OPERATIONS false Enable READ/WRITE tools
SAFETY_MAX_MESSAGE_SIZE 1048576 Max message size in bytes (1MB)
SAFETY_MAX_CONSUME_MESSAGES 100 Max messages per consume request
SAFETY_TOPIC_BLOCKLIST - Comma-separated blocked topic patterns

Security Controls

Variable Default Description
SECURITY_RATE_LIMIT_ENABLED true Enable rate limiting
SECURITY_RATE_LIMIT_RPM 60 Max requests per minute
SECURITY_AUDIT_LOG_ENABLED true Enable audit logging
SECURITY_AUDIT_LOG_FILE mcp_audit.log Audit log file path
SECURITY_OAUTH_ENABLED false Enable OAuth/OIDC authentication
SECURITY_OAUTH_ISSUER - OAuth issuer URL (e.g., https://auth.example.com)
SECURITY_OAUTH_AUDIENCE - Expected JWT audience claim
SECURITY_OAUTH_JWKS_URL - JWKS endpoint URL (auto-derived from issuer if not set)

Access Control

MCP Kafka uses a simple 2-tier access control system:

READ Access (Default)

  • List topics, consumer groups, and brokers
  • Describe topics and consumer groups
  • Consume messages (read-only peek)
  • Get cluster info and watermarks

READ/WRITE Access

Requires SAFETY_ALLOW_WRITE_OPERATIONS=true:

  • Create topics
  • Produce messages
  • Reset consumer group offsets

Protected Resources

The following resources are always protected:

  • Internal topics: __consumer_offsets, __transaction_state
  • Internal consumer groups: Groups starting with __
  • Topics in blocklist: Configured via SAFETY_TOPIC_BLOCKLIST

Development

Prerequisites

  • Python 3.11+
  • uv package manager

Setup

# Clone repository
git clone https://github.com/williajm/mcp_kafka.git
cd mcp_kafka

# Install dependencies
uv sync --all-extras

# Run tests
uv run pytest

# Run linting
uv run ruff check src/ tests/
uv run ruff format --check src/ tests/

# Run type checking
uv run mypy src/

Local Kafka for Testing

A Docker Compose environment is provided for local development:

# Start Kafka
docker compose -f docker/docker-compose.yml up -d

# Create test topics
docker compose -f docker/docker-compose.yml exec kafka \
  kafka-topics --create --topic test-topic --partitions 3 --replication-factor 1 \
  --bootstrap-server localhost:9092

# Stop Kafka
docker compose -f docker/docker-compose.yml down

License

MIT License - see LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mcp_kafka-0.1.0.tar.gz (89.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mcp_kafka-0.1.0-py3-none-any.whl (52.9 kB view details)

Uploaded Python 3

File details

Details for the file mcp_kafka-0.1.0.tar.gz.

File metadata

  • Download URL: mcp_kafka-0.1.0.tar.gz
  • Upload date:
  • Size: 89.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for mcp_kafka-0.1.0.tar.gz
Algorithm Hash digest
SHA256 68243a06e5b93f4d1959e4f892ebd82f2e10bf8909a836c4485749fca4eab73a
MD5 2f7d62b517ea57bf924820c17e92b364
BLAKE2b-256 f80c4eff958fcfa7d885b37c27e96288ba61d3ffe1d8a31d2d5027829632ece0

See more details on using hashes here.

Provenance

The following attestation bundles were made for mcp_kafka-0.1.0.tar.gz:

Publisher: release.yml on williajm/mcp_kafka

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file mcp_kafka-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: mcp_kafka-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 52.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for mcp_kafka-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9fd29589633819d51ba562dba6c23247075c0cc1a0fade63e93f94748d20e56a
MD5 47537e0a78b34e70a7586edfcbda232d
BLAKE2b-256 ba5b42eec016c7dafdae72e742c71504da500567fe8ee20ce3261bca334af3fd

See more details on using hashes here.

Provenance

The following attestation bundles were made for mcp_kafka-0.1.0-py3-none-any.whl:

Publisher: release.yml on williajm/mcp_kafka

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page