Decorator-based HPC workflow engine with Result-based data wiring, reusable Flows, and an auto-generated CLI
Project description
reflow
Decorator-based HPC workflow engine with Result-based data wiring, reusable workflows, and an auto-generated CLI.
python -m pip install reflow-hpc
from reflow
from pathlib import Path
from typing import Annotated
wf = reflow.Workflow("climate")
@wf.job(cpus=4, time="02:00:00", mem="16G")
def prepare(
start: Annotated[str, reflow.Param(help="Start date")],
run_dir: RunDir = reflow.RunDir(),
) -> list[str]:
"""Download and preprocess input files."""
return [str(f) for f in (run_dir / "input").glob("*.nc")]
@wf.job(array=True, cpus=8, time="04:00:00", mem="32G")
def compute(
nc_file: Annotated[str, reflow.Result(step="prepare")],
run_dir: RunDir = reflow.RunDir(),
) -> str:
"""Process a single input file (one per array element)."""
return str(run_dir / "output" / Path(nc_file).name)
if __name__ == "__main__":
wf.cli()
$ python pipeline.py submit --run-dir /scratch/run1 --start 2025-01-01
Created run climate-20250301-a1b2 in /scratch/run1
$ python pipeline.py status --run-id climate-20250301-a1b2
Features
Decorator-driven: define tasks with @wf.job(), wire data
between them with Result, and let reflow handle the rest.
Automatic fan-out: return a list from a task, mark the
downstream as array=True, and reflow submits one array element
per item with zero boilerplate.
Merkle-DAG caching: each task instance gets a content-addressed identity. Re-runs skip tasks whose inputs haven't changed.
Reusable flows: build a library of Flow objects and compose
them into workflows with wf.include(flow, prefix="...").
Auto-generated CLI: wf.cli() produces a full argparse CLI
with submit, status, cancel, retry, dag, and describe
subcommands. Parameters annotated with Param become CLI flags.
Multi-scheduler: works with Slurm, PBS Pro / Torque, LSF, SGE / UGE, and Flux out of the box. Write scheduler-agnostic config and reflow translates to the right flags.
Installation
pip install reflow-hpc
Requires Python 3.10+. The only runtime dependency is
tomli on Python 3.10 (stdlib
tomllib is used on 3.11+).
Scheduler backends
Reflow supports five workload managers. Set the backend in your
config file (~/.config/reflow/config.toml) or via the REFLOW_MODE
environment variable:
| Backend | mode value |
Submit | Cancel | Status |
|---|---|---|---|---|
| Slurm | sbatch (default) |
sbatch |
scancel |
sacct |
| PBS Pro / Torque | qsub-pbs |
qsub |
qdel |
qstat |
| LSF | bsub |
bsub |
bkill |
bjobs |
| SGE / UGE | qsub-sge |
qsub |
qdel |
qstat |
| Flux | flux |
flux submit |
flux cancel |
flux jobs |
Use dry-run as the mode to log commands without submitting.
Scheduler-agnostic config
You never need to know which scheduler is active. Use either
partition or queue, reflow maps them automatically:
# ~/.config/reflow/config.toml
[executor]
mode = "qsub-pbs" # or "sbatch", "bsub", "qsub-sge", "flux"
[executor.submit_options]
partition = "compute" # → -q compute on PBS, --partition compute on Slurm
account = "my_project" # → -A my_project on PBS, --account my_project on Slurm
Or set the executor at submit time:
# Config-driven (reads mode from config/env)
run = wf.submit(run_dir="/scratch/run1", start="2025-01-01")
# Explicit shorthand
run = wf.submit(run_dir="/scratch/run1", start="2025-01-01", executor="pbs")
# Explicit instance with custom paths
from reflow import PBSExecutor
exc = PBSExecutor(qsub="/opt/pbs/bin/qsub", array_flag="-t")
run = wf.submit(run_dir="/scratch/run1", start="2025-01-01", executor=exc)
Environment variables
All config values can be overridden with REFLOW_* environment
variables. The most common ones:
| Variable | Effect |
|---|---|
REFLOW_MODE |
Scheduler backend (sbatch, qsub-pbs, bsub, …) |
REFLOW_PYTHON |
Python interpreter for worker jobs |
REFLOW_PARTITION |
Default partition / queue |
REFLOW_ACCOUNT |
Default account / project |
Core concepts
Tasks
A task is a Python function decorated with @wf.job(). Parameters
control scheduler resources:
@wf.job(
cpus=4, # cores per task
time="02:00:00", # walltime
mem="16G", # memory
array=True, # run as an array job
array_parallelism=10, # max concurrent array elements
cache=True, # enable Merkle-DAG caching (default)
version="2", # bump to invalidate cache
after=["cleanup"], # explicit ordering dependency
partition="gpu", # scheduler-agnostic: works on all backends
)
def my_task(...) -> ...:
...
Data wiring with Result
Result declares that a parameter receives output from an upstream
task. Reflow resolves it at dispatch time:
@wf.job()
def step_a() -> list[str]:
return ["file1.nc", "file2.nc"]
@wf.job(array=True)
def step_b(
nc_file: Annotated[str, Result(step="step_a")],
) -> str:
# Each array element gets one item from step_a's list.
...
Wire modes are inferred from types:
| Upstream | Downstream | Mode |
|---|---|---|
T (singleton) |
T (singleton) |
Direct |
list[T] (singleton) |
T (array) |
Fan-out — one element per array slot |
T (array) |
list[T] (singleton) |
Gather — collect all into a list |
list[T] (array) |
list[T] (singleton) |
Gather + flatten |
CLI parameters with Param
@wf.job()
def ingest(
start: Annotated[str, Param(help="Start date, ISO-8601")],
model: Annotated[Literal["era5", "icon"], Param(help="Model")] = "era5",
chunk: Annotated[int, Param(help="Chunk size", namespace="local")] = 256,
) -> list[str]:
...
This generates:
$ python pipeline.py submit --help
--start START Start date, ISO-8601 (required)
--model {era5,icon} Model (default: era5)
--ingest-chunk CHUNK Chunk size (default: 256)
namespace="local" prefixes the flag with the task name to avoid
collisions.
RunDir
Parameters typed as RunDir (or named run_dir) receive a
pathlib.Path pointing to the shared working directory. They
never appear on the CLI.
Reusable Flows
Build libraries of task groups and compose them:
from reflow import Flow
preprocessing = Flow("preprocess")
@preprocessing.job()
def download(...) -> list[str]: ...
@preprocessing.job(array=True)
def convert(item: Annotated[str, Result(step="download")]) -> str: ...
# In your workflow:
wf = Workflow("experiment")
wf.include(preprocessing, prefix="pre")
# Tasks become "pre.download", "pre.convert"
CLI reference
$ python pipeline.py submit --run-dir DIR [--param VALUE ...]
$ python pipeline.py status --run-id ID
$ python pipeline.py cancel --run-id ID [--task NAME]
$ python pipeline.py retry --run-id ID [--task NAME]
$ python pipeline.py dag # print task dependency graph
$ python pipeline.py describe # JSON workflow manifest
$ python pipeline.py runs # list all runs
Python API
run = wf.submit(
run_dir="/scratch/run1",
start="2025-01-01",
executor="pbs", # or "lsf", "sge", "flux", "local"
force=True, # skip cache, re-run everything
force_tasks=["step_a"], # skip cache for specific tasks only
verify=True, # verify cached Path outputs still exist
)
run.status() # dict with run state + per-task breakdown
run.cancel() # cancel all active jobs
run.retry() # resubmit failed/cancelled tasks
Configuration
Run python -c "from reflow import ensure_config_exists; ensure_config_exists()" to
generate a fully commented config at ~/.config/reflow/config.toml.
Key sections:
[executor]
mode = "sbatch" # scheduler backend
python = "/path/to/python" # interpreter for workers
[executor.submit_options]
partition = "compute" # or queue = "batch" — both work
account = "my_project"
signal = "B:INT@60" # pre-termination signal
[notifications]
mail_user = "you@example.org"
mail_type = "FAIL"
[dispatch]
cpus = 1
time = "00:10:00"
mem = "1G"
[defaults]
run_dir = "/scratch/$USER/reflow"
Development
git clone https://github.com/freva-org/reflow
cd reflow
pip install -e ".[dev]"
tox -e test
tox -e lint
tox -e types
License
MIT — see LICENSE for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file reflow_hpc-2604.1.0.tar.gz.
File metadata
- Download URL: reflow_hpc-2604.1.0.tar.gz
- Upload date:
- Size: 146.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d62dc5d3da4bf89a5495100baad408b84f9208e7ca4f6d0e6a8c6c94b3e5669c
|
|
| MD5 |
434ca7501800c828d99dd7bea15d3ed9
|
|
| BLAKE2b-256 |
bfee095292b750a98959be027b3a8cb50f33991c94d1631293ed38614e1c89fa
|
File details
Details for the file reflow_hpc-2604.1.0-py3-none-any.whl.
File metadata
- Download URL: reflow_hpc-2604.1.0-py3-none-any.whl
- Upload date:
- Size: 67.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fb4939403aee2d844414bcb4954f6810eb2d1625b793f01dc5c538e5ae40327d
|
|
| MD5 |
47b20c6f13836018ccdb593c1bdee882
|
|
| BLAKE2b-256 |
030d9de3b617a09c96acb0b297bdd1d0a0abb829f1edf18471497f867978cf34
|