Skip to main content

Airflow Cluster Policy for BigQuery reservation management

Project description

Airflow Reservations Policy

Airflow Cluster Policy plugin for BigQuery reservation management.

This package integrates with Airflow's Cluster Policies to automatically inject BigQuery reservation assignments into your tasks without requiring any changes to your DAG code.

Features

  • Automatic re-assignment - intercepts BigQuery operators and re-assigns them to appropriate reservation based on the configuration:
    • BigQueryInsertJobOperator - Injects into configuration.reservation field
    • BigQueryExecuteQueryOperator - Injects into api_resource_configs.reservation field (supported in provider package 2.0.0 - 10.26.0)
  • Lookup-based Configuration - Uses dag_id.task_idreservation mappings
  • Python API - Provides get_reservation() for custom BigQuery API calls in PythonOperator
  • Performance Optimized - Config caching with file mtime-based invalidation
  • Graceful Error Handling - Won't crash Airflow on config errors

Installation

Add to your requirements.txt:

airflow-reservations=X.Y.Z

The policy is automatically registered via Airflow's plugin entrypoint system.

Configuration

Create a reservations_config.json file in your DAGs folder:

{
  "reservation_config": [
    {
      "tag": "standard",
      "reservation": "projects/{project}/locations/{location}/reservations/{name}",
      "tasks": [
        "finance_dag.daily_report",
        "etl_dag.load_analytics"
      ]
    },
    {
      "tag": "on_demand",
      "reservation": "none",
      "tasks": [
        "adhoc_dag.quick_query"
      ]
    },
    {
      "tag": "default",
      "reservation": null,
      "tasks": [
        "marketing_dag.calculate_roas"
      ]
    }
  ]
}

Configuration Fields

Field Description
tag Human-readable label for the reservation group
reservation See values below
tasks Array of "dag_id.task_id" patterns

Reservation values:

Value Behavior
"projects/.../locations/.../reservations/..." Injects that reservation into the SQL
"none" Explicitly use on-demand capacity
null Skips the task

Configuration Path

By default, the config file is loaded from $AIRFLOW_HOME/dags/reservations_config.json.

Override the path using the RESERVATIONS_CONFIG_PATH environment variable:

export RESERVATIONS_CONFIG_PATH=/custom/path/to/config.json

TaskGroups and Dynamic Tasks

For tasks inside TaskGroups, use the full task ID path as it appears in Airflow:

{
  "reservation_config": [
    {
      "tag": "standard",
      "reservation": "projects/{project}/locations/{location}/reservations/{name}",
      "tasks": [
        "my_dag.my_task_group.inner_task"
      ]
    }
  ]
}

How It Works

When Airflow parses your DAGs, this plugin's task_policy hook is called for each task. For BigQuery tasks, it:

  1. Extracts dag_id and task_id from the task
  2. Looks up dag_id.task_id in the configuration file
  3. If found, assigns the task to a reservation

Usage in Python Operators

For custom BigQuery API calls in PythonOperator, use the provided API:

from airflow_reservations import get_reservation

def my_bigquery_task(**context):
    dag_id = context['dag'].dag_id
    task_id = context['task'].task_id

    # Look up reservation for this task
    reservation = get_reservation(dag_id, task_id)

    if reservation:
        # Prepend to your SQL
        sql = f"SET @@reservation='{reservation}';\n{your_sql}"
    else:
        sql = your_sql

    # Execute with BigQuery client...

API Reference

get_reservation(dag_id: str, task_id: str) -> str | None

Look up the reservation ID for a specific task.

from airflow_reservations import get_reservation

reservation = get_reservation("my_dag", "my_task")
# Returns: "projects/my-project/locations/US/reservations/my-res" or None

load_config(force_reload: bool = False) -> dict

Load the full configuration dictionary.

from airflow_reservations import load_config

config = load_config()
# Returns: {"reservations": {...}}

Generating Configuration

Use Masthead recommendations to generate the reservations_config.json file containing the optimal reservation assignments for your tasks. Users are responsible for pulling this configuration into their Airflow environment.

Typical workflow:

  1. Masthead analyzes your BigQuery workloads
  2. Read Masthead recommendations and generate reservations_config.json with optimal assignments
  3. Merge the config into your DAGs repository
  4. Airflow syncs the updated config file
  5. The policy applies reservations on next task parse

Troubleshooting

Config not loading

Check that:

  1. The config file exists at the expected path
  2. The file contains valid JSON
  3. Airflow has read permissions

Enable debug logging:

import logging
logging.getLogger("airflow_reservations").setLevel(logging.DEBUG)

Reservations not being applied

Verify:

  1. The task type is BigQueryInsertJobOperator or BigQueryExecuteQueryOperator
  2. The dag_id.task_id key exactly matches the config
  3. For TaskGroups, include the full path (e.g., dag.group.task)

Development

# Install dev dependencies
pip install -e ".[dev]"

# Run unit tests
pytest tests/ -v

# Run with coverage
pytest tests/ --cov=airflow_reservations

# Run E2E tests (requires Docker)
make e2e

# Run E2E tests with Airflow 2.x
make e2e-airflow2

# Run E2E tests with Airflow 3.x
make e2e-airflow3

# Run E2E tests with all supported Airflow versions
make e2e-all

Supported Versions

This package is tested and compatible with:

  • Airflow 2.6+ (including 2.10.x) - ✅ Fully supported and tested
  • Airflow 3.x (including 3.1.x) - ✅ Fully supported and tested (see Airflow 3 notes below)
  • Python 3.8+ (tested with 3.11 and 3.12)

Airflow 3 Notes

In Airflow 3, the Task SDK parses DAGs in a separate process/container from the Scheduler and Worker. To ensure the policy works correctly:

  1. Installation: The airflow-reservations package must be installed in the environment where the Task SDK executes (typically your custom Airflow image).
  2. Config Accessibility: The reservations_config.json file must be accessible to the Task SDK process. If you are using remote DAG storage, ensure the config file is bundled with your DAGs or placed in a shared volume.
  3. Environment Variables: If you use RESERVATIONS_CONFIG_PATH, it must be set in the environment of the worker/execution container as well.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_reservations-0.2.0.tar.gz (34.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_reservations-0.2.0-py3-none-any.whl (33.2 kB view details)

Uploaded Python 3

File details

Details for the file airflow_reservations-0.2.0.tar.gz.

File metadata

  • Download URL: airflow_reservations-0.2.0.tar.gz
  • Upload date:
  • Size: 34.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for airflow_reservations-0.2.0.tar.gz
Algorithm Hash digest
SHA256 f0ee42fba9d85b59dd50d36e2988cf195175d02b3b4d1f4cb0efbf6022952990
MD5 7ca930d838a7fd3ea00d11510735c174
BLAKE2b-256 24e0baae99d83fc60548b5ab4c25b8dc0edcee6baa925713c114be4060350cd1

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_reservations-0.2.0.tar.gz:

Publisher: ci.yml on masthead-data/airflow-reservations

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file airflow_reservations-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_reservations-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 92934bba49b9f12040d2df86d4935bd1d9103f916eee1f18b5ad13ac1b203525
MD5 868be5629014372c16829de62effe72f
BLAKE2b-256 ec9f6c0b528ea071cd1230747cef1f0c913cfac024018c12f1ddbeacb607cc6f

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_reservations-0.2.0-py3-none-any.whl:

Publisher: ci.yml on masthead-data/airflow-reservations

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page