Skip to main content

An Apache Airflow provider package for CarbonAware.

Project description

Carbon Aware Provider for Apache Airflow

The Carbon Aware Provider for Apache Airflow allows you to optimize your workflows by scheduling tasks to run at times with lower carbon intensity. It introduces a CarbonAwareOperator that can shift the execution of downstream tasks to an optimal window based on forecasted carbon emissions data.

Purpose

This package provides a time-shifting operator for Apache Airflow. Its main goal is to enable users to easily integrate carbon awareness into their data pipelines, reducing the environmental impact of their computations by running them when the energy grid is cleaner.

Prerequisites

  • Apache Airflow >= 2.4
  • Python >= 3.8

Installation

You can install the Carbon Aware Provider using pip:

pip install airflow-provider-carbonaware

This will also install the necessary dependencies, including apache-airflow (if not already present) and carbonaware-scheduler-client.

Usage

To use the CarbonAwareOperator, you need to import it into your DAG file and then instantiate it as a task. Tasks downstream of the CarbonAwareOperator will be deferred. The CarbonAwareOperator itself will complete once it has identified the optimal time, and then it will defer. The Airflow scheduler will then resume the downstream tasks at that optimal time.

Example DAG

Here's a basic example of how to incorporate the CarbonAwareOperator into your Airflow DAG:

from pendulum import datetime as pendulum_datetime
from airflow.decorators import dag
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

from carbonaware_provider.operators.carbonaware import CarbonAwareOperator

@dag(
    start_date=pendulum_datetime(2023, 1, 1),
    schedule=None,
    default_args={"retries": 2},
    tags=["example", "carbon-aware"],
)
def my_carbon_aware_dag():
    """
    A DAG demonstrating the CarbonAwareOperator.
    """
    
    setup_task = BashOperator(
        task_id="setup_task",
        bash_command="echo 'Performing initial setup...'"
    )
    
    # This operator will find the best time within the next 2 hours
    # for a task that is expected to run for 30 minutes in the 'aws:us-east-1' zone.
    find_optimal_time = CarbonAwareOperator(
        task_id="find_optimal_carbon_time",
        execution_window_minutes=120,  # Look for optimal time in the next 120 minutes
        task_duration_minutes=30,      # The estimated duration of the carbon-sensitive workload
        zone={"provider": "aws", "region": "us-east-1"} # Specify your cloud provider and region
        # Alternatively, you can use:
        # location="eastus" # For Azure (example, consult client docs for exact supported values)
        # location="gcp-europe-west1" # For GCP (example, consult client docs for exact supported values)
    )
    
    def my_data_processing_task():
        print("Running data processing task at the optimal carbon intensity time.")
        # Your data processing logic here

    process_data = PythonOperator(
        task_id="process_data_at_optimal_time",
        python_callable=my_data_processing_task,
    )

    cleanup_task = BashOperator(
        task_id="cleanup_task_after_optimal_run",
        bash_command="echo 'Cleaning up after carbon-aware execution.'"
    )

    # Define dependencies
    # setup_task runs first.
    # find_optimal_time runs next, deferring until the best carbon intensity window.
    # process_data and cleanup_task run only after find_optimal_time completes at the optimal time.
    setup_task >> find_optimal_time >> process_data >> cleanup_task

my_carbon_aware_dag_instance = my_carbon_aware_dag()

Operator Parameters

The CarbonAwareOperator accepts the following key parameters:

  • task_id (str): A unique, descriptive id for the task.
  • execution_window_minutes (int): The time window (in minutes) from the current time within which to find the optimal execution slot.
  • task_duration_minutes (int): The estimated duration (in minutes) of the tasks that will run at the optimal time.
  • zone (dict, optional): Specifies the cloud provider and region (e.g., {"provider": "aws", "region": "us-east-1"}) If not specified, the operator will attempt to introspect the cloud provider and region from instance metadata.
  • deferrable (bool, optional): Defaults to True. Set to False to make the operator blocking (not recommended for its intended use).

The operator leverages the carbonaware-scheduler-client to fetch carbon intensity data and determine the optimal time to run, according to carbon forecasting.

Important Notes

macOS Proxy Issues

If you are running Airflow on macOS, you might encounter segmentation faults related to system proxy lookups. This is a known issue with Python's urllib (and libraries that use it, such as httpx which is used by carbonaware-scheduler-client) on macOS, especially within complex execution environments like Airflow. To mitigate this, it is recommended to set the following environment variable in your Airflow environment:

export no_proxy='*'

This bypasses the system proxy lookup that can cause the crash.

Project Links


This README provides a starting point. You can expand it with more details on configuration, advanced usage patterns, contribution guidelines, and more as the project evolves.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_provider_carbonaware-0.1.0.tar.gz (10.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_provider_carbonaware-0.1.0-py3-none-any.whl (13.3 kB view details)

Uploaded Python 3

File details

Details for the file airflow_provider_carbonaware-0.1.0.tar.gz.

File metadata

File hashes

Hashes for airflow_provider_carbonaware-0.1.0.tar.gz
Algorithm Hash digest
SHA256 d7ce5df925ca703815d4230f8152f4fbb217601a539830be136d47eb1e773fd9
MD5 436e6fca5c36ff14e7b7e401b0f09ada
BLAKE2b-256 7e4c318f95bd5f0715306264874688a76cccdd44393d97ec7609f7da7f7a9f4d

See more details on using hashes here.

File details

Details for the file airflow_provider_carbonaware-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_provider_carbonaware-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e36da9899d1604efe2f147248a69cf8aacb2a39073c7fb0654931b773559360f
MD5 d38068472e10c08e683e55f14acb9ca7
BLAKE2b-256 af6f1d1de02d146db8d9e8d9004a958154dad7bd784e4830a754eb02c090ff47

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page