Skip to main content

Kafka Connect API connectors synchronization library

Project description

Kafka Connect Sync

Build Status PyPI Version MIT License

Kafka Connect API connectors synchronization library

About

The kafkaconnectsync library allows you to incorporate the Kafka Connect connectors/sink to your deployment code.

When running Kafka Connect in distribute mode, connectors need to be added using REST methods after the API is running. Creating connectors shouldn't be a manual process so kafkaconnectsync provides functions to manage connectors as an infrastructure/deployment component.

To sync connectors, kafkaconnectsync reads the list of connectors and decides if it needs to create, update or delete them depending on the status of the API (i.e the existing connectors).

Installing

Install using pip:

$ pip3 install kafkaconnectsync

Alternatively, you can use setup.py to install by cloning this repository and running:

$ python setup.py install

Usage

  1. Define your connectors:
# connectors.json

[
    {
        "config": {
            "name": "my_connector_name",
            "connector.class": "io.confluent.connect.s3.S3SinkConnector",
            "tasks.max": "1",
            "topics": "my-topic",
            "locale": "en_US",
            "timezone": "UTC",
            "flush.size": "3",
        }
    },
    {
        "config": {
            "name": "my_connector_name_two",
            "connector.class": "io.confluent.connect.s3.S3SinkConnector",
            "s3.credentials.provider.class": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
            "topics.dir": "data",
            "file.delim": "-",
            "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
        }
    }
]
  1. Import the sync function from the package. Make sure to call it after your app deployment has been done.
# Other imports...
import sync from kafkaconnectsync

url = 'https://my-kafka-connect-api.com'
connectors = json.loads(open('connectors.json'))

"""
 ...
 Deploy your app here...
 ...
"""

# Sync connectors
sync(url, connectors, strict=True, wait_for_deployment=True, verbose=True)

Documentation

  • sync(url, connectors=[], wait_for_deployment=True, verbose=False):
    • url: You Kafka Connect API hostname.
    • connectors: The array of connectors objects to sync on Kafka Connect.
    • strict: When strict is enabled, apart from creating/updating connectors from the list, the sync function will remove all the API connectors that are not present on this list as a way to synchronize your list with the API. Default: True
    • wait_for_deployment: If True, it will keep sending requests to the Kafka Connect hosts until it becomes available. Useful if your deploying your app and the function should wait for the deployment to finish. Default: True.
    • verbose: Set this flag to True if you want to output action logs to your terminal. Default: False.

Development

Clone this repo to your machine:

$ git clone https://github.com/venturachrisdev/kafka-connect-sync.git
$ cd kafka-connect-sync

Install dependencies using pip:

$ pip3 install -r requirements-dev.txt

Use pylint to run linter on the project:

$ pylint kafkaconnectsync/; pylint tests/

To apply pep8 rules to the codebase, use the following command:

$ autopep8 --in-place --recursive kafkaconnectsync/ tests/

Run tests locally using:

$ pytest tests/

Contributors

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafkaconnectsync-0.0.3.tar.gz (5.6 kB view details)

Uploaded Source

File details

Details for the file kafkaconnectsync-0.0.3.tar.gz.

File metadata

  • Download URL: kafkaconnectsync-0.0.3.tar.gz
  • Upload date:
  • Size: 5.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.46.1 CPython/3.7.3

File hashes

Hashes for kafkaconnectsync-0.0.3.tar.gz
Algorithm Hash digest
SHA256 5ba71b9b16c8d3f84ffb3f2ed5f8b28c2512e46c9f78086c86154cfc720db61c
MD5 2edd9d5f49e7d83651a4f475c0ee274d
BLAKE2b-256 24c249cb6b12b2c5cab4a995677d14b1bc1e5727f375c5c114561f09801eacd2

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page