Skip to main content

Exposes 'Xtended' Apache Airflow management capabilities via secure API

Project description

Airflow Xtended API - Plugin

Apache Airflow plugin that exposes xtended secure API endpoints similar to the official Airflow API (Stable) (1.0.0), providing richer capabilities to support more powerful DAG and job management. Apache Airflow version 2.8.0 or higher is necessary.

Requirements

Installation

python3 -m pip install airflow-xtended-api

Build from source

Build a custom version of this plugin by following the instructions in this doc

Screenshots

screen1 screen2 screen3

Authentication

Airflow Xtended API plugin uses the same auth mechanism as Airflow API (Stable) (1.0.0). So, by default APIs exposed via this plugin respect the auth mechanism used by your Airflow webserver and also complies with the existing RBAC policies. Note that you will need to pass credentials data as part of the request. Here is a snippet from the official docs when basic authorization is used:

curl -X POST 'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/dags/{dag_id}?update_mask=is_paused' \
-H 'Content-Type: application/json' \
--user "username:password" \
-d '{
    "is_paused": true
}'

Using the Custom API

After installing the plugin python package and restarting your airflow webserver, You can see a link under the 'Xtended API' tab called 'Reference Docs' on the airflow webserver homepage. All the necessary documentation for the supported API endpoints resides on that page. You can also directly navigate to that page using below link.

http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/xtended_api/

All the supported endpoints are exposed in the below format:

http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/{ENDPOINT_NAME}

Following are the names of endpoints which are currently supported.

deploy_dag

Description:
  • Deploy a new DAG File to the DAGs directory.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/deploy_dag
Method:
  • POST
POST request Arguments:
  • dag_file - file - Upload & Deploy a DAG from .py or .zip files
  • force (optional) - boolean - Force uploading the file if it already exists
  • unpause (optional) - boolean - The DAG will be unpaused on creation (Works only when uploading .py files)
  • otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!
Examples:
curl -X POST -H 'Content-Type: multipart/form-data' \
 --user "username:password" \
 -F 'dag_file=@test_dag.py' \
 -F 'force=y' \
 -F 'unpause=y' \
 http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/deploy_dag
response:
{
  "message": "DAG File [<module '{MODULE_NAME}' from '/{DAG_FOLDER}/exam.py'>] has been uploaded",
  "status": "success"
}
Method:
  • GET
Get request Arguments:
  • dag_file_url - file - A valid url for fetching .py, .pyc or .zip DAG files
  • filename - string - A valid filename ending with .py, .pyc or .zip
  • force (optional) - boolean - Force uploading the file if it already exists.
  • unpause (optional) - boolean - The DAG will be unpaused on creation (Works only when uploading .py files)
  • otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!
Examples:
curl -X GET --user "username:password" \
'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/deploy_dag?dag_file_url={DAG_FILE_URL}&filename=test_dag.py&force=on&unpause=on'
response:
{
  "message": "DAG File [<module '{MODULE_NAME}' from '/{DAG_FOLDER}/exam.py'>] has been uploaded",
  "status": "success"
}

create_dag

Description:
  • Create a new DAG File in the DAGs directory.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/create_dag
Method:
  • POST
POST request Arguments:
  • filename - string - Name of the python DAG file
  • dag_code - string(multiline) - Python code of the DAG file
  • force (optional) - boolean - Force uploading the file if it already exists
  • unpause (optional) - boolean - The DAG will be unpaused on creation (Works only when uploading .py files)
  • otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!

s3_sync

Description:
  • Sync DAG files from an S3 bucket.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/s3_sync
Method:
  • POST
POST request Arguments:
  • s3_bucket_name - string - S3 bucket name where DAG files exist
  • s3_region - string - S3 region name where the specified bucket exists
  • s3_access_key - string - IAM access key having atleast S3 bucket read access
  • s3_secret_key - string - IAM secret key for the specifed access key
  • s3_object_prefix (optional) - string - Filter results by object prefix
  • s3_object_keys (optional) - string - Sync DAG files specifed by the object keys. Multiple object keys are seperated by comma (,)
  • skip_purge (optional) - boolean - Skip emptying DAGs directory
  • otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!
