Skip to main content

Dflow is a Python framework for constructing scientific computing workflows employing Argo Workflows as the workflow engine.

Project description

DFLOW

Dflow is a Python framework for constructing scientific computing workflows (e.g. concurrent learning workflows) employing Argo Workflows as the workflow engine.

For dflow's users (e.g. ML application developers), dflow offers user-friendly functional programming interfaces for building their own workflows. Users need not be concerned with process control, task scheduling, observability and disaster tolerance. Users can track workflow status and handle exceptions by APIs as well as from frontend UI. Thereby users are enabled to concentrate on implementing operations (OPs) and orchestrating workflows.

For dflow's developers, dflow wraps on argo SDK, keeps details of computing and storage resources from users, and provides extension abilities. While argo is a cloud-native workflow engine, dflow uses containers to decouple computing logic and scheduling logic, and uses Kubernetes to make workflows observable, reproducible and robust. Dflow is designed to be based on a distributed, heterogeneous infrastructure. The most common computing resources in scientific computing may be HPC clusters. User can either use executor to manage HPC jobs within dflow (dflow-extender) or using DPDispatcher plugin, or use virtual node technique to uniformly manage HPC resources in the framework of Kubernetes (wlm-operator).

Dflow's OPs can be reused among workflows and shared among users. Dflow provides a cookie cutter recipe dflow-op-cutter for template a new OP package. Start developing an OP package at once from

pip install cookiecutter
cookiecutter https://github.com/deepmodeling/dflow-op-cutter.git

Dflow provides a debug mode for running workflows bare-metally whose backend is implemented in dflow in pure Python, independent of Argo/Kubernetes. The debug mode uses local environment to execute OPs instead of containers. It implements most APIs of the default mode in order to provide an identical user experience. The debug mode offer convenience for debugging or testing without container. For the clusters having problem deploying docker and Kubernetes and difficult to access from outside, the debug mode may also be used for production, despite less robustness and observability.

1. Overview

1.1. Architecture

The dflow consists of a common layer and an interface layer. Interface layer takes various OP templates from users, usually in the form of python classes, and transforms them into base OP templates that common layer can handle.

dflow_architecture

1.2. Common layer

Common layer is an extension over argo client which provides functionalities such as file processing, computing resources management, workflow submission and management, etc.

1.2.1. Parameters and artifacts

Parameters and artifacts are data stored by the workflow and passed within the workflow. Parameters are saved as strings which can be displayed in the UI, while artifacts are saved as files.

1.2.2. OP template

OP template (shown as base OP in the figure above) is the fundamental building block of a workflow. It defines an operation to be executed given the input and output. Both the input and output can be parameters and/or artifacts. The most common OP template is the container OP template. Necessary arguments to be defined for the operation are the container image and scripts to be executed. Currently, two types of container OP templates are supported: ShellOPTemplate, PythonScriptOPTemplate. Shell OP template (ShellOPTemplate) defines an operation by a shell script and Python script OP template (PythonScriptOPTemplate) defines an operation by a Python script.

To use the ShellOPTemplate:

from dflow import ShellOPTemplate

simple_example_templ = ShellOPTemplate(
    name="Hello",
    image="alpine:latest",
    script="cp /tmp/foo.txt /tmp/bar.txt && echo {{inputs.parameters.msg}} > /tmp/msg.txt",
)

The above example defines a ShellOPTemplate with name = "Hello" and container image alpine:latest. The operation is to copy /tmp/foo.txt (input artifacts) to /tmp/bar.txt (output artifacts) and printout the properties of the parameters with name msg (input parameters) and redirect it to /tmp/msg.txt (value in the file is the properties of the output parameters).

To define the parameters and artifacts of this OPTemplate:

from dflow import InputParameter, InputArtifact, OutputParameter, OutputArtifact

# define input
simple_example_templ.inputs.parameters = {"msg": InputParameter()}
simple_example_templ.inputs.artifacts = {"inp_art": InputArtifact(path="/tmp/foo.txt")}
# define output
simple_example_templ.outputs.parameters = {
    "msg": OutputParameter(value_from_path="/tmp/msg.txt")
}
simple_example_templ.outputs.artifacts = {
    "out_art": OutputArtifact(path="/tmp/bar.txt")
}

