Skip to main content

Decorator-based HPC workflow engine with Result-based data wiring, reusable Flows, and an auto-generated CLI

Project description

reflow

Decorator-based HPC workflow engine with Result-based data wiring, reusable workflows, and an auto-generated CLI.

License CI codecov Docs PyPI Python Versions

Works with

pip install reflow-hpc
from typing import Annotated
from reflow import Workflow, Param, Result

wf = Workflow("etl")

@wf.job(cpus=2, time="00:10:00", mem="4G")
def extract(
    source: Annotated[str, Param(help="Input file path")],
) -> list[str]:
    """Read a data source and split it into chunks."""
    return [f"chunk_{i}" for i in range(5)]

@wf.job(array=True, cpus=4, time="01:00:00", mem="8G")
def transform(
    chunk: Annotated[str, Result(step="extract")],
) -> str:
    """Process one chunk. Runs as a parallel array job."""
    return chunk.upper()

@wf.job(time="00:05:00")
def load(
    results: Annotated[list[str], Result(step="transform")],
) -> str:
    """Collect all results."""
    return f"loaded {len(results)} items"

if __name__ == "__main__":
    wf.cli()
$ python pipeline.py submit --run-dir /scratch/run1 --source data.csv
Created run etl-20260401-a1b2 in /scratch/run1

$ python pipeline.py status etl-20260401-a1b2

Features

Decorator-driven: define tasks with @wf.job(), wire data between them with Result, and let reflow handle the rest.

Automatic fan-out: return a list from a task, mark the downstream as array=True, and reflow submits one array element per item.

Merkle-DAG caching: each task gets a content-addressed identity. Re-runs skip tasks whose inputs haven't changed.

Broadcast mode: pass a value to every array element without splitting it — Result(step="config", broadcast=True).

Reusable flows: build a library of Flow objects and compose them into workflows with wf.include(flow, prefix="...").

Auto-generated CLI: wf.cli() produces a full argparse CLI with submit, status, cancel, retry, dag, and describe.

Multi-scheduler: works with Slurm, PBS Pro / Torque, LSF, SGE / UGE, and Flux. Write scheduler-agnostic config and reflow translates to the right flags.

Local execution: run the full pipeline on your laptop with wf.run_local() — no scheduler needed.

Typo protection: misspell a task name in Result(step=...) and reflow suggests the closest match.

Installation

pip install reflow-hpc

Requires Python 3.10+.

Quick start

Data wiring

Result declares that a parameter receives output from an upstream task. Reflow infers the wiring mode from the types:

Upstream returns Downstream takes Mode
T T Direct
list[T] T (array job) Fan-out
T (array job) list[T] Gather
T (array job) T (array job) Chain

Broadcast

Pass a whole value to every array element instead of splitting it:

@wf.job()
def load_config() -> dict:
    return {"threshold": 0.5}

@wf.job(array=True)
def process(
    item: Annotated[str, Result(step="find_files")],
    config: Annotated[dict, Result(step="load_config", broadcast=True)],
) -> str:
    return f"{item}:{config['threshold']}"

CLI parameters

@wf.job()
def ingest(
    source: Annotated[str, Param(help="Input file")],
    limit: Annotated[int, Param(help="Max rows")] = 100,
) -> str:
    return source
$ python pipeline.py submit --help
  --source SOURCE   Input file (required)
  --limit LIMIT     Max rows (default: 100)

Local execution

Run the full pipeline in-process — no scheduler, no subprocesses:

run = wf.run_local(
    run_dir="/tmp/test",
    source="data.csv",
    max_workers=4,          # parallelize array jobs
    on_error="continue",    # don't stop on first failure
)
run.status()

Python API

run = wf.submit(
    run_dir="/scratch/run1",
    source="data.csv",
    executor="pbs",         # or "slurm", "lsf", "sge", "flux"
    force=True,             # skip cache
)

run.status()
run.cancel()
run.retry()

Scheduler backends

Set the backend in ~/.config/reflow/config.toml or via the REFLOW_MODE environment variable:

Backend mode value
Slurm sbatch (default)
PBS Pro / Torque qsub-pbs
LSF bsub
SGE / UGE qsub-sge
Flux flux

Use dry-run to log commands without submitting.

Scheduler-agnostic config — use partition or queue and reflow maps them to the right flags:

[executor]
mode = "qsub-pbs"

[executor.submit_options]
partition = "compute"
account  = "my_project"

CLI reference

$ python pipeline.py submit    --run-dir DIR [--param VALUE ...] [--force] [--force-tasks TASK ...]
$ python pipeline.py status    RUN_ID
$ python pipeline.py cancel    RUN_ID [--task NAME]
$ python pipeline.py retry     RUN_ID [--task NAME]
$ python pipeline.py dag
$ python pipeline.py describe
$ python pipeline.py runs

Configuration

Generate a fully commented config:

python -c "from reflow import ensure_config_exists; ensure_config_exists()"

Key sections:

[executor]
mode = "sbatch"
python = "/path/to/python"

[executor.submit_options]
partition = "compute"
account = "my_project"

[dispatch]
cpus = 1
time = "00:10:00"
mem = "1G"

All values can be overridden with REFLOW_* environment variables (REFLOW_MODE, REFLOW_PARTITION, REFLOW_ACCOUNT, etc.).

Development

git clone https://github.com/freva-org/reflow
cd reflow
pip install -e ".[dev]"
tox -e test
tox -e lint
tox -e types

License

MIT — see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

reflow_hpc-2604.4.0.tar.gz (154.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

reflow_hpc-2604.4.0-py3-none-any.whl (74.5 kB view details)

Uploaded Python 3

File details

Details for the file reflow_hpc-2604.4.0.tar.gz.

File metadata

  • Download URL: reflow_hpc-2604.4.0.tar.gz
  • Upload date:
  • Size: 154.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for reflow_hpc-2604.4.0.tar.gz
Algorithm Hash digest
SHA256 2cfaa5e7b42a10d0b55fa17eeddc82607364c890d4431849da9fa70725239e25
MD5 e802b0a5825f4a341a4fa6919af69f97
BLAKE2b-256 eb9f063aa807c94d04c998808e4aedde2809c0348fd647452354859dba74c509

See more details on using hashes here.

File details

Details for the file reflow_hpc-2604.4.0-py3-none-any.whl.

File metadata

  • Download URL: reflow_hpc-2604.4.0-py3-none-any.whl
  • Upload date:
  • Size: 74.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for reflow_hpc-2604.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 15a4bb9fcccd25172352e630e851c231ef921b8fd8cc05be97516da18ab8eaf0
MD5 c072bcec76fd1254d0f19e754884fddb
BLAKE2b-256 1486b45ff91df4037dcb6f9e6daf08c308ecac053ef0b0f00f694e0f1423bb63

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page