Skip to main content

流水线工厂模块,支持多节点、多工位、资源控制、磁盘溢出

Project description

Factory Queue

PyPI version Python License

流水线工厂模块,支持多节点、多工位、资源控制、磁盘溢出。

功能特性

  • 流水线设计 - 链式创建节点,自动管理依赖
  • 多工位并行 - 每个节点支持多线程同时处理
  • 多分支流水线 - 节点可输出多种数据给不同下游节点
  • 自动通知 - 上游节点完成后自动通知下游
  • 资源控制 - 可设置内存上限、队列大小
  • 磁盘溢出 - 内存不足时自动写入磁盘,防止OOM
  • 优雅退出 - 完整的节点同步机制
  • 实时监控 - 定时输出队列和节点状态
  • 彩色日志 - 不同级别日志使用不同颜色显示

安装

pip install factory-queue

快速开始

流水线模式(推荐)

from factory_queue import Factory, Node, ResourceConfig
import logging

# 设置日志
logging.basicConfig(level=logging.INFO)

# 定义处理函数
def fetch_data(data_id):
    """头节点:获取数据"""
    raw_data = {"id": data_id, "value": data_id * 10}
    
    # 返回多种产物
    if data_id % 2 == 0:
        return {0: raw_data}  # feed 0: 偶数数据
    else:
        return {1: raw_data}  # feed 1: 奇数数据

def process_even(data):
    """处理偶数数据"""
    data["processed"] = data["value"] * 2
    return data

def process_odd(data):
    """处理奇数数据"""
    data["processed"] = data["value"] * 3
    return data

def save_result(data):
    """叶子节点:保存结果(不需要返回值)"""
    print(f"保存数据: {data}")

# 创建流水线
config = ResourceConfig(max_memory_mb=512, max_queue_size=10000)

with Factory(resource_config=config, enable_monitor=True) as factory:
    # 创建头节点(2个工位并行)
    head = factory.head(func=fetch_data, node_num=2, name="数据获取")
    
    # 创建分支1:处理偶数(使用 feed=0)
    node_even = head.create_node(func=process_even, node_num=2, feed=0, name="偶数处理")
    node_even_save = node_even.create_node(func=save_result, node_num=1, feed=0, name="偶数保存")
    
    # 创建分支2:处理奇数(使用 feed=1)
    node_odd = head.create_node(func=process_odd, node_num=1, feed=1, name="奇数处理")
    node_odd_save = node_odd.create_node(func=save_result, node_num=1, feed=0, name="奇数保存")
    
    # 启动流水线
    factory.start()
    
    # 投放数据
    for i in range(100):
        factory.feed(i)
    
    # 通知结束投喂
    factory.end_feed()
    
    # 等待完成
    factory.wait_complete()

主要类说明

Factory

流水线工厂主类,管理整个流水线流程。

主要方法:

  • head(func, node_num, name) - 创建头节点
  • feed(data) - 投放数据
  • end_feed() - 通知结束投喂
  • start() - 启动流水线
  • wait_complete(timeout) - 等待完成
  • stop() - 停止流水线
  • close() - 关闭并清理资源

Node

流水线节点,支持链式创建下游节点。

主要方法:

  • create_node(func, node_num, feed, name) - 创建下游节点

ResourceConfig

资源配置类。

参数:

  • max_memory_mb - 最大内存使用量(MB),默认1024
  • max_queue_size - 队列最大长度,默认10000
  • disk_overflow_threshold - 磁盘溢出阈值,默认0.8
  • temp_dir - 临时目录,默认系统临时目录

流水线架构

线性流水线

input -> 头节点 -> 节点1 -> 节点2 -> 叶子节点

分支流水线

              ┌─> 偶数处理 -> 保存数据库
输入 -> 头节点 ┤
              └─> 奇数处理 -> 生成报告

节点类型

  • 头节点:接收初始数据,可以产生多种产物
  • 中间节点:处理数据并传递给下游
  • 叶子节点:最终处理,不产生输出

高级功能

多产物输出

def head_process(data):
    # 返回字典,key 为 feed 索引
    return {
        0: data * 2,      # feed 0: 给第一个分支
        1: data * 3,      # feed 1: 给第二个分支
    }

# 创建两个分支

使用内存比例配置

# 使用系统内存的 10%

许可证

MIT License

作者

stabvale

贡献

欢迎提交 Issue 和 Pull Request!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

factory_queue-0.1.7.tar.gz (164.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

factory_queue-0.1.7-py3-none-any.whl (58.2 kB view details)

Uploaded Python 3

File details

Details for the file factory_queue-0.1.7.tar.gz.

File metadata

  • Download URL: factory_queue-0.1.7.tar.gz
  • Upload date:
  • Size: 164.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.13

File hashes

Hashes for factory_queue-0.1.7.tar.gz
Algorithm Hash digest
SHA256 db057f5e41fd89b0f52d5f9fe0f474ff9e89e8ea40baf18734ec5f49a5a00338
MD5 9e8d0d0d247e2a44f6c7eddaf8a9eee5
BLAKE2b-256 9545f20a188604ae1e28536445d0b7624a1541045b8988f6f6a8c21e9397c16c

See more details on using hashes here.

File details

Details for the file factory_queue-0.1.7-py3-none-any.whl.

File metadata

  • Download URL: factory_queue-0.1.7-py3-none-any.whl
  • Upload date:
  • Size: 58.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.13

File hashes

Hashes for factory_queue-0.1.7-py3-none-any.whl
Algorithm Hash digest
SHA256 464fd82869a0f1a875f097e9ca39976f9e4b3faf0cc066e88cc796a0e4b1805d
MD5 44e365f9e213be09ab742165bad972d2
BLAKE2b-256 527dae41e912bf77d2e0692612396acefdf924626eb2a4cab858a422f2c7a005

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page