Skip to main content

Running Unicore Jobs from airflow DAGs.

Project description

Generic badge

This project integrates UNICORE and Apache Airflow. UNICORE is a software suite that, among other functions, provides seamless access to high-performance compute and data resources. Airflow is a platform to programmatically author, schedule and monitor workflows.

In the current state, this projects provides a set of airflow operators, which can be used as part of airflow workflows to submit jobs to Unicore. The UnicoreExecutor only offers experimental support for airflow 3 so far. Further support is currently being worked on.

Using the UnicoreExecutor

To use the UnicoreExecutor, this library needs to be installed in your airflow environment and then some configuration work needs to be done.

How to configure these settigns is up to your deployment, as it uses the standard airflow configuration mechanism. In case of a helm deployemnt via the official helm chart, you will need to use environment variables, as all unicore related options are not present in the chart and will cause schema-validation to fail.

All options fall under the [unicore.executor] section in airflow.cfg, or have the AIRFLOW__UNICORE_EXECUTOR__ prefix as an environment variable (or formatted as AIRFLOW___TEAM__UNICORE_EXECUTOR__).

Option name

default

description

Team scoped?

EXECUTION_API_SERVER_URL

<The default from the airflow config>

The url to reach the airflow API server from the execution environment (e.g. compute nodes)

no

DEFAULT_ENV

mandatory

The default activation script for a functional airflow environment on the execution machine

no

TMP_DIR

/tmp

A temporary directory to store data such as GitDagBundles

no

SITES_CONFIG

mandatory

see below

yes

SITES_TOKEN_<SITE_NAME>

mandatory

A valid UNICORE auth token for this site

yes

SITES_PROXY_<SITE_NAME>

mandatory if proxy is enabled for this site

