Skip to main content

A lightweight, in-process async DAG orchestrator for high-performance task execution.

Project description

Astrum

Python License Docs PyPI

A lightweight, in-process async DAG orchestrator for Python.
Astrum 是一个轻量、进程内运行的异步复杂多依赖任务调度库,用 Python 装饰器/静态DAG声明任务图,声明式高级依赖传递,自动并行执行无依赖/依赖分支,并返回结构化执行报告。

Astrum 通过自动并行执行无依赖/依赖已完成分支,减少手动编排 async 工作流时容易产生的不必要等待。使用场景多见于需要在单个 Python 进程内编排工作流的业务代码中,或者需要在测试环境里模拟复杂 DAG 执行的场景。如Multi-Agent系统、复杂数据处理流程、异步任务编排等。

Astrum is a lightweight, in-process, asynchronous task scheduling library designed for complex multi-dependency workflows. It utilizes Python decorators and static DAGs (Directed Acyclic Graphs) to declare task graphs, features declarative advanced dependency passing, and automatically executes independent or branching dependencies in parallel while returning a structured execution report.

Astrum helps reduce unnecessary waiting in manually orchestrated async workflows by automatically running dependency-free branches concurrently, multi-dependency parallel task systems. Common use cases include business logic requiring workflow orchestration within a single Python process, or scenarios needing to simulate complex DAG execution in testing environments—such as Multi-Agent systems, complex data processing pipelines, and asynchronous task orchestration.


Astrum 还处于早期阶段,现在社区反馈对我非常重要。我尤其希望听到真实工作流中的使用体验:如果 API 不顺手、概念不清楚、行为太魔法、文档没讲明白,或者你觉得某个能力应该做 / 不应该做,请直接提 issue。这些反馈会直接影响 0.2.0 之前的 API 设计。

Astrum is still early, and community feedback is very important right now. I am especially looking for feedback from people who try it in real workflows. If the API feels awkward, unclear, too magical, or missing an important concept, please open an issue — this feedback will directly shape the path to 0.2.0.

Installation / 安装

pip install astrum

If you want terminal visualization powered by Rich:

pip install "astrum[viz]"

如果需要在终端查看 DAG 和数据传输可视化,请安装 viz 可选依赖。

Examples that render Rich tables or trees, such as the coffee shop workflow, require the viz extra.

Current status

Astrum is currently in the early 0.1.x stage.

The core execution model is usable today, but the public API may still receive small refinements before 0.2.0.

I am actively looking for community feedback, especially from people building real async Python workflows, AI Agent pipelines, RAG systems, backend task flows, local automation tools, and small ETL-style pipelines.

If you try Astrum and something feels unclear, awkward, too magical, too limited, or badly named, please open an issue. Real usage feedback is extremely important for shaping the API before it becomes stable.

Why Astrum?

Plain asyncio.gather is great when you only need to run a list of coroutines concurrently.

But real workflows often look more like this:

  • some tasks can run in parallel;
  • some tasks depend on previous results;
  • downstream tasks need only part of an upstream result;
  • failures and retries should be visible;
  • the final execution should produce a structured report;
  • the whole thing should stay inside a normal Python process.

Astrum provides a small task graph layer for these cases.

It is not a workflow platform. It does not require a scheduler, database, webserver, worker cluster, or message broker.

How convenient is this library to use? / 这个库使用起来有多方便?

下面只是任务定义的简化示意。实际代码中,任务依赖需要通过 depends_on 显式声明,上游结果可以通过 Ref / F 注入下游参数。

The following is only a simplified task declaration sketch. In real code, dependencies are declared explicitly with depends_on, and upstream values can be injected with Ref / F.

@workflow.task("expand_query")
async def expand_query(...)->...:
    ...
@workflow.task("bm25_search")
async def bm25_search(...)->...:
    ...
@workflow.task("vector_search")
async def vector_search(...)->...:
    ...
@workflow.task("merge_scores")
async def merge_scores(...)->...:
    ...
@workflow.task("rerank")
async def rerank(...)->...:
    ...
@workflow.task("embed_query")
async def embed_query(...)->...:
    ...

实际上会发生:

It will actually happen:

---time--->
expand_query ─┬─ bm25_search  ─┐
              └─ vector_search ├─ merge_scores ─ rerank
embed_query ───────────────────┘

Quick Start / 快速开始

The recommended entry point is decorator mode: register functions with @task, declare execution dependencies with depends_on, pass upstream values with Ref / F, and call run().

推荐从装饰器模式开始:用 @task 注册任务,用 depends_on 声明执行依赖,用 Ref / F 传递上游返回值,然后调用 run()

import asyncio

from astrum import F, AstrumConfig, Ref, run, task


@task("load_users")
async def load_users() -> dict:
    await asyncio.sleep(0.1)
    return {"users": ["Alice", "Bob"]}


@task("load_orders")
async def load_orders() -> dict:
    await asyncio.sleep(0.1)
    return {"orders": ["A-001", "A-002", "A-003"]}


@task("build_report", depends_on=["load_users", "load_orders"])
async def build_report(
    users: Ref[list, F("load_users", "users")],
    orders: Ref[list, F("load_orders", "orders")],
) -> dict:
    return {"summary": f"{len(users)} users, {len(orders)} orders"}


