Skip to main content

ETL数据处理/数据迁移/数据分析工具

Project description

NiceFlow

类似Kettle数据ETL工具,同时比Kettle更加易用和轻量

特性

  • 基于python的插件机制,目前提供70+插件,同时支持自定义插件
  • 基于json的flow任务,支持自定义任务配置

img.png

安装依赖

pip install NiceFlow

测试案例

  • plugin_test.py 测试插件功能
  • flow_test.py 测试flow功能

cli使用

安装
pip install NiceFlow
exec模式执行flow任务
NiceFlow exec --path csv_input_ck_output.json

# 1.json中的参数可以使用param参数传入
NiceFlow exec --path 1.json --param '{"name":"test"}'
sql模式执行flow任务
# --sql_script,"sql脚本语句,支持多行"
# --sql_path,"sql脚本文件,支持多行,和sql_script二选一"
# --db_path, "输入duckdb数据库的路径,不存在则为内存模式[可选]"
# --res_path,"输入文件路径,该路径下的文件会被自动加载到db中[可选]"
# --function_path,"输入函数路径,该路径为python文件,可以作为数据库自定义函数使用[可选]"
NiceFlow sql --sql_path 1.sql \   
--res_path='C:/Users/xiaow/Desktop/22/test' \
--function_path='C:/Users/xiaow/Desktop/22/test/1.python' 

# sql语句
copy  (select f_print(d_date) from msd_2024 where d_date = '2023-12-31') to  'C:/Users/xiaow/Desktop/22/test/2.csv';
select f_print(d_date) from msd_2024 where d_date = '2023-12-31';


# python文件中定义函数
def f_print(x:str)->str:
    return x+"___";

代码使用

  • faker_input_console.json
{
  "flow": {
    "name": "",
    "uid": "",
    "param": {
    }
  },
  "nodes": [
    {
      "id": "FakerInput",
      "name": "read1",
      "type": "input",
      "properties": {
        "rows": 10000,
        "columns": [
          "name",
          "address",
          "city",
          "street_address",
          "date_of_birth",
          "phone_number"
        ],
        "randoms": [
          {
            "key": "sex",
            "values": [
              "男",
              "女",
              "未知"
            ]
          }
        ]
      }
    },
    {
      "id": "Console",
      "name": "write1",
      "type": "output",
      "properties": {
        "row": 100
      }
    }
  ],
  "edges": [
    {
      "startId": "read1",
      "endId": "write1"
    }
  ]
}
import os
from NiceFlow.core.flow import Flow
from NiceFlow.core.manager import FlowManager


def getProjectPath() -> str:
    # 获取当前文件的绝对路径
    current_file = os.path.abspath(__file__)
    # 获取当前文件所在目录的绝对路径
    current_directory = os.path.dirname(current_file)
    # 获取当前项目的根目录
    project_root = os.path.dirname(os.path.dirname(current_directory))
    return project_root


def test_base():
    path = getProjectPath() + "/doc/faker_input_console.json"
    myFlow: Flow = FlowManager.read(path)
    myFlow.run()


if __name__ == '__main__':
    test_base()

架构图

插件使用说明文档

输入

插件 功能 完成情况 文档
Starter 启动器 完成 启动器
CsvInput 读取CSV数据 完成 CSV输入
FakerInput 假数据生成 完成 假数据生成
ParquetInput 读取Parquet数据 完成 Parquet输入
ExcelInput 读取Excel数据 完成 Excel输入
MySQLInput 读取MySQL数据 完成 MySQL输入
DuckDBInput 读取DuckDB数据 完成 DuckDB输入
AkshareInput 读取金融股票等财经数据 完成 Akshare输入
ClickHouseInput 读取ClickHouse数据 完成 ClickHouse输入
OdpsInput 读取MaxCompute数据 完成 Odps输入
ESInput 读取Elasticsearch数据 完成 Elasticsearch输入
MongoDBInput 读取MongoDB数据 完成 MongoDB输入
MqttInput 从Mqtt Broker读取数据 MqttInput输入

转换

插件 功能 完成情况 文档
Filter 过滤器 完成 过滤器
Mapping 映射器 完成 映射器
For 遍历器 完成 遍历器
IF 条件判断器 完成 条件判断器
Join 连接器 完成 连接器
Mask 脱敏器 完成 脱敏器
Pivot 透视表 完成 透视表
Printer 打印器 完成 打印器
RegularExtract 正则提取器 正则提取器
Rename 重命名器 完成 重命名器
Samples 采样器 完成 采样器
Sort 排序器 完成 排序器
Sql SQL转换器 完成 SQL转换器
Switch 条件转换器 条件转换器
Unpivot 取消透视表 完成 取消透视表
Variable 变量转换器 完成 变量转换器
While 循环转换器 完成 循环转换器
Duplicate 去重器 完成 去重器
Console 控制台打印 完成 控制台输出

输出

插件 功能 完成情况 文档
FileOutput 文件输出 完成 文件输出
KafkaOutput Kafka输出 完成 Kafka输出
SqlServerOutput SQLServer输出 完成 SQLServer输出
S3Output S3输出 完成 S3输出
PulsarOutput Pulsar输出 完成 Pulsar输出
PostgresOutput Postgres输出 完成 Postgres输出
ParquetOutput Parquet输出 完成 Parquet输出
PaimonOutput Paimon输出 完成 Paimon输出
OracleOutput Oracle输出 完成 Oracle输出
OdpsOutput MaxCompute输出 完成 MaxCompute输出
MySQLOutput MySQL输出 完成 MySQL输出
MqttOutput MQTT输出
MongoDBOutput MongoDB输出 完成 MongoDB输出
MarkdownOutput Markdown输出 完成 Markdown输出
HttpOutput Http输出
HiveOutput Hive输出
HdfsOutput HDFS输出
FtpOutput FTP输出
ExcelOutput Excel输出 完成 Excel输出
EsOutput Elasticsearch输出
DuckOutput DuckDB输出
CsvOutput CSV输出 完成 CSV输出
CosOutput COS输出
ClickHouseOutput ClickHouse输出

数据实战

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

NiceFlow-0.0.4-py3-none-any.whl (95.9 kB view details)

Uploaded Python 3

File details

Details for the file NiceFlow-0.0.4-py3-none-any.whl.

File metadata

  • Download URL: NiceFlow-0.0.4-py3-none-any.whl
  • Upload date:
  • Size: 95.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.12

File hashes

Hashes for NiceFlow-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 f6a0250b7f4b53a010a6198b449a277e661ad1dc314e15e5dcab2d5da82aee5b
MD5 c93f95ec0238d10bf5fee935e500c340
BLAKE2b-256 2192a3d7b36feac4c5efee6cd153e4656ebfde7008ae5eb67804b8689b63eb5b

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page