Skip to main content

No project description provided

Project description

Duowen Workflow Engine

Duowen Workflow Engine 是一个强大的Python工作流引擎,专为构建和执行复杂业务流程而设计。它提供了节点化执行、变量管理、状态跟踪等功能,支持自定义节点扩展,适用于自动化任务、数据处理、决策引擎等多种场景。

核心特性

  • 可视化工作流:通过节点连接构建复杂业务流程
  • 变量管理系统:支持多种数据类型和线程安全操作
  • 执行跟踪:完整记录工作流执行过程
  • 插件系统:支持自定义节点扩展
  • 表达式引擎:支持Python表达式和Jinja模板
  • 类型安全:基于Pydantic的强类型验证
  • 线程安全:所有关键操作均使用锁保护

安装

pip install duowen_workflow

快速开始

定义工作流

from duowen_workflow.entities import TapeSchema, BaseStep, VariablePool
from duowen_workflow.engine import EngineFlow

# 1. 定义工作流蓝图
tape = TapeSchema(
    tape_id="demo_workflow",
    tape_name="示例工作流",
    steps=[
        BaseStep(
            stepSeq="1",
            stepLabel="开始",
            stepInst="Start",
            stepConfig={
                "input": {
                    "vars_name": [
                        {"type": "str", "value": "username"}
                    ]
                }
            }
        ),
        BaseStep(
            stepSeq="2",
            stepLabel="打印欢迎",
            stepInst="Print",
            preStepId="1",
            stepConfig={
                "input": {
                    "text": {
                        "type": "jinja",
                        "value": "你好, {{ username }}!"
                    }
                }
            }
        )
    ]
)

# 2. 初始化变量池
variables = VariablePool({"username": "张三"})

# 3. 执行工作流
engine = EngineFlow(tape_schema=tape)
result = engine.run(
    session_id="session_20240617",
    variable_pool=variables
)

# 4. 查看执行轨迹
for trace_item in engine.trace.trace_status.deque:
    print(trace_item.to_dict())

核心概念

工作流蓝图 (TapeSchema)

定义工作流的整体结构:

from duowen_workflow.entities import TapeSchema, BaseStep

tape = TapeSchema(
    tape_id="order_processing",
    tape_name="订单处理工作流",
    steps=[
        BaseStep(
            stepSeq="1",
            stepLabel="开始",
            stepInst="Start"
        ),
        # 更多步骤...
    ]
)

变量池 (VariablePool)

线程安全的变量存储容器:

from duowen_workflow.entities import VariablePool

# 初始化变量池
variables = VariablePool({
    "user_id": 12345,
    "order_items": [/*...*/]
})

# 添加变量
variables.append_variable("discount_rate", 0.9)

# 获取变量
user_id = variables.get_variable_value("user_id")

节点系统

工作流的基本执行单元:

内置节点

  • Start: 工作流起始节点,验证输入变量
  • Print: 打印变量内容
  • If: 条件分支节点
  • For: 循环节点(开发中)
  • CodeExec: 执行Python代码

使用内置节点

from duowen_workflow.nodes.builtin import Print
from duowen_workflow.entities import JinjaInput

printer = Print(
    input=PrintInput(
        text=JinjaInput(value="用户ID: {{ user_id }}")
    )
)

跟踪系统

完整记录工作流执行过程:

{
    "tape_id": "order_processing",
    "stepSeq": "2",
    "stepLabel": "验证订单",
    "node_exec_id": "a1b2c3d4e5",
    "type": "in",  # 节点开始执行
    "data": {
        "order_id": "ORD20240617",
        "user_id": 12345
    },
    "date": 1718611200000
}

自定义节点开发

创建自定义节点

from duowen_workflow.entities import NodeBase, VariablePool, Trace
from pydantic import BaseModel
import uuid

class ValidatorInput(BaseModel):
    input_param: str

class CustomValidator(NodeBase):
    input: ValidatorInput

    def run(self, variable_pool: VariablePool, stepSeq: str, trace: Trace, **kwargs):
        node_exec_id = f"{stepSeq}_{uuid.uuid4().hex[:8]}"
        
        # 记录节点开始
        trace.add_trace_in(stepSeq=stepSeq, node_exec_id=node_exec_id)
        
        try:
            # 获取输入参数
            param = variable_pool.get_variable_value(self.input.input_param)
            
            # 执行业务逻辑
            result = self.validate(param)
            
            # 存储结果
            variable_pool.append_variable("validation_result", result)
            
            # 记录节点完成
            trace.add_trace_out(stepSeq=stepSeq, node_exec_id=node_exec_id)
            return result
        except Exception as e:
            trace.add_trace_log(stepSeq=stepSeq, node_exec_id=node_exec_id, 
                                error=str(e), level="error")
            raise
    
    def validate(self, data):
        """自定义验证逻辑"""
        return len(data) > 5

