A flexible GRAPH-based task orchestration framework.
Project description
CelestialFlow ——一个轻量级、可并行、基于图结构的 Python 任务调度框架
CelestialFlow 是一个轻量级但功能完全的任务流框架,适合需要 复杂依赖关系、灵活执行模型、跨设备运行与实时可视化监控 的中/大型 Python 任务系统。
- 相比 Airflow/Dagster 更轻、更快开始
- 相比 multiprocessing/threading 更结构化,可直接表达 loop / complete graph 等复杂依赖模式
框架的基本单元为 TaskExecutor,可独立运行,并支持四种执行模式:
- 线性(serial)
- 多线程(thread)
- 多进程(process)
- 协程(async)
TaskExecutor 实现了对任务的结果缓存,任务去重,进度条显示,多执行模式比较等功能,单独使用也很好用。
但除去直接使用 TaskExecutor,更重要的是使用其子类TaskStage。TaskStage 可以互相连接,形成具有上游与下游依赖关系的任务图(TaskGraph)。下游 stage 会自动接收上游执行完成的结果作为输入,从而形成明确的数据流。
TaskStage 的任务执行模式只有两种:
- 线性(serial)
- 多线程(thread)
在图级别上,每个 Stage 支持两种上下文模式:
- 线性执行(serial layout):当前节点执行完毕再启动下一节点(下游节点可提前接收任务但不会立即执行)。
- 并行执行(process layout):当前节点启动后立刻前去启动下一节点。
TaskGraph 能构建完整的 有向图结构(Directed Graph),不仅支持传统的有向无环图(DAG),也能灵活表达 树形(Tree)、环形(loop) 乃至于 完全图(Complete Graph) 形式的任务依赖。
在执行与调度之外,CelestialFlow 进一步引入 CelestialTree(简称: ctree) 事件追踪系统,为每一个任务及其衍生行为(成功、失败、重试、拆分、路由等)记录明确的因果关系。借助 ctree,可以从任意一个初始任务出发,完整还原其在 TaskGraph 中的传播路径与执行轨迹,使任务系统可以进行完整的追溯、分析、解释。
在此基础上,CelestialFlow 支持 Web 可视化监控,并可通过 Redis 实现跨进程、跨设备协作;同时引入基于 Go 的外部 worker(通过 Redis 通信),用于承载 CPU 密集型任务,弥补 Python 在该场景下的性能瓶颈。
项目结构(Project Structure)
flowchart LR
%% ===== TaskGraph =====
subgraph TG[TaskGraph]
direction LR
S1[TaskStage A]
S2[TaskStage B]
S3[TaskStage C]
S4[TaskStage D]
S1 --> S2 --> S3 --> S1
S1 --> S4
end
%% 美化 TaskGraph 外框
style TG fill:#e8f2ff,stroke:#6b93d6,stroke-width:2px,color:#0b1e3f,rx:10px,ry:10px
%% 统一美化格式
classDef blueNode fill:#ffffff,stroke:#6b93d6,rx:6px,ry:6px;
%% 美化 TaskStages
class S1,S2,S3,S4 blueNode;
%% ===== WebUI =====
subgraph W[WebUI]
JS
HTML
end
style W fill:#ffeaf0,stroke:#d66b8c,stroke-width:2px,rx:10px,ry:10px
style JS fill:#ffffff,stroke:#d66b8c,rx:5px,ry:5px
style HTML fill:#ffffff,stroke:#d66b8c,rx:5px,ry:5px
R[TaskWeb]
style R fill:#f0e9ff,stroke:#8a6bc9,stroke-width:2px,rx:8px,ry:8px
%% ===== Links =====
TG --> R
R --> TG
R --> W
W --> R
快速开始(Quick Start)
安装 CelestialFlow:
# 推荐使用 `uv` 管理依赖与环境
uv pip install celestialflow
# 不过也可以直接使用 `pip`
uv pip install celestialflow
一个简单的可运行代码:
from celestialflow import TaskStage, TaskGraph
def add(x, y):
return x + y
def square(x):
return x ** 2
if __name__ == "__main__":
# 定义两个任务节点
stage1 = TaskStage(add, execution_mode="thread", unpack_task_args=True)
stage2 = TaskStage(square, execution_mode="thread")
# 构建任务图结构
stage1.set_graph_context([stage2], stage_mode="process", stage_name="Adder")
stage2.set_graph_context([], stage_mode="process", stage_name="Squarer")
graph = TaskGraph([stage1])
# 初始化任务并启动
graph.start_graph({stage1.get_tag(): [(1, 2), (3, 4), (5, 6)]})
注意不要在.ipynb中运行。
👉 想查看完整Quick Start,请见Quick Start
深入阅读(Further Reading)
(以下文档完善中)
你可以继续运行更多的测试代码,这里记录了各个测试文件与其中的测试函数说明:
若你想了解框架的整体结构与核心组件,下面的参考文档会对你有帮助:
- stage/executor.md
- stage/stage.md
- graph/graph.md
- runtime/progress.md
- runtime/metrics.md
- runtime/queue.md
- stage/nodes.md
- observability/report.md
- graph/structure.md
- web/server.md
- other/go_worker.md
推荐阅读顺序:
flowchart TD
classDef core fill:#e6efff,stroke:#3b82f6,color:#1e3a8a;
classDef runtime fill:#e9f8ef,stroke:#22c55e,color:#14532d;
classDef structure fill:#fff6e6,stroke:#f59e0b,color:#78350f;
classDef execution fill:#f3e8ff,stroke:#a855f7,color:#581c87;
classDef web fill:#ffeaea,stroke:#ef4444,color:#7f1d1d;
TM[TaskExecutor.md] --> TS[TaskStage.md] --> TG[TaskGraph.md]
TM --> TP[TaskProgress.md]
TM --> TME[TaskMetrics.md]
TG --> TQ[TaskQueue.md]
TG --> TN[TaskNodes.md]
TG --> TR[TaskReport.md]
TG --> TSR[TaskStructure.md]
TR --> TW[TaskWeb.md]
TN --> GW[Go Worker.md]
class TM,TS,TG core;
class TP,TME runtime;
class TSR structure;
class TQ,TN,GW execution;
class TR,TW web;
以下三篇可以作为补充阅读:
如果你更喜欢通过完整案例理解框架的运行方式,可以参考这篇从零开始构建 TaskGraph 的教程:
如果你对3.0.7版本加入的ctree_client与其功能感兴趣, 可以看看这一篇:
环境要求(Requirements)
CelestialFlow 基于 Python 3.8+,并依赖以下核心组件。
请确保你的环境能够正常安装这些依赖(pip install celestialflow 会自动安装)。
| 依赖包 | 说明 |
|---|---|
| Python ≥ 3.10 | 运行环境,建议使用 3.10 及以上版本 |
| tqdm | 控制台进度条显示,用于任务执行可视化 |
| fastapi | Web 服务接口框架(用于任务可视化与远程控制) |
| uvicorn | FastAPI 的高性能 ASGI 服务器 |
| requests | HTTP 客户端库,用于任务状态上报与远程调用 |
| networkx | 任务图(TaskGraph)结构与依赖分析 |
| jinja2 | FastAPI 模板引擎,用于 Web 可视化界面渲染 |
| redis | 可选组件,用于分布式任务通信(TaskRedis* 系列模块) |
| celestialtree | 可选组件,用于任务状态上报与远程调用(ctree_client) |
文件结构(File Structure)
celestial-flow 3.1.6
(该视图由我的另一个项目CelestialVault中inst_file.FileTree.print_tree()生成。转换为图片则借助Carbon。)
版本日志(Version Log)
- 3.1.6
- feat
- 前端现在只储存一页的error数据, 有效减少了运行大规模任务时前端内存飙升的问题;
- 优化任务数显示, 大于1*10^7时显示科学计数法, 否则显示英式计数;
- 优化小屏模式下表格的显示: 改为用卡片式显示;
- 在error表格中加入index项;
- 大幅调整任务颜色分配, 现在重复任务使用黄色系, 等待任务使用灰色系;
- 将节点卡片中的已消耗时间颜色拟合为下方进度条颜色;
- refactor
- 用mypy整理了一遍类型标注;
- 删除与整合部分css代码;
- 将字体等适合rem单位的地方全部从px替换为rem, 并且都部分size进行统一;
- fix
- 修复总体剩余时间在特殊情况下显示0的问题(这玩意真麻烦);
- 修复部分小屏下的显示问题, 但折线图不显示的问题不太好解决;
- feat
Star 历史趋势(Star History)
如果对项目感兴趣的话,欢迎star。如果有问题或者建议的话, 欢迎提交Issues或者在Discussion中告诉我。
许可(License)
This project is licensed under the MIT License - see the LICENSE file for details.
作者(Author)
Author: Mr-xiaotian Email: mingxiaomingtian@gmail.com Project Link: https://github.com/Mr-xiaotian/CelestialFlow
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file celestialflow-3.1.6.tar.gz.
File metadata
- Download URL: celestialflow-3.1.6.tar.gz
- Upload date:
- Size: 1.5 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a8e934225314d42da0a35865aee146b633aeddfdf9042d48a971adce5ca51040
|
|
| MD5 |
f73bd6921acf39502f8b02cc255efd88
|
|
| BLAKE2b-256 |
a12bf6af40097de90eb6aae75a7c01dd0a33ec1380ed8c7e7fb78d6fe029e030
|
File details
Details for the file celestialflow-3.1.6-py3-none-any.whl.
File metadata
- Download URL: celestialflow-3.1.6-py3-none-any.whl
- Upload date:
- Size: 1.5 MB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
262e5be846b23c54da3d4565476a1a43fe0e43dac5dd68f83665be17592d9c50
|
|
| MD5 |
b156be9e18bd75f759eaf0d387fa3efa
|
|
| BLAKE2b-256 |
d346ca96473de678fc793f485ad48e3a63d194d474cb5aa3a3a87a460ef01043
|