A flexible GRAPH-based task orchestration framework.
Project description
CelestialFlow ——一个轻量级、可并行、基于图结构的 Python 任务调度框架
CelestialFlow 是一个轻量级但功能完全的任务流框架,适合需要 复杂依赖关系、灵活执行模型、跨设备运行与实时可视化监控 的中/大型 Python 任务系统。
- 相比 Airflow/Dagster 更轻、更快开始
- 相比 multiprocessing/threading 更结构化,可直接表达 loop / complete graph 等复杂依赖模式
框架的基本单元为 TaskExecutor,可独立运行,并支持三种执行模式:
- 线性(serial)
- 多线程(thread)
- 协程(async)
TaskExecutor 实现了对任务的结果缓存,任务去重,进度条显示,多执行模式比较等功能,单独使用也很好用。
但除去直接使用 TaskExecutor,更重要的是使用其子类TaskStage。TaskStage 可以互相连接,形成具有上游与下游依赖关系的任务图(TaskGraph)。下游 stage 会自动接收上游执行完成的结果作为输入,从而形成明确的数据流。
TaskStage 的任务执行模式同样包含三种,与TaskExecutor中一致。
在图级别上,每个 Stage 支持三种上下文模式:
- 线性执行(serial layout):当前节点执行完毕再启动下一节点(下游节点可提前接收任务但不会立即执行)。
- 线程执行(thread layout):当前节点在主进程的独立线程中启动,适合 I/O 密集型任务和不可 pickle 的函数(如 lambda)。
- 并行执行(process layout):当前节点启动后立刻前去启动下一节点。
TaskGraph 能构建完整的 有向图结构(Directed Graph),不仅支持传统的有向无环图(DAG),也能灵活表达 树形(Tree)、环形(loop) 乃至于 完全图(Complete Graph) 形式的任务依赖。
在执行与调度之外,CelestialFlow 进一步引入 CelestialTree(简称: ctree) 事件追踪系统,为每一个任务及其衍生行为(成功、失败、重试、拆分、路由等)记录明确的因果关系。借助 ctree,可以从任意一个初始任务出发,完整还原其在 TaskGraph 中的传播路径与执行轨迹,使任务系统可以进行完整的追溯、分析、解释。
在此基础上,CelestialFlow 支持 Web 可视化监控,并可通过 Redis 实现跨进程、跨设备协作;同时引入基于 Go 的外部 worker(通过 Redis 通信),用于承载 CPU 密集型任务,弥补 Python 在该场景下的性能瓶颈。
项目结构(Project Structure)
flowchart LR
%% ===== TaskGraph =====
subgraph TG[TaskGraph]
direction LR
S1[TaskStage A]
S2[TaskStage B]
S3[TaskStage C]
S4[TaskStage D]
S1 --> S2 --> S3 --> S1
S1 --> S4
end
%% 美化 TaskGraph 外框
style TG fill:#e8f2ff,stroke:#6b93d6,stroke-width:2px,color:#0b1e3f,rx:10px,ry:10px
%% 统一美化格式
classDef blueNode fill:#ffffff,stroke:#6b93d6,rx:6px,ry:6px;
%% 美化 TaskStages
class S1,S2,S3,S4 blueNode;
%% ===== WebUI =====
subgraph W[WebUI]
JS
HTML
end
style W fill:#ffeaf0,stroke:#d66b8c,stroke-width:2px,rx:10px,ry:10px
style JS fill:#ffffff,stroke:#d66b8c,rx:5px,ry:5px
style HTML fill:#ffffff,stroke:#d66b8c,rx:5px,ry:5px
R[TaskWeb]
style R fill:#f0e9ff,stroke:#8a6bc9,stroke-width:2px,rx:8px,ry:8px
%% ===== Links =====
TG --> R
R --> TG
R --> W
W --> R
快速开始(Quick Start)
安装 CelestialFlow:
# 推荐使用 `uv` 管理依赖与环境
uv pip install celestialflow
# 不过也可以直接使用 `pip`
pip install celestialflow
一个简单的可运行代码:
from celestialflow import TaskStage, TaskGraph
def add(x, y):
return x + y
def square(x):
return x ** 2
if __name__ == "__main__":
# 定义两个任务节点
stage1 = TaskStage(name="Adder", func=add, execution_mode="thread", unpack_task_args=True, stage_mode="process")
stage2 = TaskStage(name="Squarer"func=square, execution_mode="thread", stage_mode="process")
# 构建任务图结构
graph = TaskGraph()
graph.set_stages(stages=[stage1, stage2])
graph.connect([stage1], [stage2])
# 初始化任务并启动
graph.start_graph({stage1.get_tag(): [(1, 2), (3, 4), (5, 6)]})
注意不要在.ipynb中运行。
👉 想查看完整Quick Start,请见Quick Start
深入阅读(Further Reading)
若你想了解框架的整体结构与核心组件,下面的参考文档会对你有帮助:
- stage/core_executor.md
- stage/core_stage.md
- graph/core_graph.md
- observability/core_progress.md
- runtime/core_metrics.md
- runtime/core_queue.md
- stage/core_stages.md
- observability/core_report.md
- graph/core_structure.md
- web/core_server.md
- other/go_worker.md
推荐阅读顺序:
flowchart TD
classDef core fill:#e6efff,stroke:#3b82f6,color:#1e3a8a;
classDef runtime fill:#e9f8ef,stroke:#22c55e,color:#14532d;
classDef structure fill:#fff6e6,stroke:#f59e0b,color:#78350f;
classDef execution fill:#f3e8ff,stroke:#a855f7,color:#581c87;
classDef web fill:#ffeaea,stroke:#ef4444,color:#7f1d1d;
TM[TaskExecutor.md] --> TS[TaskStage.md] --> TG[TaskGraph.md]
TM --> TP[TaskProgress.md]
TM --> TME[TaskMetrics.md]
TG --> TQ[TaskQueue.md]
TG --> TN[TaskStages.md]
TG --> TR[TaskReport.md]
TG --> TSR[TaskStructure.md]
TR --> TW[TaskWeb.md]
TN --> GW[Go Worker.md]
class TM,TS,TG core;
class TP,TME runtime;
class TSR structure;
class TQ,TN,GW execution;
class TR,TW web;
以下三篇可以作为补充阅读:
- runtime/util_queue.md
- runtime/util_types.md
- runtime/util_errors.md
- persistence/core_fail.md
- persistence/core_log.md
如果你更喜欢通过完整案例理解框架的运行方式,可以参考这篇利用 TaskGraph 从零开始构建项目的教程:
如果你对3.0.7版本加入的ctree_client与其功能感兴趣, 可以看看这一篇:
你可以继续运行更多的演示代码,这里记录了各个演示文件与其中的演示函数说明:
如果你想运行测试代码, 可以先查看如下文档内容:
如果你想查看bench内容, 这里的数据成为框架中部分设计的决策依据:
环境要求(Requirements)
CelestialFlow 基于 Python 3.10+,并依赖以下核心组件。
请确保你的环境能够正常安装这些依赖(pip install celestialflow 会自动安装)。
| 依赖包 | 说明 |
|---|---|
| Python ≥ 3.10 | 运行环境,建议使用 3.10 及以上版本 |
| fastapi | Web 服务接口框架(用于任务可视化与远程控制) |
| uvicorn | FastAPI 的高性能 ASGI 服务器 |
| requests | HTTP 客户端库,用于任务状态上报与远程调用 |
| networkx | 任务图(TaskGraph)结构与依赖分析 |
| jinja2 | FastAPI 模板引擎,用于 Web 可视化界面渲染 |
| tqdm | 可选组件,进度条显示,用于任务执行可视化 |
| redis | 可选组件,用于分布式任务通信(TaskRedis* 系列模块) |
| celestialtree | 可选组件,用于任务状态上报与远程调用(ctree_client) |
文件结构(File Structure)
celestial-flow 3.2.0
(该视图由我的另一个项目CelestialVault中inst_file.FileTree.print_tree()生成。转换为图片则借助Carbon。)
版本日志(Version Log)
- 3.2.0
- feat:
- [Important] 彻底废弃
stage_mode="process", 移除所有 multiprocessing 依赖(MPValue, MPQueue, multiprocessing.Process);- bench_graph_mode 数据表明 process 模式在所有场景下均慢于 thread 模式, 且引入大量序列化开销和 pickle 限制;
- [Important] 删除原有set_stages中手动输入的
root_stages参数, 取而代之为通过scc缩合图计算出的一组source_stages- 重补了不少图论课
- 将graph/stage/executor的默认log level从
SUCCESS改为INFO, 也就是默认只显示开启关闭信息与错误 - 在web页面中添加配置按钮
- 当前仅支持设置刷新间隔与历史长度, 之后可以进行更多设置
- [Important] 彻底废弃
- refactor:
- 由于stage_mode中取消
process, 框架中部分为了适配process而进行的设计进行删除或者重构- 例如将所有的MPValue和MPQueue改为int与Queue
- 尚未进行严格的bench测试, 但应该会带来一定的性能优化
- 重构networkx图的建立过程, 现在直接通过节点与出边进行建立, 不再依赖递归
- 在重试检测机制中计算bytes类型的hash, 而非原本的string类型
- 根据 bench_hash_memory, 节省内存约23%
- 将节点状态中的deltas数据放在web端由js计算, 减少不必要的通信数据
- 由于stage_mode中取消
- fix:
- 删除InQueue.get中的错误捕获, 这会导致错过panic级error
- feat:
更多过往日志可看:
Star 历史趋势(Star History)
如果对项目感兴趣的话,欢迎star。如果有问题或者建议的话, 欢迎提交Issues或者在Discussion中告诉我。
许可(License)
This project is licensed under the MIT License - see the LICENSE file for details.
作者(Author)
Author: Mr-xiaotian Email: mingxiaomingtian@gmail.com Project Link: https://github.com/Mr-xiaotian/CelestialFlow
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file celestialflow-3.2.0.tar.gz.
File metadata
- Download URL: celestialflow-3.2.0.tar.gz
- Upload date:
- Size: 1.5 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c5ac6322d7a6d6a3e40e95d1a45c7995e343de942b1a0f0b5f1e82c454c9f8c4
|
|
| MD5 |
85e4e4041439573d3a7992fb22fbc5c9
|
|
| BLAKE2b-256 |
18e57cfa03984fae6bb15308cf8d1e829eea5a962332765bf542df94a53f3c9d
|
File details
Details for the file celestialflow-3.2.0-py3-none-any.whl.
File metadata
- Download URL: celestialflow-3.2.0-py3-none-any.whl
- Upload date:
- Size: 1.5 MB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6ae3a9505bb3afc74402fb30f271294eda4cb81cbbf90625df98826979022d2e
|
|
| MD5 |
7b4fd1b12d8eba35e82eb6028cf01c2b
|
|
| BLAKE2b-256 |
3c036c7314d0a02e875d233ceb51c31259f2642cbc765e7a6ed3c41d07e6f8c4
|