Tooling to help build data-driven DAGs
Project description
Overview
data-dag
is a library for writing data-defined Airflow DAGs and operators.
Installation
pip install data-dag
Example
Re-usable operator and DAG template can be stored in a central location, such as a custom Airflow plugin (or a package within dags/
works fine too):
# plugins/my_factories/download.py
from data_dag.operator_factory import OperatorFactory
from data_dag.dag_factory import DagFactory
from urllib.request import urlretrieve
from typing import List
try:
from airflow.operators.empty import EmptyOperator
except ImportError:
from airflow.operators.dummy import DummyOperator as EmptyOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
class DownloadOperator(OperatorFactory):
"""An operator factory for safely downloading files to a known location"""
name: str
url: str
path: str
def make_operator(self):
with TaskGroup(group_id=f'download_{self.name}') as group:
check = HttpSensor(
task_id='check_exists',
endpoint=self.url
)
download = PythonOperator(
task_id=f'download',
python_callable=lambda: urlretrieve(self.url, self.path)
)
check >> download
return group
class DownloaderDag(DagFactory):
"""A DAG factory for producing simple DAGs that just download a bunch of files"""
downloads: List[DownloadOperator]
def _make_dag(self):
start = EmptyOperator(task_id='start')
end = EmptyOperator(task_id='end')
for download in self.downloads:
start >> download.make_operator() >> end
Then a definition for a particular DAG can live in a data file:
# dags/yaml/sample_dag.yaml
dag_id: sample_dag
description: An example of how to write a data-driven DAG
schedule: '@daily'
start_date: '2020-01-01T00:00:00'
downloads:
- name: data
url: https://www.example.com/data.zip
path: data.zip
- name: manifest
url: https://www.example.com/manifest.json
path: manifest.json
That data file can then be loaded into a DAG. Per Airflow's requirements, this must be done in a file located in dags/
and the result must be saved into a uniquely named global variable. The simplest possible example is this:
# dags/sample_dag.py
from yaml import safe_load
from my_factories.download import DownloaderDag
with open('yaml/sample_dag.yaml', 'r') as f:
dag_data = safe_load(f)
dag = DownloaderDag.model_validate(dag_data).make_dag()
Multiple DAGs
Obviously, using a template isn't much use if you only fill it in once. Here's a simple example of a loader that will load any number of YML files from a folder and publish each one as a DAG in Airflow:
# dags/load_yml_files.py
from pathlib import Path
from airflow import DAG
import yaml
from my_factories import BaseDag
dag_dir = Path(__file__).parent
# For each YAML file in a particular directory...
for yaml_file_path in dag_dir.glob('typical_dags/**.yml'):
with open(yaml_file_path) as f:
dag_metadata = yaml.safe_load(f)
# ... generate a DAG from that metadata
dag_metadata_obj = BaseDag.model_validate(dag_metadata)
dag = dag_metadata_obj.make_dag()
# See https://www.astronomer.io/guides/dynamically-generating-dags/
dag_name = yaml_file_path.with_suffix('').name
globals()[dag_name] = dag
Documentation
Complete docs can be found here
//: # (- Add DAG loader factory to enable a 2-liner DAG python file //: # (- Add dependency group mixin or something )
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file data_dag-1.0.0.tar.gz
.
File metadata
- Download URL: data_dag-1.0.0.tar.gz
- Upload date:
- Size: 59.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: python-requests/2.31.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | c6f14c2963e6928e6762044b06c56e7eb0ef8a1d5739779e9e408c7dba2c76c1 |
|
MD5 | f9d5daec6740581747efd66c71499584 |
|
BLAKE2b-256 | 33f76c9b57c59da480382b9c318bac766153a5efd736b71d19b6928a286814f8 |
File details
Details for the file data_dag-1.0.0-py3-none-any.whl
.
File metadata
- Download URL: data_dag-1.0.0-py3-none-any.whl
- Upload date:
- Size: 9.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: python-requests/2.31.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 62e7678363a9921e47457871f033465171d05a89b43a9048342734bc635d0dd5 |
|
MD5 | 685f8758fea2abc0bbd521cc314857ff |
|
BLAKE2b-256 | bb2394c6ce5a17b6e20742144a007c8918d490d8d75d6a212e9403f7e0223931 |