Dynamically build Apache Airflow DAGs from YAML files
Project description
dag-factory
Welcome to dag-factory! dag-factory is a library for Apache Airflow® to construct DAGs declaratively via configuration files.
The minimum requirements for dag-factory are:
- Python 3.8.0+
- Apache Airflow® 2.0+
For a gentle introduction, please take a look at our Quickstart Guide. For more examples, please see the examples folder.
Quickstart
The following example demonstrates how to create a simple DAG using dag-factory. We will be generating a DAG with three tasks, where task_2
and task_3
depend on task_1
.
These tasks will be leveraging the BashOperator
to execute simple bash commands.
- To install dag-factory, run the following pip command in your Apache Airflow® environment:
pip install dag-factory
- Create a YAML configuration file called
config_file.yml
and save it within your dags folder:
example_dag1:
default_args:
owner: 'example_owner'
retries: 1
start_date: '2024-01-01'
schedule_interval: '0 3 * * *'
catchup: False
description: 'this is an example dag!'
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: 'echo 1'
task_2:
operator: airflow.operators.bash_operator.BashOperator
bash_command: 'echo 2'
dependencies: [task_1]
task_3:
operator: airflow.operators.bash_operator.BashOperator
bash_command: 'echo 3'
dependencies: [task_1]
We are setting the execution order of the tasks by specifying the dependencies
key.
- In the same folder, create a python file called
generate_dags.py
. This file is responsible for generating the DAGs from the configuration file and is a one-time setup. You won't need to modify this file unless you want to add more configuration files or change the configuration file name.
from airflow import DAG ## by default, this is needed for the dagbag to parse this file
import dagfactory
from pathlib import Path
config_file = Path.cwd() / "dags/config_file.yml"
dag_factory = dagfactory.DagFactory(config_file)
dag_factory.clean_dags(globals())
dag_factory.generate_dags(globals())
After a few moments, the DAG will be generated and ready to run in Airflow. Unpause the DAG in the Apache Airflow® UI and watch the tasks execute!
Please look at the examples folder for more examples.
Features
Multiple Configuration Files
If you want to split your DAG configuration into multiple files, you can do so by leveraging a suffix in the configuration file name.
# 'airflow' word is required for the dagbag to parse this file
from dagfactory import load_yaml_dags
load_yaml_dags(globals_dict=globals(), suffix=['dag.yaml'])
Dynamically Mapped Tasks
If you want to create a dynamic number of tasks, you can use the mapped_tasks
key in the configuration file. The mapped_tasks
key is a list of dictionaries, where each dictionary represents a task.
...
tasks:
request:
operator: airflow.operators.python.PythonOperator
python_callable_name: example_task_mapping
python_callable_file: /usr/local/airflow/dags/expand_tasks.py # this file should contain the python callable
process:
operator: airflow.operators.python_operator.PythonOperator
python_callable_name: expand_task
python_callable_file: /usr/local/airflow/dags/expand_tasks.py
partial:
op_kwargs:
test_id: "test"
expand:
op_args:
request.output
dependencies: [request]
Datasets
dag-factory supports scheduling DAGs via Apache Airflow Datasets.
To leverage, you need to specify the Dataset
in the outlets
key in the configuration file. The outlets
key is a list of strings that represent the dataset locations.
In the schedule
key of the consumer dag, you can set the Dataset
you would like to schedule against. The key is a list of strings that represent the dataset locations.
The consumer dag will run when all the datasets are available.
producer_dag:
default_args:
owner: "example_owner"
retries: 1
start_date: '2024-01-01'
description: "Example DAG producer simple datasets"
schedule_interval: "0 5 * * *"
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 1"
outlets: [ 's3://bucket_example/raw/dataset1.json' ]
task_2:
bash_command: "echo 2"
dependencies: [ task_1 ]
outlets: [ 's3://bucket_example/raw/dataset2.json' ]
consumer_dag:
default_args:
owner: "example_owner"
retries: 1
start_date: '2024-01-01'
description: "Example DAG consumer simple datasets"
schedule: [ 's3://bucket_example/raw/dataset1.json', 's3://bucket_example/raw/dataset2.json' ]
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'consumer datasets'"
Custom Operators
dag-factory supports using custom operators. To leverage, set the path to the custom operator within the operator
key in the configuration file. You can add any additional parameters that the custom operator requires.
...
tasks:
begin:
operator: airflow.operators.dummy_operator.DummyOperator
make_bread_1:
operator: customized.operators.breakfast_operators.MakeBreadOperator
bread_type: 'Sourdough'
Notes
HttpSensor (since 1.0.0)
The package airflow.providers.http.sensors.http
is available for Airflow 2.0+
The following example shows response_check
logic in a python file:
task_2:
operator: airflow.providers.http.sensors.http.HttpSensor
http_conn_id: 'test-http'
method: 'GET'
response_check_name: check_sensor
response_check_file: /path/to/example1/http_conn.py
dependencies: [task_1]
The response_check
logic can also be provided as a lambda:
task_2:
operator: airflow.providers.http.sensors.http.HttpSensor
http_conn_id: 'test-http'
method: 'GET'
response_check_lambda: 'lambda response: "ok" in reponse.text'
dependencies: [task_1]
Benefits
- Construct DAGs without knowing Python
- Construct DAGs without learning Airflow primitives
- Avoid duplicative code
- Everyone loves YAML! ;)
Contributing
Contributions are welcome! Just submit a Pull Request or Github Issue.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for dag_factory-0.20.0rc1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | fb11d20c2dc46c1d01e55f496b3b2fb305bc527b8d4c5da908114623378c851f |
|
MD5 | a88621c8d428b64f6d2c8ad2dbb4d398 |
|
BLAKE2b-256 | 73531073e6f6fb1278d72dcd36cacf27bd5b285f8163a8240c2d839fc7d84a91 |