No project description provided
Project description
Duowen Workflow Engine
Duowen Workflow Engine 是一个强大的Python工作流引擎,专为构建和执行复杂业务流程而设计。它提供了节点化执行、变量管理、状态跟踪等功能,支持自定义节点扩展,适用于自动化任务、数据处理、决策引擎等多种场景。
核心特性
- 可视化工作流:通过节点连接构建复杂业务流程
- 变量管理系统:支持多种数据类型和线程安全操作
- 执行跟踪:完整记录工作流执行过程
- 插件系统:支持自定义节点扩展
- 表达式引擎:支持Python表达式和Jinja模板
- 类型安全:基于Pydantic的强类型验证
- 线程安全:所有关键操作均使用锁保护
安装
pip install duowen_workflow
快速开始
定义工作流
from duowen_workflow.entities import TapeSchema, BaseStep, VariablePool
from duowen_workflow.engine import EngineFlow
# 1. 定义工作流蓝图
tape = TapeSchema(
tape_id="demo_workflow",
tape_name="示例工作流",
steps=[
BaseStep(
stepSeq="1",
stepLabel="开始",
stepInst="Start",
stepConfig={
"input": {
"vars_name": [
{"type": "str", "value": "username"}
]
}
}
),
BaseStep(
stepSeq="2",
stepLabel="打印欢迎",
stepInst="Print",
preStepId="1",
stepConfig={
"input": {
"text": {
"type": "jinja",
"value": "你好, {{ username }}!"
}
}
}
)
]
)
# 2. 初始化变量池
variables = VariablePool({"username": "张三"})
# 3. 执行工作流
engine = EngineFlow(tape_schema=tape)
result = engine.run(
session_id="session_20240617",
variable_pool=variables
)
# 4. 查看执行轨迹
for trace_item in engine.trace.trace_status.deque:
print(trace_item.to_dict())
核心概念
工作流蓝图 (TapeSchema)
定义工作流的整体结构:
from duowen_workflow.entities import TapeSchema, BaseStep
tape = TapeSchema(
tape_id="order_processing",
tape_name="订单处理工作流",
steps=[
BaseStep(
stepSeq="1",
stepLabel="开始",
stepInst="Start"
),
# 更多步骤...
]
)
变量池 (VariablePool)
线程安全的变量存储容器:
from duowen_workflow.entities import VariablePool
# 初始化变量池
variables = VariablePool({
"user_id": 12345,
"order_items": [/*...*/]
})
# 添加变量
variables.append_variable("discount_rate", 0.9)
# 获取变量
user_id = variables.get_variable_value("user_id")
节点系统
工作流的基本执行单元:
内置节点
- Start: 工作流起始节点,验证输入变量
- Print: 打印变量内容
- If: 条件分支节点
- For: 循环节点(开发中)
- CodeExec: 执行Python代码
使用内置节点
from duowen_workflow.nodes.builtin import Print
from duowen_workflow.entities import JinjaInput
printer = Print(
input=PrintInput(
text=JinjaInput(value="用户ID: {{ user_id }}")
)
)
跟踪系统
完整记录工作流执行过程:
{
"tape_id": "order_processing",
"stepSeq": "2",
"stepLabel": "验证订单",
"node_exec_id": "a1b2c3d4e5",
"type": "in", # 节点开始执行
"data": {
"order_id": "ORD20240617",
"user_id": 12345
},
"date": 1718611200000
}
自定义节点开发
创建自定义节点
from duowen_workflow.entities import NodeBase, VariablePool, Trace
from pydantic import BaseModel
import uuid
class ValidatorInput(BaseModel):
input_param: str
class CustomValidator(NodeBase):
input: ValidatorInput
def run(self, variable_pool: VariablePool, stepSeq: str, trace: Trace, **kwargs):
node_exec_id = f"{stepSeq}_{uuid.uuid4().hex[:8]}"
# 记录节点开始
trace.add_trace_in(stepSeq=stepSeq, node_exec_id=node_exec_id)
try:
# 获取输入参数
param = variable_pool.get_variable_value(self.input.input_param)
# 执行业务逻辑
result = self.validate(param)
# 存储结果
variable_pool.append_variable("validation_result", result)
# 记录节点完成
trace.add_trace_out(stepSeq=stepSeq, node_exec_id=node_exec_id)
return result
except Exception as e:
trace.add_trace_log(stepSeq=stepSeq, node_exec_id=node_exec_id,
error=str(e), level="error")
raise
def validate(self, data):
"""自定义验证逻辑"""
return len(data) > 5
注册自定义节点
- 设置环境变量指定插件目录:
export WORKFLOW_NODES_PLUGINS_DIR=/path/to/plugins
- 在插件目录创建节点文件(如
custom_nodes.py)
表达式引擎
支持表达式类型
-
Jinja模板:
JinjaInput(value="欢迎, {{ user.name }}! 您的余额是{{ user.balance }}")
-
Python表达式:
ExprInput(value="total_price * discount_rate if is_vip else total_price")
-
变量引用:
VarsInput(value="order_items")
使用示例
from duowen_workflow.utils import execute_expression
from duowen_workflow.entities import JinjaInput, ExprInput
# 渲染Jinja模板
welcome_msg = execute_expression(
JinjaInput(value="欢迎, {{ user.name }}!"),
variable_pool
)
# 执行Python表达式
total_price = execute_expression(
ExprInput(value="sum(item['price'] for item in order_items"),
variable_pool
)
高级用法
变量操作指南
# 添加基本类型变量
variables.append_variable("username", "john_doe")
# 添加列表
variables.append_variable("scores", [85, 92, 78])
# 追加列表元素
variables.append_to_variable("scores", 95)
# 存储Pydantic对象
variables.append_obj_variable("user_profile", user_model)
# 读取Pydantic对象
profile = variables.get_obj_variable_value("user_profile", UserProfile)
条件分支示例
# If节点配置
stepConfig={
"input": {
"cond": [
{
"next_id": "4",
"expr": {
"type": "expr",
"value": "user['is_vip']"
}
},
{
"next_id": "5",
"expr": {
"type": "expr",
"value": "not user['is_vip']"
}
}
]
}
}
代码执行节点
stepConfig={
"input": {
"code_text": {
"type": "str",
"value": """
def main(a, b):
return a + b
"""
}
},
"output": {
"result1": {"name": "sum_result"} # 输出变量名
}
}
错误处理
工作流引擎定义了以下异常类型:
- WorkFlowExecError:工作流执行异常
- VarNameError:非法变量名异常
- ValidationError:输入参数验证失败
最佳实践
-
命名规范:
- 变量名使用蛇形命名法(snake_case)
- 节点类名使用驼峰命名法(CamelCase)
- 步骤ID使用简单字符串("step1", "validate")
-
错误处理:
- 在节点中使用try-except捕获异常
- 通过trace.add_trace_log记录错误详情
- 对关键操作添加重试机制
-
性能优化:
- 避免在循环中频繁访问变量池
- 对大文件使用流式处理
- 对数据库查询添加缓存机制
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
duowen_workflow-0.1.0.tar.gz
(13.2 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file duowen_workflow-0.1.0.tar.gz.
File metadata
- Download URL: duowen_workflow-0.1.0.tar.gz
- Upload date:
- Size: 13.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d44e42b6b56d804cd4640b925d12756c68aab4ee1606879a9c67a322dc7d727d
|
|
| MD5 |
f427b3bdd5417c157777700cd07f4766
|
|
| BLAKE2b-256 |
59b4eb5b0cfaef0f84a2f61be8aa3bc07bd8873d7364fab20caad6b247392c0b
|
File details
Details for the file duowen_workflow-0.1.0-py3-none-any.whl.
File metadata
- Download URL: duowen_workflow-0.1.0-py3-none-any.whl
- Upload date:
- Size: 13.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bf08d39461d2727d0221c24f45cfa6338c8b63de8833671a996c6a04744d57cf
|
|
| MD5 |
8202a5605900987fadd285ea2b77fc55
|
|
| BLAKE2b-256 |
28d4ad11d916a0a0abf1e9d08e9a8f12621469e659ee25837164d5d248ad94d8
|