In the above example, there are three things to clarify.

  1. The value of the input parameter is optional for the OP template, if provided, it will be regarded as the default value which can be overridden at run time.
  2. For the output parameter, the source where its value comes from should be specified. For the container OP template, the value may come from a certain file generated in the container (value_from_path).
  3. The paths to the input and output artifact in the container are required to be specified.

On the same level, one can also define a PythonScriptOPTemplate to achieve the same operation.

1.2.3. Step

Step is the central block for building a workflow. A step is created by instantiating an OP template. When a step is initialized, values of all input parameters and sources of all input artifacts declared in the OP template must be specified.

from dflow import Step

simple_example_step = Step(
    name="step0",
    template=simple_example_templ,
    parameters={"msg": "HelloWorld!"},
    artifacts={"inp_art": foo},
)

This step will instantiate the OP template created in 1.2.2. Note that foo is an artifact either uploaded from local or output of another step.

1.2.4. Workflow

Workflow is the connecting block for building a workflow. A workflow is created by adding steps together.

from dflow import Workflow

wf = Workflow(name="hello-world")
wf.add(simple_example_step)

Submit a workflow by

wf.submit()

One can also add a list of steps to a workflow to make them run in parallel

wf.add([hello2, hello3])

An example using all the elements discussed in 1.2 is shown here:

1.3. Interface layer

Interface layer handles more Python-native OPs defined in the form of class.

1.3.1. Python OP

PythonOPTemplate is another kind of OP template. It inherits from PythonScriptOPTemplate but allows users to define operation (OP) in the form of a Python class. As Python is a weak typed language, we impose strict type checking to PythonOP to alleviate ambiguity and unexpected behaviors.

The structures of the inputs and outputs of a PythonOP are defined in the static methods get_input_sign and get_output_sign. Each of them returns a OPIOSign object, which is a dictionary mapping from the name of a parameter/artifact to its sign.

The execution of the PythonOP is defined in the execute method. The execute method receives a OPIO object as input and outputs a OPIO object. OPIO is a dictionary mapping from the name of a parameter/artifact to its value/path. The type of the parameter value or the artifact path should be in accord with that declared in the sign. Type checking is implemented before and after the execute method.

from dflow.python import OP, OPIO, OPIOSign, Artifact
from pathlib import Path
import shutil


class SimpleExample(OP):
    def __init__(self):
        pass

    @classmethod
    def get_input_sign(cls):
        return OPIOSign(
            {
                "msg": str,
                "inp_art": Artifact(Path),
            }
        )

    @classmethod
    def get_output_sign(cls):
        return OPIOSign(
            {
                "msg": str,
                "out_art": Artifact(Path),
            }
        )

    @OP.exec_sign_check
    def execute(
        self,
        op_in: OPIO,
    ) -> OPIO:
        shutil.copy(op_in["inp_art"], "bar.txt")
        out_msg = op_in["msg"]
        op_out = OPIO(
            {
                "msg": out_msg,
                "out_art": Path("bar.txt"),
            }
        )
        return op_out

The above example defines an OP SimpleExample. The operation is to copy foo.txt to bar.txt and write the properties of the parameters with name msg to msg.txt.

One may also define OP using decorator @OP.function and Python Annotation as:

from dflow.python import OP, Artifact
from pathlib import Path
import shutil

@OP.function
def SimpleExample(
		msg: str,
		inp_art: Artifact(Path),
)->{"msg": str, "out_art": Artifact(Path),}:
    shutil.copy(inp_art, "bar.txt")
    out_msg = msg
    return {"msg": out_msg, "out_art": Path("bar.txt"),}

We recommend python>=3.9 to use this syntax sugar. See more about Python Annotation at Python official howtos.

To use the above class as a PythonOPTemplate, we need to pass the above class to PythonOPTemplate and specify the container image. Note that pydflow must be installed in this image

from dflow.python import PythonOPTemplate

