Skip to main content

A flexible GRAPH-based task orchestration framework.

Project description

CelestialFlow ——一个轻量级、可并行、基于图结构的 Python 任务调度框架

CelestialFlow Logo

CelestialFlow 是一个轻量级但功能完全的任务流框架,适合需要 复杂依赖关系灵活执行模型跨设备运行实时可视化监控 的中/大型 Python 任务系统。

  • 相比 Airflow/Dagster 更轻、更快开始
  • 相比 multiprocessing/threading 更结构化,可直接表达 loop / complete graph 等复杂依赖模式

框架的基本单元为 TaskExecutor,可独立运行,并支持三种执行模式:

  • 线性(serial)
  • 多线程(thread)
  • 协程(async)

TaskExecutor 实现了对任务的结果缓存,任务去重,进度条显示,多执行模式比较等功能,单独使用也很好用。

但除去直接使用 TaskExecutor,更重要的是使用其子类TaskStage。TaskStage 可以互相连接,形成具有上游与下游依赖关系的任务图(TaskGraph)。下游 stage 会自动接收上游执行完成的结果作为输入,从而形成明确的数据流。

TaskStage 的任务执行模式只有两种:

  • 线性(serial)
  • 多线程(thread)

在图级别上,每个 Stage 支持两种上下文模式:

  • 线性执行(serial layout):当前节点执行完毕再启动下一节点(下游节点可提前接收任务但不会立即执行)。
  • 并行执行(process layout):当前节点启动后立刻前去启动下一节点。

TaskGraph 能构建完整的 有向图结构(Directed Graph),不仅支持传统的有向无环图(DAG),也能灵活表达 树形(Tree)环形(loop) 乃至于 完全图(Complete Graph) 形式的任务依赖。

在执行与调度之外,CelestialFlow 进一步引入 CelestialTree(简称: ctree) 事件追踪系统,为每一个任务及其衍生行为(成功、失败、重试、拆分、路由等)记录明确的因果关系。借助 ctree,可以从任意一个初始任务出发,完整还原其在 TaskGraph 中的传播路径与执行轨迹,使任务系统可以进行完整的追溯、分析、解释

在此基础上,CelestialFlow 支持 Web 可视化监控,并可通过 Redis 实现跨进程、跨设备协作;同时引入基于 Go 的外部 worker(通过 Redis 通信),用于承载 CPU 密集型任务,弥补 Python 在该场景下的性能瓶颈。

项目结构(Project Structure)

flowchart LR

    %% ===== TaskGraph =====
    subgraph TG[TaskGraph]
        direction LR

        S1[TaskStage A]
        S2[TaskStage B]
        S3[TaskStage C]
        S4[TaskStage D]

        S1 --> S2 --> S3 --> S1
        S1 --> S4

    end

    %% 美化 TaskGraph 外框
    style TG fill:#e8f2ff,stroke:#6b93d6,stroke-width:2px,color:#0b1e3f,rx:10px,ry:10px

    %% 统一美化格式
    classDef blueNode fill:#ffffff,stroke:#6b93d6,rx:6px,ry:6px;

    %% 美化 TaskStages
    class S1,S2,S3,S4 blueNode;

    %% ===== WebUI =====
    subgraph W[WebUI]
        JS
        HTML
    end

    style W fill:#ffeaf0,stroke:#d66b8c,stroke-width:2px,rx:10px,ry:10px
    style JS fill:#ffffff,stroke:#d66b8c,rx:5px,ry:5px
    style HTML fill:#ffffff,stroke:#d66b8c,rx:5px,ry:5px

    R[TaskWeb]
    style R fill:#f0e9ff,stroke:#8a6bc9,stroke-width:2px,rx:8px,ry:8px

    %% ===== Links =====
    TG --> R 
    R --> TG 
    R --> W
    W --> R

快速开始(Quick Start)

安装 CelestialFlow:

# 推荐使用 `uv` 管理依赖与环境
uv pip install celestialflow

# 不过也可以直接使用 `pip`
pip install celestialflow

一个简单的可运行代码:

from celestialflow import TaskStage, TaskGraph

def add(x, y): 
    return x + y

def square(x): 
    return x ** 2

if __name__ == "__main__":
    # 定义两个任务节点
    stage1 = TaskStage(add, execution_mode="thread", unpack_task_args=True)
    stage2 = TaskStage(square, execution_mode="thread")

    # 构建任务图结构
    stage1.set_graph_context([stage2], stage_mode="process", stage_name="Adder")
    stage2.set_graph_context([], stage_mode="process", stage_name="Squarer")
    graph = TaskGraph([stage1])

    # 初始化任务并启动
    graph.start_graph({stage1.get_tag(): [(1, 2), (3, 4), (5, 6)]})

注意不要在.ipynb中运行。

👉 想查看完整Quick Start,请见Quick Start

深入阅读(Further Reading)

(以下文档完善中)

你可以继续运行更多的测试代码,这里记录了各个测试文件与其中的测试函数说明:

📄tests/README.md

若你想了解框架的整体结构与核心组件,下面的参考文档会对你有帮助:

推荐阅读顺序:

