Skip to main content

DataFushion的SparkPy算法插件

Project description

DataFushion_Plugins_SparkPy说明

1.简介

针对Spark的Python版本算法(pyspark)在DataFushion平台使用所给出的插件,主要用于规范化算法的输入输出

2.通常算法使用

  • Step1:引入datafushion_spark包中的operation模块
  • Step2:使用资源管理器进行数据拆解处理,并在其中实现自己需要实现的业务算法逻辑
from datafushion_spark import operation, HandleDataFrameSet, HandleInputDataStruct, DataFrame, SparkSession, \
    FileExtractFormatEnum


if __name__ == '__main__':
    with operation(app_name="AvgWindPowerByStatus", master="local") as destruction:  # type:HandleDataFrameSet
        input_data_struct_list = destruction.input_data_struct_list
        mapping_flags = destruction.mapping_flags
        param_map = destruction.param_map
        spark = destruction.spark  # type:SparkSession

        data_result = None  # type DataFrame

        # 算法逻辑部分
        for index, input_data_struct in enumerate(input_data_struct_list):  # type: HandleInputDataStruct
            # 注意:此时的DataFrame的列名已经是映射过的列名,可以直接使用
            data_list = input_data_struct.data_list  # type: DataFrame
            data_list.show()
            if index == 0:
                data_result = data_list.groupby("status").agg({
                    "power": "mean"
                }).withColumnRenamed("avg(power)", "powerAvg")
            else:
                data_result = data_result.union(data_list.groupby("status").agg({
                    "power": "mean"
                }).withColumnRenamed("avg(power)", "powerAvg"))

        # 保存最终结果
        destruction.data_result = data_result
        # 保存存储的格式,需要与打包的配置文件对应
        destruction.output_type = FileExtractFormatEnum.JSON.value

注意:


如果是Windows开发的话需要在脚本文件前加入,findspark请自行下载,没有在包中做依赖管理

import findspark


findspark.init()

destruction为解构的HandleDataFrameSet实体类


input_data_struct_list中包含了输入数据的封装,其类型为List

其元素为HandleInputDataStruct类,包含的属性为file_type,file_path,file_input_mapping,data_list

算法需要使用的是file_input_mapping和data_list

data_list是输入数据的DataFrame

file_input_mapping为输入数据字段的映射

spark为sparkSession对象

mapping_flags为映射标识字典,key为每个单独输入的映射标识,key为输入映射


param_map为算法的参数字典


在对数据进行业务算法处理完成后,需要将拆解的destruction中的data_result属性赋值为业务算法的最终数据结果


在对数据进行业务算法处理完成后,需要将拆解的destruction中的output_type属性赋值为业务算法需要输出的文件格式FileExtractFormatEnum.JSON.value中提供了JSON,CSV,PARQUET,GENERAL四类格式


目前PARQUET类的输出格式只支持作为Spark类型的算法积木中的输入

3.模型训练算法使用

  • Step1:引入datafushion_spark包中的operation模块

  • Step2:使用资源管理器进行数据拆解处理,并在其中实现自己需要实现的业务算法逻辑

    此处以鸢尾花训练为例进行逻辑回归模型训练

    from datafushion_spark import operation, HandleDataFrameSet, HandleInputDataStruct, DataFrame, SparkSession, \
        FileExtractFormatEnum, TrainFiledEnum, TrainModelResult
    from pyspark.ml.feature import VectorAssembler
    from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
    from pyspark.ml import Pipeline, PipelineModel
    
    
    if __name__ == '__main__':
        with operation(app_name="IrisClassify", mapping_data=False,
                       master="local") as destruction:  # type:HandleDataFrameSet
            input_data_struct_list = destruction.input_data_struct_list
            mapping_flags = destruction.mapping_flags  # type: dict
            param_map = destruction.param_map
            spark = destruction.spark  # type:SparkSession
    
            algo_iter = param_map['iter']
            algo_reg = param_map['reg']
            algo_elastic_net = param_map['elasticNet']
            mapping_list = []
            for k, v in mapping_flags.items():
                mapping_list.append(v)
            data_result = None
    
            # 算法逻辑部分
            for index, input_data_struct in enumerate(input_data_struct_list):  # type: HandleInputDataStruct
                # 注意:此时的DataFrame的列名已经是映射过的列名,可以直接使用
                data = input_data_struct.data_list  # type: DataFrame
                mapping = mapping_list[index]
                feature_fields = mapping[TrainFiledEnum.FEATURE.value]
                label_field = mapping[TrainFiledEnum.LABEL.value][0]
                train_data = data.withColumnRenamed(label_field, TrainFiledEnum.LABEL.value)
                featureAssembler = VectorAssembler().setInputCols(feature_fields).setOutputCol('features')
                logistic_regression = LogisticRegression().setMaxIter(algo_iter).setRegParam(algo_reg).setElasticNetParam(
                        algo_elastic_net)
                pipeline_model: PipelineModel = Pipeline().setStages([featureAssembler, logistic_regression]).fit(
                        train_data)
                # 将data_result实例化为一个TrainModelResult对象
                data_result = TrainModelResult(train_data=train_data, pipeline_model=pipeline_model)
                lg_model: LogisticRegressionModel = pipeline_model.stages[1]
                for item in lg_model.summary.objectiveHistory:
                    print(item)
    
                # 保存最终结果
            destruction.data_result = data_result
            # 保存存储的格式,需要与打包的配置文件对应
            destruction.output_type = FileExtractFormatEnum.MODEL.value
    

    注意:


    如果需要训练模型的话,一般情况下

    1.将operation中设置为mapping_data=False,因为一般我们需要自己根据标识来确定怎样处理特征数据

    2.将data_result需要设置为TrainModelResult实例,其中TrainModelResult包括的数据有train_data和pipeline_model,即训练数据和管道模型

    3.最后需要设置解构回调对象的output_type为model格式destruction.output_type = FileExtractFormatEnum.MODEL.value

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

datafushion_plugins_sparkpy-1.0.7.tar.gz (6.3 kB view hashes)

Uploaded Source

Built Distribution

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page