Skip to main content

A simple agent flow engine for Python

Project description

RioFlux

一个基于 DAG(有向无环图)的 Agent 工作流引擎,用于构建和执行复杂的任务流程。

特性

  • 基于 DAG 的任务编排
  • 灵活的任务依赖关系管理
  • 支持条件分支控制
  • 任务间数据共享机制
  • Python 原生任务支持
  • Agent 任务支持(开发中)

安装

pip install rioflux

快速开始

以下是一个简单的示例,展示如何使用 RioFlux 创建和执行一个包含分支逻辑的工作流:

from rioflux import DAG, PythonOperator, BranchPythonOperator

# 定义任务函数
def load_data(context):
    data = {"value": 5}
    context.set_var("loaded_data", data)
    return data

def process_high(context):
    data = context.get_var("loaded_data")
    return f"Processing high value: {data['value']}"

def process_low(context):
    data = context.get_var("loaded_data")
    return f"Processing low value: {data['value']}"

# 定义分支条件
def branch_func(context):
    data = context.get_var("loaded_data")
    return "process_high" if data["value"] > 10 else "process_low"

# 创建 DAG 并定义任务
with DAG(dag_id="example_dag") as dag:
    load_task = PythonOperator(
        task_id="load_task",
        python_callable=load_data
    )
    
    branch_task = BranchPythonOperator(
        task_id="branch_task",
        python_callable=branch_func
    )
    
    high_task = PythonOperator(
        task_id="process_high",
        python_callable=process_high
    )
    
    low_task = PythonOperator(
        task_id="process_low",
        python_callable=process_low
    )
    
    # 定义任务依赖
    load_task >> branch_task >> [high_task, low_task]

# 执行 DAG
dag.run()

核心组件

BaseOperator

  • 所有具体任务的基类
  • 提供任务 ID 管理
  • 实现任务依赖关系的管理
  • 支持 >><< 操作符来定义任务依赖

DAG

  • 管理任务集合和依赖关系
  • 提供任务执行的调度逻辑
  • 维护任务状态
  • 管理上下文数据

DAGContext

  • 提供任务间数据共享机制
  • 存储任务执行结果
  • 支持变量的设置和获取

内置操作符

  • PythonOperator: 执行 Python 可调用对象
  • BranchPythonOperator: 实现条件分支控制
  • BaseAgent: Agent 任务基类(开发中)

最佳实践

  1. 任务粒度

    • 保持任务功能单一
    • 合理划分任务边界
    • 避免任务间过度耦合
  2. 错误处理

    • 在任务中妥善处理异常
    • 提供清晰的错误信息
    • 考虑添加重试机制
  3. 上下文使用

    • 合理使用上下文共享数据
    • 避免在上下文中存储过大数据
    • 及时清理不需要的上下文数据

开发计划

  • 支持任务并行执行
  • 添加任务重试机制
  • 提供任务执行监控和可视化
  • 支持子 DAG
  • 添加更多类型的操作符

要求

  • Python >= 3.13

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rioflux-0.1.1-py3-none-any.whl (8.8 kB view details)

Uploaded Python 3

File details

Details for the file rioflux-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: rioflux-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 8.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.13

File hashes

Hashes for rioflux-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 05f123a38238ed487bb7d19a7af5fa11365386af841fd04b9f580fae5630ab54
MD5 e156817799adc6453f1546f4898f8c7c
BLAKE2b-256 c1ff61f1ca7483714bc1d9f37aef92a8daa7809ce58bc700fd33ce6247e5d51d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page