Skip to main content

一个用于管理和监控多个任务执行的Python工具

Project description

MultiTaskFlow 多任务流管理工具

MultiTaskFlow 是一个轻量级的多任务流管理工具,用于按顺序执行和监控一系列任务。它可以帮助您管理数据处理、模型训练、评估等一系列需要顺序执行的任务,并提供实时状态更新和执行结果跟踪。

功能特点

  • 基于YAML配置文件定义任务流
  • 支持Python脚本和Shell命令的执行
  • 提供任务状态实时监控
  • 自动执行失败任务的重试逻辑
  • 支持任务之间的依赖关系
  • 完整的日志记录和任务执行历史
  • 进程PID跟踪与管理
  • 优雅的信号处理和任务终止

安装方法

要求

  • Python 3.7+
  • PyYAML
  • 其他依赖库(如有)

配置消息推送令牌

在使用消息推送功能前,需要配置 MSG_PUSH_TOKEN 环境变量。以下是配置方法:

1. 永久配置(推荐)

~/.bashrc~/.zshrc 文件中添加:

# MultiTaskFlow 消息推送配置
export MSG_PUSH_TOKEN="your_pushplus_token_here"

然后重新加载配置:

source ~/.bashrc  # 或 source ~/.zshrc

2. 临时配置

在运行命令前设置:

MSG_PUSH_TOKEN=your_token python your_script.py

3. 开发模式配置

在项目根目录创建 .env 文件:

echo "MSG_PUSH_TOKEN=your_token" > .env

获取 Token

  1. 访问 PushPlus 官网
  2. 注册并登录
  3. 在个人中心获取您的 token

(方法1)从PyPI安装

# 使用pip直接安装
pip install multitaskflow

(方法2)从源码安装

# 克隆仓库
git clone https://github.com/Polaris-F/MultiTaskFlow.git
cd MultiTaskFlow

# 方法1: 使用pip直接安装
pip install .

# 方法2: 开发模式安装
pip install -e .

(方法3)构建离线包方法

如果您想构建wheel包或源码分发包,可以使用以下命令:

# 安装构建工具
pip install build

# 构建分发包
python -m build

# 构建的包会在dist/目录下生成

使用方法

如果需要使用 消息接收功能,请访问 https://www.pushplus.plus/ 获取您的token

1. 创建任务配置文件

创建一个YAML格式的任务配置文件,定义您要执行的任务序列:

# tasks.yaml 示例
- name: "任务1-数据准备"
  command: "python scripts/prepare_data.py --input data/raw --output data/processed"
  status: "pending"
  # silent: false  # 默认会发送消息通知

- name: "任务2-模型训练"
  command: "python scripts/train_model.py --data data/processed --epochs 10"
  status: "pending"
  silent: true  # 静默模式,不发送消息通知

- name: "任务3-结果评估"
  command: "python scripts/evaluate.py --model-path models/latest.pt"
  status: "pending"
  # silent: false  # 默认会发送消息通知

2. (方法一)使用Python API (推荐使用方法二、三)

在您的Python代码中使用MultiTaskFlow:

from multitaskflow import TaskFlow

# 创建任务流管理器
task_manager = TaskFlow("path/to/your/tasks.yaml")

# 启动任务执行
task_manager.run()

# 您也可以动态添加任务
task_manager.add_task_by_config(
    name="额外任务", 
    command="echo '这是一个动态添加的任务'",
    silent=True  # 设置为静默模式,不发送消息通知
)

2. (方法二)使用命令行工具【使用场景:不需要后台运行,可实时查看输出】

安装后,您可以直接使用taskflow命令行工具:

# 使用配置文件运行任务流
taskflow path/to/your/tasks.yaml

# 使用默认配置
# 如果不提供配置文件路径,将在examples/tasks.yaml创建示例配置
taskflow

# 查看帮助
taskflow --help

2. (方法三)使用sh脚本工具【使用场景:需要后台运行,通过log查看输出】

首先taskflowPro.sh修改脚本中 TASK_CONFIG为任务流yaml路径

chmod +x taskflowPro.sh
./taskflowPro.sh start  # 开始运行
./taskflowPro.sh stop   # 结束运行

效果展示

您可以运行我们提供的演示脚本,查看任务管理和消息接收的实际效果。演示脚本模拟了一个完整的深度学习工作流,包括数据预处理、模型训练、模型评估和数据归档等步骤。

运行演示脚本

# 安装完成后,直接运行示例脚本
python -m multitaskflow.examples.demo

# 或使用命令行工具
taskflow examples/tasks.yaml

演示内容

演示脚本将依次执行以下任务:

  1. 数据预处理 - 模拟数据集加载、清洗和处理过程
  2. 模型训练-阶段1 - 模拟第一阶段模型训练过程
  3. 模型评估-阶段1 - 模拟对第一阶段训练模型的评估
  4. 模型训练-阶段2 - 模拟基于第一阶段模型继续训练
  5. 模型评估-阶段2 - 模拟对第二阶段训练模型的评估
  6. 数据归档 - 模拟模型和结果数据的归档过程

