Skip to main content

A flexible GRAPH-based task orchestration framework.

Project description

CelestialFlow ——一个轻量级、可并行、基于图结构的 Python 任务调度框架

CelestialFlow Logo

中文 | English | 日本語

CelestialFlow 是一个轻量级但功能完全的任务流框架,适合需要 复杂依赖关系灵活执行模型跨设备运行实时可视化监控 的中/大型 Python 任务系统。

  • 相比 Airflow/Dagster 更轻、更快开始
  • 相比 multiprocessing/threading 更结构化,可直接表达 loop / complete graph 等复杂依赖模式

框架的基本单元为 TaskExecutor,可独立运行,并支持三种执行模式:

  • 线性(serial)
  • 多线程(thread)
  • 协程(async)

TaskExecutor 实现了对任务的结果缓存,任务去重,进度条显示,多执行模式比较等功能,单独使用也很好用。

但除去直接使用 TaskExecutor,更重要的是使用其子类TaskStage。TaskStage 可以互相连接,形成具有上游与下游依赖关系的任务图(TaskGraph)。下游 stage 会自动接收上游执行完成的结果作为输入,从而形成明确的数据流。

TaskStage 的任务执行模式只有两种:

  • 线性(serial)
  • 多线程(thread)

在图级别上,每个 Stage 支持三种上下文模式:

  • 线性执行(serial layout):当前节点执行完毕再启动下一节点(下游节点可提前接收任务但不会立即执行)。
  • 线程执行(thread layout):当前节点在主进程的独立线程中启动,适合 I/O 密集型任务和不可 pickle 的函数(如 lambda)。
  • 并行执行(process layout):当前节点启动后立刻前去启动下一节点。

TaskGraph 能构建完整的 有向图结构(Directed Graph),不仅支持传统的有向无环图(DAG),也能灵活表达 树形(Tree)环形(loop) 乃至于 完全图(Complete Graph) 形式的任务依赖。

在执行与调度之外,CelestialFlow 进一步引入 CelestialTree(简称: ctree) 事件追踪系统,为每一个任务及其衍生行为(成功、失败、重试、拆分、路由等)记录明确的因果关系。借助 ctree,可以从任意一个初始任务出发,完整还原其在 TaskGraph 中的传播路径与执行轨迹,使任务系统可以进行完整的追溯、分析、解释

在此基础上,CelestialFlow 支持 Web 可视化监控,并可通过 Redis 实现跨进程、跨设备协作;同时引入基于 Go 的外部 worker(通过 Redis 通信),用于承载 CPU 密集型任务,弥补 Python 在该场景下的性能瓶颈。

项目结构(Project Structure)

flowchart LR

    %% ===== TaskGraph =====
    subgraph TG[TaskGraph]
        direction LR

        S1[TaskStage A]
        S2[TaskStage B]
        S3[TaskStage C]
        S4[TaskStage D]

        S1 --> S2 --> S3 --> S1
        S1 --> S4

    end

    %% 美化 TaskGraph 外框
    style TG fill:#e8f2ff,stroke:#6b93d6,stroke-width:2px,color:#0b1e3f,rx:10px,ry:10px

    %% 统一美化格式
    classDef blueNode fill:#ffffff,stroke:#6b93d6,rx:6px,ry:6px;

    %% 美化 TaskStages
    class S1,S2,S3,S4 blueNode;

    %% ===== WebUI =====
    subgraph W[WebUI]
        JS
        HTML
    end

    style W fill:#ffeaf0,stroke:#d66b8c,stroke-width:2px,rx:10px,ry:10px
    style JS fill:#ffffff,stroke:#d66b8c,rx:5px,ry:5px
    style HTML fill:#ffffff,stroke:#d66b8c,rx:5px,ry:5px

    R[TaskWeb]
    style R fill:#f0e9ff,stroke:#8a6bc9,stroke-width:2px,rx:8px,ry:8px

    %% ===== Links =====
    TG --> R 
    R --> TG 
    R --> W
    W --> R

快速开始(Quick Start)

安装 CelestialFlow:

# 推荐使用 `uv` 管理依赖与环境
uv pip install celestialflow

# 不过也可以直接使用 `pip`
pip install celestialflow

一个简单的可运行代码:

from celestialflow import TaskStage, TaskGraph

def add(x, y): 
    return x + y

def square(x): 
    return x ** 2

if __name__ == "__main__":
    # 定义两个任务节点
    stage1 = TaskStage(add, execution_mode="thread", unpack_task_args=True)
    stage2 = TaskStage(square, execution_mode="thread")

    # 构建任务图结构
    stage1 = TaskStage(func=add, execution_mode="thread", unpack_task_args=True, stage_mode="process", name="Adder")
    stage2 = TaskStage(func=square, execution_mode="thread", stage_mode="process", name="Squarer")

    graph = TaskGraph()
    graph.set_stages(root_stages=[stage1], stages=[stage2])
    graph.connect([stage1], [stage2])

    # 初始化任务并启动
    graph.start_graph({stage1.get_tag(): [(1, 2), (3, 4), (5, 6)]})

注意不要在.ipynb中运行。

👉 想查看完整Quick Start,请见Quick Start

深入阅读(Further Reading)

若你想了解框架的整体结构与核心组件,下面的参考文档会对你有帮助:

推荐阅读顺序:

