A client for the Confluent Platform Kafka Connect REST API.
Project description
Kafka Connect Python
The Kafka Connect REST API allows you to manage connectors that move data between Apache Kafka and other systems.
The kc
command line tool provides commands for getting information about the Kafka Connect cluster and its connectors, creating new connectors, updating existing connectors, deleting connectors, etc.
This project aims to supported all features of the Kafka Connect REST API.
Install
pip install kafka-connect-py
Command Line Usage
Retrieve the version and other details of the Kafka Connect cluster.
$ kc get-cluster
Retrieve the details of a single connector.
$ kc get-connector <connector>
Retrieve a list of active connectors. The `--expand' option can be used to retrieve additional information about the connectors, such as their status or metadata.
$ kc get-connectors [--expand=status|info]
Create a new connector using the configuration specified in the given file. If the connector already exists or a rebalance is in process, Wil return a status code of 409.
$ kc create-connector <config_file>
Update the configuration for an existing connector. If a rebalance is in process, Wil return a status code of 409.
$ kc update-connector <connector> <config_file>
Retrieve the configuration of a connector.
$ kc get-connector <connector>
Retrieve the config of a connector.
$ kc get-connector-config <connector>
Retrieve the status of a connector.
$ kc get-connector-status <connector>
Retrieve the tasks of a connector. The `--include-tasks' option can be used to include task information in the response.
$ kc get-connector-tasks <connector> [--include-tasks]
Pause a connector.
$ kc pause-connector <connector>
Resume a connector that was previously paused.
$ kc resume-connector <connector>
Delete a connector.
$ kc delete-connector <connector>
Validate the configuration specified in the given file. If the configuration is valid, Wil return a status code of 200.
$ kc validate-connector-config <config_file>
Retrieve metadata about the specified connector plugin.
$ kc get-connector-plugin <connector>
Retrieve metadata about all available connector plugins.
$ kc get-connector-plugins
Python
# Import the class
from kafka_connect import KafkaConnect
# Instantiate the client
client = KafkaConnect(endpoint="http://localhost:8083")
# Get the version and other details of the Kafka Connect cluster
cluster = client.get_info()
print(cluster)
# Get a list of active connectors
connectors = client.get_connectors()
print(connectors)
# Create a new connector
config = {
"name": "my-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://localhost:5432/mydatabase",
"connection.user": "myuser",
"connection.password": "mypassword",
"table.whitelist": "mytable",
"mode": "timestamp+incrementing",
"timestamp.column.name": "modified_at",
"validate.non.null": "false",
"incrementing.column.name": "id",
"topic.prefix": "my-connector-",
},
}
response = client.create_connector(config)
print(response)
# Update an existing connector
new_config = {
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://localhost:5432/mydatabase",
"connection.user": "myuser",
"connection.password": "mypassword",
"table.whitelist": "mytable",
"mode": "timestamp+incrementing",
"timestamp.column.name": "modified_at",
"validate.non.null": "false",
"incrementing.column.name": "id",
"topic.prefix": "my-connector-",
},
}
response = client.update_connector("my-connector", new_config)
print(response)
# Restart a connector
response = client.restart_connector("my-connector")
print(response)
# Delete a connector
response = client.delete_connector("my-connector")
print(response)
Tests
python3 -m unittest tests/test_kafka_connect.py -v
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for kafka_connect_py-0.3.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 743eb41e42ca5869461592ad28b3c2df12bdb355332a8c797c4565bc385bc47e |
|
MD5 | f9a2036b9fe3e84131b3317f675eac3f |
|
BLAKE2b-256 | 98ce85b2a46822b1a4da1cee5cd5c7152c5b0e965dc91d2bdb6d129c6a5679a8 |