每个任务都会显示详细的执行进度和模拟输出,让您直观了解MultiTaskFlow的任务管理能力。所有演示任务都是模拟执行,不会创建实际文件或占用大量资源。

期望效果

运行示例后,您将看到:

  • 任务管理器启动和初始化过程
  • 任务状态的实时更新(等待中→执行中→完成/失败)
  • 每个任务的详细输出和进度信息
  • 任务完成后的状态汇总

通过观察演示效果,您可以了解MultiTaskFlow如何帮助管理复杂的多步骤工作流程,以及它如何提供清晰的任务执行状态和结果反馈。

运行效果截图

任务管理和执行效果

实际运行时在控制台中会看到详细的输出,显示任务状态和进度信息

高级功能(TODO)

任务配置选项

任务配置文件支持以下选项:

- name: "示例任务"
  command: "python script.py"
  status: "pending"  # pending, running, completed, failed
  retry: 3  # 失败后重试次数 (TODO)
  timeout: 3600  # 任务超时时间(秒)(TODO)
  depends_on: ["前置任务名称"]  # 依赖的任务 (TODO)
  silent: false  # 是否静默执行(不发送消息通知)

静默模式

MultiTaskFlow 支持静默模式,可以通过配置让某些任务不发送消息通知。这对于以下场景非常有用:

  • 中间过程任务:对于工作流中的中间步骤,可能不需要收到每个步骤的通知
  • 调试阶段任务:在开发和调试阶段,可以关闭消息通知以避免干扰
  • 高频执行任务:对于频繁执行的任务,可以只关注最终结果而不是每次执行

配置静默模式:

  1. 在YAML配置文件中

    - name: "静默任务"
      command: "python script.py"
      silent: true  # 设置为静默模式
    
  2. 通过API动态添加

    task_manager.add_task_by_config(
        name="静默任务", 
        command="echo '这是静默任务'",
        silent=True
    )
    

当任务设置为静默模式时:

  • 任务执行时不会发送消息通知
  • 任务信息仍会记录在日志文件中
  • 如果所有任务都是静默模式,任务流管理器结束时也不会发送总结报告

自定义通知

您可以配置系统在任务状态变更时发送通知:

from multitaskflow import TaskFlow, Msg_push

# 创建消息推送实例
notifier = Msg_push(
    webhook_url="your_webhook_url",
    channel="your_channel"
)

# 创建带通知功能的任务流管理器
task_manager = TaskFlow(
    "tasks.yaml",
    msg_push=notifier
)

自定义与扩展

MultiTaskFlow设计为可扩展的,您可以:

  • 自定义任务状态处理逻辑
  • 添加新的任务类型
  • 扩展监控和报告功能

自定义任务处理器示例

from multitaskflow import TaskFlow

class CustomTaskFlow(TaskFlow):
    def process_task_output(self, task, output):
        # 自定义输出处理逻辑
        print(f"处理任务 {task.name} 的输出: {output}")
        # 继续处理...
        super().process_task_output(task, output)

常见问题(FAQ)

Q: XXXX?

贡献指南

欢迎贡献代码、报告问题或提出新功能建议!

  1. Fork 这个仓库
  2. 创建您的特性分支 (git checkout -b feature/amazing-feature)
  3. 提交您的更改 (git commit -m 'Add some amazing feature')
  4. 推送到分支 (git push origin feature/amazing-feature)
  5. 打开一个 Pull Request

版本历史

  • 1.0.0 - 2024-03-15
    • 首次发布
    • 基本任务管理功能
    • 命令行工具支持

许可证

本项目采用MIT许可证 - 详情请查看 LICENSE 文件

作者与致谢

  • 主要开发者: Polaris
  • 感谢所有贡献者和使用者的宝贵反馈

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

multitaskflow-0.1.2.tar.gz (21.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

multitaskflow-0.1.2-py3-none-any.whl (19.8 kB view details)

Uploaded Python 3

File details

Details for the file multitaskflow-0.1.2.tar.gz.

File metadata

  • Download URL: multitaskflow-0.1.2.tar.gz
  • Upload date:
  • Size: 21.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.16

File hashes

Hashes for multitaskflow-0.1.2.tar.gz
Algorithm Hash digest
SHA256 b313331303c9986ac06109b5d2479ec4440b24d220f940b071cd918c7abee732
MD5 ffc6c614a009e1caf076678c26c39e3e
BLAKE2b-256 92e148ab495d34197ebab6dcd142a79d9ff8a74eaa435030afbaff0ebf7a6008

See more details on using hashes here.

File details

Details for the file multitaskflow-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: multitaskflow-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 19.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.16

File hashes

Hashes for multitaskflow-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 1ed6982cfac201059d11e867213454a3a5fff852c9843a9c7a58a5b8baebe127
MD5 7aab0a5c67c602a59818e3d15e104a5c
BLAKE2b-256 b8610a97d8e67203ff81b39c7b02a82f454f9cd3e07baccaee311c629e3d8030

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page