注册自定义节点

  1. 设置环境变量指定插件目录:
export WORKFLOW_NODES_PLUGINS_DIR=/path/to/plugins
  1. 在插件目录创建节点文件(如custom_nodes.py

表达式引擎

支持表达式类型

  1. Jinja模板

    JinjaInput(value="欢迎, {{ user.name }}! 您的余额是{{ user.balance }}")
    
  2. Python表达式

    ExprInput(value="total_price * discount_rate if is_vip else total_price")
    
  3. 变量引用

    VarsInput(value="order_items")
    

使用示例

from duowen_workflow.utils import execute_expression
from duowen_workflow.entities import JinjaInput, ExprInput

# 渲染Jinja模板
welcome_msg = execute_expression(
    JinjaInput(value="欢迎, {{ user.name }}!"),
    variable_pool
)

# 执行Python表达式
total_price = execute_expression(
    ExprInput(value="sum(item['price'] for item in order_items"),
    variable_pool
)

高级用法

变量操作指南

# 添加基本类型变量
variables.append_variable("username", "john_doe")

# 添加列表
variables.append_variable("scores", [85, 92, 78])

# 追加列表元素
variables.append_to_variable("scores", 95)

# 存储Pydantic对象
variables.append_obj_variable("user_profile", user_model)

# 读取Pydantic对象
profile = variables.get_obj_variable_value("user_profile", UserProfile)

条件分支示例

# If节点配置
stepConfig={
    "input": {
        "cond": [
            {
                "next_id": "4",
                "expr": {
                    "type": "expr",
                    "value": "user['is_vip']"
                }
            },
            {
                "next_id": "5",
                "expr": {
                    "type": "expr",
                    "value": "not user['is_vip']"
                }
            }
        ]
    }
}

代码执行节点

stepConfig={
    "input": {
        "code_text": {
            "type": "str",
            "value": """
def main(a, b):
    return a + b
"""
        }
    },
    "output": {
        "result1": {"name": "sum_result"}  # 输出变量名
    }
}

错误处理

工作流引擎定义了以下异常类型:

  1. WorkFlowExecError:工作流执行异常
  2. VarNameError:非法变量名异常
  3. ValidationError:输入参数验证失败

最佳实践

  1. 命名规范

    • 变量名使用蛇形命名法(snake_case)
    • 节点类名使用驼峰命名法(CamelCase)
    • 步骤ID使用简单字符串("step1", "validate")
  2. 错误处理

    • 在节点中使用try-except捕获异常
    • 通过trace.add_trace_log记录错误详情
    • 对关键操作添加重试机制
  3. 性能优化

    • 避免在循环中频繁访问变量池
    • 对大文件使用流式处理
    • 对数据库查询添加缓存机制

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

duowen_workflow-0.1.0.tar.gz (13.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

duowen_workflow-0.1.0-py3-none-any.whl (13.5 kB view details)

Uploaded Python 3

File details

Details for the file duowen_workflow-0.1.0.tar.gz.

File metadata

  • Download URL: duowen_workflow-0.1.0.tar.gz
  • Upload date:
  • Size: 13.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.11

File hashes

Hashes for duowen_workflow-0.1.0.tar.gz
Algorithm Hash digest
SHA256 d44e42b6b56d804cd4640b925d12756c68aab4ee1606879a9c67a322dc7d727d
MD5 f427b3bdd5417c157777700cd07f4766
BLAKE2b-256 59b4eb5b0cfaef0f84a2f61be8aa3bc07bd8873d7364fab20caad6b247392c0b

See more details on using hashes here.

File details

Details for the file duowen_workflow-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for duowen_workflow-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 bf08d39461d2727d0221c24f45cfa6338c8b63de8833671a996c6a04744d57cf
MD5 8202a5605900987fadd285ea2b77fc55
BLAKE2b-256 28d4ad11d916a0a0abf1e9d08e9a8f12621469e659ee25837164d5d248ad94d8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page