Skip to main content

Dynamically build Airflow DAGs from YAML files

Project description

airflow-dagfactory

Github Actions Coverage PyPi Code Style Downloads

dag-factory is a library for dynamically generating Apache Airflow DAGs from YAML configuration files.

Installation

To install dag-factory run pip install airflow-dagfactory. It requires Python 3.10.0+ and Apache Airflow 2.9.1+.

Usage

After installing airflow-dagfactory in your Airflow environment, there are two steps to creating DAGs. First, we need to create a YAML configuration file. For example:

example_dag1:
  default_args:
    owner: 'example_owner'
    start_date: 2018-01-01  # or '2 days'
    end_date: 2018-01-05
    retries: 1
    retry_delay_sec: 300
  schedule_interval: '0 3 * * *'
  concurrency: 1
  max_active_runs: 1
  dagrun_timeout_sec: 60
  default_view: 'tree'  # or 'graph', 'duration', 'gantt', 'landing_times'
  orientation: 'LR'  # or 'TB', 'RL', 'BT'
  description: 'this is an example dag!'
  on_success_callback_name: print_hello
  on_success_callback_file: /usr/local/airflow/dags/print_hello.py
  on_failure_callback_name: print_hello
  on_failure_callback_file: /usr/local/airflow/dags/print_hello.py
  tasks:
    task_1:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: 'echo 1'
    task_2:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: 'echo 2'
      dependencies: [task_1]
    task_3:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: 'echo 3'
      dependencies: [task_1]

Then in the DAGs folder in your Airflow environment you need to create a python file like this:

from airflow import DAG
import airflow_dagfactory

dag_factory = airflow_dagfactory.DagFactory("/path/to/dags/config_file.yml")

dag_factory.clean_dags(globals())
dag_factory.generate_dags(globals())

And this DAG will be generated and ready to run in Airflow!

If you have several configuration files you can import them like this:

# 'airflow' word is required for the dagbag to parse this file
from airflow_dagfactory import load_yaml_dags

load_yaml_dags(globals_dict=globals(), suffix=['dag.yaml'])

screenshot

Notes

HttpSensor (since 0.10.0)

The package airflow.sensors.http_sensor works with all supported versions of Airflow. In Airflow 2.0+, the new package name can be used in the operator value: airflow.providers.http.sensors.http

The following example shows response_check logic in a python file:

task_2:
      operator: airflow.sensors.http_sensor.HttpSensor
      http_conn_id: 'test-http'
      method: 'GET'
      response_check_name: check_sensor
      response_check_file: /path/to/example1/http_conn.py
      dependencies: [task_1]

The response_check logic can also be provided as a lambda:

task_2:
      operator: airflow.sensors.http_sensor.HttpSensor
      http_conn_id: 'test-http'
      method: 'GET'
      response_check_lambda: 'lambda response: "ok" in reponse.text'
      dependencies: [task_1]

Benefits

  • Construct DAGs without knowing Python
  • Construct DAGs without learning Airflow primitives
  • Avoid duplicative code
  • Everyone loves YAML! ;)

Contributing

Contributions are welcome! Just submit a Pull Request or Github Issue.

Upload Pypi
python setup.py sdist bdist_wheel
python -m twine upload dist/* --verbose

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow-dagfactory-0.19.2.tar.gz (18.3 kB view details)

Uploaded Source

Built Distribution

airflow_dagfactory-0.19.2-py3-none-any.whl (17.3 kB view details)

Uploaded Python 3

File details

Details for the file airflow-dagfactory-0.19.2.tar.gz.

File metadata

  • Download URL: airflow-dagfactory-0.19.2.tar.gz
  • Upload date:
  • Size: 18.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.10.12

File hashes

Hashes for airflow-dagfactory-0.19.2.tar.gz
Algorithm Hash digest
SHA256 aa728638f63abb97dae03777384bd5c5249c95ab0c95118647ad6362d8a5281f
MD5 3ab60489f946ccc23fa145b32271f8d1
BLAKE2b-256 025c29b6fed60955b69306bafe5d4e83b2d5eb426f402e22259bfc2cf2d1b0c9

See more details on using hashes here.

File details

Details for the file airflow_dagfactory-0.19.2-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_dagfactory-0.19.2-py3-none-any.whl
Algorithm Hash digest
SHA256 7a14b1fea32bb48f7968019bdaeb78aa64fba33be5d069cc7bd22d4d48695c65
MD5 cb2a608bb7f0065147bed74dafebd486
BLAKE2b-256 35cc35c4ee1a23014c80e8271e7f618bbd58cacebbd39b66d571c314f60af816

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page