Stream each new rows of a file and write in kafka
Project description
strefi
Stream each new rows of a file and write in kafka.
Installation
PyPi
pip install strefi
Git
git clone https://github.com/VictorMeyer77/strefi.git
cd strefi
make virtualenv # if you want create new environment
source .venv/bin/activate # if you want activate the new environment
make install
Usage
Read the complete example for more details.
usage: strefi [-h] [-c CONFIG] [-i JOBID] [-l LOG] command
Stream each new rows of a file and write in kafka
positional arguments:
command "start" to launch stream or "stop" to kill stream
options:
-h, --help show this help message and exit
-c CONFIG, --config CONFIG
configuration file path
-i JOBID, --jobid JOBID
stream id
-l LOG, --log LOG log configuration file path (configparser file format)
Launch job
strefi start -c config.json
Stop a job
strefi stop -i {job_id}
Stop all jobs
strefi stop -i all
List jobs status
strefi ls
Configuration
Strefi configuration is stored in a simple json file.
{
"producer":{
"bootstrap_servers":"localhost:9092",
"acks":0,
"retries":0
},
"headers":{
"version":"0.1",
"type":"json"
},
"defaults":{
"key_one":"value_one",
"key_two":"value_two"
},
"files":{
"/path/to/file_1":"target_topic",
"/path/to/file_2":"target_topic"
}
}
files
Specify in the "files" objects the paths of all files you want stream. The field key is file path and the field value is the topic.
"files":{
"/path/to/file_1":"target_topic",
"/path/to/file_2":"target_topic"
}
producer
Producer configuration must have at least the field boostrap_servers. All fields will be parameters of the KafkaProducer. One producer is created foreach table to stream, but all producers have the same configuration.
"producer":{
"bootstrap_servers":"localhost:9092",
"acks":0,
"retries":0
}
defaults
This field can be empty. In this case, the record sent to kafka is just composed with streamed file path and the file row.
{"file": "/path/to/file_1", "row": "last file row"}
You can enhance records send to topic with th "defaults" object.
"defaults":{
"key_one":"value_one",
"key_two":"value_two"
}
With this configuration, the record sent to kafka has also these values.
{"file":"/path/to/file_1", "row":"last file row", "key_one":"value_one","key_two":"value_two"}
headers
You can join headers with the record with the "headers" field. It can be empty if you don't want headers.
"headers":{
"version":"0.1",
"type":"json"
}
These headers will be converted in this list of tuple. Headers key shall be a string and the value will be encoded.
[("version", b"0.1"), ("type", b"json")]
License
strefi is released under GPL-3.0 license. See LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.