simple_example_templ = PythonOPTemplate(SimpleExample, image="python:3.8")

An example using all the elements discussed in 1.3 is shown here:

2. Quick Start

2.1. Prepare Kubernetes cluster

Firstly, you will need a Kubernetes cluster. To setup a Kubernetes cluster on your laptop, you can download the Minikube on your PC and make sure you have Docker up and running on you PC.

After downloading, you can initiate the Kubernetes cluster using:

minikube start 

2.2. Setup Argo Workflows

To get started quickly, you can use the quick start manifest. It will install Argo Workflow as well as some commonly used components:

kubectl create ns argo
kubectl apply -n argo -f https://raw.githubusercontent.com/deepmodeling/dflow/master/manifests/quick-start-postgres.yaml

If you are running Argo Workflows locally (e.g. using Minikube or Docker for Desktop), open a port-forward so you can access the namespace:

kubectl -n argo port-forward deployment/argo-server 2746:2746 --address 0.0.0.0

This will serve the user interface on https://localhost:2746

For access to the minio object storage, open a port-forward for minio

kubectl -n argo port-forward deployment/minio 9000:9000 --address 0.0.0.0

2.3. Install dflow

Make sure your Python version is not less than 3.6 and install dflow

pip install pydflow

2.4. Run an example

Submit a simple workflow

python examples/test_steps.py

Then you can check the submitted workflow through argo's UI.

3. User Guide (dflow-doc)

3.1. Common layer

3.1.1. Workflow management

After submitting a workflow by wf.submit(), or getting a history workflow by wf = Workflow(id="xxx"), one can track its real-time status with APIs

  • wf.id: workflow ID in argo
  • wf.query_status(): query workflow status, return "Pending", "Running", "Succeeded", etc.
  • wf.query_step(name=None): query step by name (support for regex), return a list of argo step objects
    • step.phase: phase of a step, "Pending", "Running", Succeeded, etc.
    • step.outputs.parameters: a dictionary of output parameters
    • step.outputs.artifacts: a dictionary of output artifacts

3.1.2. Upload/download artifact

Dflow offers tools for uploading files to Minio and downloading files from Minio (default object storage in the quick start). User can upload a list of files or directories and get an artifact object, which can be used as argument of a step

artifact = upload_artifact([path1, path2])
step = Step(
    ...
    artifacts={"foo": artifact}
)

User can also download the output artifact of a step queried from a workflow (to current directory for default)

step = wf.query_step(name="hello")
download_artifact(step.outputs.artifacts["bar"])

Modify dflow.s3_config to configure S3 globally.

Note: dflow retains the relative path of the uploaded file/directory with respect to the current directory during uploading. If file/directory outside current directory is uploaded, its absolute path is used as the relative path in the artifact. If you want a different directory structure in the artifact with the local one, you can make soft links and then upload.

3.1.3. Steps

Steps is another kind of OP template which is defined by its constituent steps instead of a container. It can be seen as a sub-workflow or a super OP template consisting of some smaller OPs. Steps is a sequential array of concurrent Step. A simple example goes like [[s00,s01],[s10,s11,s12]], where inner array represent concurrent tasks while outer array is sequential. Add a step to a steps just like for a workflow

steps.add(step)

Steps can be used as the template to define a bigger step. Thus one can construct complex workflows of nested structure. One is also allowed to recursively use a Steps as the template of a building bloack inside it self to achieve dynamic loop.

3.1.4. DAG

DAG is another kind of OP template which is defined by its constituent tasks and their dependencies. The usage of DAG is similar to that of steps. To add a task to a DAG, use

dag.add(task)

The usage of task is also similar to that of step. Dflow will automatically detect dependencies among tasks of a DAG (from input/output relations). Additional dependencies can be declared by

task_3 = Task(..., dependencies=[task_1, task_2])

3.1.5. Output parameters and artifacts of Steps

The output parameter of a Steps can be set to come from a step of it by steps.outputs.parameters["msg"].value_from_parameter = step.outputs.parameters["msg"]. Here, step must be contained in steps. For assigning output artifact for a Steps, use steps.outputs.artifacts["foo"]._from = step.outputs.parameters["foo"].

