Neu.ro Airflow plugin
Project description
Neu.ro Airflow plugin
This package helps you execute your ML workloads on neu.ro platform from Airflow environment.
Also, take a look at our CLI reference and Python API reference.
Environment
- Python 3.6+
- apache-airflow >= 1.10.x
- Neuromation >= 20.4.6
Installation
The plugin is written to automatically register with Airflow, so all you have to do is install it into your Python environment:
pip install neuro-airflow-plugin
Usage
Before start you need to get a Neuro token by using CLI command:
neuro config show-token
and set up a Neuro Connection (neuro_default by default) in Airflow:
airflow connections --add \
--conn_id neuro_default2 \
--conn_type "neuro" \
--conn_extra '{"token": "Put your Token here..."}'
Apart from token you can also provide those fields as part of extra json:
cluster- name of the cluster used for compute scheduling. Default cluster will be used if not provided.api_url- entry URL for Neuro Platform. Only needed for custom clusters.
You can set up the connection from UI interface as well, just put the same JSON document
into Extra form field. Connection type does not matter, so you can pick any that the UI allows.
For more information on how to set up connections in Airflow see Managing Connections.
NeuroRunOperator
Runs a Job in the Neuro Platform. Example usage:
from airflow.operators.neuro import NeuroRunOperator
run = NeuroRunOperator(
task_id="small-deeds",
job_command="echo 'Big things start from small deeds'",
job_image="ubuntu:{{ var.value.ubuntu_tag }}",
job_resources_preset="{% if var.value.use_large_cpu %}cpu-large{% else %}cpu-small{% endif %}"
)
For more usage examples see examples/dags folder of the repository.
Operator arguments
job_commandstr Required - Command to be executed in the Job. If you need to override the entrypoint of an image, seejob_entrypointinstead.job_imagestr Required - Container image used for the Job. Name can be either a docker image name hosted on an external public repository or a Neuro image specified byimage://scheme.job_namestr - Optional job name. Note that creating 2 running jobs with the same name by the same user is forbidden.job_volumeslist - List of strings describing a volume mount orneuromation.Volumeobjects. String description consists of 3 parts separated by column: storage URI, mount path, mount mode. For example:storage:my-project:/var/storage:ro.job_resources_presetstr - Predefined resource configuration (to see available values, runneuro config show)job_resourcesResources - Custom resource configuration. See Python API reference for details.job_is_preemptiblebool - Whether the Job may be run on a preemptible, or also known as Spot instance. Is only used with custom resource configuration.job_extshmbool - Request extended '/dev/shm' space. Defaults toTrueand is only used with predefined resource configuration.job_tagslist - List of string tags to mark the Job with. Can later be used for filtering, etc.job_descriptionstr - Optional job description in free format.job_lifespanfloat - Optional job run-time limit in seconds. Is unlimited by default.job_environdict - Environment variables to run the Job with. Jinja template support is only provided for values, not for keys, see more details below.job_entrypointstr - Override ENTRYPOINT of the container image.job_http_portstr - Enable HTTP port forwarding to specified container port. If used you can access it from a custom link definition on the Task panel in Airflow UI (see Airflow docs for details on how it works). Disabled by default.job_http_authbool - Disable Neuro authentication on the exposed port injob_http_port.job_ttybool - Allocate a TTY for the Container.job_detachbool - Detach after starting the job. If detached Job logs will not be viewable in Airflow interface, but the job will not consume Airflow worker slot. Defaults toTrue.raise_on_errnobool - Raise an error if job returns a non-zero exit code. Ignored ifjob_detachisTrue. Default toTrue.neuro_conn_idbool - Name of the connection to use for Neuro authentication. Defaults toneuro_default.
See also the neuro run reference in CLI documentation
Jinja2 template fields
Airflow supports passing custom attributes and dynamic definitions using Jinja templating fields. This operator supports templating on the following fields:
job_commandjob_imagejob_volumesjob_namejob_resources_presetjob_tagsjob_environjob_entrypointjob_descriptionjob_http_portneuro_conn_id
XCom exports
The operator exports 2 XCom values: return_value (default in Airflow for query) and assigned_job. Both are
JSON documents with the following fields:
idstr - Job ID assigned by Neuro on start.exit_codeint - Command return code if the Job already finished.statusstr - One of job statuses:pending,running,succeeded,failedorunknown.http_urlstr - URL of the exposed HTTP port ifjob_http_portis used.
NeuroJobStatusSensor
Wait for a Job to be completed or any other status transition to happen. Example usage:
from airflow.sensors.neuro import NeuroJobStatusSensor
wait = NeuroJobStatusSensor(
task_id="wait_close",
job_id="{{ task_instance.xcom_pull(task_ids='small-deeds')['id'] }}", # noqa
poke_interval=5,
timeout=10 * 60,
)
Operator arguments
job_idstr - Job ID to query for status updates.job_statuseslist - List JobStatus enum values to wait for.neuro_conn_idstr - Name of the connection to use for Neuro authentication. Defaults toneuro_default.
Jinja2 template fields
job_id
XCom exports
Does not export any XCom values.
NeuroHook
In some cases you may need to access other functionalities of the platform. This can be done using the NeuroHook. For example:
import yarl
from neuromation.api import ResourceNotFound
from airflow.hooks.neuro import NeuroHook
from airflow.operators.python_operator import BranchPythonOperator
def check_model(templates_dict, **kw):
hook = NeuroHook()
with hook:
try:
hook.run(
hook.client.storage.stat(
yarl.URL("storage:" + templates_dict["model_path"])
)
)
return "process_with_model"
except ResourceNotFound:
return "process_without_model"
check_model = BranchPythonOperator(
task_id="check_model_exists",
python_callable=check_model,
provide_context=True,
templates_dict={"model_path": "{{ var.value.project_home }}/model.pth"},
)
Explore the Python SDK for more features of the Platform.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file neuro-airflow-plugin-0.0.1.tar.gz.
File metadata
- Download URL: neuro-airflow-plugin-0.0.1.tar.gz
- Upload date:
- Size: 18.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/47.1.1 requests-toolbelt/0.9.1 tqdm/4.46.0 CPython/3.7.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2d6ddf716f0026ce043808ac1f43c3394f18ff54b6b5e9abb7f88cc39c84ed00
|
|
| MD5 |
5add455cbdce851de9656afbb668f589
|
|
| BLAKE2b-256 |
4f268f6d1f88c54a917117c154ceb21b72b92e6bf8ab1a202755a9bf8ee5ac74
|
File details
Details for the file neuro_airflow_plugin-0.0.1-py3-none-any.whl.
File metadata
- Download URL: neuro_airflow_plugin-0.0.1-py3-none-any.whl
- Upload date:
- Size: 17.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/47.1.1 requests-toolbelt/0.9.1 tqdm/4.46.0 CPython/3.7.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
136af84892fadebe7f04b6cbf834f01419d05b040061611259def6bb5587949e
|
|
| MD5 |
61b09e9fc0a89d8ef1208f30fd3d2fa4
|
|
| BLAKE2b-256 |
47040c7ea05ddc5ef694d72e80ea66b614830823365545482e9a14c2a04b9ff3
|