Skip to main content

Kafka Connect API connectors synchronization library

Project description

Kafka Connect Sync

Build Status PyPI Version MIT License

Kafka Connect API connectors synchronization library

About

The kafkaconnectsync library allows you to incorporate the Kafka Connect connectors/sink to your deployment code.

When running Kafka Connect in distribute mode, connectors need to be added using REST methods after the API is running. Creating connectors shouldn't be a manual process so kafkaconnectsync provides functions to manage connectors as an infrastructure/deployment component.

To sync connectors, kafkaconnectsync reads the list of connectors and decides if it needs to create, update or delete them depending on the status of the API (i.e the existing connectors).

Installing

Install using pip:

$ pip3 install kafkaconnectsync

Alternatively, you can use setup.py to install by cloning this repository and running:

$ python setup.py install

Usage

  1. Define your connectors:
# connectors.json

[
    {
        "config": {
            "name": "my_connector_name",
            "connector.class": "io.confluent.connect.s3.S3SinkConnector",
            "tasks.max": "1",
            "topics": "my-topic",
            "locale": "en_US",
            "timezone": "UTC",
            "flush.size": "3",
        }
    },
    {
        "config": {
            "name": "my_connector_name_two",
            "connector.class": "io.confluent.connect.s3.S3SinkConnector",
            "s3.credentials.provider.class": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
            "topics.dir": "data",
            "file.delim": "-",
            "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
        }
    }
]
  1. Import the sync function from the package. Make sure to call it after your app deployment has been done.
# Other imports...
import sync from kafkaconnectsync

url = 'https://my-kafka-connect-api.com'
connectors = json.loads(open('connectors.json').read())

"""
 ...
 Deploy your app here...
 ...
"""

# Sync connectors
sync(url, connectors, strict=True, wait_for_deployment=True, verbose=True)

Documentation

  • sync(url, connectors=[], wait_for_deployment=True, verbose=False):
    • url: You Kafka Connect API hostname.
    • connectors: The array of connectors objects to sync on Kafka Connect.
    • strict: When strict is enabled, apart from creating/updating connectors from the list, the sync function will remove all the API connectors that are not present on this list as a way to synchronize your list with the API. Default: True
    • wait_for_deployment: If True, it will keep sending requests to the Kafka Connect hosts until it becomes available. Useful if your deploying your app and the function should wait for the deployment to finish. Default: True.
    • verbose: Set this flag to True if you want to output action logs to your terminal. Default: False.

Development

Clone this repo to your machine:

$ git clone https://github.com/venturachrisdev/kafka-connect-sync.git
$ cd kafka-connect-sync

Install dependencies using pip:

$ pip3 install -r requirements-dev.txt

Use pylint to run linter on the project:

$ pylint kafkaconnectsync/; pylint tests/

To apply pep8 rules to the codebase, use the following command:

$ autopep8 --in-place --recursive kafkaconnectsync/ tests/

Run tests locally using:

$ pytest tests/

Contributors

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafkaconnectsync-0.0.4.tar.gz (5.6 kB view details)

Uploaded Source

File details

Details for the file kafkaconnectsync-0.0.4.tar.gz.

File metadata

  • Download URL: kafkaconnectsync-0.0.4.tar.gz
  • Upload date:
  • Size: 5.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.46.1 CPython/3.7.3

File hashes

Hashes for kafkaconnectsync-0.0.4.tar.gz
Algorithm Hash digest
SHA256 5f9ac6e8db6630a7e39a99eb03be62bf3cf317b11215e66cdd3fd1a22ae3af3b
MD5 5fcf5f67bd87caff652e07f28e1a5df8
BLAKE2b-256 356c78dc2000f0787d73add8a90a00b499db9092baba2b2d0acbf4565d79b994

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page