A simple agent flow engine for Python
Project description
RioFlux
一个基于 DAG(有向无环图)的 Agent 工作流引擎,用于构建和执行复杂的任务流程。
特性
- 基于 DAG 的任务编排
- 灵活的任务依赖关系管理
- 支持条件分支控制
- 任务间数据共享机制
- Python 原生任务支持
- Agent 任务支持(开发中)
安装
pip install rioflux
快速开始
以下是一个简单的示例,展示如何使用 RioFlux 创建和执行一个包含分支逻辑的工作流:
from rioflux import DAG, PythonOperator, BranchPythonOperator
# 定义任务函数
def load_data(context):
data = {"value": 5}
context.set_var("loaded_data", data)
return data
def process_high(context):
data = context.get_var("loaded_data")
return f"Processing high value: {data['value']}"
def process_low(context):
data = context.get_var("loaded_data")
return f"Processing low value: {data['value']}"
# 定义分支条件
def branch_func(context):
data = context.get_var("loaded_data")
return "process_high" if data["value"] > 10 else "process_low"
# 创建 DAG 并定义任务
with DAG(dag_id="example_dag") as dag:
load_task = PythonOperator(
task_id="load_task",
python_callable=load_data
)
branch_task = BranchPythonOperator(
task_id="branch_task",
python_callable=branch_func
)
high_task = PythonOperator(
task_id="process_high",
python_callable=process_high
)
low_task = PythonOperator(
task_id="process_low",
python_callable=process_low
)
# 定义任务依赖
load_task >> branch_task >> [high_task, low_task]
# 执行 DAG
dag.run()
核心组件
BaseOperator
- 所有具体任务的基类
- 提供任务 ID 管理
- 实现任务依赖关系的管理
- 支持
>>和<<操作符来定义任务依赖
DAG
- 管理任务集合和依赖关系
- 提供任务执行的调度逻辑
- 维护任务状态
- 管理上下文数据
DAGContext
- 提供任务间数据共享机制
- 存储任务执行结果
- 支持变量的设置和获取
内置操作符
- PythonOperator: 执行 Python 可调用对象
- BranchPythonOperator: 实现条件分支控制
- BaseAgent: Agent 任务基类(开发中)
最佳实践
-
任务粒度
- 保持任务功能单一
- 合理划分任务边界
- 避免任务间过度耦合
-
错误处理
- 在任务中妥善处理异常
- 提供清晰的错误信息
- 考虑添加重试机制
-
上下文使用
- 合理使用上下文共享数据
- 避免在上下文中存储过大数据
- 及时清理不需要的上下文数据
开发计划
- 支持任务并行执行
- 添加任务重试机制
- 提供任务执行监控和可视化
- 支持子 DAG
- 添加更多类型的操作符
要求
- Python >= 3.13
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
No source distribution files available for this release.See tutorial on generating distribution archives.
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rioflux-0.1.1-py3-none-any.whl.
File metadata
- Download URL: rioflux-0.1.1-py3-none-any.whl
- Upload date:
- Size: 8.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
05f123a38238ed487bb7d19a7af5fa11365386af841fd04b9f580fae5630ab54
|
|
| MD5 |
e156817799adc6453f1546f4898f8c7c
|
|
| BLAKE2b-256 |
c1ff61f1ca7483714bc1d9f37aef92a8daa7809ce58bc700fd33ce6247e5d51d
|