Examples:
curl -X POST -H 'Content-Type: multipart/form-data' \
 --user "username:password" \
 -F 's3_bucket_name=test-bucket' \
 -F 's3_region=us-east-1' \
 -F 's3_access_key={IAM_ACCESS_KEY}' \
 -F 's3_secret_key={IAM_SECRET_KEY}' \
 -F 'skip_purge=y' \
 http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/s3_sync
response:
{
  "message": "dag files synced from s3",
  "sync_status": {
    "synced": ["test_dag0.py", "test_dag1.py", "test_dag2.py"],
    "failed": []
  },
  "status": "success"
}

mongo_sync

Description:
  • Sync DAG files from a mongo db collection
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/mongo_sync
Method:
  • POST
POST request Arguments:
  • connection_string - string - Source mongo server connection string
  • db_name - string - Source mongo database name
  • collection_name - string - Collection name where DAG data exists in the specified db
  • field_filename - string - DAGs are named using value of this document field from the specified collection
  • field_dag_source - string - A document field referring the Python source for the yet-to-be created DAGs
  • query_filter (optional) - string - JSON query string to filter required documents
  • skip_purge (optional) - boolean - Skip emptying DAGs directory
  • otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!
Examples:
curl -X POST -H 'Content-Type: multipart/form-data' \
 --user "username:password" \
 -F 'connection_string={MONGO_SERVER_CONNECTION_STRING}' \
 -F 'db_name=test_db' \
 -F 'collection_name=test_collection' \
 -F 'field_dag_source=dag_source' \
 -F 'field_filename=dag_filename' \
 -F 'skip_purge=y' \
 http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/mongo_sync
response:
{
  "message": "dag files synced from mongo",
  "sync_status": {
    "synced": ["test_dag0.py", "test_dag1.py", "test_dag2.py"],
    "failed": []
  },
  "status": "success"
}

refresh_all_dags

Description:
  • Refresh all DAGs in the webserver.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/refresh_all_dags
Method:
  • GET
GET request Arguments:
  • None
Examples:
curl -X GET --user "username:password" \
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/refresh_all_dags
response:
{
  "message": "All DAGs are now up-to-date!!",
  "status": "success"
}

scan_dags

Description:
  • Check for newly created DAGs.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/scan_dags
Method:
  • GET
GET request Arguments:
  • otf_sync (optional) - boolean - Check for newly created DAGs On The Fly!
Examples:
curl -X GET --user "username:password" \
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/scan_dags
response:
{
  "message": "Ondemand DAG scan complete!!",
  "status": "success"
}

purge_dags

Description:
  • Empty DAG directory.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/purge_dags
Method:
  • GET
GET request Arguments:
  • None
Examples:
curl -X GET --user "username:password" \
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/purge_dags
response:
{
  "message": "DAG directory purged!!",
  "status": "success"
}

delete_dag

Description:
  • Delete a DAG in the web server from Airflow database and filesystem.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/delete_dag
Method:
  • GET
GET request Arguments:
  • dag_id (optional)- string - DAG id
  • filename (optional) - string - Name of the DAG file that needs to be deleted
  • Note: Atleast one of args 'dag_id' or 'filename' should be specified
Examples:
curl -X GET --user "username:password" \
'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/delete_dag?dag_id=test_dag&filename=test_dag.py'
response:
{
  "message": "DAG [dag_test] deleted",
  "status": "success"
}

upload_file

Description:
  • Upload a new File to the specified directory.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/upload_file
Method:
  • POST
POST request Arguments:
  • file - file - File to be uploaded
  • force (optional) - boolean - Force uploading the file if it already exists
  • path (optional) - string - Location where the file is to be uploaded (Default is the DAGs directory)
Examples:
curl -X POST -H 'Content-Type: multipart/form-data' \
 --user "username:password" \
 -F 'file=@test_file.py' \
 -F 'force=y' \
 http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/upload_file
