Skip to main content

Airflow utilities for configuration of many DAGs and DAG environments

Project description

airflow-config

Apache Airflow utilities for configuration of DAGs and DAG environments

Build Status codecov License PyPI

Overview

This library allows for YAML-driven configuration of Airflow, including DAGs, Operators, and declaratively defined DAGs (à la dag-factory). It is built with Pydantic, Hydra, and OmegaConf.

Consider the following basic DAG:

from airflow import DAG
from airflow.providers.standard.operators.bash import BashOperator
from datetime import datetime, timedelta

with DAG(
    dag_id="test-dag",
    default_args={
        "depends_on_past": False,
        "email": ["my.email@myemail.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 0,
    },
    description="test that dag is working properly",
    schedule=timedelta(minutes=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["utility", "test"],
):
    BashOperator(
        task_id="test-task",
        bash_command="echo 'test'",
    )

We can already see many options that we might want to drive centrally via config, perhaps based on some notion of environment (e.g. dev, prod, etc).

  • "email": ["my.email@myemail.com"]
  • "email_on_failure": False
  • "email_on_retry": False
  • "retries": 0
  • schedule=timedelta(minutes=1)
  • tags=["utility", "test"]

If we want to change these in our DAG, we need to modify code. Now imagine we have hundreds of DAGs, this can quickly get out of hand, especially since Airflow DAGs are Python code, and we might easily inject a syntax error or a trailing comma or other common problem.

Now consider the alternative, config-driven approach:

config/dev.yaml

# @package _global_
_target_: airflow_config.Configuration
default_args:
  _target_: airflow_config.TaskArgs
  owner: test
  email: [myemail@myemail.com]
  email_on_failure: false
  email_on_retry: false
  retries: 0
  depends_on_past: false
default_dag_args:
  _target_: airflow_config.DagArgs
  schedule: "01:00"
  start_date: "2024-01-01"
  catchup: false
  tags: ["utility", "test"]
from airflow.providers.standard.operators.bash import BashOperator
from airflow_config import DAG, load_config

config = load_config(config_name="dev")

with DAG(
    dag_id="test-dag",
    description="test that dag is working properly",
    schedule=timedelta(minutes=1),
    config=config
):
    BashOperator(
        task_id="test-task",
        bash_command="echo 'test'",
    )

This has a number of benefits:

  • Make changes without code changes, with static type validation
  • Make changes across any number of DAGs without having to copy-paste
  • Organize collections of DAGs into groups, e.g. via environment like dev, prod, etc

Features

  • Configure DAGs from a central config file or...
  • from multiple env-specific config files (e.g. dev, uat, prod)
  • Specialize DAGs by dag_id from a single file (e.g. set each DAG's schedule from a single shared file)
  • Generate entire DAGs declaratively, like astronomer/dag-factory
  • Configure other extensions like:

Configuration

class Configuration(BaseModel):
    # default task args
    # https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator
    default_task_args: TaskArgs

    # default dag args
    # https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html#airflow.models.dag.DAG
    default_dag_args: DagArgs

    # string (dag id) to Dag mapping
    dags: Optional[Dict[str, Dag]]

    # string (dag id) to Task mapping
    tasks: Optional[Dict[str, Task]]

    # used for extensions to inject arbitrary configuration.
    # See e.g.: https://github.com/airflow-laminar/airflow-supervisor?tab=readme-ov-file#example-dag-airflow-config
    extensions: Optional[Dict[str, BaseModel]]

Here is an example configuration defined via yaml:

# @package _global_
_target_: airflow_config.Configuration
default_task_args:
  _target_: airflow_config.TaskArgs
  owner: blerg
  email: []
  email_on_failure: false
  email_on_retry: false
  retries: 0
  depends_on_past: false

default_dag_args:
  _target_: airflow_config.DagArgs
  start_date: ["2025-01-01", "America/New_York"]
  catchup: false
  max_active_runs: 1

dags:
  reboot:
    tags: ["reboot", "utility"]
    description: "Reboot machines"
    schedule: "0 0 * * *"
    max_active_tasks: 1
  clean-logs:
    tags: ["celery", "utility"]
    description: "Clean worker logs"
    schedule: "0 4 * * *"

Examples

License

This software is licensed under the Apache 2.0 license. See the LICENSE file for details.

[!NOTE] This library was generated using copier from the Base Python Project Template repository.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_config-1.11.1.tar.gz (189.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_config-1.11.1-py3-none-any.whl (184.0 kB view details)

Uploaded Python 3

File details

Details for the file airflow_config-1.11.1.tar.gz.

File metadata

  • Download URL: airflow_config-1.11.1.tar.gz
  • Upload date:
  • Size: 189.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for airflow_config-1.11.1.tar.gz
Algorithm Hash digest
SHA256 5b677187288ae9bd9995e2fe98ee0fab7f58cf08a88f6ea843754c220b08b671
MD5 e5bf389e6b10fb0412ec532fb9c21b56
BLAKE2b-256 8281bfce1f0bae261856df65125ed19d1dfa4f23055395e0025d3f6c4812d319

See more details on using hashes here.

File details

Details for the file airflow_config-1.11.1-py3-none-any.whl.

File metadata

  • Download URL: airflow_config-1.11.1-py3-none-any.whl
  • Upload date:
  • Size: 184.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for airflow_config-1.11.1-py3-none-any.whl
Algorithm Hash digest
SHA256 b0e86e85e961ec7fcbde3a2e25fb70babe5339c1231003ac66d50f61e44c8555
MD5 82c2dc78dd4cf4804a79c5fd446888bc
BLAKE2b-256 bc5c62b51bf4f22c6b34c82c52b0214ce85f20d66c6922ac998674547f66cec2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page