Skip to main content

Singer.io tap for extracting data from Kafka topic - PipelineWise compatible

Project description

pipelinewise-tap-kafka

PyPI version PyPI - Python Version License: MIT

This is a Singer tap that reads data from Kafka topic and produces JSON-formatted data following the Singer spec.

This is a PipelineWise compatible target connector.

How to use it

The recommended method of running this tap is to use it from PipelineWise. When running it from PipelineWise you don't need to configure this tap with JSON files and most of things are automated. Please check the related documentation at Kafka

If you want to run this Singer Tap independently please read further.

Install and Run

First, make sure Python 3 is installed on your system or follow these installation instructions for Mac or Ubuntu.

It's recommended to use a virtualenv:

  python3 -m venv venv
  pip install pipelinewise-tap-kafka

or

  python3 -m venv venv
  . venv/bin/activate
  pip install --upgrade pip
  pip install .

Configuration

Create a config.json

{
  "bootstrap_servers": "foo.com,bar.com",
  "group_id": "my_group",
  "topic": "my_topic",
  "primary_keys": {
    "id": "/path/to/primary_key"
  }
}

Full list of options in config.json:

Property Type Required? Description
bootstrap_servers String Yes host[:port] string (or list of comma separated host[:port] strings) that the consumer should contact to bootstrap initial cluster metadata.
group_id String Yes The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets.
topic String Yes Name of kafka topic to subscribe to
partitions List (Default: [] (all)) Partition(s) of topic to consume, example [0,4]
primary_keys Object Optionally you can define primary key for the consumed messages. It requires a column name and /slashed/paths ala xpath selector to extract the value from the kafka messages. The extracted column will be added to every output singer message.
use_message_key Bool (Default: true) Defines whether to use Kafka message key as a primary key for the record. Note: if a custom primary key(s) has been defined, it will be used instead of the message_key.
initial_start_time String (Default: latest) Start time reference of the message consumption if no bookmarked position in state.json. One of: beginning, earliest, latest or an ISO-8601 formatted timestamp string.
max_runtime_ms Integer (Default: 300000) The maximum time for the tap to collect new messages from Kafka topic. If this time exceeds it will flush the batch and close kafka connection.
commit_interval_ms Integer (Default: 5000) Number of milliseconds between two commits. This is different than the kafka auto commit feature. Tap-kafka sends commit messages automatically but only when the data consumed successfully and persisted to local store.
consumer_timeout_ms Integer (Default: 10000) KafkaConsumer setting. Number of milliseconds to block during message iteration before raising StopIteration
session_timeout_ms Integer (Default: 30000) KafkaConsumer setting. The timeout used to detect failures when using Kafka’s group management facilities.
heartbeat_interval_ms Integer (Default: 10000) KafkaConsumer setting. The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka’s group management facilities.
max_poll_interval_ms Integer (Default: 300000) KafkaConsumer setting. The maximum delay between invocations of poll() when using consumer group management.
message_format String (Default: json) Supported message formats are json and protobuf.
proto_schema String Protobuf message format in .proto syntax. Required if the message_format is protobuf.
proto_classes_dir String (Default: current working dir)

This tap reads Kafka messages and generating singer compatible SCHEMA and RECORD messages in the following format.

Property Name Description
MESSAGE_TIMESTAMP Timestamp extracted from the kafka metadata
MESSAGE_OFFSET Offset extracted from the kafka metadata
MESSAGE_PARTITION Partition extracted from the kafka metadata
MESSAGE The original Kafka message
MESSAGE_KEY (Optional) Added by default (can be overridden) in case no custom keys defined
DYNAMIC_PRIMARY_KEY(S) (Optional) Dynamically added primary key values, extracted from the Kafka message

Run the tap in Discovery Mode

tap-kafka --config config.json --discover                # Should dump a Catalog to stdout
tap-kafka --config config.json --discover > catalog.json # Capture the Catalog

Add Metadata to the Catalog

Each entry under the Catalog's "stream" key will need the following metadata:

{
  "streams": [
    {
      "stream_name": "my_topic"
      "metadata": [{
        "breadcrumb": [],
        "metadata": {
          "selected": true,
        }
      }]
    }
  ]
}

Run the tap in Sync Mode

tap-kafka --config config.json --properties catalog.json

The tap will write bookmarks to stdout which can be captured and passed as an optional --state state.json parameter to the tap for the next sync.

To run tests:

  1. Install python test dependencies in a virtual env and run nose unit and integration tests
  python3 -m venv venv
  . venv/bin/activate
  pip install --upgrade pip
  pip install -e .[test]
  1. To run unit tests:
  make unit_test
  1. To run integration test:
  make integration_test

To run pylint:

  1. Install python dependencies and run python linter
  python3 -m venv venv
  . venv/bin/activate
  pip install --upgrade pip
  pip install -e .[test]
  pylint tap_kafka -d C,W,unexpected-keyword-arg,duplicate-code

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pipelinewise-tap-kafka-8.1.0.tar.gz (23.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pipelinewise_tap_kafka-8.1.0-py3-none-any.whl (25.7 kB view details)

Uploaded Python 3

File details

Details for the file pipelinewise-tap-kafka-8.1.0.tar.gz.

File metadata

  • Download URL: pipelinewise-tap-kafka-8.1.0.tar.gz
  • Upload date:
  • Size: 23.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.4

File hashes

Hashes for pipelinewise-tap-kafka-8.1.0.tar.gz
Algorithm Hash digest
SHA256 97b3bf1f44c2d7f9bf92274a9dc2cbf2e16dce95d8ecb0087164caaf6b24ff27
MD5 96a46e2999ba60528c98e3094e1b162c
BLAKE2b-256 e31850e051f88e6fa89a0585864112822c505d409048d1e3916dcf5ed8a3b5f5

See more details on using hashes here.

File details

Details for the file pipelinewise_tap_kafka-8.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for pipelinewise_tap_kafka-8.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 cc6bbf1baad361c3c0f2f51abcf0281784873b70e8aeffb759acf8b845db298e
MD5 8bff48b6ebcd7dba510a8e666fa5105a
BLAKE2b-256 bf5b5cf49ed5ee699cbbbfca9b7050a188013c1cd90d989198e1744c078f5e90

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page