Skip to main content

DataFushion的SparkPy算法插件

Project description

DataFushion_Plugins_SparkPy说明

1.简介

针对Spark的Python版本算法(pyspark)在DataFushion平台使用所给出的插件,主要用于规范化算法的输入输出

2.使用

  • Step1:引入datafushion_spark包中的operation模块
  • Step2:使用资源管理器进行数据拆解处理,并在其中实现自己需要实现的业务算法逻辑
from datafushion_spark import operation, HandleDataFrameSet, HandleInputDataStruct, DataFrame, SparkSession, \
    FileExtractFormatEnum


if __name__ == '__main__':
    print("start")
    with operation(app_name="test", master="local") as destruction:  # type: HandleDataFrameSet
        input_data_struct_list = destruction.input_data_struct_list
        param_map = destruction.param_map
        spark = destruction.spark  # type:SparkSession

        data_result = None  # type DataFrame

        # 算法逻辑部分
        for index, input_data_struct in enumerate(input_data_struct_list):  # type: HandleInputDataStruct
            # 注意:此时的DataFrame的列名已经是映射过的列名,可以直接使用
            data_list = input_data_struct.data_list  # type: DataFrame
            if index == 0:
                data_result = data_list.groupby("status").agg({
                    "power": "mean"
                }).withColumnRenamed("avg(power)", "powerAvg")
            else:
                data_result = data_result.union(data_list.groupby("status").agg({
                    "power": "mean"
                }).withColumnRenamed("avg(power)", "powerAvg"))

        # 保存最终结果
        destruction.data_result = data_result
        # 保存存储的格式,需要与打包的配置文件对应
        destruction.output_type = FileExtractFormatEnum.JSON.value

注意:


destruction为解构的HandleDataFrameSet实体类


input_data_struct_list中包含了输入数据的封装,其类型为List

其元素为HandleInputDataStruct类,包含的属性为file_type,file_path,file_input_mapping,data_list

算法需要使用的是file_input_mapping和data_list

data_list是输入数据的DataFrame

file_input_mapping为输入数据字段的映射


param_map为算法的参数字典


在对数据进行业务算法处理完成后,需要将拆解的destruction中的data_result属性赋值为业务算法的最终数据结果


在对数据进行业务算法处理完成后,需要将拆解的destruction中的output_type属性赋值为业务算法需要输出的文件格式FileExtractFormatEnum.JSON.value中提供了JSON,CSV,PARQUET,GENERAL四类格式


目前PARQUET类的输出格式只支持作为Spark类型的算法积木中的输入

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

datafushion_plugins_sparkpy-1.0.1.tar.gz (4.5 kB view hashes)

Uploaded Source

Built Distribution

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page