DataFushion的SparkPy算法插件
Project description
DataFushion_Plugins_SparkPy说明
1.简介
针对Spark的Python版本算法(pyspark)在DataFushion平台使用所给出的插件,主要用于规范化算法的输入输出
2.通常算法使用
- Step1:引入datafushion_spark包中的operation模块
- Step2:使用资源管理器进行数据拆解处理,并在其中实现自己需要实现的业务算法逻辑
from datafushion_spark import operation, HandleDataFrameSet, HandleInputDataStruct, DataFrame, SparkSession, \
FileExtractFormatEnum
if __name__ == '__main__':
with operation(app_name="AvgWindPowerByStatus", master="local") as destruction: # type:HandleDataFrameSet
input_data_struct_list = destruction.input_data_struct_list
mapping_flags = destruction.mapping_flags
param_map = destruction.param_map
spark = destruction.spark # type:SparkSession
data_result = None # type DataFrame
# 算法逻辑部分
for index, input_data_struct in enumerate(input_data_struct_list): # type: HandleInputDataStruct
# 注意:此时的DataFrame的列名已经是映射过的列名,可以直接使用
data_list = input_data_struct.data_list # type: DataFrame
data_list.show()
if index == 0:
data_result = data_list.groupby("status").agg({
"power": "mean"
}).withColumnRenamed("avg(power)", "powerAvg")
else:
data_result = data_result.union(data_list.groupby("status").agg({
"power": "mean"
}).withColumnRenamed("avg(power)", "powerAvg"))
# 保存最终结果
destruction.data_result = data_result
# 保存存储的格式,需要与打包的配置文件对应
destruction.output_type = FileExtractFormatEnum.JSON.value
注意:
如果是Windows开发的话需要在脚本文件前加入,findspark请自行下载,没有在包中做依赖管理
import findspark
findspark.init()
destruction为解构的HandleDataFrameSet实体类
input_data_struct_list中包含了输入数据的封装,其类型为List
其元素为HandleInputDataStruct类,包含的属性为file_type,file_path,file_input_mapping,data_list
算法需要使用的是file_input_mapping和data_list
data_list是输入数据的DataFrame
file_input_mapping为输入数据字段的映射
spark为sparkSession对象
mapping_flags为映射标识字典,key为每个单独输入的映射标识,key为输入映射
param_map为算法的参数字典
在对数据进行业务算法处理完成后,需要将拆解的destruction中的data_result属性赋值为业务算法的最终数据结果
在对数据进行业务算法处理完成后,需要将拆解的destruction中的output_type属性赋值为业务算法需要输出的文件格式FileExtractFormatEnum.JSON.value中提供了JSON,CSV,PARQUET,GENERAL四类格式
目前PARQUET类的输出格式只支持作为Spark类型的算法积木中的输入
3.模型训练算法使用
-
Step1:引入datafushion_spark包中的operation模块
-
Step2:使用资源管理器进行数据拆解处理,并在其中实现自己需要实现的业务算法逻辑
此处以鸢尾花训练为例进行逻辑回归模型训练
from datafushion_spark import operation, HandleDataFrameSet, HandleInputDataStruct, DataFrame, SparkSession, \ FileExtractFormatEnum, TrainFiledEnum, TrainModelResult from pyspark.ml.feature import VectorAssembler from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel from pyspark.ml import Pipeline, PipelineModel if __name__ == '__main__': with operation(app_name="IrisClassify", mapping_data=False, master="local") as destruction: # type:HandleDataFrameSet input_data_struct_list = destruction.input_data_struct_list mapping_flags = destruction.mapping_flags # type: dict param_map = destruction.param_map spark = destruction.spark # type:SparkSession algo_iter = param_map['iter'] algo_reg = param_map['reg'] algo_elastic_net = param_map['elasticNet'] mapping_list = [] for k, v in mapping_flags.items(): mapping_list.append(v) data_result = None # 算法逻辑部分 for index, input_data_struct in enumerate(input_data_struct_list): # type: HandleInputDataStruct # 注意:此时的DataFrame的列名已经是映射过的列名,可以直接使用 data = input_data_struct.data_list # type: DataFrame mapping = mapping_list[index] feature_fields = mapping[TrainFiledEnum.FEATURE.value] label_field = mapping[TrainFiledEnum.LABEL.value][0] train_data = data.withColumnRenamed(label_field, TrainFiledEnum.LABEL.value) featureAssembler = VectorAssembler().setInputCols(feature_fields).setOutputCol('features') logistic_regression = LogisticRegression().setMaxIter(algo_iter).setRegParam(algo_reg).setElasticNetParam( algo_elastic_net) pipeline_model: PipelineModel = Pipeline().setStages([featureAssembler, logistic_regression]).fit( train_data) # 将data_result实例化为一个TrainModelResult对象 data_result = TrainModelResult(train_data=train_data, pipeline_model=pipeline_model) lg_model: LogisticRegressionModel = pipeline_model.stages[1] for item in lg_model.summary.objectiveHistory: print(item) # 保存最终结果 destruction.data_result = data_result # 保存存储的格式,需要与打包的配置文件对应 destruction.output_type = FileExtractFormatEnum.MODEL.value
注意:
如果需要训练模型的话,一般情况下
1.将operation中设置为mapping_data=False,因为一般我们需要自己根据标识来确定怎样处理特征数据
2.将data_result需要设置为TrainModelResult实例,其中TrainModelResult包括的数据有train_data和pipeline_model,即训练数据和管道模型
3.最后需要设置解构回调对象的output_type为model格式
destruction.output_type = FileExtractFormatEnum.MODEL.value
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file datafushion_plugins_sparkpy-1.0.7.tar.gz.
File metadata
- Download URL: datafushion_plugins_sparkpy-1.0.7.tar.gz
- Upload date:
- Size: 6.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.45.0 CPython/3.7.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
568db029821b0956b6187127098096c54154de13f2085305365dc2282322b885
|
|
| MD5 |
0034d2f20909757be5d4a78c185837c0
|
|
| BLAKE2b-256 |
8723f15fd6a729dbf61acb1b77d6e27d182ad3f312d86d67c5a20656dac31d24
|
File details
Details for the file datafushion_plugins_sparkpy-1.0.7-py3-none-any.whl.
File metadata
- Download URL: datafushion_plugins_sparkpy-1.0.7-py3-none-any.whl
- Upload date:
- Size: 9.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.45.0 CPython/3.7.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6f43b9f3436017f9be8979d54182b01b0844fe2456074600b453591906f15dc7
|
|
| MD5 |
d3d2a6d835e957f9acd29bba52040b95
|
|
| BLAKE2b-256 |
ea65f2e185bc550e3963936ee04618730ce039efff7bff23dc09e6c1bd20434c
|