flowchart TD
    classDef core fill:#e6efff,stroke:#3b82f6,color:#1e3a8a;
    classDef runtime fill:#e9f8ef,stroke:#22c55e,color:#14532d;
    classDef structure fill:#fff6e6,stroke:#f59e0b,color:#78350f;
    classDef execution fill:#f3e8ff,stroke:#a855f7,color:#581c87;
    classDef web fill:#ffeaea,stroke:#ef4444,color:#7f1d1d;

    TM[TaskExecutor.md] --> TS[TaskStage.md] --> TG[TaskGraph.md]
    TM --> TP[TaskProgress.md]
    TM --> TME[TaskMetrics.md]

    TG --> TQ[TaskQueue.md]
    TG --> TN[TaskStages.md]
    TG --> TR[TaskReport.md]
    TG --> TSR[TaskStructure.md]

    TR --> TW[TaskWeb.md]
    TN --> GW[Go Worker.md]

    class TM,TS,TG core;
    class TP,TME runtime;
    class TSR structure;
    class TQ,TN,GW execution;
    class TR,TW web;

以下三篇可以作为补充阅读:

如果你更喜欢通过完整案例理解框架的运行方式,可以参考这篇从零开始构建 TaskGraph 的教程:

📘案例教程

如果你对3.0.7版本加入的ctree_client与其功能感兴趣, 可以看看这一篇:

📚CelestialTreeClient

环境要求(Requirements)

CelestialFlow 基于 Python 3.10+,并依赖以下核心组件。
请确保你的环境能够正常安装这些依赖(pip install celestialflow 会自动安装)。

依赖包 说明
Python ≥ 3.10 运行环境,建议使用 3.10 及以上版本
tqdm 控制台进度条显示,用于任务执行可视化
fastapi Web 服务接口框架(用于任务可视化与远程控制)
uvicorn FastAPI 的高性能 ASGI 服务器
requests HTTP 客户端库,用于任务状态上报与远程调用
networkx 任务图(TaskGraph)结构与依赖分析
jinja2 FastAPI 模板引擎,用于 Web 可视化界面渲染
redis 可选组件,用于分布式任务通信(TaskRedis* 系列模块)
celestialtree 可选组件,用于任务状态上报与远程调用(ctree_client

文件结构(File Structure)

FileStructure
celestial-flow 3.1.7

(该视图由我的另一个项目CelestialVault中inst_file.FileTree.print_tree()生成。转换为图片则借助Carbon。)

版本日志(Version Log)

  • 3.1.7
    • feat:
      • [Important] 删除executor中的"process"模式, 它跟新的retry机制实在难以匹配;
    • refactor:
      • [Important] 大幅重构retry机制, 现在不再让retry任务重进task_queue, 而是在worker中直接解决;
        • 模仿CelestialForge中grow的做法;
      • 大幅重构原本成功与失败结果的缓存机制, 并将原有get_success/error_dict改名为get_success/error_pairs, 以避免task无法作为键时会导致的问题;
        • fail数据从fail.jsonl中提取;
        • success数据从result_queue中直接获取, 为此添加了SuccessSpout作为集合端;
    • fix:
      • 修复部分任务类型无法在log_spout中dump进jsonl文件的问题;
      • 修复在executor运行时, retry添加任务total数的问题, 虽然这样很帅;

Star 历史趋势(Star History)

如果对项目感兴趣的话,欢迎star。如果有问题或者建议的话, 欢迎提交Issues或者在Discussion中告诉我。

Star History Chart

许可(License)

This project is licensed under the MIT License - see the LICENSE file for details.

作者(Author)

Author: Mr-xiaotian Email: mingxiaomingtian@gmail.com Project Link: https://github.com/Mr-xiaotian/CelestialFlow

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celestialflow-3.1.7.tar.gz (1.5 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

celestialflow-3.1.7-py3-none-any.whl (1.5 MB view details)

Uploaded Python 3

File details

Details for the file celestialflow-3.1.7.tar.gz.

File metadata

  • Download URL: celestialflow-3.1.7.tar.gz
  • Upload date:
  • Size: 1.5 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for celestialflow-3.1.7.tar.gz
Algorithm Hash digest
SHA256 da5a48685b7797061c6848a3c1b0629cbd5b2e87fcab30a6283ce7a298d97106
MD5 4f636e08494e767159fc6dc001089f17
BLAKE2b-256 c4fcd2250fa11d2fdd1c924610cdbaa705002848d31242364db5e6e0a41d589a

See more details on using hashes here.

File details

Details for the file celestialflow-3.1.7-py3-none-any.whl.

File metadata

  • Download URL: celestialflow-3.1.7-py3-none-any.whl
  • Upload date:
  • Size: 1.5 MB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for celestialflow-3.1.7-py3-none-any.whl
Algorithm Hash digest
SHA256 462068de1d0b7799db6251a1c6f74832fdcea5378de1990f957fbf7d96ec81d8
MD5 a14faf140d2edc78aca5e085480bd9ff
BLAKE2b-256 f65d4e8c10e208079d45224d51a26b3e24fbe4d51d95317a7d91d8298ebbda9c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page