flowchart TD
    classDef core fill:#e6efff,stroke:#3b82f6,color:#1e3a8a;
    classDef runtime fill:#e9f8ef,stroke:#22c55e,color:#14532d;
    classDef structure fill:#fff6e6,stroke:#f59e0b,color:#78350f;
    classDef execution fill:#f3e8ff,stroke:#a855f7,color:#581c87;
    classDef web fill:#ffeaea,stroke:#ef4444,color:#7f1d1d;

    TM[TaskExecutor.md] --> TS[TaskStage.md] --> TG[TaskGraph.md]
    TM --> TP[TaskProgress.md]
    TM --> TME[TaskMetrics.md]

    TG --> TQ[TaskQueue.md]
    TG --> TN[TaskStages.md]
    TG --> TR[TaskReport.md]
    TG --> TSR[TaskStructure.md]

    TR --> TW[TaskWeb.md]
    TN --> GW[Go Worker.md]

    class TM,TS,TG core;
    class TP,TME runtime;
    class TSR structure;
    class TQ,TN,GW execution;
    class TR,TW web;

以下三篇可以作为补充阅读:

如果你更喜欢通过完整案例理解框架的运行方式,可以参考这篇利用 TaskGraph 从零开始构建项目的教程:

📘案例教程

如果你对3.0.7版本加入的ctree_client与其功能感兴趣, 可以看看这一篇:

📚CelestialTreeClient

你可以继续运行更多的演示代码,这里记录了各个演示文件与其中的演示函数说明:

🎮demo/

​如果你想运行测试代码, 可以先查看如下文档内容:

🧪tests/

如果你想查看bench内容, 这里的数据成为框架中部分设计的决策依据:

⚡bench/

环境要求(Requirements)

CelestialFlow 基于 Python 3.10+,并依赖以下核心组件。
请确保你的环境能够正常安装这些依赖(pip install celestialflow 会自动安装)。

依赖包 说明
Python ≥ 3.10 运行环境,建议使用 3.10 及以上版本
tqdm 控制台进度条显示,用于任务执行可视化
fastapi Web 服务接口框架(用于任务可视化与远程控制)
uvicorn FastAPI 的高性能 ASGI 服务器
requests HTTP 客户端库,用于任务状态上报与远程调用
networkx 任务图(TaskGraph)结构与依赖分析
jinja2 FastAPI 模板引擎,用于 Web 可视化界面渲染
redis 可选组件,用于分布式任务通信(TaskRedis* 系列模块)
celestialtree 可选组件,用于任务状态上报与远程调用(ctree_client

文件结构(File Structure)

FileStructure
celestial-flow 3.1.8

(该视图由我的另一个项目CelestialVault中inst_file.FileTree.print_tree()生成。转换为图片则借助Carbon。)

版本日志(Version Log)

  • 3.1.8
    • feat:
      • [Important] 为stage_mode新增"thread"模式, 适合I/O密集型任务和不可pickle的函数;
        • 在io密集任务的bench测试中, "thread-thread"相比最慢的"serial-serial"快8.7x, 并且优于之前最快的"process-thread";
      • 新增docs/.../bench与docs/.../demo文档, 补充bench实测结果;
    • refactor:
      • [Important] 删除stage.set_stage_context, 改用graph.connect;
        • 接口破坏性重构, 需要修改大量graph相关代码;
      • [Important] name作为第一位参数在stage/executor中强制设定;
        • 接口破坏性重构
      • 优化async的executor_mode, 使其同时获取与消费任务, 而非集中消费;
      • 为上版本retry机制的重构收尾;
    • fix:
      • 修复find_unpickleable对bound method的误报;
      • 修复stage重复检测错误;
      • 修复retry任务无法被去重的问题;
    • chore:
      • 将demo/中所有test_函数名改为demo_函数名;
      • 重组docs目录结构, 将reference改为src;
      • docs中新增en/与ja/, 并将原有中文文档移至zh-CN/;

更多过往日志可看:

change_log.md

Star 历史趋势(Star History)

如果对项目感兴趣的话,欢迎star。如果有问题或者建议的话, 欢迎提交Issues或者在Discussion中告诉我。

Star History Chart

许可(License)

This project is licensed under the MIT License - see the LICENSE file for details.

作者(Author)

Author: Mr-xiaotian Email: mingxiaomingtian@gmail.com Project Link: https://github.com/Mr-xiaotian/CelestialFlow

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celestialflow-3.1.8.tar.gz (1.5 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

celestialflow-3.1.8-py3-none-any.whl (1.5 MB view details)

Uploaded Python 3

File details

Details for the file celestialflow-3.1.8.tar.gz.

File metadata

  • Download URL: celestialflow-3.1.8.tar.gz
  • Upload date:
  • Size: 1.5 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for celestialflow-3.1.8.tar.gz
Algorithm Hash digest
SHA256 f6dbe1922193ba146a0f966322a9b76a6b70ce40693bdcc0977df47cc626c38b
MD5 c0b4f66bf3a36e6b3b82e346a0a222c6
BLAKE2b-256 a60d59f4c63bad53245313f54df7bc07adc09000c6c5a4b54c7fffe5bc4c3d22

See more details on using hashes here.

File details

Details for the file celestialflow-3.1.8-py3-none-any.whl.

File metadata

  • Download URL: celestialflow-3.1.8-py3-none-any.whl
  • Upload date:
  • Size: 1.5 MB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for celestialflow-3.1.8-py3-none-any.whl
Algorithm Hash digest
SHA256 cf86fc4faa51d44ea4a765f9e3c46646e24277452adeaa942fca16f57e4166ea
MD5 f95a4d990e56512ccf1a345925dc259f
BLAKE2b-256 c459d341e792996fb0d0e6d20cccfa6e5cbf6ca7b62e4aea133dc7e9bf9cc90d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page