Skip to main content

Tooling to help build data-driven DAGs

Project description

Documentation Status PyPI

Overview

data-dag is a library for writing data-defined Airflow DAGs and operators.

Installation

pip install data-dag

Example

Re-usable operator and DAG template can be stored in a central location, such as a custom Airflow plugin (or a package within dags/ works fine too):

# plugins/my_factories/download.py

from data_dag.operator_factory import OperatorFactory
from data_dag.dag_factory import DagFactory

from urllib.request import urlretrieve
from typing import List
try:
    from airflow.operators.empty import EmptyOperator
except ImportError:
    from airflow.operators.dummy import DummyOperator as EmptyOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup


class DownloadOperator(OperatorFactory):
    """An operator factory for safely downloading files to a known location"""

    name: str
    url: str
    path: str

    def make_operator(self):
        with TaskGroup(group_id=f'download_{self.name}') as group:
            check = HttpSensor(
                task_id='check_exists',
                endpoint=self.url
            )
            download = PythonOperator(
                task_id=f'download',
                python_callable=lambda: urlretrieve(self.url, self.path)
            )
            check >> download

        return group


class DownloaderDag(DagFactory):
    """A DAG factory for producing simple DAGs that just download a bunch of files"""

    downloads: List[DownloadOperator]

    def _make_dag(self):
        start = EmptyOperator(task_id='start')
        end = EmptyOperator(task_id='end')

        for download in self.downloads:
            start >> download.make_operator() >> end

Then a definition for a particular DAG can live in a data file:

# dags/yaml/sample_dag.yaml

dag_id: sample_dag
description: An example of how to write a data-driven DAG
schedule: '@daily'
start_date: '2020-01-01T00:00:00'
downloads:
- name: data
  url: https://www.example.com/data.zip
  path: data.zip
- name: manifest
  url: https://www.example.com/manifest.json
  path: manifest.json

That data file can then be loaded into a DAG. Per Airflow's requirements, this must be done in a file located in dags/ and the result must be saved into a uniquely named global variable. The simplest possible example is this:

# dags/sample_dag.py

from yaml import safe_load
from my_factories.download import DownloaderDag

with open('yaml/sample_dag.yaml', 'r') as f:
    dag_data = safe_load(f)

dag = DownloaderDag.model_validate(dag_data).make_dag()

img.png

Multiple DAGs

Obviously, using a template isn't much use if you only fill it in once. Here's a simple example of a loader that will load any number of YML files from a folder and publish each one as a DAG in Airflow:

# dags/load_yml_files.py

from pathlib import Path
from airflow import DAG
import yaml
from my_factories import BaseDag

dag_dir = Path(__file__).parent

# For each YAML file in a particular directory...
for yaml_file_path in dag_dir.glob('typical_dags/**.yml'):
    with open(yaml_file_path) as f:
        dag_metadata = yaml.safe_load(f)

    # ... generate a DAG from that metadata
    dag_metadata_obj = BaseDag.model_validate(dag_metadata)
    dag = dag_metadata_obj.make_dag()

    # See https://www.astronomer.io/guides/dynamically-generating-dags/
    dag_name = yaml_file_path.with_suffix('').name
    globals()[dag_name] = dag

Documentation

Complete docs can be found here

//: # (- Add DAG loader factory to enable a 2-liner DAG python file //: # (- Add dependency group mixin or something )

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

data_dag-1.0.0.tar.gz (59.0 kB view details)

Uploaded Source

Built Distribution

data_dag-1.0.0-py3-none-any.whl (9.2 kB view details)

Uploaded Python 3

File details

Details for the file data_dag-1.0.0.tar.gz.

File metadata

  • Download URL: data_dag-1.0.0.tar.gz
  • Upload date:
  • Size: 59.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: python-requests/2.31.0

File hashes

Hashes for data_dag-1.0.0.tar.gz
Algorithm Hash digest
SHA256 c6f14c2963e6928e6762044b06c56e7eb0ef8a1d5739779e9e408c7dba2c76c1
MD5 f9d5daec6740581747efd66c71499584
BLAKE2b-256 33f76c9b57c59da480382b9c318bac766153a5efd736b71d19b6928a286814f8

See more details on using hashes here.

File details

Details for the file data_dag-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: data_dag-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 9.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: python-requests/2.31.0

File hashes

Hashes for data_dag-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 62e7678363a9921e47457871f033465171d05a89b43a9048342734bc635d0dd5
MD5 685f8758fea2abc0bbd521cc314857ff
BLAKE2b-256 bb2394c6ce5a17b6e20742144a007c8918d490d8d75d6a212e9403f7e0223931

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page