Skip to main content

No project description provided

Project description

Python Metaai Pipeline Components SDK

快速开始

安装python-metaai

pip install -U python-metaai

使用说明

kubeflow pipeline component

编写组件代码demo

from typing import *
from metaai.pipeline.components import component,upload_model,image_datasets
from metaai.pipeline import pipeline
from metaai.pipeline.components.constants import ImagePullPolicy

from kfp.components import InputPath, OutputPath


@component(
    # 如果您需要安装部分包,可以在这里进行指定,当然我们建议只安装体量较小的包,
    # 如果您有自己独有的个性化包,请按照镜像编译手册来构建您的镜像
    packages_to_install=["dill==0.3.5.1","requests==2.28.1"],
    # enable_cache 指定当前的op是否会使用缓存。
    # 如果没有改变我们建议打开缓存开关,默认为true
    enable_cache=False,
)
def training(datasets_path: InputPath(str), training_config: Dict, model_path: OutputPath(str)):
    import json
    import requests
    # dill 是pickle的再封装升级版本。可以dump 闭包属性
    import dill
    
    # 因为我需要dump当前这整个类 所以我引入了dill
    class MM:
        
        def fit(self,x):
            setattr(self,"X",x)
        
        def predict(self):
            return {"prediction":self.X }
    
    with open(datasets_path,"r") as fp:
        datasets = json.load(fp)
   
    items = datasets.get("items")
    if not items and not isinstance(items,list):
        raise ValueError("datasets input error! now datasets is {datasets}")
    
    for item in items:
        image_data = requests.get(item["image_url"]).content
        print(len(image_data))
        label_data = requests.get(item["label_url"]).content
        print(len(label_data))
    
    with open(model_path,"wb") as model_fp:
        dill.dump({"model":MM(),"training_config":training_config},model_fp)

@pipeline(
    name="pipeline-test",
    description="python-metaai test pipeline",
    # 如果你需要让每次这个容器运行都去重新拉取新的镜像,请将他为always
    # 如:当您的基础镜像总是发生改变时,但是pipeline的代码却没有变动的时候。可以指定为always
    image_pull_policy=ImagePullPolicy.Always.value,
)
# 你的pipeline将会接受的参数。
# 通过指定默认值,可以在ui界面上渲染出默认的key-value
def main(
    dataset: Dict={"dataFiles":[],"labelFiles":[]},
    train: Dict={},
    model: Dict={"modelName":"model-test"},
):
    datasets_op = image_datasets(dataset, enable_cache=False)

    train_op = training(datasets=datasets_op.outputs["datasets"], training_config=train)
    # train_op.set_gpu_limit(1)
    # 被outputPath 或 InputPath等修饰的参数,会将后缀'_file'或'_path'进行裁剪,所以这类里直接使用outputs["model"]
    # 为避免歧义,python-metaai 中都不直接使用return来在op间传递数据。
    upload_model(train_op.outputs["model"], model, {"lalala": "lalala"})


if __name__ == "__main__":
    my_first_pipeline = main()
    # to yaml
    my_first_pipeline.to_yaml("zt-sdk-test")

编写预测镜像

目录结构需要遵循以下目录树

. project_dir 
├── my_module # 您的模块目录
│   ├── __init__.py
│   ├── main.py
│   └── pkgxxx
├── requirements.txt  # 这个文件需要和serving.py同级,并且必须要存在,且可以为空
└── serving.py        # 这个文件必须要存在,且和您的模块目录同级

serving代码demo,就demo pipeline生成的model

import dill
from typing import *

# 从metaai包中导入这些
from metaai.serving.models.commons.model import Model
from metaai.serving.models.commons.model_server import ModelServer
from metaai.serving.utils import success_response, failed_response_with_break



# 这里的导包需要注意,请不要使用相对路径导包
# 请直接从您的module顶层模块进行导入。
from my_module.main import preprocess_func

