YAWE - Yet Another Workflow Engine: 一个轻量级、配置驱动的工作流任务编排框架
Project description
YAWE (Yet Another Workflow Engine)
一个轻量级、灵活、配置驱动的工作流任务编排框架。
核心特性
- 完全配置驱动: 通过 YAML 配置文件定义工作流,无需编写代码
- 断点续传: 任务级别的状态管理和恢复,失败后可从断点继续
- SSH 远程执行: 内置 SSH 执行器,支持远程命令执行和文件传输
- 灵活的错误处理: 支持任务级别的错误处理策略和通知
- 任务间变量传递: 支持任务导出变量供后续任务使用
- Jinja2 模板: 命令支持模板渲染和变量替换
- 并发防护: 使用文件锁防止工作流并发执行
快速开始
安装
# 方式1: 从源码安装(开发模式)
git clone https://github.com/tomjamescn/yawe.git
cd yawe
pip install -e .
# 方式2: 从 PyPI 安装(待发布)
pip install yawe
# 方式3: 最小安装(仅依赖)
pip install -r requirements.txt
5分钟上手
1. 创建配置文件 my_workflow.yaml:
logger:
log_dir: logs
level: INFO
workflow:
settings:
stop_on_first_error: true
tasks:
- name: hello
type: command
executor: local
command: "echo 'Hello from YAWE!'"
2. 编写执行脚本 run.py:
from workflow_engine import Config, Logger, WorkflowEngine
config = Config("my_workflow.yaml")
logger = Logger(log_dir=config.log_dir, level=config.log_level)
context = {'logger': logger, 'config': config}
engine = WorkflowEngine(config.get('workflow'), context)
exit_code = engine.run()
print(f"Exit code: {exit_code}")
3. 运行:
python run.py
常用任务示例
SSH远程命令
- name: remote_task
type: command
executor: ssh
host: my_server # 在 ~/.ssh/config 中配置
command: "python script.py"
timeout: 3600
本地命令带参数
- name: template_task
type: command
executor: local
command_template: |
echo "Processing {{ count }} items"
for i in {1..{{ count }}}; do
echo "Item $i"
done
params:
count: 5
文件传输
- name: sync_files
type: transfer
host: my_server
direction: remote_to_local # 或 local_to_remote
params:
items:
- remote: /data/results
local: ./results
recursive: true
exclude:
- "*.tmp"
- "*.log"
带重试的SSH任务
- name: critical_job
type: command
executor: ssh
host: my_server
command: "python important_script.py"
retry:
max_retries: 3
retry_interval: 300 # 5分钟
SSH配置
在 ~/.ssh/config 中配置主机:
Host my_server
HostName 192.168.1.100
User username
Port 22
IdentityFile ~/.ssh/id_rsa
然后在任务中使用 host: my_server。
支持的任务类型
1. Command Task (命令任务)
执行 SSH 远程命令或本地命令。
- name: my_command
type: command
executor: ssh # 或 local
host: server1 # SSH时必需
command_template: |
cd {{ workspace }}
python script.py --arg {{ value }}
params:
workspace: /home/user
value: 123
timeout: 3600
高级特性:
- Jinja2 模板渲染
- 多重验证机制(退出码、错误关键词、成功关键词、输出文件)
- 自动重试机制
- 使用前置任务的导出变量
2. Transfer Task (文件传输)
支持 rsync 和 scp 两种传输方式。
- name: sync_data
type: transfer
host: server1
direction: remote_to_local # 或 local_to_remote
transfer_method: rsync # 或 scp
pre_compress: true # 预压缩传输(适合慢速网络)
params:
items:
- remote: /data/model
local: ./models
recursive: true
exclude:
- "*.tmp"
- "*.log"
断点续传
系统自动保存工作流状态,失败后可以从断点恢复:
# 正常运行
python run.py
# 从上次失败处恢复
python run.py -r
# 从指定任务开始执行
python run.py --from-task "task_name"
任务间变量传递
任务可以导出变量供后续任务使用:
workflow:
tasks:
# 任务1: 上传文件到服务器(预压缩,不解压)
- name: upload_data
type: transfer
host: file_server
direction: local_to_remote
pre_compress: true
decompress: false # 保留压缩文件
params:
items:
- local: ./data
remote: /www/downloads
# 任务2: 使用任务1导出的变量
- name: download_on_target
type: command
executor: ssh
host: target_machine
command_template: |
# 使用前置任务导出的变量
wget http://file_server/downloads/{{ upload_data.archive_name }}
tar -xzf {{ upload_data.archive_name }}
配置说明
全局配置
# 通知配置(可选)
notifier:
api_url: "https://your-notification-api.com"
timeout: 10
verify_ssl: false
# 日志配置
logger:
log_dir: logs
level: INFO # DEBUG, INFO, WARNING, ERROR, CRITICAL
# 任务默认配置
tasks:
command_timeout: 3600 # 命令默认超时(秒)
local_shell: "/bin/bash" # 本地命令默认shell
transfer:
timeout: 600 # 传输默认超时(秒)
show_progress: true
compress: true
preserve_times: true
任务通知
每个任务可以配置独立的成功/失败通知:
- name: critical_task
type: command
executor: ssh
host: server1
command: "python important_script.py"
notify_on_success: true
notify_on_failure: true
notification:
success:
title: "重要任务完成"
message: "任务执行成功"
failure:
title: "重要任务失败"
message: "请检查日志"
扩展开发
自定义任务类型
from workflow_engine.tasks import Task, TaskFactory
from typing import Tuple
class MyCustomTask(Task):
"""自定义任务"""
def execute(self) -> Tuple[bool, str]:
"""执行任务逻辑"""
self.logger.info(f"执行自定义任务: {self.name}")
# 获取配置参数
param1 = self.get_param('param1')
# 执行自定义逻辑
# ...
return True, "任务执行成功"
def export_context(self) -> dict:
"""导出变量供后续任务使用"""
return {
'result': 'some_value',
'timestamp': '2025-11-29'
}
# 注册任务类型
TaskFactory.register('my_custom', MyCustomTask)
然后在配置文件中使用:
workflow:
tasks:
- name: custom_job
type: my_custom
params:
param1: value1
更多示例
查看 examples/ 目录:
basic_workflow.yaml- 基础工作流run_basic.py- 运行示例
系统要求
- Python 3.6+
- rsync(用于文件传输,可选)
- SSH 配置(~/.ssh/config)
License
MIT License - 详见 LICENSE 文件
贡献
欢迎提交 Issue 和 Pull Request!
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file yawe-1.0.0.tar.gz.
File metadata
- Download URL: yawe-1.0.0.tar.gz
- Upload date:
- Size: 35.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f5ced4f08af111f3bc81f3b3affc2a7a50440959cdf1d6e12d95e229863743a2
|
|
| MD5 |
d87fa22b49d6675f86375796d973b99b
|
|
| BLAKE2b-256 |
531edb0d37735d643376f0e49a44b48f9e62fec5487a1f655ee99be148ae43d0
|
File details
Details for the file yawe-1.0.0-py3-none-any.whl.
File metadata
- Download URL: yawe-1.0.0-py3-none-any.whl
- Upload date:
- Size: 39.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4f0a498a6ce3277c52474024cebc2f034150540ca41387f3a77159e5736d0fad
|
|
| MD5 |
9262db87945e516dd3c38fc1c7c571a6
|
|
| BLAKE2b-256 |
8213613ae85e63719a155768cfd4fefe8e6822f3b58e07ef6e32864583a211f3
|