Skip to main content

Exposes 'Xtended' Apache Airflow management capabilities via secure API

Project description

Airflow Xtended API - Plugin

Apache Airflow plugin that exposes xtended secure API endpoints similar to the official Airflow API (Stable) (1.0.0), providing richer capabilities to support more powerful DAG and job management. Apache Airflow version 2.1.0 or higher is necessary.

Requirements

Installation

python3 -m pip install airflow-xtended-api

Authentication

Airflow Xtended API plugin uses the same auth mechanism as Airflow API (Stable) (1.0.0). So, by default APIs exposed via this plugin respect the auth mechanism used by your Airflow webserver and also complies with the existing RBAC policies. Note that you will need to pass credentials data as part of the request. Here is a snippet from the official docs when basic authorization is used:

curl -X POST 'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/dags/{dag_id}?update_mask=is_paused' \
-H 'Content-Type: application/json' \
--user "username:password" \
-d '{
    "is_paused": true
}'

Using the Custom API

After installing the plugin python package and restarting your airflow webserver, You can see a link under the 'Xtended API' tab called 'Reference Docs' on the airflow webserver homepage. All the necessary documentation for the supported API endpoints resides on that page. You can also directly navigate to that page using below link.

http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/xtended_api/

All the supported endpoints are exposed in the below format:

http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/{ENDPOINT_NAME}

Following are the names of endpoints which are currently supported.

deploy_dag

Description:
  • Deploy a new DAG File to the DAGs directory.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/deploy_dag
Method:
  • POST
POST request Arguments:
  • dag_file - file - Upload & Deploy a DAG from .py or .zip files
  • force (optional) - boolean - Force uploading the file if it already exists
  • unpause (optional) - boolean - The DAG will be unpaused on creation (Works only when uploading .py files)
  • otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!
Examples:
curl -X POST -H 'Content-Type: multipart/form-data' \
 --user "username:password" \
 -F 'dag_file=@test_dag.py' \
 -F 'force=y' \
 -F 'unpause=y' \
 http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/deploy_dag
response:
{
  "message": "DAG File [<module '{MODULE_NAME}' from '/{DAG_FOLDER}/exam.py'>] has been uploaded",
  "status": "success"
}
Method:
  • GET
Get request Arguments:
  • dag_file_url - file - A valid url for fetching .py, .pyc or .zip DAG files
  • filename - string - A valid filename ending with .py, .pyc or .zip
  • force (optional) - boolean - Force uploading the file if it already exists.
  • unpause (optional) - boolean - The DAG will be unpaused on creation (Works only when uploading .py files)
  • otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!
Examples:
curl -X GET --user "username:password" \
'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/deploy_dag?dag_file_url={DAG_FILE_URL}&filename=test_dag.py&force=on&unpause=on'
response:
{
  "message": "DAG File [<module '{MODULE_NAME}' from '/{DAG_FOLDER}/exam.py'>] has been uploaded",
  "status": "success"
}

create_dag

Description:
  • Create a new DAG File in the DAGs directory.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/create_dag
Method:
  • POST
POST request Arguments:
  • filename - string - Name of the python DAG file
  • dag_code - string(multiline) - Python code of the DAG file
  • force (optional) - boolean - Force uploading the file if it already exists
  • unpause (optional) - boolean - The DAG will be unpaused on creation (Works only when uploading .py files)
  • otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!

s3_sync

Description:
  • Sync DAG files from an S3 bucket.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/s3_sync
Method:
  • POST
POST request Arguments:
  • s3_bucket_name - string - S3 bucket name where DAG files exist
  • s3_region - string - S3 region name where the specified bucket exists
  • s3_access_key - string - IAM access key having atleast S3 bucket read access
  • s3_secret_key - string - IAM secret key for the specifed access key
  • s3_object_prefix (optional) - string - Filter results by object prefix
  • s3_object_keys (optional) - string - Sync DAG files specifed by the object keys. Multiple object keys are seperated by comma (,)
  • skip_purge (optional) - boolean - Skip emptying DAGs directory
  • otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!
