Skip to main content

Airflow REST APIs to create and manage DAGS

Project description

OpenMetadata Airflow Managed DAGS Api

This is a plugin for Apache Airflow >= 1.10 and Airflow >=2.x that exposes REST APIs to deploy an OpenMetadata workflow definition and manage DAGS and tasks.

Development

You can run make branch=issue-3659-v2 test_up and specify any branch from OpenMetadata that you'd need to test the changes in the APIs. This will prepare a separated airflow container.

The command will build the image by downloading the branch changes inside the container. This helps us test the REST APIs using some ongoing changes on OpenMetadata as well.

Requirements

First, make sure that Airflow is properly installed with the latest version 2.3.3. From the docs:

Then, install following packages in your scheduler and webserver python env.

pip install openmetadata-airflow-managed-apis       

Configuration

Add the following section to airflow.cfg

[openmetadata_airflow_apis]
dag_generated_configs = {AIRFLOW_HOME}/dag_generated_configs

substitute AIRFLOW_HOME with your airflow installation home

Deploy

pip install "apache-airflow==2.3.3" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.3.3/constraints-3.9.txt"
  1. Install the package

  2. mkdir -p {AIRFLOW_HOME}/dag_generated_configs

  3. (re)start the airflow webserver and scheduler

    airflow webserver
    airflow scheduler
    

Validate

You can check that the plugin is correctly loaded by going to http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/restapi, or accessing the REST_API_PLUGIN view through the Admin dropdown.

APIs

Enable JWT Auth tokens

Plugin enables JWT Token based authentication for Airflow versions 1.10.4 or higher when RBAC support is enabled.

Generating the JWT access token
curl -XPOST http://localhost:8080/api/v1/security/login -H "Content-Type: application/json" -d '{"username":"admin", "password":"admin", "refresh":true, "provider": "db"}'
Examples:
curl -X POST http://localhost:8080/api/v1/security/login -H "Content-Type: application/json" -d '{"username":"admin", "password":"admin", "refresh":true, "provider": "db"}'
Sample response which includes access_token and refresh_token.
{
 "access_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MDQyMTc4MzgsIm5iZiI6MTYwNDIxNzgzOCwianRpIjoiMTI4ZDE2OGQtMTZiOC00NzU0LWJiY2EtMTEyN2E2ZTNmZWRlIiwiZXhwIjoxNjA0MjE4NzM4LCJpZGVudGl0eSI6MSwiZnJlc2giOnRydWUsInR5cGUiOiJhY2Nlc3MifQ.xSWIE4lR-_0Qcu58OiSy-X0XBxuCd_59ic-9TB7cP9Y",
 "refresh_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MDQyMTc4MzgsIm5iZiI6MTYwNDIxNzgzOCwianRpIjoiZjA5NTNkODEtNWY4Ni00YjY0LThkMzAtYzg5NTYzMmFkMTkyIiwiZXhwIjoxNjA2ODA5ODM4LCJpZGVudGl0eSI6MSwidHlwZSI6InJlZnJlc2gifQ.VsiRr8_ulCoQ-3eAbcFz4dQm-y6732QR6OmYXsy4HLk"
}

By default, JWT access token is valid for 15 mins and refresh token is valid for 30 days. You can renew the access token with the help of refresh token as shown below.

Renewing the Access Token
curl -X POST "http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/security/refresh" -H 'Authorization: Bearer <refresh_token>'
Examples:
curl -X POST "http://localhost:8080/api/v1/security/refresh" -H 'Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MDQyMTc4MzgsIm5iZiI6MTYwNDIxNzgzOCwianRpIjoiZjA5NTNkODEtNWY4Ni00YjY0LThkMzAtYzg5NTYzMmFkMTkyIiwiZXhwIjoxNjA2ODA5ODM4LCJpZGVudGl0eSI6MSwidHlwZSI6InJlZnJlc2gifQ.VsiRr8_ulCoQ-3eAbcFz4dQm-y6732QR6OmYXsy4HLk'
sample response returns the renewed access token as shown below.
{
 "access_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MDQyODQ2OTksIm5iZiI6MTYwNDI4NDY5OSwianRpIjoiZDhhN2IzMmYtMWE5Zi00Y2E5LWFhM2ItNDEwMmU3ZmMyMzliIiwiZXhwIjoxNjA0Mjg1NTk5LCJpZGVudGl0eSI6MSwiZnJlc2giOmZhbHNlLCJ0eXBlIjoiYWNjZXNzIn0.qY2e-bNSgOY-YboinOoGqLfKX9aQkdRjo025mZwBadA"
}

Enable API requests with JWT

If the Authorization header is not added in the api request,response error:
{"msg":"Missing Authorization Header"}
Pass the additional Authorization:Bearer <access_token> header in the rest API request.

Examples:

curl -X GET -H 'Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MDQyODQ2OTksIm5iZiI6MTYwNDI4NDY5OSwianRpIjoiZDhhN2IzMmYtMWE5Zi00Y2E5LWFhM2ItNDEwMmU3ZmMyMzliIiwiZXhwIjoxNjA0Mjg1NTk5LCJpZGVudGl0eSI6MSwiZnJlc2giOmZhbHNlLCJ0eXBlIjoiYWNjZXNzIn0.qY2e-bNSgOY-YboinOoGqLfKX9aQkdRjo025mZwBadA' http://localhost:8080/rest_api/api\?api\=dag_state\&dag_id\=dag_test\&run_id\=manual__2020-10-28T17%3A36%3A28.838356%2B00%3A00