3.1.6. Conditional step, parameters and artifacts

Set a step to be conditional by Step(..., when=expr) where expr is an boolean expression in string format. Such as "%s < %s" % (par1, par2). The when argument is often used as the breaking condition of recursive steps. The output parameter of a Steps can be assigned as optional by

steps.outputs.parameters["msg"].value_from_expression = if_expression(
    _if=par1 < par2,
    _then=par3,
    _else=par4
)

Similarly, the output artifact of a Steps can be assigned as optional by

steps.outputs.artifacts["foo"].from_expression = if_expression(
    _if=par1 < par2,
    _then=art1,
    _else=art2
)

3.1.7. Produce parallel steps using loop

with_param and with_sequence are 2 arguments of Step for automatically generating a list of parallel steps. These steps share a common OP template, and only differ in the input parameters.

A step using with_param option generates parallel steps on a list (either a constant list or referring to another parameter, e.g. an output parameter of another step or an input parameter of the steps or DAG context), the parallelism equals to the length of the list. Each parallel step picks an item from the list by "{{item}}", such as

step = Step(
    ...
    parameters={"msg": "{{item}}"},
    with_param=steps.inputs.parameters["msg_list"]
)

A step using with_sequence option generates parallel steps on a numeric sequence. with_sequence is usually used in coordination with argo_sequence which returns an Argo's sequence. For argo_sequence, the number at which to start the sequence is specified by start (default: 0). One can either specify the number of elements in the sequence by count or the number at which to end the sequence by end. The printf format string can be specified by format to format the value in the sequence. Each argument can be passed with a parameter, argo_len which returns the length of a list may be useful. Each parallel step picks an element from the sequence by "{{item}}", such as

step = Step(
    ...
    parameters={"i": "{{item}}"},
    with_sequence=argo_sequence(argo_len(steps.inputs.parameters["msg_list"]))
)

3.1.8. Timeout

Set the timeout of a step by Step(..., timeout=t). The unit is second.

3.1.9. Continue on failed

Set the workflow to continue when a step fails by Step(..., continue_on_failed=True).

3.1.10. Continue on success number/ratio of parallel steps

Set the workflow to continue when certain number/ratio of parallel steps succeed by Step(..., continue_on_num_success=n) or Step(..., continue_on_success_ratio=r).

3.1.11. Optional input artifacts

Set an input artifact to be optional by op_template.inputs.artifacts["foo"].optional = True.

3.1.12. Default value for output parameters

Set default value for an output parameter by op_template.outputs.parameters["msg"].default = default_value. The default value will be used when the expression in value_from_expression fails or the step is skipped.

3.1.13. Key of a step

You can set a key for a step by Step(..., key="some-key") for the convenience of locating the step. The key can be regarded as an input parameter which may contain reference of other parameters. For instance, the key of a step can change with iterations of a dynamic loop. Once key is assigned to a step, the step can be query by wf.query_step(key="some-key"). If the key is unique within the workflow, the query_step method returns a list consist of only one element.

3.1.14. Resubmit a workflow

Workflows often have some steps that are expensive to compute. The outputs of previously run steps can be reused for submitting a new workflow. E.g. a failed workflow can be restarted from a certain point after some modification of the workflow template or even outputs of completed steps. For example, submit a workflow with reused steps by wf.submit(reuse_step=[step0, step1]). Here, step0 and step1 are previously run steps returned by query_step method. Before the new workflow runs a step, it will detect if there exists a reused step whose key matches that of the step about to run. If hit, the workflow will skip the step and set its outputs as those of the reused step. To modify outputs of a step before reusing, use step0.modify_output_parameter(par_name, value) for parameters and step0.modify_output_artifact(art_name, artifact) for artifacts.

3.1.15. Executor

By default, for a "script step" (a step whose template is a script OP template), the Shell script or Python script runs in the container directly. Alternatively, one can modify the executor to run the script. Dflow offers an extension point for "script step" Step(..., executor=my_executor). Here, my_executor should be an instance of class derived from Executor. A Executor-derived class should implement a method render which converts original template to a new template.

