Skip to main content

Airflow Cluster Policy for BigQuery reservation management

Project description

Airflow Reservations Policy

PyPI

Airflow Cluster Policy plugin for BigQuery reservation management.

This package integrates with Airflow's Cluster Policies to automatically inject BigQuery reservation assignments into your tasks without requiring any changes to your DAG code.

Features

  • Automatic re-assignment - intercepts BigQuery operators and re-assigns them to appropriate reservation based on the configuration:
    • BigQueryInsertJobOperator - Injects into configuration.reservation field
    • BigQueryExecuteQueryOperator - Injects into api_resource_configs.reservation field (supported in provider package 2.0.0 - 10.26.0)
  • Lookup-based Configuration - Uses dag_id.task_idreservation mappings
  • Python API - Provides get_reservation() for custom BigQuery API calls in PythonOperator
  • Performance Optimized - Config caching with file mtime-based invalidation
  • Graceful Error Handling - Won't crash Airflow on config errors

Installation

Add to your requirements.txt:

airflow-reservations=X.Y.Z

Configuration

Generating Configuration

Use Masthead recommendations to generate the reservations_config.json file containing the optimal reservation assignments for your tasks. Users are responsible for pulling this configuration into their Airflow environment.

Typical workflow:

  1. Masthead analyzes your BigQuery workloads
  2. Read Masthead recommendations and generate reservations_config.json with optimal assignments
  3. Merge the config into your DAGs repository
  4. Airflow syncs the updated config file
  5. The policy applies reservations on next task parse

Configuration Fields

Field Description
tag Human-readable label for the reservation group
reservation See values below
tasks Array of "dag_id.task_id" patterns

Reservation values:

Value Behavior
"projects/.../locations/.../reservations/..." Injects that reservation into the SQL
"none" Explicitly use on-demand capacity
null Skips the task

Applying Configuration

Create a reservations_config.json file in your DAGs folder:

{
  "reservation_config": [
    {
      "tag": "standard",
      "reservation": "projects/{project}/locations/{location}/reservations/{name}",
      "tasks": [
        "finance_dag.daily_report",
        "etl_dag.load_analytics",
        "etl_dag.etl_group-transform_data"
      ]
    },
    {
      "tag": "on_demand",
      "reservation": "none",
      "tasks": [
        "adhoc_dag.quick_query"
      ]
    },
    {
      "tag": "default",
      "reservation": null,
      "tasks": [
        "marketing_dag.calculate_roas"
      ]
    }
  ]
}

By default, the config file is loaded from $AIRFLOW_HOME/dags/reservations_config.json.

Override the path using the RESERVATIONS_CONFIG_PATH environment variable:

export RESERVATIONS_CONFIG_PATH=/custom/path/to/config.json

How It Works

The policy is automatically registered via Airflow's plugin entrypoint system. When Airflow parses your DAGs, this plugin's task_policy hook is called for each task. For BigQuery tasks, it:

  1. Extracts dag_id and task_id from the task
  2. Looks up dag_id.task_id in the configuration file
  3. If found, assigns the task to a reservation

Prerequisite: BigQuery Job Labels

Masthead recommendations depend on BigQuery job labels that identify the Airflow task. Ensure every BigQuery job has:

  • airflow-dag
  • airflow-task

Airflow BigQuery operators set these labels by default (Airflow v2.9+), but custom job configuration can override defaults and drop them. When customizing operator configs, merge labels and keep both keys.

For jobs submitted in PythonOperator (or any manual BigQuery client usage), set labels explicitly (see Usage in Python Operators below).

Note: This package applies reservation assignment based on Airflow task identity, but it does not enforce BigQuery labels. Keep label handling in your DAG/operator/client code.

Usage in Python Operators

For custom BigQuery API calls in PythonOperator, use the provided get_reservation function, e.g.:

from airflow_reservations import get_reservation

def my_bigquery_task(**context):
    ti = context['task_instance']

    existing_labels = {"team": "finance"}
    labels = {
      **existing_labels,
      "airflow-dag": ti.dag_id.lower(),
      "airflow-task": ti.task_id.lower().replace(".", "-"), # dots are not allowed in labels
    }

    # Look up reservation for this task
    reservation = get_reservation(ti.dag_id, ti.task_id)

    # When executing, ensure labels and the reservation are included in the job config
    job_config = bigquery.QueryJobConfig(
      labels=labels,
      reservation=reservation if reservation else None
    )

    # Execute with BigQuery client...
    client.query(sql, job_config=job_config)

Troubleshooting

Config not loading

Check that:

  1. The config file exists at the expected path
  2. The file contains valid JSON
  3. Airflow has read permissions

Enable debug logging:

import logging
logging.getLogger("airflow_reservations").setLevel(logging.DEBUG)

Reservations not being applied

Verify:

  1. The task type is BigQueryInsertJobOperator or BigQueryExecuteQueryOperator
  2. BigQuery jobs include airflow-dag and airflow-task labels (see Prerequisite: BigQuery Job Labels above)
  3. The dag_id.task_id key exactly matches the config
  4. For TaskGroups, the full path is included (e.g., dag.group.task), and in the config JSON the second dot is replaced with a dash (e.g., dag.group-task).

Supported Versions

This package is tested and compatible with:

  • Airflow 2.6+ (including 2.10.x) - ✅ Fully supported and tested
  • Airflow 3.x (including 3.1.x) - ✅ Fully supported and tested (see Airflow 3 notes below)
  • Python 3.8+ (tested with 3.11 and 3.12)

Airflow 3 Notes

In Airflow 3, the Task SDK parses DAGs in a separate process/container from the Scheduler and Worker. To ensure the policy works correctly:

  1. Installation: The airflow-reservations package must be installed in the environment where the Task SDK executes (typically your custom Airflow image).
  2. Config Accessibility: The reservations_config.json file must be accessible to the Task SDK process. If you are using remote DAG storage, ensure the config file is bundled with your DAGs or placed in a shared volume.
  3. Environment Variables: If you use RESERVATIONS_CONFIG_PATH, it must be set in the environment of the worker/execution container as well.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_reservations-0.2.2.tar.gz (35.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_reservations-0.2.2-py3-none-any.whl (33.6 kB view details)

Uploaded Python 3

File details

Details for the file airflow_reservations-0.2.2.tar.gz.

File metadata

  • Download URL: airflow_reservations-0.2.2.tar.gz
  • Upload date:
  • Size: 35.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.13

File hashes

Hashes for airflow_reservations-0.2.2.tar.gz
Algorithm Hash digest
SHA256 18b67ca891e42eb5f28c3ea7e59de5b5b12ef3aeb2fb8e30b533a54b63ac2f1d
MD5 757b83320457ae3a8991158da560b969
BLAKE2b-256 487440ca6a45be955154fd9636c458b2472ad793d1bbee1e2c8426b9521d55f2

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_reservations-0.2.2.tar.gz:

Publisher: ci.yml on masthead-data/airflow-reservations

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file airflow_reservations-0.2.2-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_reservations-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 9818b7d2bb3ba7299513ce9ff206b516c30feaeb9b95e5b18209b06b7466c522
MD5 88d10588fa306e5f5f60c3349c03a494
BLAKE2b-256 ca728b596bfc2a26d31a8140b6acc43359f58d8b029e598ff86c9c0566fd5f6e

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_reservations-0.2.2-py3-none-any.whl:

Publisher: ci.yml on masthead-data/airflow-reservations

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page