Examples:
curl -X POST -H 'Content-Type: multipart/form-data' \
 --user "username:password" \
 -F 's3_bucket_name=test-bucket' \
 -F 's3_region=us-east-1' \
 -F 's3_access_key={IAM_ACCESS_KEY}' \
 -F 's3_secret_key={IAM_SECRET_KEY}' \
 -F 'skip_purge=y' \
 http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/s3_sync
response:
{
  "message": "dag files synced from s3",
  "sync_status": {
    "synced": ["test_dag0.py", "test_dag1.py", "test_dag2.py"],
    "failed": []
  },
  "status": "success"
}

mongo_sync

Description:
  • Sync DAG files from a mongo db collection
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/mongo_sync
Method:
  • POST
POST request Arguments:
  • connection_string - string - Source mongo server connection string
  • db_name - string - Source mongo database name
  • collection_name - string - Collection name where DAG data exists in the specified db
  • field_filename - string - DAGs are named using value of this document field from the specified collection
  • field_dag_source - string - A document field referring the Python source for the yet-to-be created DAGs
  • query_filter (optional) - string - JSON query string to filter required documents
  • skip_purge (optional) - boolean - Skip emptying DAGs directory
  • otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!
Examples:
curl -X POST -H 'Content-Type: multipart/form-data' \
 --user "username:password" \
 -F 'connection_string={MONGO_SERVER_CONNECTION_STRING}' \
 -F 'db_name=test_db' \
 -F 'collection_name=test_collection' \
 -F 'field_dag_source=dag_source' \
 -F 'field_filename=dag_filename' \
 -F 'skip_purge=y' \
 http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/mongo_sync
response:
{
  "message": "dag files synced from mongo",
  "sync_status": {
    "synced": ["test_dag0.py", "test_dag1.py", "test_dag2.py"],
    "failed": []
  },
  "status": "success"
}

refresh_all_dags

Description:
  • Refresh all DAGs in the webserver.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/refresh_all_dags
Method:
  • GET
GET request Arguments:
  • None
Examples:
curl -X GET --user "username:password" \
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/refresh_all_dags
response:
{
  "message": "All DAGs are now up-to-date!!",
  "status": "success"
}

scan_dags

Description:
  • Check for newly created DAGs.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/scan_dags
Method:
  • GET
GET request Arguments:
  • otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!
Examples:
curl -X GET --user "username:password" \
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/scan_dags
response:
{
  "message": "Ondemand DAG scan complete!!",
  "status": "success"
}

purge_dags

Description:
  • Empty DAG directory.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/purge_dags
Method:
  • GET
GET request Arguments:
  • None
Examples:
curl -X GET --user "username:password" \
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/purge_dags
response:
{
  "message": "DAG directory purged!!",
  "status": "success"
}

delete_dag

Description:
  • Delete a DAG in the web server from Airflow database and filesystem.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/delete_dag
Method:
  • GET
GET request Arguments:
  • dag_id (optional)- string - DAG id
  • filename (optional) - string - Name of the DAG file that needs to be deleted
  • Note: Atleast one of args 'dag_id' or 'filename' should be specified
Examples:
curl -X GET --user "username:password" \
'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/delete_dag?dag_id=test_dag&filename=test_dag.py'
response:
{
  "message": "DAG [dag_test] deleted",
  "status": "success"
}

upload_file

Description:
  • Upload a new File to the specified directory.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/upload_file
Method:
  • POST
POST request Arguments:
  • file - file - File to be uploaded
  • force (optional) - boolean - Force uploading the file if it already exists
  • path (optional) - string - Location where the file is to be uploaded (Default is the DAGs directory)
Examples:
curl -X POST -H 'Content-Type: multipart/form-data' \
 --user "username:password" \
 -F 'file=@test_file.py' \
 -F 'force=y' \
 http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/upload_file
response:
{
  "message": "File [/{DAG_FOLDER}/dag_test.txt] has been uploaded",
  "status": "success"
}

restart_failed_task

Description:
  • Restart failed tasks with downstream.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/restart_failed_task
Method:
  • GET
GET request Arguments:
  • dag_id - string - DAG id
  • run_id - string - DagRun id
Examples:
curl -X GET --user "username:password" \
'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/restart_failed_task?dag_id=test_dag&run_id=test_run'
response:
{
  "message": {
    "failed_task_count": 1,
    "clear_task_count": 7
  },
  "status": "success"
}

kill_running_tasks

Description:
  • Kill running tasks having status in ['none', 'running'].
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/kill_running_tasks
Method:
  • GET
GET request Arguments:
  • dag_id - string - DAG id
  • run_id - string - DagRun id
  • task_id - string - If task_id is none, kill all tasks, else kill the specified task.
Examples:
curl -X GET --user "username:password" \
'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/kill_running_tasks?dag_id=test_dag&run_id=test_run&task_id=test_task'
response:
{
  "message": "tasks in test_run killed!!",
  "status": "success"
}

run_task_instance

Description:
  • Create DagRun, run the specified tasks, and skip the rest.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/run_task_instance
Method:
  • POST
POST request Arguments:
  • dag_id - string - DAG id
  • run_id - string - DagRun id
  • tasks - string - task id(s), Multiple tasks are separated by comma (,)
  • conf (optional)- string - Optional configuartion for creating DagRun.
Examples:
curl -X POST -H 'Content-Type: multipart/form-data' \
 --user "username:password" \
 -F 'dag_id=test_dag' \
 -F 'run_id=test_run' \
 -F 'tasks=test_task' \
 http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/run_task_instance
response:
{
  "execution_date": "2021-06-21T05:50:19.740803+0000",
  "status": "success"
}

skip_task_instance

Description:
  • Skip one task instance.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/skip_task_instance
Method:
  • GET
GET request Arguments:
  • dag_id - string - DAG id
  • run_id - string - DagRun id
  • task_id - string - task id
Examples:
curl -X GET http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/skip_task_instance?dag_id=test_dag&run_id=test_run&task_id=test_task
response:
{
  "message": "<TaskInstance: test_dag.test_task 2021-06-21 19:59:34.638794+00:00 [skipped]> skipped!!",
  "status": "success"
}

Acknowledgements

Huge shout out to these awesome plugins that contributed to the growth of Airflow ecosystem, which also inspired this plugin.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_xtended_api-0.1.5.tar.gz (36.5 kB view details)

Uploaded Source

Built Distribution

airflow_xtended_api-0.1.5-py3-none-any.whl (51.4 kB view details)

Uploaded Python 3

File details

Details for the file airflow_xtended_api-0.1.5.tar.gz.

File metadata

  • Download URL: airflow_xtended_api-0.1.5.tar.gz
  • Upload date:
  • Size: 36.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/2.0.0 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.61.1 CPython/3.6.9

File hashes

Hashes for airflow_xtended_api-0.1.5.tar.gz
Algorithm Hash digest
SHA256 a8efe33b28d2f6a25221e88bb559f57863affc62062696cdf7a9a453d6007548
MD5 b0b14a4b1308195e117ebbac6bfa20a1
BLAKE2b-256 a0568ef1ed89e3cbc4986476e325964ea1c95dbdfb093f9f7820e82cbc92671a

See more details on using hashes here.

File details

Details for the file airflow_xtended_api-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: airflow_xtended_api-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 51.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/2.0.0 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.61.1 CPython/3.6.9

File hashes

Hashes for airflow_xtended_api-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 fcc286cd3898d5b127b167ae369f95aa7b12917a11162a358f27d4a28e6f1ecd
MD5 5f0140417a87b6f53f02dfafeb5e574e
BLAKE2b-256 d91e3c0efc339c34933b7aec15ce44aff44dbd162a8f04f9f9699e7d27fb8ec6

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page