@task("publish_report", depends_on=["build_report"])
async def publish_report(
    summary: Ref[str, F("build_report", "summary")],
) -> None:
    print(summary)


async def main() -> None:
    report = await run(
        target_tasks=["publish_report"],
        config=AstrumConfig(skip_type_check=True, silence_warnings=True),
    )
    print(report.execution_state)
    print(f"{report.successful_tasks}/{report.total_tasks} tasks completed")


asyncio.run(main())

Expected output:

2 users, 3 orders
completed
4/4 tasks completed

load_users and load_orders have no input parameters, so they can run in parallel. build_report and publish_report receive data from upstream task results.

load_usersload_orders 没有入参,会并行执行;build_reportpublish_report 通过 Ref/F 接收上游任务结果。

Why Astrum / 为什么使用 Astrum

  • Plain Python tasks / 普通 Python 函数即任务:同步函数和异步函数都可以注册为 DAG task。
  • Async DAG execution / 异步 DAG 执行:没有依赖关系的任务自动并行启动。
  • Structured reports / 结构化报告:执行状态、耗时、失败摘要、取消状态和重试记录都会进入 ExecutionReport
  • Data transport / 数据传输:用 Ref[T, F("task", "field")] 声明下游参数来自哪里。
  • Namespaces and registries / 命名空间与注册表:可用 namespaceuse_namespace()SchedulerRegistry 隔离工作流。
  • Retries / 重试:用 @task(..., retry=N) 为单个任务设置失败重试次数。

When Not to Use It / 不适合场景

Astrum is intentionally small and in-process. It is not a replacement for distributed workflow engines.

Astrum 当前不是分布式调度系统。如果你的需求是下面这些场景,通常应该选择更重的工作流平台或队列系统:

  • Cross-machine scheduling / 跨机器调度
  • Persistent queues / 持久化队列
  • Cron-like scheduled jobs / 定时任务平台
  • Long-running distributed workflows / 长生命周期分布式工作流
  • Worker fleet management / 多 worker 集群管理

Core Concepts / 核心概念

  • Task / 任务:一个已注册的 Python callable。任务可以是 async def,也可以是普通 def
  • DAG / 有向无环图:任务之间的依赖关系。某个任务只有在上游依赖完成后才会启动。
  • Scheduler / 调度器:把 DAG 拆成执行阶段,启动可并行任务,并收集执行结果。
  • ExecutionReport / 执行报告run()scheduler.execute() 的返回值,包含状态、耗时、统计和错误摘要。
  • Data Transport / 数据传输:把上游任务的返回值注入下游函数参数。推荐使用 Ref / F 注解。

Documentation / 文档

The full documentation is bilingual and built with MkDocs + Material for MkDocs.

完整文档支持中英文,并使用 MkDocs + Material for MkDocs 构建。

The local Markdown files remain available in docs/ for offline reading and contributions.

Examples / 示例

  • Fast Start:从串行、并行、fan-in 到异步重试的工作流模式速览。
  • Coffee Shop:用咖啡店流程解释 TaskData / DataItem / DTRela 数据流。
  • Stateless Text Retriever:用复杂检索链路展示 fan-out、fan-in、多分支评分和 rerank。

The corresponding English pages are available next to the Chinese pages with .en.md filenames.

Development / 开发

This project uses uv for dependency management.

uv sync --extra docs --extra viz
uv run pytest
uv run mkdocs serve
uv run mkdocs build --strict

Common checks before publishing:

uv run pytest
uv run mkdocs build --strict

Project Status / 项目状态

Current version: 0.1.1.

Astrum 0.1.1 is the recommended early testing release; APIs may change in 0.1.x.

Astrum 已经初次发布到 PyPI。在 0.1.x 阶段仍可能围绕 API 命名、文档和类型检查体验做小幅改进。

License / 许可证

Astrum is released under the MIT License.

Astrum 使用 MIT 许可证发布。

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

astrum-0.1.1.tar.gz (65.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

astrum-0.1.1-py3-none-any.whl (41.5 kB view details)

Uploaded Python 3

File details

Details for the file astrum-0.1.1.tar.gz.

File metadata

  • Download URL: astrum-0.1.1.tar.gz
  • Upload date:
  • Size: 65.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for astrum-0.1.1.tar.gz
Algorithm Hash digest
SHA256 83114e476797f0caa567245f1410d20db257c01b80d19aaee0b3dc67755fef3f
MD5 133c628fca476ab790c1cb2f0a56eeb3
BLAKE2b-256 8acd0b4ee66390986c956df70b2c3be22fddfe3b8b46b3f0c66e315e5a765edf

See more details on using hashes here.

Provenance

The following attestation bundles were made for astrum-0.1.1.tar.gz:

Publisher: publish.yml on DuskXi/astrum

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file astrum-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: astrum-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 41.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for astrum-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 e23261427d9bf05fa2f2558e565ccda69722b6904c71efbe047e4fbcfc4fb84f
MD5 f01822ba261ebbce7a74dad959dfac3d
BLAKE2b-256 472794698434371b46152dbfe8f531e06929b6f4d0ba89caf55deaeaf8be82be

See more details on using hashes here.

Provenance

The following attestation bundles were made for astrum-0.1.1-py3-none-any.whl:

Publisher: publish.yml on DuskXi/astrum

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page