Airflow Cluster Policy for BigQuery reservation management
Project description
Airflow Reservations Policy
Airflow Cluster Policy plugin for BigQuery reservation management.
This package integrates with Airflow's Cluster Policies to automatically inject BigQuery reservation assignments into your tasks without requiring any changes to your DAG code.
Features
- Automatic re-assignment - intercepts BigQuery operators and re-assigns them to appropriate reservation based on the configuration:
BigQueryInsertJobOperator- Injects intoconfiguration.reservationfieldBigQueryExecuteQueryOperator- Injects intoapi_resource_configs.reservationfield (supported in provider package 2.0.0 - 10.26.0)
- Lookup-based Configuration - Uses
dag_id.task_id→reservationmappings - Python API - Provides
get_reservation()for custom BigQuery API calls inPythonOperator - Performance Optimized - Config caching with file mtime-based invalidation
- Graceful Error Handling - Won't crash Airflow on config errors
Installation
Add to your requirements.txt:
airflow-reservations=X.Y.Z
Configuration
Generating Configuration
Use Masthead recommendations to generate the reservations_config.json file containing the optimal reservation assignments for your tasks. Users are responsible for pulling this configuration into their Airflow environment.
Typical workflow:
- Masthead analyzes your BigQuery workloads
- Read Masthead recommendations and generate
reservations_config.jsonwith optimal assignments - Merge the config into your DAGs repository
- Airflow syncs the updated config file
- The policy applies reservations on next task parse
Configuration Fields
| Field | Description |
|---|---|
tag |
Human-readable label for the reservation group |
reservation |
See values below |
tasks |
Array of "dag_id.task_id" patterns |
Reservation values:
| Value | Behavior |
|---|---|
"projects/.../locations/.../reservations/..." |
Injects that reservation into the SQL |
"none" |
Explicitly use on-demand capacity |
null |
Skips the task |
Applying Configuration
Create a reservations_config.json file in your DAGs folder:
{
"reservation_config": [
{
"tag": "standard",
"reservation": "projects/{project}/locations/{location}/reservations/{name}",
"tasks": [
"finance_dag.daily_report",
"etl_dag.load_analytics",
"etl_dag.etl_group-transform_data"
]
},
{
"tag": "on_demand",
"reservation": "none",
"tasks": [
"adhoc_dag.quick_query"
]
},
{
"tag": "default",
"reservation": null,
"tasks": [
"marketing_dag.calculate_roas"
]
}
]
}
By default, the config file is loaded from $AIRFLOW_HOME/dags/reservations_config.json.
Override the path using the RESERVATIONS_CONFIG_PATH environment variable:
export RESERVATIONS_CONFIG_PATH=/custom/path/to/config.json
How It Works
The policy is automatically registered via Airflow's plugin entrypoint system. When Airflow parses your DAGs, this plugin's task_policy hook is called for each task. For BigQuery tasks, it:
- Extracts
dag_idandtask_idfrom the task - Looks up
dag_id.task_idin the configuration file - If found, assigns the task to a reservation
Prerequisite: BigQuery Job Labels
Masthead recommendations depend on BigQuery job labels that identify the Airflow task. Ensure every BigQuery job has:
airflow-dagairflow-task
Airflow BigQuery operators set these labels by default (Airflow v2.9+), but custom job configuration can override defaults and drop them. When customizing operator configs, merge labels and keep both keys.
For jobs submitted in PythonOperator (or any manual BigQuery client usage), set labels explicitly (see Usage in Python Operators below).
Note: This package applies reservation assignment based on Airflow task identity, but it does not enforce BigQuery labels. Keep label handling in your DAG/operator/client code.
Usage in Python Operators
For custom BigQuery API calls in PythonOperator, use the provided get_reservation function, e.g.:
from airflow_reservations import get_reservation
def my_bigquery_task(**context):
ti = context['task_instance']
existing_labels = {"team": "finance"}
labels = {
**existing_labels,
"airflow-dag": ti.dag_id.lower(),
"airflow-task": ti.task_id.lower().replace(".", "-"), # dots are not allowed in labels
}
# Look up reservation for this task
reservation = get_reservation(ti.dag_id, ti.task_id)
# When executing, ensure labels and the reservation are included in the job config
job_config = bigquery.QueryJobConfig(
labels=labels,
reservation=reservation if reservation else None
)
# Execute with BigQuery client...
client.query(sql, job_config=job_config)
Troubleshooting
Config not loading
Check that:
- The config file exists at the expected path
- The file contains valid JSON
- Airflow has read permissions
Enable debug logging:
import logging
logging.getLogger("airflow_reservations").setLevel(logging.DEBUG)
Reservations not being applied
Verify:
- The task type is
BigQueryInsertJobOperatororBigQueryExecuteQueryOperator - BigQuery jobs include
airflow-dagandairflow-tasklabels (seePrerequisite: BigQuery Job Labelsabove) - The
dag_id.task_idkey exactly matches the config - For TaskGroups, the full path is included (e.g.,
dag.group.task), and in the config JSON the second dot is replaced with a dash (e.g.,dag.group-task).
Supported Versions
This package is tested and compatible with:
- Airflow 2.6+ (including 2.10.x) - ✅ Fully supported and tested
- Airflow 3.x (including 3.1.x) - ✅ Fully supported and tested (see Airflow 3 notes below)
- Python 3.8+ (tested with 3.11 and 3.12)
Airflow 3 Notes
In Airflow 3, the Task SDK parses DAGs in a separate process/container from the Scheduler and Worker. To ensure the policy works correctly:
- Installation: The
airflow-reservationspackage must be installed in the environment where the Task SDK executes (typically your custom Airflow image). - Config Accessibility: The
reservations_config.jsonfile must be accessible to the Task SDK process. If you are using remote DAG storage, ensure the config file is bundled with your DAGs or placed in a shared volume. - Environment Variables: If you use
RESERVATIONS_CONFIG_PATH, it must be set in the environment of the worker/execution container as well.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file airflow_reservations-0.2.2.tar.gz.
File metadata
- Download URL: airflow_reservations-0.2.2.tar.gz
- Upload date:
- Size: 35.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
18b67ca891e42eb5f28c3ea7e59de5b5b12ef3aeb2fb8e30b533a54b63ac2f1d
|
|
| MD5 |
757b83320457ae3a8991158da560b969
|
|
| BLAKE2b-256 |
487440ca6a45be955154fd9636c458b2472ad793d1bbee1e2c8426b9521d55f2
|
Provenance
The following attestation bundles were made for airflow_reservations-0.2.2.tar.gz:
Publisher:
ci.yml on masthead-data/airflow-reservations
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
airflow_reservations-0.2.2.tar.gz -
Subject digest:
18b67ca891e42eb5f28c3ea7e59de5b5b12ef3aeb2fb8e30b533a54b63ac2f1d - Sigstore transparency entry: 1285630391
- Sigstore integration time:
-
Permalink:
masthead-data/airflow-reservations@3c875eacb74ff80cdc858cd2a75c76a8126b4f97 -
Branch / Tag:
refs/tags/v0.2.2 - Owner: https://github.com/masthead-data
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
ci.yml@3c875eacb74ff80cdc858cd2a75c76a8126b4f97 -
Trigger Event:
push
-
Statement type:
File details
Details for the file airflow_reservations-0.2.2-py3-none-any.whl.
File metadata
- Download URL: airflow_reservations-0.2.2-py3-none-any.whl
- Upload date:
- Size: 33.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9818b7d2bb3ba7299513ce9ff206b516c30feaeb9b95e5b18209b06b7466c522
|
|
| MD5 |
88d10588fa306e5f5f60c3349c03a494
|
|
| BLAKE2b-256 |
ca728b596bfc2a26d31a8140b6acc43359f58d8b029e598ff86c9c0566fd5f6e
|
Provenance
The following attestation bundles were made for airflow_reservations-0.2.2-py3-none-any.whl:
Publisher:
ci.yml on masthead-data/airflow-reservations
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
airflow_reservations-0.2.2-py3-none-any.whl -
Subject digest:
9818b7d2bb3ba7299513ce9ff206b516c30feaeb9b95e5b18209b06b7466c522 - Sigstore transparency entry: 1285630433
- Sigstore integration time:
-
Permalink:
masthead-data/airflow-reservations@3c875eacb74ff80cdc858cd2a75c76a8126b4f97 -
Branch / Tag:
refs/tags/v0.2.2 - Owner: https://github.com/masthead-data
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
ci.yml@3c875eacb74ff80cdc858cd2a75c76a8126b4f97 -
Trigger Event:
push
-
Statement type: