Skip to main content

A client for the Confluent Platform Kafka Connect REST API.

Project description

Kafka Connect Python

The Kafka Connect REST API allows you to manage connectors that move data between Apache Kafka and other systems.

The Kafka Connect command line tool, also known as kc or kafka-connect, allows users to manage their Kafka Connect cluster and connectors. With this tool, users can retrieve information about the cluster and connectors, create new connectors, update existing connectors, delete connectors, and perform other actions.

This project aims to supported all features of the Kafka Connect REST API.

Install

pip install kafka-connect-py

Command Line Usage

This CLI tool is writen with Python Click.

Group options

Connect to a custom endpoint.

kc --url https://connect.example.com <sub-command>

Connect with basic auth.

kc --auth="thisismyusername:thisismypass" <sub-command>

Connect with insecure SSL certificate.

kc --no-ssl-verify <sub-command>

Change log level.

kc --log-level=[critical|error|warning|info|debug|notset] <sub-command>

Please see Click: Commands and Groups for more information.

Get Kafka Connect cluster info

kc info

Get a list of all connectors

kc list [--expand=status|info] [--pattern=regex] [--state=running|paused|unassigned|failed]

Get the details of a single connector

kc get <connector>

Get the config of a connector

kc config <connector>

Create a new connector with a JSON file

kc create --config-file <config-file>

Create a new connector with inline JSON data

kc create --config-data <config-data>

Update the configuration for an existing connector with a JSON file

kc update <connector> --config-file <config_file>

Update the configuration for an existing connector with inline JSON data

kc create <connector> --config-data <config-data>

Restart a connector

kc restart <connector> [--include-tasks] [--only-failed] | jq

Restart all connectors

kc restart --all [--pattern=regex] [--state=running|paused|unassigned|failed] [--include-tasks] [--only-failed]

The state targets the connector status whereas --include-tasks and --only-failed target connector tasks.

Pause a connector

kc pause <connector>

Pause all connectors

kc pause --all [--pattern=regex] [--state=running|paused|unassigned|failed]

Resume a connector

kc resume <connector>

Resume all connectors

kc resume --all [--pattern=regex] [--state=running|paused|unassigned|failed]

Delete a connector

kc delete <connector>

Delete all connectors

kc delete --all [--pattern=regex] [--state=running|paused|unassigned|failed]

Python

# Import the class
from kafka_connect import KafkaConnect

import json

# Instantiate the client
client = KafkaConnect(url="http://localhost:8083")

# Get the version and other details of the Kafka Connect cluster
cluster = client.get_cluster_info()
print(cluster)

# Get a list of active connectors
connectors = client.list_connectors(expand="status")
print(json.dumps(connectors, indent=2))

# Create a new connector
config = {
    "name": "my-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:postgresql://localhost:5432/mydatabase",
        "connection.user": "myuser",
        "connection.password": "mypassword",
        "table.whitelist": "mytable",
        "mode": "timestamp+incrementing",
        "timestamp.column.name": "modified_at",
        "validate.non.null": "false",
        "incrementing.column.name": "id",
        "topic.prefix": "my-connector-",
    },
}
response = client.create_connector(config)
print(response)

# Update an existing connector
new_config = {
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:postgresql://localhost:5432/mydatabase",
        "connection.user": "myuser",
        "connection.password": "mypassword",
        "table.whitelist": "mytable",
        "mode": "timestamp+incrementing",
        "timestamp.column.name": "modified_at",
        "validate.non.null": "false",
        "incrementing.column.name": "id",
        "topic.prefix": "my-connector-",
    },
}
response = client.update_connector("my-connector", new_config)
print(response)

# Restart a connector
response = client.restart_connector("my-connector")
print(response)

# Delete a connector
response = client.delete_connector("my-connector")
print(response)

Tests

python3 -m unittest tests/test_kafka_connect.py -v

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafka_connect_py-0.10.0.tar.gz (9.7 kB view hashes)

Uploaded Source

Built Distribution

kafka_connect_py-0.10.0-py3-none-any.whl (9.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page