Using the API

Once you deploy the plugin and restart the webserver, you can start to use the REST API. Bellow you will see the endpoints that are supported.

Note: If enable RBAC, http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/rest_api/
This web page will show the Endpoints supported and provide a form for you to test submitting to them.

deploy_dag

Description:
  • Deploy a new dag, and refresh dag to session.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/rest_api/api?api=deploy_dag
Method:
  • POST
POST request Arguments:
{
	"workflow": {
		"name": "test_ingestion_x_35",
		"force": "true",
		"pause": "false",
		"unpause": "true",
		"dag_config": {
			"test_ingestion_x_35": {
				"default_args": {
					"owner": "harsha",
					"start_date": "2021-10-29T00:00:00.000Z",
					"end_date": "2021-11-05T00:00:00.000Z",
					"retries": 1,
					"retry_delay_sec": 300
				},
				"schedule_interval": "0 3 * * *",
				"concurrency": 1,
				"max_active_runs": 1,
				"dagrun_timeout_sec": 60,
				"default_view": "tree",
				"orientation": "LR",
				"description": "this is an example dag!",
				"tasks": {
					"task_1": {
						"operator": "airflow.operators.python_operator.PythonOperator",
						"python_callable_name": "metadata_ingestion_workflow",
						"python_callable_file": "metadata_ingestion.py",
						"op_kwargs": {
							"workflow_config": {
								"metadata_server": {
									"config": {
										"api_endpoint": "http://localhost:8585/api",
										"auth_provider_type": "no-auth"
									},
									"type": "metadata-server"
								},
								"sink": {
									"config": {
										"es_host": "localhost",
										"es_port": 9200,
										"index_dashboards": "true",
										"index_tables": "true",
										"index_topics": "true"
									},
									"type": "elasticsearch"
								},
								"source": {
									"config": {
										"include_dashboards": "true",
										"include_tables": "true",
										"include_topics": "true",
										"limit_records": 10
									},
									"type": "metadata"
								}
							}
						}
					}
				}
			}
		}
	}
}
Examples:
curl -H  'Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MzU2NTE1MDAsIm5iZiI6MTYzNTY1MTUwMCwianRpIjoiNWQyZTM3ZDYtNjdiYS00NGZmLThjOWYtMDM0ZTQyNGE3MTZiIiwiZXhwIjoxNjM1NjUyNDAwLCJpZGVudGl0eSI6MSwiZnJlc2giOnRydWUsInR5cGUiOiJhY2Nlc3MifQ.DRUYCAiMh5h2pk1MZZJ4asyVFC20pu35DuAANQ5GxGw' -H 'Content-Type: application/json' -d "@test_ingestion_config.json" -X POST http://localhost:8080/rest_api/api\?api\=deploy_dag```
##### response:
```json
{"message": "Workflow [test_ingestion_x_35] has been created", "status": "success"}

delete_dag

Description:
  • Delete dag based on dag_id.
Endpoint:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/rest_api/api?api=delete_dag&dag_id=value
Method:
  • GET
GET request Arguments:
  • dag_id - string - The id of dag.
Examples:
curl -X GET http://localhost:8080/rest_api/api?api=delete_dag&dag_id=dag_test
response:
{
  "message": "DAG [dag_test] deleted",
  "status": "success"
}

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

openmetadata_managed_apis-0.13.1.1.tar.gz (30.1 kB view details)

Uploaded Source

Built Distributions

openmetadata_managed_apis-0.13.1.1-py3.9.egg (127.6 kB view details)

Uploaded Source

File details

Details for the file openmetadata_managed_apis-0.13.1.1.tar.gz.

File metadata

File hashes

Hashes for openmetadata_managed_apis-0.13.1.1.tar.gz
Algorithm Hash digest
SHA256 7292c21c15c33221954a24020b8109348a4e0d821ba4e102f369f862898bc81c
MD5 352cfce8a197227595cef59a4b24f642
BLAKE2b-256 41b8f75c95e34928f5bd4ae1a366dcec89c6ae5f88952e249c75a0168bd7eaea

See more details on using hashes here.

File details

Details for the file openmetadata_managed_apis-0.13.1.1-py3.9.egg.

File metadata

File hashes

Hashes for openmetadata_managed_apis-0.13.1.1-py3.9.egg
Algorithm Hash digest
SHA256 ec92bad67f10b868d6894a04c2904f92c125a0e92a7ad04f3de571fba81a6cea
MD5 23765f7219742af2dd52867fde7b7818
BLAKE2b-256 754521672e7a090e8df1c8574b38603a6e20b4b0f643f54f2c4d58707c99b361

See more details on using hashes here.

File details

Details for the file openmetadata_managed_apis-0.13.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for openmetadata_managed_apis-0.13.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 8675a9b03957a9db230853818984dfeabd85648816f7c84f86c9b229b9850fef
MD5 71cca02e85f24397ebf98db081c4cdcc
BLAKE2b-256 5fb37f41e016545458e56be52c699cea6a740eed0f7e672b1bcd7137f9945d83

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page