Skip to main content

streaming_infer

Project description

流式推理框架

目录结构说明

  • config: 作业级别配置相关的解析代码
  • pipeline:任务流水线相关的代码,包括线程的创建、推理客户端封装、推理请求metrics生成等
  • products:实际的各个推理场景的代码
    • example:这里是一些示例代码
    • soe: 仿真优化推理任务
  • streamz_ext: 对streamz的扩展,实现一些额外的source、窗口、sink算子
  • tasks: 任务级别的配置管理、任务发现
  • supervisor.py: 管理者,用于创建WorkerActor和维护WorkerActor中的流水线
  • worker.py: 实际任务流水线运行的一个容器,一个Worker可以包含多个任务流水线
  • main.py: ray作业提交的driver入口

开发步骤

对于一个新的推理场景,开发流水线的步骤如下:

  1. 在products目录下创建一个对应产品的目录(以example为例)),并且在products/init.py中加上import. 后续步骤创建的类的文件都放在此目录中, 且必须在example/init.py中import, 否则可能会初始化失败。

    from . import example
    
  2. 开发一个InferTaskConfig的子类,一般放在task.py。 其中包含一个任务需要的配置信息。包括: 1) 数据源信息:jetstream的subject、消费者等信息 2)输出目标信息:可能是数据库、jetstream

  3. 开发一个TaskManager的子类,从数据库或其他来源获取上述InferTaskConfig的全部对象。

  4. 开发一个InferPipeline的子类,用于实现推理任务的流水线。 一般只需要实现其中的__exec__方法。exec 目前没有标准化的模板,主要包括以下几步: 1) source:数据源。一般都是jetstream,已经内置好。 2) 预处理:将数据源的消息转换成内部对象。 需要自行开发,一般就是一个map算子。 3)窗口算子: 如果有窗口需求,可以增加一个窗口算子,包含时间窗口、计数窗口等。 窗口算子都已经内置实现,只需要引用即可。 4) 推理预处理:将一个窗口中的元素集合转换成推理服务的请求。 定制开发,要看推理服务的请求需要如何组织。 5) 调用推理服务。 这一般是一个map算子,是个通用的算子。 6) 写入目标。 根据具体需求引用sink算子将数据发送给下游。

example的运行方法

在本地ray中运行

# 在streaming-infer目录中执行以下命令, 将在本地启动
export RAY_PARAMETERS='{"task_type": "example"}'; python3.9 -m streaming_infer.main

其中任务的配置是在example/task_manager.py中写死的,可以用nats命令向写死的nats发送消息

cur_sec=`date '+%s'`; nats -s "ns://10.31.76.16:8221" pub js-test-7 --count=3 --sleep 1s "{\"time\": $cur_sec}"

在本地运行单个任务

这种情况下不会启动ray, 因为pipeline的运行本身是不依赖ray的。 但是目前只运行单个线程,这种情况下可以添加断点来调试。

RAY_PARAMETERS='{"task_type": "example"}'; python3.9 -m streaming_infer.local_test --id ex_1

发送消息的方法同上。

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streaming_infer-0.1.1.tar.gz (39.5 kB view details)

Uploaded Source

Built Distribution

streaming_infer-0.1.1-py3-none-any.whl (55.9 kB view details)

Uploaded Python 3

File details

Details for the file streaming_infer-0.1.1.tar.gz.

File metadata

  • Download URL: streaming_infer-0.1.1.tar.gz
  • Upload date:
  • Size: 39.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.13

File hashes

Hashes for streaming_infer-0.1.1.tar.gz
Algorithm Hash digest
SHA256 434c971daa5ab3a5f741019fd4df8f96c27e0a8e953c34baa816ea582eb9afef
MD5 c9633418b0989c57736bce6f8849e960
BLAKE2b-256 0ac6c8b6fef0b620a13675ef95647a9dd93bb8af1af78e3373f77751a95266f1

See more details on using hashes here.

File details

Details for the file streaming_infer-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for streaming_infer-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 8dfeb58b2b5c6e2fade85d1f77d73d1cdfa93e1088920dc2b20d1215bf349510
MD5 cadd133318ca2a5c4fd86154ffd8a8d8
BLAKE2b-256 2a5575859680301b7f267495c5491ec16b2f9775a90168149a9cf25474cae8a0

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page