A lightweight, in-process async DAG orchestrator for high-performance task execution.
Project description
Astrum
A lightweight, in-process async DAG orchestrator for Python.
Astrum 是一个轻量、进程内运行的 async DAG 编排库,用 Python 函数声明任务图,自动并行执行无依赖分支,并返回结构化执行报告。
Astrum is designed for local workflow orchestration where you want the ergonomics of plain Python functions, the clarity of a DAG, and the performance profile of asyncio without introducing a distributed workflow service.
Installation / 安装
pip install astrum
If you want terminal visualization powered by Rich:
pip install "astrum[viz]"
如果需要在终端查看 DAG 和数据传输可视化,请安装 viz 可选依赖。
Examples that render Rich tables or trees, such as the coffee shop workflow, require the viz extra.
Quick Start / 快速开始
The recommended entry point is decorator mode: register functions with @task, declare execution dependencies with depends_on, pass upstream values with Ref / F, and call run().
推荐从装饰器模式开始:用 @task 注册任务,用 depends_on 声明执行依赖,用 Ref / F 传递上游返回值,然后调用 run()。
import asyncio
from astrum import F, AstrumConfig, Ref, run, task
@task("load_users")
async def load_users() -> dict:
await asyncio.sleep(0.1)
return {"users": ["Alice", "Bob"]}
@task("load_orders")
async def load_orders() -> dict:
await asyncio.sleep(0.1)
return {"orders": ["A-001", "A-002", "A-003"]}
@task("build_report", depends_on=["load_users", "load_orders"])
async def build_report(
users: Ref[list, F("load_users", "users")],
orders: Ref[list, F("load_orders", "orders")],
) -> dict:
return {"summary": f"{len(users)} users, {len(orders)} orders"}
@task("publish_report", depends_on=["build_report"])
async def publish_report(
summary: Ref[str, F("build_report", "summary")],
) -> None:
print(summary)
async def main() -> None:
report = await run(
target_tasks=["publish_report"],
config=AstrumConfig(skip_type_check=True, silence_warnings=True),
)
print(report.execution_state)
print(f"{report.successful_tasks}/{report.total_tasks} tasks completed")
asyncio.run(main())
Expected output:
2 users, 3 orders
completed
4/4 tasks completed
load_users and load_orders have no input parameters, so they can run in parallel. build_report and publish_report receive data from upstream task results.
load_users 和 load_orders 没有入参,会并行执行;build_report 和 publish_report 通过 Ref/F 接收上游任务结果。
Why Astrum / 为什么使用 Astrum
- Plain Python tasks / 普通 Python 函数即任务:同步函数和异步函数都可以注册为 DAG task。
- Async DAG execution / 异步 DAG 执行:没有依赖关系的任务自动并行启动。
- Structured reports / 结构化报告:执行状态、耗时、失败摘要、取消状态和重试记录都会进入
ExecutionReport。 - Data transport / 数据传输:用
Ref[T, F("task", "field")]声明下游参数来自哪里。 - Namespaces and registries / 命名空间与注册表:可用
namespace、use_namespace()或SchedulerRegistry隔离工作流。 - Retries / 重试:用
@task(..., retry=N)为单个任务设置失败重试次数。
When Not to Use It / 不适合场景
Astrum is intentionally small and in-process. It is not a replacement for distributed workflow engines.
Astrum 当前不是分布式调度系统。如果你的需求是下面这些场景,通常应该选择更重的工作流平台或队列系统:
- Cross-machine scheduling / 跨机器调度
- Persistent queues / 持久化队列
- Cron-like scheduled jobs / 定时任务平台
- Long-running distributed workflows / 长生命周期分布式工作流
- Worker fleet management / 多 worker 集群管理
Core Concepts / 核心概念
- Task / 任务:一个已注册的 Python callable。任务可以是
async def,也可以是普通def。 - DAG / 有向无环图:任务之间的依赖关系。某个任务只有在上游依赖完成后才会启动。
- Scheduler / 调度器:把 DAG 拆成执行阶段,启动可并行任务,并收集执行结果。
- ExecutionReport / 执行报告:
run()和scheduler.execute()的返回值,包含状态、耗时、统计和错误摘要。 - Data Transport / 数据传输:把上游任务的返回值注入下游函数参数。推荐使用
Ref/F注解。
Documentation / 文档
The full documentation is bilingual and built with MkDocs + Material for MkDocs.
完整文档支持中英文,并使用 MkDocs + Material for MkDocs 构建。
- 中文文档入口
- English documentation
- 快速开始 / Quickstart
- 中文使用指南
- English Usage Guide
- 外部 API Reference
- Internal API Reference
The local Markdown files remain available in docs/ for offline reading and contributions.
Examples / 示例
- Fast Start:从串行、并行、fan-in 到异步重试的工作流模式速览。
- Coffee Shop:用咖啡店流程解释
TaskData/DataItem/DTRela数据流。 - Stateless Text Retriever:用复杂检索链路展示 fan-out、fan-in、多分支评分和 rerank。
The corresponding English pages are available next to the Chinese pages with .en.md filenames.
Development / 开发
This project uses uv for dependency management.
uv sync --extra docs --extra viz
uv run pytest
uv run mkdocs serve
uv run mkdocs build --strict
Common checks before publishing:
uv run pytest
uv run mkdocs build --strict
Project Status / 项目状态
Current version: 0.1.0.
Astrum is preparing for its first public PyPI release. The core workflow model is usable, while APIs may still receive small refinements as the project stabilizes.
Astrum 正在准备首次发布到 PyPI。当前核心工作流模型已经可用,但在 0.1.x 阶段仍可能围绕 API 命名、文档和类型检查体验做小幅改进。
License / 许可证
Astrum is released under the MIT License.
Astrum 使用 MIT 许可证发布。
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file astrum-0.1.0.tar.gz.
File metadata
- Download URL: astrum-0.1.0.tar.gz
- Upload date:
- Size: 64.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dd8262afd91305b49eb97905f56792ba8371d9aa007ab5f10a1dab41871a0d5a
|
|
| MD5 |
4be629a60dccd87fb0e8dd58885d8c9a
|
|
| BLAKE2b-256 |
e9c0b2ba0d8ea9d1e68e1526694f236207b412714f596b1a64cac310894cd90b
|
File details
Details for the file astrum-0.1.0-py3-none-any.whl.
File metadata
- Download URL: astrum-0.1.0-py3-none-any.whl
- Upload date:
- Size: 40.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
40ee9f59f8b0ce08ba24566f7eb0cc797ad87f53e47b86516550491b8d9da5da
|
|
| MD5 |
73683b832a49472576bbdd75750f93b1
|
|
| BLAKE2b-256 |
b651c0386f1679a881ab1d3d9db1ea05e0db41af8d60befe8d7f9c4e850e6908
|