Skip to main content

Stream each new rows of a file and write in kafka

Project description

strefi

Python codecov CI docs License: GPL v3 style PyPI version

Stream each new rows of a file and write in kafka.

Installation

PyPi

pip install strefi

Git

git clone https://github.com/VictorMeyer77/strefi.git
cd strefi
make virtualenv # if you want create new environment
source .venv/bin/activate # if you want activate the new environment
make install

Usage

Read the complete example for more details.

usage: strefi [-h] [-c CONFIG] [-i JOBID] [-l LOG] command

Stream each new rows of a file and write in kafka

positional arguments:
  command               "start" to launch stream or "stop" to kill stream

options:
  -h, --help            show this help message and exit
  -c CONFIG, --config CONFIG
                        configuration file path
  -i JOBID, --jobid JOBID
                        stream id
  -l LOG, --log LOG     log configuration file path (configparser file format)

Launch job

strefi start -c config.json

Stop a job

strefi stop -i {job_id}

Stop all jobs

strefi stop -i all

List jobs status

strefi ls

Configuration

Strefi configuration is stored in a simple json file.

{
   "producer":{
      "bootstrap_servers":"localhost:9092",
      "acks":0,
      "retries":0
   },
   "headers":{
      "version":"0.1",
      "type":"json"
   },
   "defaults":{
      "key_one":"value_one",
      "key_two":"value_two"
   },
   "files":{
      "/path/to/file_1":"target_topic",
      "/path/to/file_2":"target_topic"
   }
}

files

Specify in the "files" objects the paths of all files you want stream. The field key is file path and the field value is the topic.

"files":{
  "/path/to/file_1":"target_topic",
  "/path/to/file_2":"target_topic"
}

producer

Producer configuration must have at least the field boostrap_servers. All fields will be parameters of the KafkaProducer. One producer is created foreach table to stream, but all producers have the same configuration.

"producer":{
  "bootstrap_servers":"localhost:9092",
  "acks":0,
  "retries":0
}

defaults

This field can be empty. In this case, the record sent to kafka is just composed with streamed file path and the file row.

{"file": "/path/to/file_1", "row": "last file row"}

You can enhance records send to topic with th "defaults" object.

"defaults":{
  "key_one":"value_one",
  "key_two":"value_two"
}

With this configuration, the record sent to kafka has also these values.

{"file":"/path/to/file_1", "row":"last file row", "key_one":"value_one","key_two":"value_two"}

headers

You can join headers with the record with the "headers" field. It can be empty if you don't want headers.

"headers":{
  "version":"0.1",
  "type":"json"
}

These headers will be converted in this list of tuple. Headers key shall be a string and the value will be encoded.

[("version", b"0.1"), ("type", b"json")]

License

strefi is released under GPL-3.0 license. See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

strefi-0.2.0.tar.gz (26.2 kB view hashes)

Uploaded Source

Built Distribution

strefi-0.2.0-py3-none-any.whl (9.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page