Skip to main content

ETL数据处理/数据迁移/数据分析工具

Project description

NiceFlow

类似Kettle数据ETL工具,同时比Kettle更加易用和轻量,底层基于duckdb,速度超快,是一款可以让普通用户快速使用的数据处理工具,基于插件机制, 可以快速配置各种数据处理工作流,让数据处理工作流就像搭积木一样,简单易用。

特性

  • 基于python的插件机制,目前提供70+插件,同时支持自定义插件
  • 基于json的flow任务,支持自定义任务配置
  • 底层基于duckdb的内存数据库,支持sql脚本和json配置,支持亿级别的数据进行join查询,并且毫秒出结果

示例

img1.png

img2.png

安装依赖

pip install NiceFlow

测试案例

  • plugin_test.py 测试插件功能
  • flow_test.py 测试flow功能

cli使用

安装
pip install NiceFlow
exec模式执行flow任务
NiceFlow exec --path csv_input_ck_output.json

# 1.json中的参数可以使用param参数传入
NiceFlow exec --path 1.json --param '{"name":"test"}'
sql模式执行flow任务
# --sql_script,"sql脚本语句,支持多行"
# --sql_path,"sql脚本文件,支持多行,和sql_script二选一"
# --db_path, "输入duckdb数据库的路径,不存在则为内存模式[可选]"
# --res_path,"输入文件路径,该路径下的文件会被自动加载到db中[可选]"
# --function_path,"输入函数路径,该路径为python文件,可以作为数据库自定义函数使用[可选]"
NiceFlow sql --sql_path 1.sql \   
--res_path='C:/Users/xiaow/Desktop/22/test' \
--function_path='C:/Users/xiaow/Desktop/22/test/1.python' 

# sql语句
copy  (select f_print(d_date) from msd_2024 where d_date = '2023-12-31') to  'C:/Users/xiaow/Desktop/22/test/2.csv';
select f_print(d_date) from msd_2024 where d_date = '2023-12-31';


# python文件中定义函数
def f_print(x:str)->str:
    return x+"___";

代码使用

  • faker_input_console.json
{
  "flow": {
    "name": "",
    "uid": "",
    "param": {
    }
  },
  "nodes": [
    {
      "id": "FakerInput",
      "name": "read1",
      "type": "input",
      "properties": {
        "rows": 10000,
        "columns": [
          "name",
          "address",
          "city",
          "street_address",
          "date_of_birth",
          "phone_number"
        ],
        "randoms": [
          {
            "key": "sex",
            "values": [
              "男",
              "女",
              "未知"
            ]
          }
        ]
      }
    },
    {
      "id": "Console",
      "name": "write1",
      "type": "output",
      "properties": {
        "row": 100
      }
    }
  ],
  "edges": [
    {
      "startId": "read1",
      "endId": "write1"
    }
  ]
}
import os
from NiceFlow.core.flow import Flow
from NiceFlow.core.manager import FlowManager


def getProjectPath() -> str:
    # 获取当前文件的绝对路径
    current_file = os.path.abspath(__file__)
    # 获取当前文件所在目录的绝对路径
    current_directory = os.path.dirname(current_file)
    # 获取当前项目的根目录
    project_root = os.path.dirname(os.path.dirname(current_directory))
    return project_root


def test_base():
    path = getProjectPath() + "/doc/faker_input_console.json"
    myFlow: Flow = FlowManager.read(path)
    myFlow.run()


if __name__ == '__main__':
    test_base()

架构图

插件使用说明文档

输入

插件 功能 完成情况 文档
Starter 启动器 启动器
CsvInput 读取CSV数据 完成 CSV输入
FakerInput 假数据生成 完成 假数据生成
ParquetInput 读取Parquet数据 完成 Parquet输入
ExcelInput 读取Excel数据 完成 Excel输入
MySQLInput 读取MySQL数据 完成 MySQL输入
DuckDBInput 读取DuckDB数据 完成 DuckDB输入
AkshareInput 读取金融股票等财经数据 完成 Akshare输入
ClickHouseInput 读取ClickHouse数据 完成 ClickHouse输入
OdpsInput 读取MaxCompute数据 完成 Odps输入
ESInput 读取Elasticsearch数据 完成 Elasticsearch输入
MongoDBInput 读取MongoDB数据 完成 MongoDB输入
MqttInput 从Mqtt Broker读取数据 完成 MqttInput输入

转换