response:
{
  "message": "File [/{DAG_FOLDER}/dag_test.txt] has been uploaded",
  "status": "success"
}

restart_failed_task

Description:
  • Restart failed tasks with downstream.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/restart_failed_task
Method:
  • GET
GET request Arguments:
  • dag_id - string - DAG id
  • run_id - string - DagRun id
Examples:
curl -X GET --user "username:password" \
'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/restart_failed_task?dag_id=test_dag&run_id=test_run'
response:
{
  "message": {
    "failed_task_count": 1,
    "clear_task_count": 7
  },
  "status": "success"
}

kill_running_tasks

Description:
  • Kill running tasks having status in ['none', 'running'].
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/kill_running_tasks
Method:
  • GET
GET request Arguments:
  • dag_id - string - DAG id
  • run_id - string - DagRun id
  • task_id - string - If task_id is none, kill all tasks, else kill the specified task.
Examples:
curl -X GET --user "username:password" \
'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/kill_running_tasks?dag_id=test_dag&run_id=test_run&task_id=test_task'
response:
{
  "message": "tasks in test_run killed!!",
  "status": "success"
}

run_task_instance

Description:
  • Create DagRun, run the specified tasks, and skip the rest.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/run_task_instance
Method:
  • POST
POST request Arguments:
  • dag_id - string - DAG id
  • run_id - string - DagRun id
  • tasks - string - task id(s), Multiple tasks are separated by comma (,)
  • conf (optional)- string - Optional configuartion for creating DagRun.
Examples:
curl -X POST -H 'Content-Type: multipart/form-data' \
 --user "username:password" \
 -F 'dag_id=test_dag' \
 -F 'run_id=test_run' \
 -F 'tasks=test_task' \
 http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/run_task_instance
response:
{
  "execution_date": "2021-06-21T05:50:19.740803+0000",
  "status": "success"
}

skip_task_instance

Description:
  • Skip one task instance.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/skip_task_instance
Method:
  • GET
GET request Arguments:
  • dag_id - string - DAG id
  • run_id - string - DagRun id
  • task_id - string - task id
Examples:
curl -X GET http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/xtended/skip_task_instance?dag_id=test_dag&run_id=test_run&task_id=test_task
response:
{
  "message": "<TaskInstance: test_dag.test_task 2021-06-21 19:59:34.638794+00:00 [skipped]> skipped!!",
  "status": "success"
}

Acknowledgements

Huge shout out to these awesome plugins that contributed to the growth of Airflow ecosystem, which also inspired this plugin.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_xtended_api-0.1.6.tar.gz (41.5 kB view details)

Uploaded Source

Built Distribution

airflow_xtended_api-0.1.6-py3-none-any.whl (51.0 kB view details)

Uploaded Python 3

File details

Details for the file airflow_xtended_api-0.1.6.tar.gz.

File metadata

  • Download URL: airflow_xtended_api-0.1.6.tar.gz
  • Upload date:
  • Size: 41.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.2

File hashes

Hashes for airflow_xtended_api-0.1.6.tar.gz
Algorithm Hash digest
SHA256 fcfd5261f5581e8e32b7762c04e7ff4adbafb7a7fa4eacec1ae795b78d032ecf
MD5 a34b071e6e279502cbdd84bc1690035e
BLAKE2b-256 f9934ad1ab4ccbbf296173ce4fe93fb02996f8e5bec87ec969b7a08333acf5be

See more details on using hashes here.

File details

Details for the file airflow_xtended_api-0.1.6-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_xtended_api-0.1.6-py3-none-any.whl
Algorithm Hash digest
SHA256 1fa0e217953cc981c2a796254ce2351ffeb3d73d0b3a948884ebb8546e85f01e
MD5 de7e05d92103fc651607af4c3f76f99e
BLAKE2b-256 74cd7846d1df7fd080ad2474112805778d8f181e70cbba1b162481a7bc617d5f

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page