Airflow Cluster Policy for BigQuery reservation management
Project description
Airflow Reservations Policy
Airflow Cluster Policy plugin for BigQuery reservation management.
This package integrates with Airflow's Cluster Policies to automatically inject BigQuery reservation assignments into your tasks without requiring any changes to your DAG code.
Features
- Automatic re-assignment - intercepts BigQuery operators and re-assigns them to appropriate reservation based on the configuration:
BigQueryInsertJobOperator- Injects intoconfiguration.query.querydict- Any BigQuery operator with a
sqlattribute (e.g.BigQueryExecuteQueryOperator,BigQueryCheckOperator)
- Lookup-based Configuration - Uses
dag_id.task_id→reservation_idmappings - Python API - Provides
get_reservation()for custom BigQuery API calls inPythonOperator - Performance Optimized - Config caching with file mtime-based invalidation
- Graceful Error Handling - Won't crash Airflow on config errors
Installation
pip install airflow-reservations
Or add to your requirements.txt:
airflow-reservations=0.1.0
The policy is automatically registered via Airflow's plugin entrypoint system (requires Airflow 2.6+).
Configuration
Create a reservations_config.json file in your DAGs folder:
{
"reservation_config": [
{
"tag": "standard",
"reservation": "projects/{project}/locations/{location}/reservations/{name}",
"tasks": [
"finance_dag.daily_report",
"etl_dag.load_analytics"
]
},
{
"tag": "on_demand",
"reservation": "none",
"tasks": [
"adhoc_dag.quick_query"
]
},
{
"tag": "default",
"reservation": null,
"tasks": [
"marketing_dag.calculate_roas"
]
}
]
}
Configuration Fields
| Field | Description |
|---|---|
tag |
Human-readable label for the reservation group |
reservation |
See values below |
tasks |
Array of "dag_id.task_id" patterns |
Reservation values:
| Value | Behavior |
|---|---|
"projects/.../locations/.../reservations/..." |
Injects that reservation into the SQL |
"none" |
Injects SET @@reservation='none'; (explicitly use on-demand capacity) |
null |
Skips the task entirely (no SQL modification) |
Configuration Path
By default, the config file is loaded from $AIRFLOW_HOME/dags/reservations_config.json.
Override the path using the RESERVATIONS_CONFIG_PATH environment variable:
export RESERVATIONS_CONFIG_PATH=/custom/path/to/config.json
TaskGroups and Dynamic Tasks
For tasks inside TaskGroups, use the full task ID path as it appears in Airflow:
{
"reservation_config": [
{
"tag": "standard",
"reservation": "projects/{project}/locations/{location}/reservations/{name}",
"tasks": [
"my_dag.my_task_group.inner_task"
]
}
]
}
How It Works
When Airflow parses your DAGs, this plugin's task_policy hook is called for each task. For BigQuery tasks, it:
- Extracts
dag_idandtask_idfrom the task - Looks up
dag_id.task_idin the configuration file - If found, prepends
SET @@reservation='...';to the SQL query
flowchart LR
A[DAG Parsing] -->|Triggers| B[task_policy hook]
B -->|Looks up| C[reservations_config.json]
C -->|Returns reservation| B
B -->|Re-assigns a query to a reservation| D[BigQuery Operator]
Usage in Python Operators
For custom BigQuery API calls in PythonOperator, use the provided API:
from airflow_reservations import get_reservation
def my_bigquery_task(**context):
dag_id = context['dag'].dag_id
task_id = context['task'].task_id
# Look up reservation for this task
reservation = get_reservation(dag_id, task_id)
if reservation:
# Prepend to your SQL
sql = f"SET @@reservation='{reservation}';\n{your_sql}"
else:
sql = your_sql
# Execute with BigQuery client...
API Reference
get_reservation(dag_id: str, task_id: str) -> str | None
Look up the reservation ID for a specific task.
from airflow_reservations import get_reservation
reservation = get_reservation("my_dag", "my_task")
# Returns: "projects/my-project/locations/US/reservations/my-res" or None
load_config(force_reload: bool = False) -> dict
Load the full configuration dictionary.
from airflow_reservations import load_config
config = load_config()
# Returns: {"reservations": {...}}
Generating Configuration
Use Masthead recommendations to generate the reservations_config.json file containing the optimal reservation assignments for your tasks. Users are responsible for pulling this configuration into their Airflow environment.
Typical workflow:
- Masthead analyzes your BigQuery workloads
- Read Masthead recommendations and generate
reservations_config.jsonwith optimal assignments - Merge the config into your DAGs repository
- Airflow syncs the updated config file
- The policy applies reservations on next task parse
Troubleshooting
Config not loading
Check that:
- The config file exists at the expected path
- The file contains valid JSON
- Airflow has read permissions
Enable debug logging:
import logging
logging.getLogger("airflow_reservations").setLevel(logging.DEBUG)
Reservations not being applied
Verify:
- The task type is
BigQueryInsertJobOperatororBigQueryExecuteQueryOperator - The
dag_id.task_idkey exactly matches the config - For TaskGroups, include the full path (e.g.,
dag.group.task)
Development
# Install dev dependencies
pip install -e ".[dev]"
# Run unit tests
pytest tests/ -v
# Run with coverage
pytest tests/ --cov=airflow_reservations
# Run E2E tests (requires Docker)
make e2e
# Run E2E tests with Airflow 2.x
make e2e-airflow2
# Run E2E tests with Airflow 3.x
make e2e-airflow3
# Run E2E tests with all supported Airflow versions
make e2e-all
Supported Versions
This package is tested and compatible with:
- Airflow 2.6+ (including 2.10.x) - ✅ Fully supported and tested
- Airflow 3.x (including 3.1.x) - ✅ Fully supported and tested (see Airflow 3 notes below)
- Python 3.8+ (tested with 3.11 and 3.12)
Airflow 3 Notes
In Airflow 3, the Task SDK parses DAGs in a separate process/container from the Scheduler and Worker. To ensure the policy works correctly:
- Installation: The
airflow-reservationspackage must be installed in the environment where the Task SDK executes (typically your custom Airflow image). - Config Accessibility: The
reservations_config.jsonfile must be accessible to the Task SDK process. If you are using remote DAG storage, ensure the config file is bundled with your DAGs or placed in a shared volume. - Environment Variables: If you use
RESERVATIONS_CONFIG_PATH, it must be set in the environment of the worker/execution container as well.
The plugin uses Airflow's standard Cluster Policies API, which remains the recommended way to implement cross-cutting concerns in Airflow 3.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file airflow_reservations-0.1.0.tar.gz.
File metadata
- Download URL: airflow_reservations-0.1.0.tar.gz
- Upload date:
- Size: 25.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
aba16be528dce982224d60626a13cad7b7190cd41e8099d76b591450827e4f7b
|
|
| MD5 |
80b186d70718a8ffb8da2a7db68462ee
|
|
| BLAKE2b-256 |
13b3f3d5d7fde9a3f768331a38309b4b7df0caf378331aaa484e3e4102b305ec
|
Provenance
The following attestation bundles were made for airflow_reservations-0.1.0.tar.gz:
Publisher:
ci.yml on masthead-data/airflow-reservations
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
airflow_reservations-0.1.0.tar.gz -
Subject digest:
aba16be528dce982224d60626a13cad7b7190cd41e8099d76b591450827e4f7b - Sigstore transparency entry: 844770317
- Sigstore integration time:
-
Permalink:
masthead-data/airflow-reservations@1ea10b0569af4de26f1d14776f84dd29266c095d -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/masthead-data
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
ci.yml@1ea10b0569af4de26f1d14776f84dd29266c095d -
Trigger Event:
push
-
Statement type:
File details
Details for the file airflow_reservations-0.1.0-py3-none-any.whl.
File metadata
- Download URL: airflow_reservations-0.1.0-py3-none-any.whl
- Upload date:
- Size: 21.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3b6be85a0f93c0750c8109a65439a6a8d2c5937aff5f211b5c8b8906eb385e2e
|
|
| MD5 |
9ee713d5a97b1c436fb838833cebdd17
|
|
| BLAKE2b-256 |
f8ad370a36cc0a5a2961c031d27e063a89c65c76f970ce29139e1b9066666991
|
Provenance
The following attestation bundles were made for airflow_reservations-0.1.0-py3-none-any.whl:
Publisher:
ci.yml on masthead-data/airflow-reservations
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
airflow_reservations-0.1.0-py3-none-any.whl -
Subject digest:
3b6be85a0f93c0750c8109a65439a6a8d2c5937aff5f211b5c8b8906eb385e2e - Sigstore transparency entry: 844770319
- Sigstore integration time:
-
Permalink:
masthead-data/airflow-reservations@1ea10b0569af4de26f1d14776f84dd29266c095d -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/masthead-data
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
ci.yml@1ea10b0569af4de26f1d14776f84dd29266c095d -
Trigger Event:
push
-
Statement type: