Model Context Protocol server for Apache Kafka
Project description
MCP Kafka
| Category | Status |
|---|---|
| Build & CI | |
| SonarCloud | |
| Security | |
| Technology |
A Model Context Protocol (MCP) server for Apache Kafka that provides AI assistants with safe, controlled access to Kafka clusters.
Features
- 12 Kafka tools for topic management, message operations, consumer groups, and cluster info
- 2-tier access control: READ (default) and READ/WRITE modes
- Universal Kafka support: SASL (PLAIN, SCRAM, GSSAPI) and mTLS authentication
- Safety controls: Protected internal topics, consumer group validation, message size limits
- Built with FastMCP: Modern MCP server implementation
Installation
# Using uv (recommended)
uv add mcp-kafka
# Using pip
pip install mcp-kafka
Quick Start
1. Configure Environment
# Basic connection
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
# SASL authentication
export KAFKA_SECURITY_PROTOCOL=SASL_SSL
export KAFKA_SASL_MECHANISM=SCRAM-SHA-256
export KAFKA_SASL_USERNAME=your-username
export KAFKA_SASL_PASSWORD=your-password
# Enable write operations (disabled by default)
export SAFETY_ALLOW_WRITE_OPERATIONS=true
2. Run the Server
# stdio transport (default, for MCP clients)
uv run mcp-kafka
# HTTP transport (for web integrations)
uv run mcp-kafka --transport http --host 127.0.0.1 --port 8000
# Using convenience scripts
./scripts/http-read.sh # Read-only HTTP server
./scripts/http-readwrite.sh # Read-write HTTP server
CLI Options
| Option | Default | Description |
|---|---|---|
--transport |
stdio |
Transport type: stdio or http |
--host |
127.0.0.1 |
Host to bind (HTTP only) |
--port |
8000 |
Port to bind (HTTP only) |
--health-check |
- | Run health check and exit |
--version, -v |
- | Show version and exit |
3. Connect to MCP Client
Add to your MCP client configuration (e.g., Claude Desktop):
{
"mcpServers": {
"kafka": {
"command": "uv",
"args": ["run", "mcp-kafka"],
"env": {
"KAFKA_BOOTSTRAP_SERVERS": "localhost:9092"
}
}
}
}
Available Tools
Topic Management
| Tool | Access | Description |
|---|---|---|
kafka_list_topics |
READ | List all topics with partition counts |
kafka_describe_topic |
READ | Get detailed topic info and configuration |
kafka_create_topic |
READ/WRITE | Create a new topic with partitions, replication factor, and config |
Message Operations
| Tool | Access | Description |
|---|---|---|
kafka_consume_messages |
READ | Peek at messages (no offset commit) |
kafka_produce_message |
READ/WRITE | Produce a message with optional key, headers, and partition |
Consumer Group Management
| Tool | Access | Description |
|---|---|---|
kafka_list_consumer_groups |
READ | List all consumer groups |
kafka_describe_consumer_group |
READ | Get group details, members, and lag |
kafka_get_consumer_lag |
READ | Get lag per partition |
kafka_reset_offsets |
READ/WRITE | Reset consumer group offsets to earliest, latest, or specific offset |
Cluster Information
| Tool | Access | Description |
|---|---|---|
kafka_cluster_info |
READ | Get cluster metadata |
kafka_list_brokers |
READ | List all brokers |
kafka_get_watermarks |
READ | Get topic partition watermarks |
Configuration
For detailed configuration options and examples, see CONFIGURATION.md.
Environment Variables
Kafka Connection
| Variable | Default | Description |
|---|---|---|
KAFKA_BOOTSTRAP_SERVERS |
localhost:9092 |
Kafka broker addresses |
KAFKA_SECURITY_PROTOCOL |
PLAINTEXT |
Security protocol (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL) |
KAFKA_CLIENT_ID |
mcp-kafka |
Client identifier |
KAFKA_TIMEOUT |
30 |
Operation timeout in seconds |
SASL Authentication
| Variable | Default | Description |
|---|---|---|
KAFKA_SASL_MECHANISM |
- | SASL mechanism (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI) |
KAFKA_SASL_USERNAME |
- | SASL username |
KAFKA_SASL_PASSWORD |
- | SASL password |
KAFKA_SASL_KERBEROS_SERVICE_NAME |
kafka |
Kerberos service name |
KAFKA_SASL_KERBEROS_KEYTAB |
- | Path to Kerberos keytab |
KAFKA_SASL_KERBEROS_PRINCIPAL |
- | Kerberos principal |
SSL/TLS
| Variable | Default | Description |
|---|---|---|
KAFKA_SSL_CA_LOCATION |
- | CA certificate path |
KAFKA_SSL_CERTIFICATE_LOCATION |
- | Client certificate path |
KAFKA_SSL_KEY_LOCATION |
- | Client key path |
KAFKA_SSL_KEY_PASSWORD |
- | Client key password |
Safety Controls
| Variable | Default | Description |
|---|---|---|
SAFETY_ALLOW_WRITE_OPERATIONS |
false |
Enable READ/WRITE tools |
SAFETY_MAX_MESSAGE_SIZE |
1048576 |
Max message size in bytes (1MB) |
SAFETY_MAX_CONSUME_MESSAGES |
100 |
Max messages per consume request |
SAFETY_TOPIC_BLOCKLIST |
- | Comma-separated blocked topic patterns |
Security Controls
| Variable | Default | Description |
|---|---|---|
SECURITY_RATE_LIMIT_ENABLED |
true |
Enable rate limiting |
SECURITY_RATE_LIMIT_RPM |
60 |
Max requests per minute |
SECURITY_AUDIT_LOG_ENABLED |
true |
Enable audit logging |
SECURITY_AUDIT_LOG_FILE |
mcp_audit.log |
Audit log file path |
SECURITY_OAUTH_ENABLED |
false |
Enable OAuth/OIDC authentication |
SECURITY_OAUTH_ISSUER |
- | OAuth issuer URL (e.g., https://auth.example.com) |
SECURITY_OAUTH_AUDIENCE |
- | Expected JWT audience claim |
SECURITY_OAUTH_JWKS_URL |
- | JWKS endpoint URL (auto-derived from issuer if not set) |
Access Control
MCP Kafka uses a simple 2-tier access control system:
READ Access (Default)
- List topics, consumer groups, and brokers
- Describe topics and consumer groups
- Consume messages (read-only peek)
- Get cluster info and watermarks
READ/WRITE Access
Requires SAFETY_ALLOW_WRITE_OPERATIONS=true:
- Create topics
- Produce messages
- Reset consumer group offsets
Protected Resources
The following resources are always protected:
- Internal topics:
__consumer_offsets,__transaction_state - Internal consumer groups: Groups starting with
__ - Topics in blocklist: Configured via
SAFETY_TOPIC_BLOCKLIST
Development
Prerequisites
- Python 3.11+
- uv package manager
Setup
# Clone repository
git clone https://github.com/williajm/mcp_kafka.git
cd mcp_kafka
# Install dependencies
uv sync --all-extras
# Run tests
uv run pytest
# Run linting
uv run ruff check src/ tests/
uv run ruff format --check src/ tests/
# Run type checking
uv run mypy src/
Local Kafka for Testing
A Docker Compose environment is provided for local development:
# Start Kafka
docker compose -f docker/docker-compose.yml up -d
# Create test topics
docker compose -f docker/docker-compose.yml exec kafka \
kafka-topics --create --topic test-topic --partitions 3 --replication-factor 1 \
--bootstrap-server localhost:9092
# Stop Kafka
docker compose -f docker/docker-compose.yml down
License
MIT License - see LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mcp_kafka-0.1.1.tar.gz.
File metadata
- Download URL: mcp_kafka-0.1.1.tar.gz
- Upload date:
- Size: 90.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0a8e9dc6ce50497a600a8f1d5377eab3528ca255f998f3bb366e4cd85fdcf969
|
|
| MD5 |
64b335be5633c23ca499fadfc8f81382
|
|
| BLAKE2b-256 |
938fdc7d8b599eb94d3e13d7f36932cf49407e9ae26557ca72a1d18a9d3112bf
|
Provenance
The following attestation bundles were made for mcp_kafka-0.1.1.tar.gz:
Publisher:
release.yml on williajm/mcp_kafka
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
mcp_kafka-0.1.1.tar.gz -
Subject digest:
0a8e9dc6ce50497a600a8f1d5377eab3528ca255f998f3bb366e4cd85fdcf969 - Sigstore transparency entry: 940506235
- Sigstore integration time:
-
Permalink:
williajm/mcp_kafka@c2fb2a0b85398969d78a2d7df89ecc8c514149ac -
Branch / Tag:
refs/tags/v0.1.1 - Owner: https://github.com/williajm
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@c2fb2a0b85398969d78a2d7df89ecc8c514149ac -
Trigger Event:
release
-
Statement type:
File details
Details for the file mcp_kafka-0.1.1-py3-none-any.whl.
File metadata
- Download URL: mcp_kafka-0.1.1-py3-none-any.whl
- Upload date:
- Size: 52.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
15550d79a11c2b2fab42d3f3e90bf31b73a7c1e934d85d6582ceddfe0d10aaa2
|
|
| MD5 |
f7e0419b3054ccf2701101b6b2d76544
|
|
| BLAKE2b-256 |
e23aebec970d6dc39f81c5fdbb93334b9f07b3625902fa16b4b1da8a595c97a0
|
Provenance
The following attestation bundles were made for mcp_kafka-0.1.1-py3-none-any.whl:
Publisher:
release.yml on williajm/mcp_kafka
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
mcp_kafka-0.1.1-py3-none-any.whl -
Subject digest:
15550d79a11c2b2fab42d3f3e90bf31b73a7c1e934d85d6582ceddfe0d10aaa2 - Sigstore transparency entry: 940506247
- Sigstore integration time:
-
Permalink:
williajm/mcp_kafka@c2fb2a0b85398969d78a2d7df89ecc8c514149ac -
Branch / Tag:
refs/tags/v0.1.1 - Owner: https://github.com/williajm
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@c2fb2a0b85398969d78a2d7df89ecc8c514149ac -
Trigger Event:
release
-
Statement type: