Decorator-based HPC workflow engine with Result-based data wiring, reusable Flows, and an auto-generated CLI
Project description
reflow
Decorator-based HPC workflow engine with Result-based data wiring, reusable workflows, and an auto-generated CLI.
pip install reflow-hpc
from typing import Annotated
from reflow import Workflow, Param, Result
wf = Workflow("etl")
@wf.job(cpus=2, time="00:10:00", mem="4G")
def extract(
source: Annotated[str, Param(help="Input file path")],
) -> list[str]:
"""Read a data source and split it into chunks."""
return [f"chunk_{i}" for i in range(5)]
@wf.job(array=True, cpus=4, time="01:00:00", mem="8G")
def transform(
chunk: Annotated[str, Result(step="extract")],
) -> str:
"""Process one chunk. Runs as a parallel array job."""
return chunk.upper()
@wf.job(time="00:05:00")
def load(
results: Annotated[list[str], Result(step="transform")],
) -> str:
"""Collect all results."""
return f"loaded {len(results)} items"
if __name__ == "__main__":
wf.cli()
$ python pipeline.py submit --run-dir /scratch/run1 --source data.csv
Created run etl-20260401-a1b2 in /scratch/run1
$ python pipeline.py status etl-20260401-a1b2
Features
Decorator-driven: define tasks with @wf.job(), wire data
between them with Result, and let reflow handle the rest.
Automatic fan-out: return a list from a task, mark the
downstream as array=True, and reflow submits one array element
per item.
Merkle-DAG caching: each task gets a content-addressed identity. Re-runs skip tasks whose inputs haven't changed.
Broadcast mode: pass a value to every array element without
splitting it — Result(step="config", broadcast=True).
Reusable flows: build a library of Flow objects and compose
them into workflows with wf.include(flow, prefix="...").
Auto-generated CLI: wf.cli() produces a full argparse CLI
with submit, status, cancel, retry, dag, and describe.
Multi-scheduler: works with Slurm, PBS Pro / Torque, LSF, SGE / UGE, and Flux. Write scheduler-agnostic config and reflow translates to the right flags.
Local execution: run the full pipeline on your laptop with
wf.run_local() — no scheduler needed.
Typo protection: misspell a task name in Result(step=...)
and reflow suggests the closest match.
Installation
pip install reflow-hpc
Requires Python 3.10+.
Quick start
Data wiring
Result declares that a parameter receives output from an upstream
task. Reflow infers the wiring mode from the types:
| Upstream returns | Downstream takes | Mode |
|---|---|---|
T |
T |
Direct |
list[T] |
T (array job) |
Fan-out |
T (array job) |
list[T] |
Gather |
T (array job) |
T (array job) |
Chain |
Broadcast
Pass a whole value to every array element instead of splitting it:
@wf.job()
def load_config() -> dict:
return {"threshold": 0.5}
@wf.job(array=True)
def process(
item: Annotated[str, Result(step="find_files")],
config: Annotated[dict, Result(step="load_config", broadcast=True)],
) -> str:
return f"{item}:{config['threshold']}"
CLI parameters
@wf.job()
def ingest(
source: Annotated[str, Param(help="Input file")],
limit: Annotated[int, Param(help="Max rows")] = 100,
) -> str:
return source
$ python pipeline.py submit --help
--source SOURCE Input file (required)
--limit LIMIT Max rows (default: 100)
Local execution
Run the full pipeline in-process — no scheduler, no subprocesses:
run = wf.run_local(
run_dir="/tmp/test",
source="data.csv",
max_workers=4, # parallelize array jobs
on_error="continue", # don't stop on first failure
)
run.status()
Python API
run = wf.submit(
run_dir="/scratch/run1",
source="data.csv",
executor="pbs", # or "slurm", "lsf", "sge", "flux"
force=True, # skip cache
)
run.status()
run.cancel()
run.retry()
Scheduler backends
Set the backend in ~/.config/reflow/config.toml or via the
REFLOW_MODE environment variable:
| Backend | mode value |
|---|---|
| Slurm | sbatch (default) |
| PBS Pro / Torque | qsub-pbs |
| LSF | bsub |
| SGE / UGE | qsub-sge |
| Flux | flux |
Use dry-run to log commands without submitting.
Scheduler-agnostic config — use partition or queue and reflow
maps them to the right flags:
[executor]
mode = "qsub-pbs"
[executor.submit_options]
partition = "compute"
account = "my_project"
CLI reference
$ python pipeline.py submit --run-dir DIR [--param VALUE ...] [--force] [--force-tasks TASK ...]
$ python pipeline.py status RUN_ID
$ python pipeline.py cancel RUN_ID [--task NAME]
$ python pipeline.py retry RUN_ID [--task NAME]
$ python pipeline.py dag
$ python pipeline.py describe
$ python pipeline.py runs
Configuration
Generate a fully commented config:
python -c "from reflow import ensure_config_exists; ensure_config_exists()"
Key sections:
[executor]
mode = "sbatch"
python = "/path/to/python"
[executor.submit_options]
partition = "compute"
account = "my_project"
[dispatch]
cpus = 1
time = "00:10:00"
mem = "1G"
All values can be overridden with REFLOW_* environment variables
(REFLOW_MODE, REFLOW_PARTITION, REFLOW_ACCOUNT, etc.).
Development
git clone https://github.com/freva-org/reflow
cd reflow
pip install -e ".[dev]"
tox -e test
tox -e lint
tox -e types
License
MIT — see LICENSE for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file reflow_hpc-2604.4.0.tar.gz.
File metadata
- Download URL: reflow_hpc-2604.4.0.tar.gz
- Upload date:
- Size: 154.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2cfaa5e7b42a10d0b55fa17eeddc82607364c890d4431849da9fa70725239e25
|
|
| MD5 |
e802b0a5825f4a341a4fa6919af69f97
|
|
| BLAKE2b-256 |
eb9f063aa807c94d04c998808e4aedde2809c0348fd647452354859dba74c509
|
File details
Details for the file reflow_hpc-2604.4.0-py3-none-any.whl.
File metadata
- Download URL: reflow_hpc-2604.4.0-py3-none-any.whl
- Upload date:
- Size: 74.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
15a4bb9fcccd25172352e630e851c231ef921b8fd8cc05be97516da18ab8eaf0
|
|
| MD5 |
c072bcec76fd1254d0f19e754884fddb
|
|
| BLAKE2b-256 |
1486b45ff91df4037dcb6f9e6daf08c308ecac053ef0b0f00f694e0f1423bb63
|