class MyModelService(Model):
    
    # 如需重写 init 请super调用父类的init构造方法 保证基础实例子变量存在
    def __init__(self, name: str):
        super().__init__(name)
        
        
    def _validate(self, request) -> Any:
        # 在preprocess之后调用
        # 这函数可以输入参数做自定义的校验。
        # 比如您需要对这个request有相应的要求
        if "balabala" not in request:
            failed_response_with_break(message="expected key 'balabala' in request json!")
        return super()._validate(request)
    
    # 您可以使用async修饰当前函数, 也可以使用普通函数方法
    async def _preprocess(self, request: Any) -> Any:
        # 数据前处理的函数。
        # 使用后会调用_validate
        resp = await preprocess_func(request)
        return resp

    
    def load(self) -> bool:
        # 使用适合模型的load方式
        print("loading.....")
        print(self.model_path)
        
        with open(self.model_path,"rb") as fp:
            pkl_res = dill.load(fp)
            
            print(pkl_res)
            self._model = pkl_res["model"]

        self.ready = True
        return self.ready
    
    
    def predict(self, request: Any):
        print(request)
        # 数据预测函数
        print("在predict")  
        self._model.fit(request)
        return self._model.predict()


    def _postprocess(self, response: Any) -> Dict:
        # 数据后处理的函数。在predict后调用
        return success_response(super()._postprocess(response))    


# 启动服务需要 请务必写上
if __name__ == '__main__':
    
    model = MyModelService(name="custom")
    
    ModelServer.start(model)
        

使用命令编译docker 镜像并推送

# activate "虚拟环境"
metaai --help

cd "你的工作目录"
metaai serving-build -t {image_name}:{image_tag} -s .
#  docker images 可以查看到刚刚编译出来的镜像
#  docker login 进行登录
#  docker push {image_name}:{image_tag} 推送镜像到远程

本地调试和使用

  • 本地代码运行

    # cd "你的工作目录"
    export MODEL_PATH=/home/{user1}/models/model.pkl
    python3 serving.py
    

    容器运行

    # 运行使用命令编译出来的 docker metaai镜像
    # 比如说 image full name 是 metaai-serving:0.0.1
    
    docker run -it --name metaai-serving  -v /home/{user1}/models/model.pkl:/mnt/models/model.pkl \ 
    -p 8089:8089 -e MODEL_PATH=/mnt/models/model.pkl  metaai-serving:0.0.1
    
  • Output

    # OUTPUT
    loading.....
    /home/zt/Workspace/demo-serving/model.pkl
    {'model': <class '__main__.tt.<locals>.MM'>}
    INFO:     Started server process [120675]
    INFO:     Waiting for application startup.
    INFO:     Application startup complete.
    INFO:     Uvicorn running on http://0.0.0.0:8089 (Press CTRL+C to quit)
    
    
    

调用方式

  • 请求predict/ 接口
curl --request POST \
  --url http://127.0.0.1:8089/predict/ \
  --header 'Content-Type: application/json' \
  --data '{
	"body":"123"
}' -vv
  •  Output
# ouput 
Note: Unnecessary use of -X or --request, POST is already inferred.
*   Trying 127.0.0.1:8089...
* TCP_NODELAY set
* Connected to 127.0.0.1 (127.0.0.1) port 8089 (#0)
> POST /predict/ HTTP/1.1
> Host: 127.0.0.1:8089
> User-Agent: curl/7.68.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 17
> 
* upload completely sent off: 17 out of 17 bytes
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< date: Mon, 01 Aug 2022 10:39:04 GMT
< server: uvicorn
< content-length: 29
< content-type: application/json
< 
* Connection #0 to host 127.0.0.1 left intact

{"prediction":{"body":"123"}}

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

python-metaai-0.1.0.dev6.tar.gz (18.6 kB view details)

Uploaded Source

Built Distribution

python_metaai-0.1.0.dev6-py3-none-any.whl (22.8 kB view details)

Uploaded Python 3

File details

Details for the file python-metaai-0.1.0.dev6.tar.gz.

File metadata

  • Download URL: python-metaai-0.1.0.dev6.tar.gz
  • Upload date:
  • Size: 18.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.1.3 CPython/3.8.5

File hashes

Hashes for python-metaai-0.1.0.dev6.tar.gz
Algorithm Hash digest
SHA256 556b1ddba445162302cbf881e1b37572db1da1575def6b108b5f507dea7ddc52
MD5 daadacec0e3efe5b88658de415b3e6b6
BLAKE2b-256 5c2e9003db77b31f1cfbe4f2ea9afd66e72c4cef22cc32ad2762d3d20edbe6d9

See more details on using hashes here.

File details

Details for the file python_metaai-0.1.0.dev6-py3-none-any.whl.

File metadata

File hashes

Hashes for python_metaai-0.1.0.dev6-py3-none-any.whl
Algorithm Hash digest
SHA256 0837ae7ce048069961cf51233b87b419444d8e6ccf8be535cfe57aa8e5dd1163
MD5 a75036254794d8a6c606adbf1f882a87
BLAKE2b-256 7852614f74ba30b48050927e0635837e06ba62227df9a4dd8456a40396c681c4

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page