流水线工厂模块,支持多节点、多工位、资源控制、磁盘溢出
Project description
Factory Queue
流水线工厂模块,支持多节点、多工位、资源控制、磁盘溢出。
功能特性
- ✅ 流水线设计 - 链式创建节点,自动管理依赖
- ✅ 多工位并行 - 每个节点支持多线程同时处理
- ✅ 多分支流水线 - 节点可输出多种数据给不同下游节点
- ✅ 自动通知 - 上游节点完成后自动通知下游
- ✅ 资源控制 - 可设置内存上限、队列大小
- ✅ 磁盘溢出 - 内存不足时自动写入磁盘,防止OOM
- ✅ 优雅退出 - 完整的节点同步机制
- ✅ 实时监控 - 定时输出队列和节点状态
- ✅ 彩色日志 - 不同级别日志使用不同颜色显示
安装
pip install factory-queue
快速开始
流水线模式(推荐)
from factory_queue import Factory, Node, ResourceConfig
import logging
# 设置日志
logging.basicConfig(level=logging.INFO)
# 定义处理函数
def fetch_data(data_id):
"""头节点:获取数据"""
raw_data = {"id": data_id, "value": data_id * 10}
# 返回多种产物
if data_id % 2 == 0:
return {0: raw_data} # feed 0: 偶数数据
else:
return {1: raw_data} # feed 1: 奇数数据
def process_even(data):
"""处理偶数数据"""
data["processed"] = data["value"] * 2
return data
def process_odd(data):
"""处理奇数数据"""
data["processed"] = data["value"] * 3
return data
def save_result(data):
"""叶子节点:保存结果(不需要返回值)"""
print(f"保存数据: {data}")
# 创建流水线
config = ResourceConfig(max_memory_mb=512, max_queue_size=10000)
with Factory(resource_config=config, enable_monitor=True) as factory:
# 创建头节点(2个工位并行)
head = factory.head(func=fetch_data, node_num=2, name="数据获取")
# 创建分支1:处理偶数(使用 feed=0)
node_even = head.create_node(func=process_even, node_num=2, feed=0, name="偶数处理")
node_even_save = node_even.create_node(func=save_result, node_num=1, feed=0, name="偶数保存")
# 创建分支2:处理奇数(使用 feed=1)
node_odd = head.create_node(func=process_odd, node_num=1, feed=1, name="奇数处理")
node_odd_save = node_odd.create_node(func=save_result, node_num=1, feed=0, name="奇数保存")
# 启动流水线
factory.start()
# 投放数据
for i in range(100):
factory.feed(i)
# 通知结束投喂
factory.end_feed()
# 等待完成
factory.wait_complete()
主要类说明
Factory
流水线工厂主类,管理整个流水线流程。
主要方法:
head(func, node_num, name)- 创建头节点feed(data)- 投放数据end_feed()- 通知结束投喂start()- 启动流水线wait_complete(timeout)- 等待完成stop()- 停止流水线close()- 关闭并清理资源
Node
流水线节点,支持链式创建下游节点。
主要方法:
create_node(func, node_num, feed, name)- 创建下游节点
ResourceConfig
资源配置类。
参数:
max_memory_mb- 最大内存使用量(MB),默认1024max_queue_size- 队列最大长度,默认10000disk_overflow_threshold- 磁盘溢出阈值,默认0.8temp_dir- 临时目录,默认系统临时目录
流水线架构
线性流水线
input -> 头节点 -> 节点1 -> 节点2 -> 叶子节点
分支流水线
┌─> 偶数处理 -> 保存数据库
输入 -> 头节点 ┤
└─> 奇数处理 -> 生成报告
节点类型
- 头节点:接收初始数据,可以产生多种产物
- 中间节点:处理数据并传递给下游
- 叶子节点:最终处理,不产生输出
高级功能
多产物输出
def head_process(data):
# 返回字典,key 为 feed 索引
return {
0: data * 2, # feed 0: 给第一个分支
1: data * 3, # feed 1: 给第二个分支
}
# 创建两个分支
使用内存比例配置
# 使用系统内存的 10%
许可证
MIT License
作者
stabvale
贡献
欢迎提交 Issue 和 Pull Request!
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
factory_queue-0.1.8.tar.gz
(165.2 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file factory_queue-0.1.8.tar.gz.
File metadata
- Download URL: factory_queue-0.1.8.tar.gz
- Upload date:
- Size: 165.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
893caa89d5b1e51380d1354f413301122e98ccd60cee3752c04a46b3469447bc
|
|
| MD5 |
edf028e09ec3e2ede5ad588028513cab
|
|
| BLAKE2b-256 |
3e4d3397145c2b7eddb0e2c65044d499687547a2188dcbb9ce3f081090ccd10e
|
File details
Details for the file factory_queue-0.1.8-py3-none-any.whl.
File metadata
- Download URL: factory_queue-0.1.8-py3-none-any.whl
- Upload date:
- Size: 58.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3efadee017f9d1b1edad405af507667d1aeaf62247c9ba559cce12d13336160f
|
|
| MD5 |
35bd5a0497c7ebfc9d350d27f85c7b40
|
|
| BLAKE2b-256 |
b0f267b97acb761d8e2d8a7adadb97b7672ec9b380eb79584e316ccfb252bc61
|