Alibaba Cloud PAI Python SDK
Project description
Alibaba PAI Python SDK
AliPAI Python SDK is provided by PAI team of Alibaba computing platform. It provide convenience for user to access PAI service in Alibaba Cloud.
PAI SDK currently support PAIFlow(ML Pipeline Service of PAI) service, other PAI service, such as EAS(Elastic Algorithm Service), Blade will be included soon.
Installation
To install the PAI sdk, use the below command in terminal.
python -m pip install alipai
Usage
Setup default PAI session
Before use PAI service via SDK, developer should initialize the default PAI session by provide credential and region_id of service.
Pipeline service of PAI is currently provide in
cn-shanghai
region only.
from pai.core.session import setup_default_session
session = setup_default_session(access_key_id="your_access_key", access_key_secret="your_access_secret", region_id="your_region_id")
Access Pipeline Service
Use PipelineTemplate
PipelineTemplate instance include the definition of "Workflow" use in PAI pipeline service. It could be fetch from remote PAI service or construct from local Pipeline/Component.
Saved pipeline template has unique pipeline_id
which is generated by pipeline service. Remote pipeline template could be fetch using identifier-provider-version or pipeline_id.
PAI provide a list of public pipeline template which could be use as workflow template to run or a step to build new pipeline, they are accessible by specific the provider as pai.common.ProviderAlibabaPAI
in PipelineTemplate.list
.
from pai.pipeline import PipelineTemplate
from pai.common import ProviderAlibabaPAI
# search PipelineTemplate which provide by `PAI` and include `xflow` in identifier.
template = next(PipelineTemplate.list(identifie="xflow", provider=ProviderAlibabaPAI))
# view template inputs/outputs.
template
template.inputs
template.outputs
PipelineTemplate is runnable with required arguments, user is able to inspect the detail workflow DAG, execution log and outputs of the pipeline by visit the job detail url printed in console.
from pai.common import ProviderAlibabaPAI
from pai.pipeline import PipelineTemplate
# Get specific template by Identifier-Provider-Version
template = PipelineTemplate.get_by_identifier(identifier="split-xflow-maxCompute",
provider=ProviderAlibabaPAI, version="v1")
xflow_execution = {
"odpsInfoFile": "/share/base/odpsInfo.ini",
"endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
"logViewHost": "http://logview.odps.aliyun.com",
"odpsProject": "your_odps_project",
}
# run pipeline use provide arguments.
job = template.run(job_name="demo-split-job", arguments={
"inputArtifact": "odps://pai_online_project/tables/mnist_data",
"execution": xflow_execution, "fraction": 0.7}, wait=True)
job.get_outputs()
Build runnable and reusable pipeline
PAI Pipeline Service support nested user-defined workflow. Build composite pipeline is runnable by provided required arguments. Saved pipeline template (Local pipeline template as step will be support soon.) is able to use as step to build a new pipeline.
def create_composite_pipeline():
# Definite the inputs parameters in pipeline
execution_input = PipelineParameter(name="execution", typ=dict)
cols_to_double_input = PipelineParameter(name="cols_to_double")
table_input = PipelineArtifact(name="data_source", metadata=ArtifactMetadata(
data_type=ArtifactDataType.DataSet,
location_type=ArtifactLocationType.MaxComputeTable))
# Pipeline step from remote PAI service.
type_transform_step = PipelineStep(
identifier="type-transform-xflow-maxCompute", provider=ProviderAlibabaPAI,
version="v1", name="typeTransform", inputs={
"inputArtifact": table_input, "execution": execution_input,
"outputTable": gen_temp_table(), "cols_to_double": cols_to_double_input,
}
)
split_template = PipelineTemplate.get_by_identifier(identifier="split-xflow-maxCompute",
provider=ProviderAlibabaPAI, version="v1")
split_step = split_template.as_step(inputs={"inputArtifact": type_transform_step.outputs[0],
"execution": execution_input, "output1TableName": gen_temp_table(),
"fraction": 0.5, "output2TableName": gen_temp_table(),
})
# Initialize the pipeline instance by specific the steps and outputs.
p = Pipeline(
steps=[split_step],
outputs=split_step.outputs[:2],
)
return p
p = create_composite_pipeline()
# Run pipeline with required arguments.
pipeline_run = p.run(job_name="demo-composite-pipeline-run", arguments={
"execution": xflow_execution,
"cols_to_double": "time,hour,pm2,pm10,so2,co,no2",
"data_source": "odps://pai_online_project/tables/wumai_data",
}, wait=True)
# Save Pipeline
p.save(identifier="demo-composite-pipeline", version="v1")
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Hashes for alipai-0.1.6.post1-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | bb2ba93a97c7851c6fb8819319037ce5acee4c9ac18ae8ebe06f404121ee8613 |
|
MD5 | 89c56f0c7f3b2bd1436c2fced8915724 |
|
BLAKE2b-256 | e309bd00f211a60d0c32f78362723f161bd9eac5a7efea707527734df03ed13b |