Distributed TensorFlow on a YARN cluster
Project description
tf-yarnᵝ
Installation
Install with Pip
$ pip install tf-yarn
Install from source
$ git clone https://github.com/criteo/tf-yarn
$ cd tf-yarn
$ pip install .
Prerequisites
tf-yarn only supports Python ≥3.6.
Make sure to have Tensorflow working with HDFS by setting up all the environment variables as described here.
You can run the check_hadoop_env
script to check that your setup is OK (it has been installed by tf_yarn):
$ check_hadoop_env
# You should see something like
# INFO:tf_yarn.bin.check_hadoop_env:results will be written in /home/.../shared/Dev/tf-yarn/check_hadoop_env.log
# INFO:tf_yarn.bin.check_hadoop_env:check_env: True
# INFO:tf_yarn.bin.check_hadoop_env:write dummy file to hdfs hdfs://root/tmp/a1df7b99-fa47-4a86-b5f3-9bc09019190f/hello_tf_yarn.txt
# INFO:tf_yarn.bin.check_hadoop_env:check_local_hadoop_tensorflow: True
# INFO:root:Launching remote check
# ...
# INFO:tf_yarn.bin.check_hadoop_env:remote_check: True
# INFO:tf_yarn.bin.check_hadoop_env:Hadoop setup: OK
Quickstart
The core abstraction in tf-yarn is called an ExperimentFn
. It is
a function returning a triple of an Estimator
, and two specs --
TrainSpec
and EvalSpec
.
Here is a stripped down experiment_fn
from
examples/linear_classifier_example.py
to give you an idea of how it might look:
from tf_yarn import Experiment
def experiment_fn():
# ...
estimator = tf.estimator.LinearClassifier(...)
return Experiment(
estimator,
tf.estimator.TrainSpec(train_input_fn),
tf.estimator.EvalSpec(eval_input_fn)
An experiment can be scheduled on YARN using the run_on_yarn
function which
takes three required arguments: python environment(s), experiment_fn
,
and a dictionary specifying how much resources to allocate for each of the
distributed TensorFlow task types. The example uses the Wine Quality
dataset from UCI ML repository. With just under 5000 training instances available,
there is no need for multi-node training, meaning that a "chief"
task complemented by an
"evaluator"
would manage just fine. Note that each task will be executed
in its own YARN container.
from tf_yarn import TaskSpec, run_on_yarn
from tf_yarn import packaging
pyenv_zip_path = packaging.upload_env_to_hdfs()
run_on_yarn(
pyenv_zip_path,
experiment_fn,
task_specs={
"chief": TaskSpec(memory=2 * 2**10, vcores=4),
"evaluator": TaskSpec(memory=2**10, vcores=1),
"tensorboard": TaskSpec(memory=2**10, vcores=1)
}
)
The final bit is to forward the winequality.py
module to the YARN containers,
in order for the tasks to be able to import them:
run_on_yarn(
...,
files={
os.path.basename(winequality.__file__): winequality.__file__,
}
)
Distributed TensorFlow 101
The following is a brief summary of the core distributed TensorFlow concepts relevant to training estimators. Please refer to the official documentation for the full version.
Distributed TensorFlow operates in terms of tasks. A task has a type which
defines its purpose in the distributed TensorFlow cluster. "worker"
tasks
headed by the "chief"
worker do model training. The "chief"
additionally
handles checkpointing, saving/restoring the model, etc. The model itself is
stored on one or more "ps"
tasks. These tasks typically do not compute
anything. Their sole purpose is serving the variables of the model. Finally,
the "evaluator"
task is responsible for periodically evaluating the model.
At the minimum, a cluster must have a single "chief"
task. However, it
is a good idea to complement it by the "evaluator"
to allow for running
the evaluation in parallel with the training.
+-----------+ +---------+ +----------+ +----------+
| evaluator | +-----+ chief:0 | | worker:0 | | worker:1 |
+-----+-----+ | +----^----+ +-----^----+ +-----^----+
^ | | | |
| v | | |
| +-----+---+ | | |
| | model | +--v---+ | |
+--------+ exports | | ps:0 <--------+--------------+
+---------+ +------+
Training with multiple workers
Multi-worker clusters require at least a single parameter server aka "ps"
task
to store the variables being updated by the "chief"
and "worker"
tasks. It is
generally a good idea to give "ps"
tasks >1 vcores to allow for concurrent I/O
processing.
run_on_yarn(
...,
task_specs={
"chief": TaskSpec(memory=2 * 2**10, vcores=4),
"worker": TaskSpec(memory=2 * 2**10, vcores=4, instances=8),
"ps": TaskSpec(memory=2 * 2**10, vcores=8),
"evaluator": TaskSpec(memory=2**10, vcores=1),
"tensorboard": TaskSpec(memory=2**10, vcores=1)
}
)
Configuring the Python interpreter and packages
tf-yarn needs to ship an isolated virtual environment to the containers.
You can use the packaging module to generate a package on hdfs based on your current installed virtual environment.
(You should have installed the dependencies from requirements.txt
first pip install -r requirements.txt
)
This works if you use conda and virtual environments.
By default the generated package is a pex package.
pyenv_zip_path, env_name = packaging.upload_env_to_hdfs()
run_on_yarn(
pyenv_zip_path=pyenv_zip_path
)
By specifiying your own packaging.CONDA_PACKER to upload_env_to_hdfs
it will use conda-pack to create the package.
You can also directly use the command line tools provided by conda-pack and pex
For pex you can run this command in the root directory to create the package (it includes all requirements from setup.py)
pex . -o myarchive.pex
You can then run tf-yarn with your generated package:
run_on_yarn(
pyenv_zip_path="myarchive.pex"
)
Running on GPU
YARN does not have first-class support for GPU resources. A common workaround is to use node labels where CPU-only nodes are unlabelled, while the GPU ones have a label. Furthermore, in this setting GPU nodes are typically bound to a separate queue which is different from the default one.
Currently, tf-yarn assumes that the GPU label is "gpu"
. There are no
assumptions on the name of the queue with GPU nodes, however, for the sake of
example we wil use the name "ml-gpu"
.
The default behaviour of run_on_yarn
is to run on CPU-only nodes. In order
to run on the GPU ones:
- Set the
queue
argument. - Set
TaskSpec.label
toNodeLabel.GPU
for relevant task types. A good rule of a thumb is to run compute heavy"chief"
and"worker"
tasks on GPU, while keeping"ps"
and"evaluator"
on CPU. - Generate two python environements: one with Tensorflow for CPUs and one with Tensorflow for GPUs. Parameters additional_packages and ignored_packages of upload_env_to_hdfs are only supported with PEX packet
from tf_yarn import NodeLabel
from tf_yarn import packaging
pyenv_zip_path_cpu, _ = packaging.upload_env_to_hdfs()
pyenv_zip_path_gpu, _ = packaging.upload_env_to_hdfs(
additional_packages={"tensorflow-gpu", "2.0.0a0"},
ignored_packages={"tensorflow"}
)
run_on_yarn(
{NodeLabel.CPU: pyenv_zip_path_cpu, NodeLabel.GPU: pyenv_zip_path_gpu}
experiment_fn,
task_specs={
"chief": TaskSpec(memory=2 * 2**10, vcores=4, label=NodeLabel.GPU),
"evaluator": TaskSpec(memory=2**10, vcores=1),
"tensorboard": TaskSpec(memory=2**10, vcores=1)
},
queue="ml-gpu"
)
Accessing HDFS in the presence of federation
skein
the library underlying tf_yarn
automatically acquires a delegation token
for fs.defaultFS
on security-enabled clusters. This should be enough for most
use-cases. However, if your experiment needs to access data on namenodes other than
the default one, you have to explicitly list them in the file_systems
argument
to run_on_yarn
. This would instruct skein
to acquire a delegation token for
these namenodes in addition to fs.defaultFS
:
run_on_yarn(
...,
file_systems=["hdfs://preprod"]
)
Depending on the cluster configuration, you might need to point libhdfs to a different configuration folder. For instance:
run_on_yarn(
...,
env={"HADOOP_CONF_DIR": "/etc/hadoop/conf.all"}
)
Tensorboard
You can use Tensorboard with TF Yarn. Tensorboard is automatically spawned when using a default task_specs. Thus running as a separate container on YARN. If you use a custom task_specs, you must add explicitly a Tensorboard task to your configuration.
run_on_yarn(
...,
task_specs={
"chief": TaskSpec(memory=2 * 2**10, vcores=4),
"worker": TaskSpec(memory=2 * 2**10, vcores=4, instances=8),
"ps": TaskSpec(memory=2 * 2**10, vcores=8),
"evaluator": TaskSpec(memory=2**10, vcores=1),
"tensorboard": TaskSpec(memory=2**10, vcores=1, instances=1, termination_timeout_seconds=30)
}
)
Both instances and termination_timeout_seconds are optional parameters.
- instances: controls the number of Tensorboard instances to spawn. Defaults to 1
- termination_timeout_seconds: controls how many seconds each tensorboard instance must stay alive after the end of the run. Defaults to 30 seconds
The full access URL of each tensorboard instance is advertised as a url_event starting with "Tensorboard is listening at...". Typically, you will see it appearing on the standard output of a run_on_yarn call.
Environment variables
The following optional environment variables can be passed to the tensorboard task:
- TF_BOARD_MODEL_DIR: to configure a model directory. Note that the experiment model dir, if specified, has higher priority. Defaults: None
- TF_BOARD_EXTRA_ARGS: appends command line arguments to the mandatory ones (--logdir and --port): defaults: None
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
File details
Details for the file tf_yarn-0.4.2-py3-none-any.whl
.
File metadata
- Download URL: tf_yarn-0.4.2-py3-none-any.whl
- Upload date:
- Size: 34.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/40.6.2 requests-toolbelt/0.9.1 tqdm/4.32.2 CPython/3.6.8
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3de1b2f362710a3f428f92eafdcc68f1dde96beb078d5d9f42f0f65df8490197 |
|
MD5 | 96614bfc17c101828c082fc90c228c6f |
|
BLAKE2b-256 | 619617b4ee7a51eb01591fcd9d233aef97dfce0d2b6b0665214a83ec140b2a1d |