Skip to main content

Singer.io tap for extracting data from Kafka topic - PipelineWise compatible

Project description

pipelinewise-tap-kafka

PyPI version PyPI - Python Version License: MIT

This is a Singer tap that reads data from Kafka topic and produces JSON-formatted data following the Singer spec.

This is a PipelineWise compatible target connector.

How to use it

The recommended method of running this tap is to use it from PipelineWise. When running it from PipelineWise you don't need to configure this tap with JSON files and most of things are automated. Please check the related documentation at Kafka

If you want to run this Singer Tap independently please read further.

Install and Run

First, make sure Python 3 is installed on your system or follow these installation instructions for Mac or Ubuntu.

It's recommended to use a virtualenv:

  python3 -m venv venv
  pip install pipelinewise-tap-kafka

or

  python3 -m venv venv
  . venv/bin/activate
  pip install --upgrade pip
  pip install .

Configuration

Create a config.json

{
  "group_id": "1",
  "bootstrap_servers": "foo.com,bar.com",
  "topic": "messages",
  "primary_keys": {
    "id": "$.jsonpath.to.primary_key"
  }
}

This tap reads Kafka messages and generating singer compatible SCHEMA and RECORD messages in the following format.

Property Name Description
MESSAGE The original Kafka message
DYNAMIC_PRIMARY_KEY(S) (Optional) Dynamically added primary key values, extracted from the Kafka message

Run the tap in Discovery Mode

tap-kafka --config config.json --discover                # Should dump a Catalog to sdtout
tap-kafka --config config.json --discover > catalog.json # Capture the Catalog

Add Metadata to the Catalog

Each entry under the Catalog's "stream" key will need the following metadata:

{
  "streams": [
    {
      "stream_name": "my_topic"
      "metadata": [{
        "breadcrumb": [],
        "metadata": {
          "selected": true,
        }
      }]
    }
  ]
}

Run the tap in Sync Mode

tap-kafka --config config.json --properties catalog.json

The tap will write bookmarks to stdout which can be captured and passed as an optional --state state.json parameter to the tap for the next sync.

To run tests:

  1. Install python test dependencies in a virtual env and run nose unit and integration tests
  python3 -m venv venv
  . venv/bin/activate
  pip install --upgrade pip
  pip install .[test]
  1. To run tests:
  pytest tests

To run pylint:

  1. Install python dependencies and run python linter
  python3 -m venv venv
  . venv/bin/activate
  pip install --upgrade pip
  pip install .[test]
  pylint tap_kafka -d C,W,unexpected-keyword-arg,duplicate-code

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pipelinewise-tap-kafka-2.1.0.tar.gz (5.7 kB view hashes)

Uploaded Source

Built Distribution

pipelinewise_tap_kafka-2.1.0-py3-none-any.whl (17.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page