A urlfor the HTTP_PROXY/HTTPS_PROXY env variables (e.g. socks5://user:pass@proxy:port)

yes

The default env is loaded via . default_env.sh, and must enable an environment, where python is available in a suitable version, and the apache-airflow-task-sdk and apache-airflow-providers-git packages are available. All other dependencies depend on the dags to be run, but must already be included in the environment.

A simple solution for this may be the “activate” script for a python venv. If the target systems requires additional commands to enable python (e.g. module load), these may be added to the top of the activate script.

SITES_CONFIG

Sites config is a json string describing as preconfigured sites. It is a list of lists, where each inner lists describes a site. The order of the elements of the inner list is site_name, site_url, site specific user config, site specific precommand, use a proxy for thie site?. site_name and site_url must be provided, the remaining values can be left empty and will then be treated as not required or false. site specific user config sets the User config header for the UNICORE REST API.

Example:

AIRFLOW__UNICORE_EXECUTOR__SITES_CONFIG: |
    [
      [
        "local",
        "https://unicore:8080/DEMO-SITE/rest/core",
        "group:groupname",
        "",
        "False"
      ],
      [
        "juwels",
        "https://unicore:8080/DEMO-SITE/rest/core",
        "",
        "",
        ""
      ],
      [
        "juwels-booster",
        "https://unicore:8080/DEMO-SITE/rest/core",
        "",
        "activate_proxy_env_command",
        "True"
      ]
    ]

Using the Unicore Operators

There are multiple Unicore operators provided by this package. The most versatile one is the UnicoreGenericOperator, which supports a lot of job parameters. All other operators are intended to offer a slightly less complex constructor, and therefore simpler usage, but all generic parameters are still available to be used.

All operators support all possible parameters of the Unicore job description. Here is an excerpt containing some commonly used parameters:

parameter name

type

default

description

application_name

str

None

Application Name

application_version

str

None

Application Version

executable

str

None

Command line executable

arguments

List(str)

None

Command line arguments

environment

Map(str,str)

None

environment arguments

parameters

Map

None

Application Parameters

project

str

None

Accounting Project

imports

List(imports)

None

Stage-in/data import - see Unicore docs

exports

List(exports)

None

Stage-out/data export - see Unicore docs

For imports and exports go here for details.

The UnicoreGenericOperator supports the following additional parameters:

parameter name

type

default

description

name

str

None

name for the airflow task and the Unicore job

xcom_output_files

List(str)

[“stdout”,”stderr”]

list of files of which the content should be put into xcoms

base_url

str

configured in airflow connections or None

The base URL of the UNICOREX server to be used for the Unicore client

credential

pyunicore credential

configured in airflow connections or None

A Unicore Credential to be used for the Unicore client

credential_username

str

configured in airflow connections or None

Username for the Unicore client credentials

credential_password

str

configured in airflow connections or None

Password the the Unicore client credentials

credential_token

str

configured in airflow connections or None

An OIDC token to be used by the Unicore client

The UnicoreScriptOperator offers a way to more easily submit a script as a job, where the script content can be provided as a string.

parameter name

type

default

description

script_content

str

None

The content of the script file

The UnicoreBSSOperator offers a way to directly submit batch-scripts from their content-strings.

parameter name

type

default

description

bss_file_content

str

None

The content of the batch script file

The UnicoreExecutableOperator offers a reduced constructor that only requires an executable.

parameter name

type

default

description

executable

str

None

The executable to run for this job

xcom_output_files

List(str)

[“stdout”,”stderr”]

list of files of which the content should be put into xcoms

The UnicoreDateOperator is more of a testing operator, since it will only run the date executable.

Behaviour on Errors and Success

The Unicore Operators do not do a lot of error and exception handling, and mostly just forward any problems to be handled by airflow. All of the Unicore logic is handled by the pyunicore library.

While some validation of the resulting Unicore job description is done automatically, it may still be possible to build an invalid job description with the operators. This may lead to a submission failure with Unicore. In this case, an exception is thrown to be handled by airflow.

For a successful job submission, the job exit code is returned as the task return value, so that airflow can handle non-zero exit codes. All operators will also append the content of the job-log-file from Unicore to the airflow task log. Also, some job results and values will be uploaded via airflow-x-coms as well:

xcom name

description

Unicore Job ID

the Unicore ID for the job

Unicore Job

the TSI script that was submitted by Unicore

BSS_SUBMIT

the bss_script submitted by Unicore

status_message

the status message for the Unicore job

log

the Unicore job log

workdir_content

content of the job workdir upon completion

[xcom_output_files]

content of each file in their own xcom, by default stdout and stderr

Example DAGs

There are some example DAGs in this repository under project-dir/dags.

  • unicore-test-1.py just shows basic date and executable usage.

  • unicore-test-2.py has some basic examples for the generic operator.

  • unicore-test-3.py also includes script-operator examples.

  • unicore-test-4.py has some examples with more arguments.

  • unicore-test-bss.py shows how bss submission can be done (very simple example).

  • unicore-test-credentials.py demonstrates that not only the credentials from the airflow connections backend can be used, but they can also be provided in the constructor of the operator.

  • unicore-test-import-export.py gives short examples for the imports and exports usage.

Setup testing env

Ensure a current version of docker is installed.

Run python3 -m build to build the python package.

Run the testing-env/build-image.sh script to create the customized airflow image, which will contain the newly build python package.

Run testing-env/run-testing-env.sh init to initialize the airflow containers, database etc. This only needs to be done once.

Run testing-env/run-testing-env.sh up to start the local airflow and Unicore deployment. Airflow will be available on port 8080, Unicore on port 8081.

The run-testing-env.sh script supports the commands up, down, start, stop, ps and init for matching docker compose functions.

Install package via pip

pip install airflow-unicore-integration

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_unicore_integration-0.5.0.tar.gz (24.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_unicore_integration-0.5.0-py3-none-any.whl (23.7 kB view details)

Uploaded Python 3

File details

Details for the file airflow_unicore_integration-0.5.0.tar.gz.

File metadata

File hashes

Hashes for airflow_unicore_integration-0.5.0.tar.gz
Algorithm Hash digest
SHA256 135990bcb09649103fa52ca9c38efa2541a585ff5727511801082c61d6fe462e
MD5 026ab51a2adf3bb0acf29b0108c9db56
BLAKE2b-256 c0ac67a9a694d8e70d57cadb032373daace294716f388d6d0f42460709791d31

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_unicore_integration-0.5.0.tar.gz:

Publisher: publish-to-pypi.yml on UNICORE-EU/airflow-unicore-integration

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file airflow_unicore_integration-0.5.0-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_unicore_integration-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e6fa175b51b60b37bbbbb5593bf43847c61c18ab7ff1d04e0f6b200905e805ed
MD5 874977e36cab1e0e22e014d708129215
BLAKE2b-256 95115ece94c78cb6abd9ab14a9743c60a37f8edd41c74ffc5ccccb04b43d3eef

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_unicore_integration-0.5.0-py3-none-any.whl:

Publisher: publish-to-pypi.yml on UNICORE-EU/airflow-unicore-integration

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page