ETL数据处理/数据迁移/数据分析工具
Project description
NiceFlow
类似Kettle数据ETL工具,同时比Kettle更加易用和轻量
特性
- 基于python的插件机制,目前提供70+插件,同时支持自定义插件
- 基于json的flow任务,支持自定义任务配置
安装依赖
pip install NiceFlow
测试案例
- plugin_test.py 测试插件功能
- flow_test.py 测试flow功能
cli使用
安装
pip install NiceFlow
exec模式执行flow任务
NiceFlow exec --path csv_input_ck_output.json
# 1.json中的参数可以使用param参数传入
NiceFlow exec --path 1.json --param '{"name":"test"}'
sql模式执行flow任务
# --sql_script,"sql脚本语句,支持多行"
# --sql_path,"sql脚本文件,支持多行,和sql_script二选一"
# --db_path, "输入duckdb数据库的路径,不存在则为内存模式[可选]"
# --res_path,"输入文件路径,该路径下的文件会被自动加载到db中[可选]"
# --function_path,"输入函数路径,该路径为python文件,可以作为数据库自定义函数使用[可选]"
NiceFlow sql --sql_path 1.sql \
--res_path='C:/Users/xiaow/Desktop/22/test' \
--function_path='C:/Users/xiaow/Desktop/22/test/1.python'
# sql语句
copy (select f_print(d_date) from msd_2024 where d_date = '2023-12-31') to 'C:/Users/xiaow/Desktop/22/test/2.csv';
select f_print(d_date) from msd_2024 where d_date = '2023-12-31';
# python文件中定义函数
def f_print(x:str)->str:
return x+"___";
代码使用
- faker_input_console.json
{
"flow": {
"name": "",
"uid": "",
"param": {
}
},
"nodes": [
{
"id": "FakerInput",
"name": "read1",
"type": "input",
"properties": {
"rows": 10000,
"columns": [
"name",
"address",
"city",
"street_address",
"date_of_birth",
"phone_number"
],
"randoms": [
{
"key": "sex",
"values": [
"男",
"女",
"未知"
]
}
]
}
},
{
"id": "Console",
"name": "write1",
"type": "output",
"properties": {
"row": 100
}
}
],
"edges": [
{
"startId": "read1",
"endId": "write1"
}
]
}
import os
from NiceFlow.core.flow import Flow
from NiceFlow.core.manager import FlowManager
def getProjectPath() -> str:
# 获取当前文件的绝对路径
current_file = os.path.abspath(__file__)
# 获取当前文件所在目录的绝对路径
current_directory = os.path.dirname(current_file)
# 获取当前项目的根目录
project_root = os.path.dirname(os.path.dirname(current_directory))
return project_root
def test_base():
path = getProjectPath() + "/doc/faker_input_console.json"
myFlow: Flow = FlowManager.read(path)
myFlow.run()
if __name__ == '__main__':
test_base()
架构图
插件使用说明文档
输入
插件 | 功能 | 完成情况 | 文档 |
---|---|---|---|
Starter | 启动器 | 完成 | 启动器 |
CsvInput | 读取CSV数据 | 完成 | CSV输入 |
FakerInput | 假数据生成 | 完成 | 假数据生成 |
ParquetInput | 读取Parquet数据 | 完成 | Parquet输入 |
ExcelInput | 读取Excel数据 | 完成 | Excel输入 |
MySQLInput | 读取MySQL数据 | 完成 | MySQL输入 |
DuckDBInput | 读取DuckDB数据 | 完成 | DuckDB输入 |
AkshareInput | 读取金融股票等财经数据 | 完成 | Akshare输入 |
ClickHouseInput | 读取ClickHouse数据 | 完成 | ClickHouse输入 |
OdpsInput | 读取MaxCompute数据 | 完成 | Odps输入 |
ESInput | 读取Elasticsearch数据 | 完成 | Elasticsearch输入 |
MongoDBInput | 读取MongoDB数据 | 完成 | MongoDB输入 |
MqttInput | 从Mqtt Broker读取数据 | MqttInput输入 |
转换
插件 | 功能 | 完成情况 | 文档 |
---|---|---|---|
Filter | 过滤器 | 完成 | 过滤器 |
Mapping | 映射器 | 完成 | 映射器 |
For | 遍历器 | 完成 | 遍历器 |
IF | 条件判断器 | 完成 | 条件判断器 |
Join | 连接器 | 完成 | 连接器 |
Mask | 脱敏器 | 完成 | 脱敏器 |
Pivot | 透视表 | 完成 | 透视表 |
Printer | 打印器 | 完成 | 打印器 |
RegularExtract | 正则提取器 | 正则提取器 | |
Rename | 重命名器 | 完成 | 重命名器 |
Samples | 采样器 | 完成 | 采样器 |
Sort | 排序器 | 完成 | 排序器 |
Sql | SQL转换器 | 完成 | SQL转换器 |
Switch | 条件转换器 | 条件转换器 | |
Unpivot | 取消透视表 | 完成 | 取消透视表 |
Variable | 变量转换器 | 完成 | 变量转换器 |
While | 循环转换器 | 完成 | 循环转换器 |
Duplicate | 去重器 | 完成 | 去重器 |
Console | 控制台打印 | 完成 | 控制台输出 |
输出
插件 | 功能 | 完成情况 | 文档 |
---|---|---|---|
FileOutput | 文件输出 | 完成 | 文件输出 |
KafkaOutput | Kafka输出 | 完成 | Kafka输出 |
SqlServerOutput | SQLServer输出 | 完成 | SQLServer输出 |
S3Output | S3输出 | 完成 | S3输出 |
PulsarOutput | Pulsar输出 | 完成 | Pulsar输出 |
PostgresOutput | Postgres输出 | 完成 | Postgres输出 |
ParquetOutput | Parquet输出 | 完成 | Parquet输出 |
PaimonOutput | Paimon输出 | 完成 | Paimon输出 |
OracleOutput | Oracle输出 | 完成 | Oracle输出 |
OdpsOutput | MaxCompute输出 | 完成 | MaxCompute输出 |
MySQLOutput | MySQL输出 | 完成 | MySQL输出 |
MqttOutput | MQTT输出 | ||
MongoDBOutput | MongoDB输出 | 完成 | MongoDB输出 |
MarkdownOutput | Markdown输出 | 完成 | Markdown输出 |
HttpOutput | Http输出 | ||
HiveOutput | Hive输出 | ||
HdfsOutput | HDFS输出 | ||
FtpOutput | FTP输出 | ||
ExcelOutput | Excel输出 | 完成 | Excel输出 |
EsOutput | Elasticsearch输出 | ||
DuckOutput | DuckDB输出 | ||
CsvOutput | CSV输出 | 完成 | CSV输出 |
CosOutput | COS输出 | ||
ClickHouseOutput | ClickHouse输出 |
数据实战
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
No source distribution files available for this release.See tutorial on generating distribution archives.
Built Distribution
NiceFlow-0.0.4-py3-none-any.whl
(95.9 kB
view details)
File details
Details for the file NiceFlow-0.0.4-py3-none-any.whl
.
File metadata
- Download URL: NiceFlow-0.0.4-py3-none-any.whl
- Upload date:
- Size: 95.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | f6a0250b7f4b53a010a6198b449a277e661ad1dc314e15e5dcab2d5da82aee5b |
|
MD5 | c93f95ec0238d10bf5fee935e500c340 |
|
BLAKE2b-256 | 2192a3d7b36feac4c5efee6cd153e4656ebfde7008ae5eb67804b8689b63eb5b |