插件 功能 完成情况 文档
Filter 过滤器 完成 过滤器
Mapping 映射器 完成 映射器
For 遍历器 完成 遍历器
IF 条件判断器 完成 条件判断器
Join 连接器 完成 连接器
Mask 脱敏器 完成 脱敏器
Pivot 透视表 完成 透视表
Printer 打印器 完成 打印器
RegularExtract 正则提取器 正则提取器
Rename 重命名器 完成 重命名器
Samples 采样器 完成 采样器
Sort 排序器 完成 排序器
Sql SQL转换器 完成 SQL转换器
Switch 条件转换器 条件转换器
Unpivot 取消透视表 完成 取消透视表
Variable 变量转换器 完成 变量转换器
While 循环转换器 完成 循环转换器
Duplicate 去重器 完成 去重器
Console 控制台打印 完成 控制台输出
SplitFieldToRows 列拆分为多行 完成 列转行
Function 动态函数 完成 动态函数

输出

插件 功能 完成情况 文档
FileOutput 文件输出 完成 文件输出
KafkaOutput Kafka输出 完成 Kafka输出
SqlServerOutput SQLServer输出 完成 SQLServer输出
S3Output S3输出 完成 S3输出
PulsarOutput Pulsar输出 完成 Pulsar输出
PostgresOutput Postgres输出 完成 Postgres输出
ParquetOutput Parquet输出 完成 Parquet输出
PaimonOutput Paimon输出 完成 Paimon输出
OracleOutput Oracle输出 完成 Oracle输出
OdpsOutput MaxCompute输出 完成 MaxCompute输出
MySQLOutput MySQL输出 完成 MySQL输出
MqttOutput MQTT输出
MongoDBOutput MongoDB输出 完成 MongoDB输出
MarkdownOutput Markdown输出 完成 Markdown输出
HttpOutput Http输出
HiveOutput Hive输出
HdfsOutput HDFS输出
FtpOutput FTP输出
ExcelOutput Excel输出 完成 Excel输出
EsOutput Elasticsearch输出
DuckOutput DuckDB输出
CsvOutput CSV输出 完成 CSV输出
CosOutput COS输出
ClickHouseOutput ClickHouse输出

自定义脚本插件[PyScript]

  • 系统内置PyScript插件,该插件没有固定内容,可以自定义脚本,如下所示
{
  "flow": {
    "name": "",
    "uid": "",
    "param": {

    } },
  "nodes": [
    {
      "id": "FakerInput",
      "name": "read1",
      "type": "input",
      "properties": {
        "rows":10000,
        "columns": ["name","address","city","street_address","date_of_birth","phone_number"],
        "randoms":[
          {"key":"sex","values":["男","女","未知"]}
        ]
      }
    },
    {
      "id": "PyScript",
      "name": "write1",
      "type": "output",
      "properties": {
        "content": "import json\n\nfrom NiceFlow.core.flow import Flow\nfrom NiceFlow.core.plugin import IPlugin\n\n\nclass PyScript(IPlugin):\n\n    def init(self, param: json, flow: Flow):\n        super(PyScript, self).init(param, flow)\n\n    def execute(self):\n        super(PyScript, self).execute()\n        row = int(self.param.get(\"row\",10))\n\n        # 获取上一步结果\n        pre_node = self.pre_nodes[0]\n        PyScript_df = self._pre_result_dict[pre_node.name]\n        PyScript_df.limit(row).show()\n        self.set_result(PyScript_df)\n\n\n    def to_json(self):\n        super(PyScript, self).to_json()\n\n    def close(self):\n        super(PyScript, self).close()"
      }
    }
  ],
  "edges": [
    {
      "startId": "read1",
      "endId": "write1"
    }
  ]
}

数据实战

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

NiceFlow-0.0.5-py3-none-any.whl (101.5 kB view details)

Uploaded Python 3

File details

Details for the file NiceFlow-0.0.5-py3-none-any.whl.

File metadata

  • Download URL: NiceFlow-0.0.5-py3-none-any.whl
  • Upload date:
  • Size: 101.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.12

File hashes

Hashes for NiceFlow-0.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 4c691c8d501e8e049a888c41a26800761ca03039f5f43323bcf4ef91862ea3cf
MD5 1b7b4ade24a209c6b8c8e3530ba68e51
BLAKE2b-256 543c07aecd59a994d5cc591fddae8c66bdf523d73bc697b715bb38afaaefc607

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page