Singer.io tap for extracting data from Kafka topic - PipelineWise compatible
Project description
pipelinewise-tap-kafka
This is a Singer tap that reads data from Kafka topic and produces JSON-formatted data following the Singer spec.
This is a PipelineWise compatible target connector.
How to use it
The recommended method of running this tap is to use it from PipelineWise. When running it from PipelineWise you don't need to configure this tap with JSON files and most of things are automated. Please check the related documentation at Kafka
If you want to run this Singer Tap independently please read further.
Install and Run
First, make sure Python 3 is installed on your system or follow these installation instructions for Mac or Ubuntu.
It's recommended to use a virtualenv:
python3 -m venv venv
pip install pipelinewise-tap-kafka
or
python3 -m venv venv
. venv/bin/activate
pip install --upgrade pip
pip install .
Configuration
Create a config.json
{
"group_id": "1",
"bootstrap_servers": "foo.com,bar.com",
"topic": "messages",
"primary_keys": {
"id": "$.jsonpath.to.primary_key"
}
}
This tap reads Kafka messages and generating singer compatible SCHEMA and RECORD messages in the following format.
Property Name | Description |
---|---|
MESSAGE | The original Kafka message |
DYNAMIC_PRIMARY_KEY(S) | (Optional) Dynamically added primary key values, extracted from the Kafka message |
Run the tap in Discovery Mode
tap-kafka --config config.json --discover # Should dump a Catalog to sdtout
tap-kafka --config config.json --discover > catalog.json # Capture the Catalog
Add Metadata to the Catalog
Each entry under the Catalog's "stream" key will need the following metadata:
{
"streams": [
{
"stream_name": "my_topic"
"metadata": [{
"breadcrumb": [],
"metadata": {
"selected": true,
}
}]
}
]
}
Run the tap in Sync Mode
tap-kafka --config config.json --properties catalog.json
The tap will write bookmarks to stdout which can be captured and passed as an optional --state state.json
parameter to the tap for the next sync.
To run tests:
- Install python test dependencies in a virtual env and run nose unit and integration tests
python3 -m venv venv
. venv/bin/activate
pip install --upgrade pip
pip install .[test]
- To run tests:
pytest tests
To run pylint:
- Install python dependencies and run python linter
python3 -m venv venv
. venv/bin/activate
pip install --upgrade pip
pip install .[test]
pylint tap_kafka -d C,W,unexpected-keyword-arg,duplicate-code
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for pipelinewise-tap-kafka-2.1.0.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | ef5852fd010afe4931e61dc25e650e209aa46b9e2cd15c85d63c3c5d58032469 |
|
MD5 | 1e9f19208a686267697870fa20d103de |
|
BLAKE2b-256 | 9e604b1614a396c44e82593405f64a58819b5e9b839ba039c25fbd298943ece2 |
Hashes for pipelinewise_tap_kafka-2.1.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 376a9372ecafb963c2d400105f8c707397343bce6d3b2fbdb4ac13dfdea83e9a |
|
MD5 | 9f572ef76c70046d278dc98423098876 |
|
BLAKE2b-256 | 75ece30cb936f1f14b274775a453cb9cb13e496c42c2dc3a238a305b0f57346b |