class Executor(object):
    def render(self, template):
        pass

A context is similar to an executor, but assigned to a workflow Workflow(context=...) and affect every step.

3.1.16. Submit Slurm job via slurm executor

SlurmRemoteExecutor is provided as an example of executor. The executor submits a slurm job to a remote host and synchronize its status and logs to the dflow step. The central logic of the executor is implemented in the Golang project Dflow-extender. If you want to run a step on a slurm cluster remotely, do something like

Step(
    ...,
    executor=SlurmRemoteExecutor(host="1.2.3.4",
        username="myuser",
        header="""#!/bin/bash
                  #SBATCH -N 1
                  #SBATCH -n 1
                  #SBATCH -p cpu""")
)

There are 3 options for SSH authentication, using password, specify path of private key file locally, or upload authorized private key to each node (or equivalently add each node to the authorized host list).

3.1.17. Submit HPC job via dispatcher plugin

DPDispatcher is a python package used to generate HPC scheduler systems (Slurm/PBS/LSF) jobs input scripts and submit these scripts to HPC systems and poke until they finish. Dflow provides simple interface to invoke dispatcher as executor to complete script steps. E.g.

from dflow.plugins.dispatcher import DispatcherExecutor
Step(
    ...,
    executor=DispatcherExecutor(host="1.2.3.4",
        username="myuser",
        queue_name="V100")
)

For SSH authentication, one can either specify path of private key file locally, or upload authorized private key to each node (or equivalently add each node to the authorized host list). For configuring extra machine, resources or task parameters for dispatcher, use DispatcherExecutor(..., machine_dict=m, resources_dict=r, task_dict=t).

3.1.18. Submit Slurm job via virtual node

Following the installation steps in the wlm-operator project to add Slurm partitions as virtual nodes to Kubernetes (use manifests configurator.yaml, operator-rbac.yaml, operator.yaml in this project which modified some RBAC configurations)

$ kubectl get nodes
NAME                            STATUS   ROLES                  AGE    VERSION
minikube                        Ready    control-plane,master   49d    v1.22.3
slurm-minikube-cpu              Ready    agent                  131m   v1.13.1-vk-N/A
slurm-minikube-dplc-ai-v100x8   Ready    agent                  131m   v1.13.1-vk-N/A
slurm-minikube-v100             Ready    agent                  131m   v1.13.1-vk-N/A

Then you can assign a step to be executed on a virtual node (i.e. submit a Slurm job to the corresponding partition to complete the step)

step = Step(
    ...
    executor=SlurmJobTemplate(
        header="#!/bin/sh\n#SBATCH --nodes=1",
        node_selector={"kubernetes.io/hostname": "slurm-minikube-v100"}
    )
)

3.1.19. Use resources in Kubernetes

A step can also be completed by a Kubernetes resource (e.g. Job or custom resources). At the beginning, a manifest is applied to Kubernetes. Then the status of the resource is monitered until the success condition or the failure condition is satisfied.

class Resource(object):
    action = None
    success_condition = None
    failure_condition = None
    def get_manifest(self, template):
        pass

3.1.20. Important note: variable names

Dflow has following restrictions on variable names.

