Dynamically build Airflow DAGs from YAML files
Project description
dag-factory
dag-factory is a library for dynamically generating Apache Airflow DAGs from YAML configuration files.
Installation
To install dag-factory run pip install dag-factory. It requires Python 3.6.0+ and Apache Airflow 2.0+.
Usage
After installing dag-factory in your Airflow environment, there are two steps to creating DAGs. First, we need to create a YAML configuration file. For example:
example_dag1:
default_args:
owner: 'example_owner'
start_date: 2018-01-01 # or '2 days'
end_date: 2018-01-05
retries: 1
retry_delay_sec: 300
schedule_interval: '0 3 * * *'
concurrency: 1
max_active_runs: 1
dagrun_timeout_sec: 60
default_view: 'tree' # or 'graph', 'duration', 'gantt', 'landing_times'
orientation: 'LR' # or 'TB', 'RL', 'BT'
description: 'this is an example dag!'
on_success_callback_name: print_hello
on_success_callback_file: /usr/local/airflow/dags/print_hello.py
on_failure_callback_name: print_hello
on_failure_callback_file: /usr/local/airflow/dags/print_hello.py
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: 'echo 1'
task_2:
operator: airflow.operators.bash_operator.BashOperator
bash_command: 'echo 2'
dependencies: [task_1]
task_3:
operator: airflow.operators.bash_operator.BashOperator
bash_command: 'echo 3'
dependencies: [task_1]
Then in the DAGs folder in your Airflow environment you need to create a python file like this:
from airflow import DAG
import dagfactory
dag_factory = dagfactory.DagFactory("/path/to/dags/config_file.yml")
dag_factory.clean_dags(globals())
dag_factory.generate_dags(globals())
And this DAG will be generated and ready to run in Airflow!
If you have several configuration files you can import them like this:
# 'airflow' word is required for the dagbag to parse this file
from dagfactory import load_yaml_dags
load_yaml_dags(globals_dict=globals(), suffix=['dag.yaml'])
Notes
HttpSensor (since 0.10.0)
The package airflow.sensors.http_sensor works with all supported versions of Airflow. In Airflow 2.0+, the new package name can be used in the operator value: airflow.providers.http.sensors.http
The following example shows response_check logic in a python file:
task_2:
operator: airflow.sensors.http_sensor.HttpSensor
http_conn_id: 'test-http'
method: 'GET'
response_check_name: check_sensor
response_check_file: /path/to/example1/http_conn.py
dependencies: [task_1]
The response_check logic can also be provided as a lambda:
task_2:
operator: airflow.sensors.http_sensor.HttpSensor
http_conn_id: 'test-http'
method: 'GET'
response_check_lambda: 'lambda response: "ok" in reponse.text'
dependencies: [task_1]
Benefits
- Construct DAGs without knowing Python
- Construct DAGs without learning Airflow primitives
- Avoid duplicative code
- Everyone loves YAML! ;)
Contributing
Contributions are welcome! Just submit a Pull Request or Github Issue.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dag-factory-0.18.1.tar.gz.
File metadata
- Download URL: dag-factory-0.18.1.tar.gz
- Upload date:
- Size: 15.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.48.0 CPython/3.7.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7fd21c6e382760cb8d7c184b6cebdc7d65f1ed7066ff489e88213a5bfd0057f0
|
|
| MD5 |
6906c09443e1d97ee0e35a245f120882
|
|
| BLAKE2b-256 |
eec865374261a98529845abdebd377c0a0e0e5a444feb58e8a439628ec8b5a2a
|
File details
Details for the file dag_factory-0.18.1-py2.py3-none-any.whl.
File metadata
- Download URL: dag_factory-0.18.1-py2.py3-none-any.whl
- Upload date:
- Size: 16.4 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.48.0 CPython/3.7.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
52320e93c6e570497d5d5c608ca61e73c958c1c8256730f1957a702692ef4685
|
|
| MD5 |
6df64b030e2fc4b9a2e5506bfb8bf39c
|
|
| BLAKE2b-256 |
bfc2351c33a754da569f2e7f81483cceb4c20acfa0061bf54314c8964d66e13b
|