DataFushion的SparkPy算法插件
Project description
DataFushion_Plugins_SparkPy说明
1.简介
针对Spark的Python版本算法(pyspark)在DataFushion平台使用所给出的插件,主要用于规范化算法的输入输出
2.使用
- Step1:引入datafushion_spark包中的operation模块
- Step2:使用资源管理器进行数据拆解处理,并在其中实现自己需要实现的业务算法逻辑
from datafushion_spark import operation, HandleDataFrameSet, HandleInputDataStruct, DataFrame, SparkSession, \
FileExtractFormatEnum
if __name__ == '__main__':
print("start")
with operation(app_name="test", master="local") as destruction: # type: HandleDataFrameSet
input_data_struct_list = destruction.input_data_struct_list
param_map = destruction.param_map
spark = destruction.spark # type:SparkSession
data_result = None # type DataFrame
# 算法逻辑部分
for index, input_data_struct in enumerate(input_data_struct_list): # type: HandleInputDataStruct
# 注意:此时的DataFrame的列名已经是映射过的列名,可以直接使用
data_list = input_data_struct.data_list # type: DataFrame
if index == 0:
data_result = data_list.groupby("status").agg({
"power": "mean"
}).withColumnRenamed("avg(power)", "powerAvg")
else:
data_result = data_result.union(data_list.groupby("status").agg({
"power": "mean"
}).withColumnRenamed("avg(power)", "powerAvg"))
# 保存最终结果
destruction.data_result = data_result
# 保存存储的格式,需要与打包的配置文件对应
destruction.output_type = FileExtractFormatEnum.JSON.value
注意:
destruction为解构的HandleDataFrameSet
实体类
input_data_struct_list中包含了输入数据的封装,其类型为List
其元素为HandleInputDataStruct
类,包含的属性为file_type,file_path,file_input_mapping,data_list
算法需要使用的是file_input_mapping和data_list
data_list是输入数据的DataFrame
file_input_mapping为输入数据字段的映射
param_map为算法的参数字典
在对数据进行业务算法处理完成后,需要将拆解的destruction中的data_result属性赋值为业务算法的最终数据结果
在对数据进行业务算法处理完成后,需要将拆解的destruction中的output_type属性赋值为业务算法需要输出的文件格式FileExtractFormatEnum.JSON.value
中提供了JSON,CSV,PARQUET,GENERAL
四类格式
目前PARQUET
类的输出格式只支持作为Spark类型的算法积木中的输入
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for datafushion_plugins_sparkpy-1.0.4.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | f402aa4c8c10a1842944ccc6b005822795ee6a9bf0ba016d6735223c00584fd3 |
|
MD5 | fdc9e95d358dbfa2b3879b862f491326 |
|
BLAKE2b-256 | 3e6eb53e914634bc896da21b7b9526e0cd403f2540277422318f6c27fb33195a |
Close
Hashes for datafushion_plugins_sparkpy-1.0.4-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | aacd9de60b9c7c0cf25a7810469337abb758df9c4b83fc46926b493ddb6f04b9 |
|
MD5 | 87f603d1c7bdc019247500bec9a3ed3a |
|
BLAKE2b-256 | ccd1ec31e3732f218c12bce7eee2f4f13a2db00e0286cff4a0ecf9382a20a505 |