Skip to main content

MCP server for managing Kafka Connect clusters

Project description

kafka-connect-mcp

Tests

An MCP server that exposes Kafka Connect REST API operations as tools, letting LLMs manage connectors, tasks, and plugins through natural language.

Tools

Tool Kafka Connect Endpoint Description
get_cluster_info GET / Cluster version and metadata
list_connectors GET /connectors List all connector names
get_connector GET /connectors/{name} Connector info, config, and tasks
get_connector_status GET /connectors/{name}/status Connector and task states
get_connector_config GET /connectors/{name}/config Connector configuration
create_connector POST /connectors Create a new connector
update_connector_config PUT /connectors/{name}/config Replace connector configuration
delete_connector DELETE /connectors/{name} Delete a connector
pause_connector PUT /connectors/{name}/pause Pause a connector
resume_connector PUT /connectors/{name}/resume Resume a paused connector
restart_connector POST /connectors/{name}/restart Restart a connector (optionally tasks)
get_task_status GET /connectors/{name}/tasks/{id}/status Status of a specific task
restart_task POST /connectors/{name}/tasks/{id}/restart Restart a specific task
list_connector_plugins GET /connector-plugins Available plugins on the cluster
validate_connector_config PUT /connector-plugins/{name}/config/validate Validate config against plugin schema

Setup

Prerequisites

  • Python 3.12+
  • uv
  • A running Kafka Connect cluster (or use the included Docker Compose)

Install

uv sync

Add to Claude Code

claude mcp add kafka-connect \
  -e KAFKA_CONNECT_URL=http://localhost:8083 \
  -- uv --directory /path/to/kafka-connect-mcp run kafka-connect-mcp

Configuration

Environment Variable Default Description
KAFKA_CONNECT_URL http://localhost:8083 Kafka Connect REST API base URL
KAFKA_CONNECT_ENABLE_CREATE false Allow create_connector
KAFKA_CONNECT_ENABLE_UPDATE false Allow update_connector_config
KAFKA_CONNECT_ENABLE_DELETE false Allow delete_connector
KAFKA_CONNECT_ENABLE_PAUSE_RESUME false Allow pause_connector and resume_connector
KAFKA_CONNECT_ENABLE_RESTART false Allow restart_connector and restart_task
KAFKA_CONNECT_MUTATION_ALLOWLIST (empty) Optional comma-separated connector allowlist for mutating operations

Safe mode (capability-gated)

This server is read-only by default because all mutation capabilities default to false. Mutating tools are blocked unless you explicitly enable the specific capability.

Examples:

Restart-only mode for selected connectors:

KAFKA_CONNECT_ENABLE_RESTART=true
KAFKA_CONNECT_MUTATION_ALLOWLIST=payments-sink,inventory-source

Enable all mutating operations (for development only):

KAFKA_CONNECT_ENABLE_CREATE=true
KAFKA_CONNECT_ENABLE_UPDATE=true
KAFKA_CONNECT_ENABLE_DELETE=true
KAFKA_CONNECT_ENABLE_PAUSE_RESUME=true
KAFKA_CONNECT_ENABLE_RESTART=true

Running

stdio (default, for Claude Code)

KAFKA_CONNECT_URL=http://localhost:8083 uv run kafka-connect-mcp

SSE (for Docker / remote)

uv run kafka-connect-mcp --transport sse --host 0.0.0.0 --port 8000

Docker Compose (full stack)

Spins up Zookeeper, Kafka, Kafka Connect (with the Datagen plugin), and the MCP server:

docker compose up --build -d
Service Port Description
zookeeper 2181 ZooKeeper
kafka 9092 Kafka broker
kafka-connect 8083 Kafka Connect REST API
mcp-server 8000 MCP server (SSE transport)

Example: create a Datagen connector

Once the stack is up, ask Claude to create a datagen connector, or do it directly:

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "datagen-users",
    "config": {
      "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
      "kafka.topic": "users",
      "quickstart": "users",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable": "false",
      "max.interval": "1000",
      "tasks.max": "1"
    }
  }'

Tests

uv run pytest tests/ -v

Tests use respx to mock HTTP calls to the Kafka Connect API -- no running cluster required.

Releases

Releases are automated from main:

  1. Bump project.version in pyproject.toml (for example 0.1.0 -> 0.1.1).
  2. Merge to main.
  3. GitHub Actions creates tag/release vX.Y.Z.
  4. The published release is automatically built and published to PyPI.

One-time PyPI setup

Set up PyPI Trusted Publishing for project kafka-connect-mcp:

  • Owner: lawrencemq
  • Repository: kafka-connect-mcp
  • Workflow: .github/workflows/release.yml
  • Environment: (none required by this workflow)

Project structure

kafka-connect-mcp/
├── pyproject.toml
├── Dockerfile
├── docker-compose.yml
├── src/kafka_connect_mcp/
│   ├── __init__.py
│   ├── safety.py            # Read-only and mutation policy gates
│   └── server.py            # MCP tools and entry point
└── tests/
    ├── conftest.py           # Shared fixtures (mock URL + respx router)
    ├── test_cluster.py
    ├── test_connectors.py
    ├── test_safety.py
    ├── test_tasks.py
    └── test_plugins.py

License

Licensed under the Apache License, Version 2.0. See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafka_connect_mcp-0.1.4.tar.gz (5.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kafka_connect_mcp-0.1.4-py3-none-any.whl (7.0 kB view details)

Uploaded Python 3

File details

Details for the file kafka_connect_mcp-0.1.4.tar.gz.

File metadata

  • Download URL: kafka_connect_mcp-0.1.4.tar.gz
  • Upload date:
  • Size: 5.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for kafka_connect_mcp-0.1.4.tar.gz
Algorithm Hash digest
SHA256 54bad591ce1d3d0befc1c666519824251b8affa184f1ff0b5048128293e7c7f0
MD5 04815247101240655a604bccd7bba293
BLAKE2b-256 104e3a52852f691a451f5c74140a9fdea6f1e9b7b7f4c11b03db4c6f8cff0cdb

See more details on using hashes here.

Provenance

The following attestation bundles were made for kafka_connect_mcp-0.1.4.tar.gz:

Publisher: release.yml on lawrencemq/kafka-connect-mcp

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file kafka_connect_mcp-0.1.4-py3-none-any.whl.

File metadata

File hashes

Hashes for kafka_connect_mcp-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 29afaa409a5728139568b5ae1cf8a3f43205de7ed1895e4205c6b5480ccfb19f
MD5 bdf62e0d6fd9fee4ec509307fb412729
BLAKE2b-256 3b978d4607efbb6b621a928f54ef2359e7ffd5daa171807b4ea55a7d57a86eef

See more details on using hashes here.

Provenance

The following attestation bundles were made for kafka_connect_mcp-0.1.4-py3-none-any.whl:

Publisher: release.yml on lawrencemq/kafka-connect-mcp

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page