Variable name Static/Dynamic Restrictions Example
Workflow/OP template name Static Lowercase RFC 1123 subdomain (must consist of lower case alphanumeric characters, '-' or '.', and must start and end with an alphanumeric character my-name
Step/Task name Static Must consist of alpha-numeric characters or '-', and must start with an alpha-numeric character My-name1-2, 123-NAME
Parameter/Artifact name Static Must consist of alpha-numeric characters, '_' or '-' my_param_1, MY-PARAM-1
Key name Dynamic Lowercase RFC 1123 subdomain (must consist of lower case alphanumeric characters, '-' or '.', and must start and end with an alphanumeric character my-name

3.1.21. Debug mode: dflow independent of Kubernetes

The debug mode is enabled by setting

from dflow import config
config["mode"] = "debug"

Before running a workflow locally, make sure that the dependencies of all OPs in the workflow are well-configured in the locally environment, unless the dispatcher executor is employed to submit jobs to some remote environments. The debug mode uses the current directory as the working directory by default. Each workflow will create a new directory there, whose structure will be like

python-lsev6
├── status
└── step-penf5
    ├── inputs
    │   ├── artifacts
    │   │   ├── dflow_python_packages
    │   │   ├── foo
    │   │   └── idir
    │   └── parameters
    │       ├── msg
    │       └── num
    ├── log.txt
    ├── outputs
    │   ├── artifacts
    │   │   ├── bar
    │   │   └── odir
    │   └── parameters
    │       └── msg
    ├── phase
    ├── script
    ├── type
    └── workdir
        ├── ...

The top level contains the status and all steps of the workflow. The directory name for each step will be its key if provided, or generated from its name otherwise. The step directory contains the input/output parameters/artifacts, the type and the phase of the step. For a step of type "Pod", its directory also includes the script, the log file and the working directory for the step.

3.2. Interface layer

3.2.1. Slices

Slices helps user to slice input parameters/artifacts (which must be lists) to feed parallel steps and stack their output parameters/artifacts to lists in the same pattern. For example,

step = Step(name="parallel-tasks",
    template=PythonOPTemplate(
        ...,
        slices=Slices("{{item}}",
            input_parameter=["msg"],
            input_artifact=["data"],
            output_artifact=["log"])
    ),
    parameters = {
        "msg": msg_list
    },
    artifacts={
        "data": data_list
    },
    with_param=argo_range(5)
)

In this example, each item in msg_list is passed to a parallel step as the input parameter msg, each part in data_list is passed to a parallel step as the input artifact data. Finally, the output artifacts log of all parallel steps are collected to one artifact step.outputs.artifacts["log"].

It should be noticed that this feature by default passes full input artifacts to each parallel step which may only use some slices of these artifacts. In comparison, the subpath mode of slices only passes one single slice of the input artifacts to each parallel step. To use the subpath mode of slices,

step = Step(name="parallel-tasks",
    template=PythonOPTemplate(
        ...,
        slices=Slices(sub_path=True,
            input_parameter=["msg"],
            input_artifact=["data"],
            output_artifact=["log"])
    ),
    parameters = {
        "msg": msg_list
    },
    artifacts={
        "data": data_list
    })

Here, the slice pattern ({{item}}) of PythonOPTemplate and the with_param argument of the Step need not to be set, because they are fixed in this mode. Each input parameter and artifact to be sliced must be of the same length, and the parallelism equals to this length. Another noticeable point is that in order to use the subpath of the artifacts, these artifacts must be saved without compression when they are generated. E.g. declare Artifact(..., archive=None) in the output signs of Python OP, or specify upload_artifact(..., archive=None) while uploading artifacts. Besides, one can use dflow.config["archive_mode"] = None to set default archive mode to no compression globally.

3.2.2. Retry and error handling

Dflow catches TransientError and FatalError thrown from OP. User can set maximum number of retries on TransientError by PythonOPTemplate(..., retry_on_transient_error=n). Timeout error is regarded as fatal error for default. To treat timeout error as transient error, set PythonOPTemplate(..., timeout_as_transient_error=True).

3.2.3. Progress

A OP can update progress in the runtime so that user can track its real-time progress

class Progress(OP):
    progress_total = 100
    ...
    def execute(op_in):
        for i in range(10):
            self.progress_current = 10 * (i + 1)
            ...

3.2.4. Upload python packages for development

To avoid frequently making image during development, dflow offers an interface to upload local packages into container and add them to $PYTHONPATH, such as PythonOPTemplate(..., python_packages=["/opt/anaconda3/lib/python3.9/site-packages/numpy"]). One can also globally specify packages to be uploaded, which will affect all OPs

from dflow.python import upload_packages
upload_packages.append("/opt/anaconda3/lib/python3.9/site-packages/numpy")

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pydflow-1.6.6.tar.gz (99.5 kB view hashes)

Uploaded Source

Built Distribution

pydflow-1.6.6-py3-none-any.whl (93.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page