Running Unicore Jobs from airflow DAGs.
Project description
This project integrates UNICORE and Apache Airflow. UNICORE is a software suite that, among other functions, provides seamless access to high-performance compute and data resources. Airflow is a platform to programmatically author, schedule and monitor workflows.
In the current state, this projects provides a set of airflow operators, which can be used as part of airflow workflows to submit jobs to Unicore. The UnicoreExecutor only offers experimental support for airflow 3 so far. Further support is currently being worked on.
Using the Unicore Operators
There are multiple Unicore operators provided by this package. The most versatile one is the UnicoreGenericOperator, which supports a lot of job parameters. All other operators are intended to offer a slightly less complex constructor, and therefore simpler usage, but all generic parameters are still available to be used.
All operators support all possible parameters of the Unicore job description. Here is an excerpt containing some commonly used parameters:
parameter name |
type |
default |
description |
|---|---|---|---|
application_name |
str |
None |
Application Name |
application_version |
str |
None |
Application Version |
executable |
str |
None |
Command line executable |
arguments |
List(str) |
None |
Command line arguments |
environment |
Map(str,str) |
None |
environment arguments |
parameters |
Map |
None |
Application Parameters |
project |
str |
None |
Accounting Project |
imports |
List(imports) |
None |
Stage-in/data import - see Unicore docs |
exports |
List(exports) |
None |
Stage-out/data export - see Unicore docs |
For imports and exports go here for details.
The UnicoreGenericOperator supports the following additional parameters:
parameter name |
type |
default |
description |
|---|---|---|---|
name |
str |
None |
name for the airflow task and the Unicore job |
xcom_output_files |
List(str) |
[“stdout”,”stderr”] |
list of files of which the content should be put into xcoms |
base_url |
str |
configured in airflow connections or None |
The base URL of the UNICOREX server to be used for the Unicore client |
credential |
pyunicore credential |
configured in airflow connections or None |
A Unicore Credential to be used for the Unicore client |
credential_username |
str |
configured in airflow connections or None |
Username for the Unicore client credentials |
credential_password |
str |
configured in airflow connections or None |
Password the the Unicore client credentials |
credential_token |
str |
configured in airflow connections or None |
An OIDC token to be used by the Unicore client |
The UnicoreScriptOperator offers a way to more easily submit a script as a job, where the script content can be provided as a string.
parameter name |
type |
default |
description |
|---|---|---|---|
script_content |
str |
None |
The content of the script file |
The UnicoreBSSOperator offers a way to directly submit batch-scripts from their content-strings.
parameter name |
type |
default |
description |
|---|---|---|---|
bss_file_content |
str |
None |
The content of the batch script file |
The UnicoreExecutableOperator offers a reduced constructor that only requires an executable.
parameter name |
type |
default |
description |
|---|---|---|---|
executable |
str |
None |
The executable to run for this job |
xcom_output_files |
List(str) |
[“stdout”,”stderr”] |
list of files of which the content should be put into xcoms |
The UnicoreDateOperator is more of a testing operator, since it will only run the date executable.
Behaviour on Errors and Success
The Unicore Operators do not do a lot of error and exception handling, and mostly just forward any problems to be handled by airflow. All of the Unicore logic is handled by the pyunicore library.
While some validation of the resulting Unicore job description is done automatically, it may still be possible to build an invalid job description with the operators. This may lead to a submission failure with Unicore. In this case, an exception is thrown to be handled by airflow.
For a successful job submission, the job exit code is returned as the task return value, so that airflow can handle non-zero exit codes. All operators will also append the content of the job-log-file from Unicore to the airflow task log. Also, some job results and values will be uploaded via airflow-x-coms as well:
xcom name |
description |
|---|---|
Unicore Job ID |
the Unicore ID for the job |
Unicore Job |
the TSI script that was submitted by Unicore |
BSS_SUBMIT |
the bss_script submitted by Unicore |
status_message |
the status message for the Unicore job |
log |
the Unicore job log |
workdir_content |
content of the job workdir upon completion |
[xcom_output_files] |
content of each file in their own xcom, by default stdout and stderr |
Example DAGs
There are some example DAGs in this repository under project-dir/dags.
unicore-test-1.py just shows basic date and executable usage.
unicore-test-2.py has some basic examples for the generic operator.
unicore-test-3.py also includes script-operator examples.
unicore-test-4.py has some examples with more arguments.
unicore-test-bss.py shows how bss submission can be done (very simple example).
unicore-test-credentials.py demonstrates that not only the credentials from the airflow connections backend can be used, but they can also be provided in the constructor of the operator.
unicore-test-import-export.py gives short examples for the imports and exports usage.
Setup testing env
Ensure a current version of docker is installed.
Run python3 -m build to build the python package.
Run the testing-env/build-image.sh script to create the customized airflow image, which will contain the newly build python package.
Run testing-env/run-testing-env.sh init to initialize the airflow containers, database etc. This only needs to be done once.
Run testing-env/run-testing-env.sh up to start the local airflow and Unicore deployment. Airflow will be available on port 8080, Unicore on port 8081.
The run-testing-env.sh script supports the commands up, down, start, stop, ps and init for matching docker compose functions.
Install package via pip
pip install airflow-unicore-integration
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file airflow_unicore_integration-0.1.3.tar.gz.
File metadata
- Download URL: airflow_unicore_integration-0.1.3.tar.gz
- Upload date:
- Size: 16.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7ab31a6e48ad8eae6ad84a00cbc5ee46d7d366ac4a143aa0e160624c72e49ff5
|
|
| MD5 |
5fa28fe1b978f617a3be651df7daf6cf
|
|
| BLAKE2b-256 |
53dc343b088a46cf4cc7b9352aa4dc393d7b73872a3d5a8e986475f820a7b21d
|
Provenance
The following attestation bundles were made for airflow_unicore_integration-0.1.3.tar.gz:
Publisher:
publish-to-pypi.yml on UNICORE-EU/airflow-unicore-integration
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
airflow_unicore_integration-0.1.3.tar.gz -
Subject digest:
7ab31a6e48ad8eae6ad84a00cbc5ee46d7d366ac4a143aa0e160624c72e49ff5 - Sigstore transparency entry: 559796898
- Sigstore integration time:
-
Permalink:
UNICORE-EU/airflow-unicore-integration@a45ab2ceb907e55b1ec9aac1795a4b5e0e4e892e -
Branch / Tag:
refs/tags/0.1.3 - Owner: https://github.com/UNICORE-EU
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-to-pypi.yml@a45ab2ceb907e55b1ec9aac1795a4b5e0e4e892e -
Trigger Event:
push
-
Statement type:
File details
Details for the file airflow_unicore_integration-0.1.3-py3-none-any.whl.
File metadata
- Download URL: airflow_unicore_integration-0.1.3-py3-none-any.whl
- Upload date:
- Size: 17.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5579be06a8e0167b8ff2c0d7b5ec7f86e4e1a5747098e72c6b56615a9bfa83b3
|
|
| MD5 |
19ce3a6ac95224cdf171cce5aa815a09
|
|
| BLAKE2b-256 |
12062751b9eb990839972a43421a24581d898490260908e7b2f5652d97d4587b
|
Provenance
The following attestation bundles were made for airflow_unicore_integration-0.1.3-py3-none-any.whl:
Publisher:
publish-to-pypi.yml on UNICORE-EU/airflow-unicore-integration
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
airflow_unicore_integration-0.1.3-py3-none-any.whl -
Subject digest:
5579be06a8e0167b8ff2c0d7b5ec7f86e4e1a5747098e72c6b56615a9bfa83b3 - Sigstore transparency entry: 559796921
- Sigstore integration time:
-
Permalink:
UNICORE-EU/airflow-unicore-integration@a45ab2ceb907e55b1ec9aac1795a4b5e0e4e892e -
Branch / Tag:
refs/tags/0.1.3 - Owner: https://github.com/UNICORE-EU
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-to-pypi.yml@a45ab2ceb907e55b1ec9aac1795a4b5e0e4e892e -
Trigger Event:
push
-
Statement type: