Skip to main content

Python pipeline with asyncio support

Project description

apipeline

PyPI

python --version >=3.10 with asyncio-task

Installation

  • local To install apipeline(pipeline-python):
git clone https://github.com/weedge/pipeline-py
# install
cd pipeline-py && pip install .
# develop install
cd pipeline-py && pip install -e .
  • install pipeline-py from pypi:
pip install apipeline

Design

see docs/design.md

image

系统指令frame

  • CancelFrame: 系统退出指令,用于系统接受退出信号,清理退出系统
  • ErrorFrame: 系统运行时错误指令
  • StopTaskFrame: 停止任务指令
  • StartInterruptionFrame: 中断指令,对于异步processor进行中断,直接切断异步buffer重启queue_frame; 如果有些模型内化了终端指令,发送给底层模型触发模型中断操作,或者停止流式输出
  • StopInterruptionFrame: 停止中断指令
  • MetricsFrame: 系统监控指标 (processor 运行时长; first token/chunk/byte time)

控制指令frame

  • StartFrame: 系统开始运行指令,会携带初始参数:是否中断,是否监控等参数
  • EndFrame: 结束运行指令,
  • SyncFrame: 并行同步pipeline需要用到,输出口等到SyncFrame才把buffer的结果输出
  • SyncNotifyFrame: 用于事件通知,广播

数据frame (主要是多模态数据 用于chatbot)

  • TextFrame: 文本
  • AudioRawFrame: 原始音频信息帧
  • ImageRawFrame: 原始图片信息帧

应用frame 主要是针对业务场景来定义,这里提供一个基类(结构)

序列化主要是 PB(需要定义IDL schema 数据规范) 和 JSON

Processor 分为

  • 同步 异步 processor 主要处理系统和控制层面的frame
  • 针对多模态数据聚合类的processor
  • 过滤processor 根据定义的过滤handler进行过滤处理
  • 日志processor 更具过滤条件打印日志,便于frame 追踪调试
  • 输入输出processor 对输入输出的frame进行分类处理,对应子类,或者组合类直接实现对应类型frame的处理

Pipeline 分为 串行,并行,并行同步

整体定义总括如图红字所示

pipeline

  • 串行同步处理:由同步 processor组成,upstream/downstream frame按链式顺序执行,每个pipeline有头和尾,分别是Source和Sinker, 用于和其他pipeline 的输入/输出 进行组装;
image
  • 串行异步处理(包含同步processor):由异步/同步 processor组成,异步processor在同步processor的基础上加入了upstream/downstream 异步队列buffer,以及对应异步处理upstream/downstream frame handler 按链式顺序执行;(如果不不需要等待下一步processor的结果,可以使用异步方式处理,直接将frame通过queue_frame的方式写入队列buffer中, 由异步processor中的handler来异步处理调用process_frame)
image
  • 并行异步处理:由多个串行的pipeline并行执行,彼此之间不需要同步;

    (注意:这里的并行指的流程上的并行,真正任务在运行时是并发工作)

image
  • 并行同步处理:由多个串行的pipeline并行执行,处理的upstream/downstream frame在出口处需要同步;同步实现使用up/down queue来缓存对应frame, 直到处理SyncFrame时,将queue中缓存的frame写入upstream/downstream;图中展示了整体流程
image

pipeline task runner

由多个pipeline (比如有以上三种类型的pipeline:串行pipeline, 并行pipeline, 并行同步pipeline)组装成一个DAG processor; pipeline Task 中定义了一个Source processor 来对接组装好的DAG processor, 在程序启动运行时,通过runner发送控制/系统(StartFrame / MetricsFrame)指令;比如在实时聊天场景中,用户发起会话连接, 启动运行 pipeline Task时,会根据PipelineParams初始参数初始化StartFrame进行启动(也会根据是否监控来发送MetricsFrame进行监控信息收集),DAG中的processor收到StartFrame之后,开始执行对应的启动流程(如果有对MetricsFrame进行处理,将processor中的监控信息写入MetricsFrame中进行收集);如果会话结束,发起EndFrame; 如果结束这个任务(或者遇到系统错误需要结束时),发起StopTaskFrame; 如果触发系统进程退出信号,发送CancelFrame, 进行清场,退出。

image

Examples

see examples

Acknowledge

  1. borrowed a lot of code from pipecat

License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

apipeline-0.2.12.tar.gz (37.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

apipeline-0.2.12-py3-none-any.whl (39.9 kB view details)

Uploaded Python 3

File details

Details for the file apipeline-0.2.12.tar.gz.

File metadata

  • Download URL: apipeline-0.2.12.tar.gz
  • Upload date:
  • Size: 37.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.11.7

File hashes

Hashes for apipeline-0.2.12.tar.gz
Algorithm Hash digest
SHA256 fa388fe8019433c2fbfb4cc53e427046d148f7c0fe93c89a03a5e8d803e38aea
MD5 7517839b038a5433aa5e3252aa12562a
BLAKE2b-256 9948a4f6da9b377fa507b14f18180099ffcc3c937a128d440ca5121fee869920

See more details on using hashes here.

File details

Details for the file apipeline-0.2.12-py3-none-any.whl.

File metadata

  • Download URL: apipeline-0.2.12-py3-none-any.whl
  • Upload date:
  • Size: 39.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.11.7

File hashes

Hashes for apipeline-0.2.12-py3-none-any.whl
Algorithm Hash digest
SHA256 fe741aa431a469e4000b8745cbf47438ae8fe9389eda95690adf214ad18d8ff0
MD5 f846f3d7a50631f615ea9eec682e5fa5
BLAKE2b-256 87e37d0c426956bcb8d2fd87cd5bc848b5d0c65a8595a886539081b6bdfaf51f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page