MCP server for managing Kafka Connect clusters
Project description
kafka-connect-mcp
An MCP server that exposes Kafka Connect REST API operations as tools, letting LLMs manage connectors, tasks, and plugins through natural language.
Tools
| Tool | Kafka Connect Endpoint | Description |
|---|---|---|
get_cluster_info |
GET / |
Cluster version and metadata |
list_connectors |
GET /connectors |
List all connector names |
get_connector |
GET /connectors/{name} |
Connector info, config, and tasks |
get_connector_status |
GET /connectors/{name}/status |
Connector and task states |
get_connector_config |
GET /connectors/{name}/config |
Connector configuration |
create_connector |
POST /connectors |
Create a new connector |
update_connector_config |
PUT /connectors/{name}/config |
Replace connector configuration |
delete_connector |
DELETE /connectors/{name} |
Delete a connector |
pause_connector |
PUT /connectors/{name}/pause |
Pause a connector |
resume_connector |
PUT /connectors/{name}/resume |
Resume a paused connector |
restart_connector |
POST /connectors/{name}/restart |
Restart a connector (optionally tasks) |
get_task_status |
GET /connectors/{name}/tasks/{id}/status |
Status of a specific task |
restart_task |
POST /connectors/{name}/tasks/{id}/restart |
Restart a specific task |
list_connector_plugins |
GET /connector-plugins |
Available plugins on the cluster |
validate_connector_config |
PUT /connector-plugins/{name}/config/validate |
Validate config against plugin schema |
Setup
Prerequisites
- Python 3.12+
- uv
- A running Kafka Connect cluster (or use the included Docker Compose)
Install
uv sync
Add to Claude Code
claude mcp add kafka-connect \
-e KAFKA_CONNECT_URL=http://localhost:8083 \
-- uv --directory /path/to/kafka-connect-mcp run kafka-connect-mcp
Configuration
| Environment Variable | Default | Description |
|---|---|---|
KAFKA_CONNECT_URL |
http://localhost:8083 |
Kafka Connect REST API base URL |
KAFKA_CONNECT_ENABLE_CREATE |
false |
Allow create_connector |
KAFKA_CONNECT_ENABLE_UPDATE |
false |
Allow update_connector_config |
KAFKA_CONNECT_ENABLE_DELETE |
false |
Allow delete_connector |
KAFKA_CONNECT_ENABLE_PAUSE_RESUME |
false |
Allow pause_connector and resume_connector |
KAFKA_CONNECT_ENABLE_RESTART |
false |
Allow restart_connector and restart_task |
KAFKA_CONNECT_MUTATION_ALLOWLIST |
(empty) | Optional comma-separated connector allowlist for mutating operations |
Safe mode (capability-gated)
This server is read-only by default because all mutation capabilities default to false.
Mutating tools are blocked unless you explicitly enable the specific capability.
Examples:
Restart-only mode for selected connectors:
KAFKA_CONNECT_ENABLE_RESTART=true
KAFKA_CONNECT_MUTATION_ALLOWLIST=payments-sink,inventory-source
Enable all mutating operations (for development only):
KAFKA_CONNECT_ENABLE_CREATE=true
KAFKA_CONNECT_ENABLE_UPDATE=true
KAFKA_CONNECT_ENABLE_DELETE=true
KAFKA_CONNECT_ENABLE_PAUSE_RESUME=true
KAFKA_CONNECT_ENABLE_RESTART=true
Running
stdio (default, for Claude Code)
KAFKA_CONNECT_URL=http://localhost:8083 uv run kafka-connect-mcp
SSE (for Docker / remote)
uv run kafka-connect-mcp --transport sse --host 0.0.0.0 --port 8000
Docker Compose (full stack)
Spins up Zookeeper, Kafka, Kafka Connect (with the Datagen plugin), and the MCP server:
docker compose up --build -d
| Service | Port | Description |
|---|---|---|
| zookeeper | 2181 | ZooKeeper |
| kafka | 9092 | Kafka broker |
| kafka-connect | 8083 | Kafka Connect REST API |
| mcp-server | 8000 | MCP server (SSE transport) |
Example: create a Datagen connector
Once the stack is up, ask Claude to create a datagen connector, or do it directly:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "datagen-users",
"config": {
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "users",
"quickstart": "users",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"max.interval": "1000",
"tasks.max": "1"
}
}'
Tests
uv run pytest tests/ -v
Tests use respx to mock HTTP calls to the Kafka Connect API -- no running cluster required.
Releases
Releases are automated from main:
- Bump
project.versioninpyproject.toml(for example0.1.0->0.1.1). - Merge to
main. - GitHub Actions creates tag/release
vX.Y.Z. - The published release is automatically built and published to PyPI.
One-time PyPI setup
Set up PyPI Trusted Publishing for project kafka-connect-mcp:
- Owner:
lawrencemq - Repository:
kafka-connect-mcp - Workflow:
.github/workflows/release.yml - Environment: (none required by this workflow)
Project structure
kafka-connect-mcp/
├── pyproject.toml
├── Dockerfile
├── docker-compose.yml
├── src/kafka_connect_mcp/
│ ├── __init__.py
│ ├── safety.py # Read-only and mutation policy gates
│ └── server.py # MCP tools and entry point
└── tests/
├── conftest.py # Shared fixtures (mock URL + respx router)
├── test_cluster.py
├── test_connectors.py
├── test_safety.py
├── test_tasks.py
└── test_plugins.py
License
Licensed under the Apache License, Version 2.0. See LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kafka_connect_mcp-0.1.4.tar.gz.
File metadata
- Download URL: kafka_connect_mcp-0.1.4.tar.gz
- Upload date:
- Size: 5.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
54bad591ce1d3d0befc1c666519824251b8affa184f1ff0b5048128293e7c7f0
|
|
| MD5 |
04815247101240655a604bccd7bba293
|
|
| BLAKE2b-256 |
104e3a52852f691a451f5c74140a9fdea6f1e9b7b7f4c11b03db4c6f8cff0cdb
|
Provenance
The following attestation bundles were made for kafka_connect_mcp-0.1.4.tar.gz:
Publisher:
release.yml on lawrencemq/kafka-connect-mcp
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
kafka_connect_mcp-0.1.4.tar.gz -
Subject digest:
54bad591ce1d3d0befc1c666519824251b8affa184f1ff0b5048128293e7c7f0 - Sigstore transparency entry: 997961755
- Sigstore integration time:
-
Permalink:
lawrencemq/kafka-connect-mcp@a35e99632db0e1b471704fb57bb5215e97ce01ca -
Branch / Tag:
refs/heads/main - Owner: https://github.com/lawrencemq
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@a35e99632db0e1b471704fb57bb5215e97ce01ca -
Trigger Event:
push
-
Statement type:
File details
Details for the file kafka_connect_mcp-0.1.4-py3-none-any.whl.
File metadata
- Download URL: kafka_connect_mcp-0.1.4-py3-none-any.whl
- Upload date:
- Size: 7.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
29afaa409a5728139568b5ae1cf8a3f43205de7ed1895e4205c6b5480ccfb19f
|
|
| MD5 |
bdf62e0d6fd9fee4ec509307fb412729
|
|
| BLAKE2b-256 |
3b978d4607efbb6b621a928f54ef2359e7ffd5daa171807b4ea55a7d57a86eef
|
Provenance
The following attestation bundles were made for kafka_connect_mcp-0.1.4-py3-none-any.whl:
Publisher:
release.yml on lawrencemq/kafka-connect-mcp
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
kafka_connect_mcp-0.1.4-py3-none-any.whl -
Subject digest:
29afaa409a5728139568b5ae1cf8a3f43205de7ed1895e4205c6b5480ccfb19f - Sigstore transparency entry: 997961798
- Sigstore integration time:
-
Permalink:
lawrencemq/kafka-connect-mcp@a35e99632db0e1b471704fb57bb5215e97ce01ca -
Branch / Tag:
refs/heads/main - Owner: https://github.com/lawrencemq
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@a35e99632db0e1b471704fb57bb5215e97ce01ca -
Trigger Event:
push
-
Statement type: