A lightweight, in-process async DAG orchestrator for high-performance task execution.
Project description
Astrum
A lightweight, in-process async DAG orchestrator for Python.
Astrum 是一个轻量、进程内运行的异步复杂多依赖任务调度库,用 Python 装饰器/静态DAG声明任务图,声明式高级依赖传递,自动并行执行无依赖/依赖分支,并返回结构化执行报告。
Astrum 通过自动并行执行无依赖/依赖已完成分支,减少手动编排 async 工作流时容易产生的不必要等待。使用场景多见于需要在单个 Python 进程内编排工作流的业务代码中,或者需要在测试环境里模拟复杂 DAG 执行的场景。如Multi-Agent系统、复杂数据处理流程、异步任务编排等。
Astrum is a lightweight, in-process, asynchronous task scheduling library designed for complex multi-dependency workflows. It utilizes Python decorators and static DAGs (Directed Acyclic Graphs) to declare task graphs, features declarative advanced dependency passing, and automatically executes independent or branching dependencies in parallel while returning a structured execution report.
Astrum helps reduce unnecessary waiting in manually orchestrated async workflows by automatically running dependency-free branches concurrently, multi-dependency parallel task systems. Common use cases include business logic requiring workflow orchestration within a single Python process, or scenarios needing to simulate complex DAG execution in testing environments—such as Multi-Agent systems, complex data processing pipelines, and asynchronous task orchestration.
Astrum 还处于早期阶段,现在社区反馈对我非常重要。我尤其希望听到真实工作流中的使用体验:如果 API 不顺手、概念不清楚、行为太魔法、文档没讲明白,或者你觉得某个能力应该做 / 不应该做,请直接提 issue。这些反馈会直接影响 0.2.0 之前的 API 设计。
Astrum is still early, and community feedback is very important right now. I am especially looking for feedback from people who try it in real workflows. If the API feels awkward, unclear, too magical, or missing an important concept, please open an issue — this feedback will directly shape the path to 0.2.0.
Installation / 安装
pip install astrum
If you want terminal visualization powered by Rich:
pip install "astrum[viz]"
如果需要在终端查看 DAG 和数据传输可视化,请安装 viz 可选依赖。
Examples that render Rich tables or trees, such as the coffee shop workflow, require the viz extra.
Current status
Astrum is currently in the early 0.1.x stage.
The core execution model is usable today, but the public API may still receive small refinements before 0.2.0.
I am actively looking for community feedback, especially from people building real async Python workflows, AI Agent pipelines, RAG systems, backend task flows, local automation tools, and small ETL-style pipelines.
If you try Astrum and something feels unclear, awkward, too magical, too limited, or badly named, please open an issue. Real usage feedback is extremely important for shaping the API before it becomes stable.
Why Astrum?
Plain asyncio.gather is great when you only need to run a list of coroutines concurrently.
But real workflows often look more like this:
- some tasks can run in parallel;
- some tasks depend on previous results;
- downstream tasks need only part of an upstream result;
- failures and retries should be visible;
- the final execution should produce a structured report;
- the whole thing should stay inside a normal Python process.
Astrum provides a small task graph layer for these cases.
It is not a workflow platform. It does not require a scheduler, database, webserver, worker cluster, or message broker.
How convenient is this library to use? / 这个库使用起来有多方便?
下面只是任务定义的简化示意。实际代码中,任务依赖需要通过 depends_on 显式声明,上游结果可以通过 Ref / F 注入下游参数。
The following is only a simplified task declaration sketch. In real code, dependencies are declared explicitly with depends_on, and upstream values can be injected with Ref / F.
@workflow.task("expand_query")
async def expand_query(...)->...:
...
@workflow.task("bm25_search")
async def bm25_search(...)->...:
...
@workflow.task("vector_search")
async def vector_search(...)->...:
...
@workflow.task("merge_scores")
async def merge_scores(...)->...:
...
@workflow.task("rerank")
async def rerank(...)->...:
...
@workflow.task("embed_query")
async def embed_query(...)->...:
...
实际上会发生:
It will actually happen:
---time--->
expand_query ─┬─ bm25_search ─┐
└─ vector_search ├─ merge_scores ─ rerank
embed_query ───────────────────┘
Quick Start / 快速开始
The recommended entry point is decorator mode: register functions with @task, declare execution dependencies with depends_on, pass upstream values with Ref / F, and call run().
推荐从装饰器模式开始:用 @task 注册任务,用 depends_on 声明执行依赖,用 Ref / F 传递上游返回值,然后调用 run()。
import asyncio
from astrum import F, AstrumConfig, Ref, run, task
@task("load_users")
async def load_users() -> dict:
await asyncio.sleep(0.1)
return {"users": ["Alice", "Bob"]}
@task("load_orders")
async def load_orders() -> dict:
await asyncio.sleep(0.1)
return {"orders": ["A-001", "A-002", "A-003"]}
@task("build_report", depends_on=["load_users", "load_orders"])
async def build_report(
users: Ref[list, F("load_users", "users")],
orders: Ref[list, F("load_orders", "orders")],
) -> dict:
return {"summary": f"{len(users)} users, {len(orders)} orders"}
@task("publish_report", depends_on=["build_report"])
async def publish_report(
summary: Ref[str, F("build_report", "summary")],
) -> None:
print(summary)
async def main() -> None:
report = await run(
target_tasks=["publish_report"],
config=AstrumConfig(skip_type_check=True, silence_warnings=True),
)
print(report.execution_state)
print(f"{report.successful_tasks}/{report.total_tasks} tasks completed")
asyncio.run(main())
Expected output:
2 users, 3 orders
completed
4/4 tasks completed
load_users and load_orders have no input parameters, so they can run in parallel. build_report and publish_report receive data from upstream task results.
load_users 和 load_orders 没有入参,会并行执行;build_report 和 publish_report 通过 Ref/F 接收上游任务结果。
Why Astrum / 为什么使用 Astrum
- Plain Python tasks / 普通 Python 函数即任务:同步函数和异步函数都可以注册为 DAG task。
- Async DAG execution / 异步 DAG 执行:没有依赖关系的任务自动并行启动。
- Structured reports / 结构化报告:执行状态、耗时、失败摘要、取消状态和重试记录都会进入
ExecutionReport。 - Data transport / 数据传输:用
Ref[T, F("task", "field")]声明下游参数来自哪里。 - Namespaces and registries / 命名空间与注册表:可用
namespace、use_namespace()或SchedulerRegistry隔离工作流。 - Retries / 重试:用
@task(..., retry=N)为单个任务设置失败重试次数。
When Not to Use It / 不适合场景
Astrum is intentionally small and in-process. It is not a replacement for distributed workflow engines.
Astrum 当前不是分布式调度系统。如果你的需求是下面这些场景,通常应该选择更重的工作流平台或队列系统:
- Cross-machine scheduling / 跨机器调度
- Persistent queues / 持久化队列
- Cron-like scheduled jobs / 定时任务平台
- Long-running distributed workflows / 长生命周期分布式工作流
- Worker fleet management / 多 worker 集群管理
Core Concepts / 核心概念
- Task / 任务:一个已注册的 Python callable。任务可以是
async def,也可以是普通def。 - DAG / 有向无环图:任务之间的依赖关系。某个任务只有在上游依赖完成后才会启动。
- Scheduler / 调度器:把 DAG 拆成执行阶段,启动可并行任务,并收集执行结果。
- ExecutionReport / 执行报告:
run()和scheduler.execute()的返回值,包含状态、耗时、统计和错误摘要。 - Data Transport / 数据传输:把上游任务的返回值注入下游函数参数。推荐使用
Ref/F注解。
Documentation / 文档
The full documentation is bilingual and built with MkDocs + Material for MkDocs.
完整文档支持中英文,并使用 MkDocs + Material for MkDocs 构建。
- 中文文档入口
- English documentation
- 快速开始 / Quickstart
- 中文使用指南
- English Usage Guide
- 外部 API Reference
- Internal API Reference
The local Markdown files remain available in docs/ for offline reading and contributions.
Examples / 示例
- Fast Start:从串行、并行、fan-in 到异步重试的工作流模式速览。
- Coffee Shop:用咖啡店流程解释
TaskData/DataItem/DTRela数据流。 - Stateless Text Retriever:用复杂检索链路展示 fan-out、fan-in、多分支评分和 rerank。
The corresponding English pages are available next to the Chinese pages with .en.md filenames.
Development / 开发
This project uses uv for dependency management.
uv sync --extra docs --extra viz
uv run pytest
uv run mkdocs serve
uv run mkdocs build --strict
Common checks before publishing:
uv run pytest
uv run mkdocs build --strict
Project Status / 项目状态
Current version: 0.1.1.
Astrum 0.1.1 is the recommended early testing release; APIs may change in 0.1.x.
Astrum 已经初次发布到 PyPI。在 0.1.x 阶段仍可能围绕 API 命名、文档和类型检查体验做小幅改进。
License / 许可证
Astrum is released under the MIT License.
Astrum 使用 MIT 许可证发布。
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file astrum-0.1.1.tar.gz.
File metadata
- Download URL: astrum-0.1.1.tar.gz
- Upload date:
- Size: 65.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
83114e476797f0caa567245f1410d20db257c01b80d19aaee0b3dc67755fef3f
|
|
| MD5 |
133c628fca476ab790c1cb2f0a56eeb3
|
|
| BLAKE2b-256 |
8acd0b4ee66390986c956df70b2c3be22fddfe3b8b46b3f0c66e315e5a765edf
|
Provenance
The following attestation bundles were made for astrum-0.1.1.tar.gz:
Publisher:
publish.yml on DuskXi/astrum
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
astrum-0.1.1.tar.gz -
Subject digest:
83114e476797f0caa567245f1410d20db257c01b80d19aaee0b3dc67755fef3f - Sigstore transparency entry: 1732952505
- Sigstore integration time:
-
Permalink:
DuskXi/astrum@95e59f32708486d2d6caa46939853dc4327f08a7 -
Branch / Tag:
refs/tags/v0.1.1 - Owner: https://github.com/DuskXi
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@95e59f32708486d2d6caa46939853dc4327f08a7 -
Trigger Event:
push
-
Statement type:
File details
Details for the file astrum-0.1.1-py3-none-any.whl.
File metadata
- Download URL: astrum-0.1.1-py3-none-any.whl
- Upload date:
- Size: 41.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e23261427d9bf05fa2f2558e565ccda69722b6904c71efbe047e4fbcfc4fb84f
|
|
| MD5 |
f01822ba261ebbce7a74dad959dfac3d
|
|
| BLAKE2b-256 |
472794698434371b46152dbfe8f531e06929b6f4d0ba89caf55deaeaf8be82be
|
Provenance
The following attestation bundles were made for astrum-0.1.1-py3-none-any.whl:
Publisher:
publish.yml on DuskXi/astrum
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
astrum-0.1.1-py3-none-any.whl -
Subject digest:
e23261427d9bf05fa2f2558e565ccda69722b6904c71efbe047e4fbcfc4fb84f - Sigstore transparency entry: 1732952528
- Sigstore integration time:
-
Permalink:
DuskXi/astrum@95e59f32708486d2d6caa46939853dc4327f08a7 -
Branch / Tag:
refs/tags/v0.1.1 - Owner: https://github.com/DuskXi
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@95e59f32708486d2d6caa46939853dc4327f08a7 -
Trigger Event:
push
-
Statement type: