Skip to main content

streaming_infer

Project description

流式推理框架

目录结构说明

  • config: 作业级别配置相关的解析代码
  • pipeline:任务流水线相关的代码,包括线程的创建、推理客户端封装、推理请求metrics生成等
  • products:实际的各个推理场景的代码
    • example:这里是一些示例代码
    • soe: 仿真优化推理任务
  • streamz_ext: 对streamz的扩展,实现一些额外的source、窗口、sink算子
  • tasks: 任务级别的配置管理、任务发现
  • supervisor.py: 管理者,用于创建WorkerActor和维护WorkerActor中的流水线
  • worker.py: 实际任务流水线运行的一个容器,一个Worker可以包含多个任务流水线
  • main.py: ray作业提交的driver入口

开发步骤

对于一个新的推理场景,开发流水线的步骤如下:

  1. 在products目录下创建一个对应产品的目录(以example为例)),并且在products/init.py中加上import. 后续步骤创建的类的文件都放在此目录中, 且必须在example/init.py中import, 否则可能会初始化失败。

    from . import example
    
  2. 开发一个InferTaskConfig的子类,一般放在task.py。 其中包含一个任务需要的配置信息。包括: 1) 数据源信息:jetstream的subject、消费者等信息 2)输出目标信息:可能是数据库、jetstream

  3. 开发一个TaskManager的子类,从数据库或其他来源获取上述InferTaskConfig的全部对象。

  4. 开发一个InferPipeline的子类,用于实现推理任务的流水线。 一般只需要实现其中的__exec__方法。exec 目前没有标准化的模板,主要包括以下几步: 1) source:数据源。一般都是jetstream,已经内置好。 2) 预处理:将数据源的消息转换成内部对象。 需要自行开发,一般就是一个map算子。 3)窗口算子: 如果有窗口需求,可以增加一个窗口算子,包含时间窗口、计数窗口等。 窗口算子都已经内置实现,只需要引用即可。 4) 推理预处理:将一个窗口中的元素集合转换成推理服务的请求。 定制开发,要看推理服务的请求需要如何组织。 5) 调用推理服务。 这一般是一个map算子,是个通用的算子。 6) 写入目标。 根据具体需求引用sink算子将数据发送给下游。

example的运行方法

在本地ray中运行

# 在streaming-infer目录中执行以下命令, 将在本地启动
export RAY_PARAMETERS='{"task_type": "example"}'; python3.9 -m streaming_infer.main

其中任务的配置是在example/task_manager.py中写死的,可以用nats命令向写死的nats发送消息

cur_sec=`date '+%s'`; nats -s "ns://10.31.76.16:8221" pub js-test-7 --count=3 --sleep 1s "{\"time\": $cur_sec}"

在本地运行单个任务

这种情况下不会启动ray, 因为pipeline的运行本身是不依赖ray的。 但是目前只运行单个线程,这种情况下可以添加断点来调试。

RAY_PARAMETERS='{"task_type": "example"}'; python3.9 -m streaming_infer.local_test --id ex_1

发送消息的方法同上。

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streaming_infer-0.1.1.tar.gz (39.5 kB view hashes)

Uploaded Source

Built Distribution

streaming_infer-0.1.1-py3-none-any.whl (55.9 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page