Skip to main content

An Apache Airflow provider for Ray

Project description

Airflow Ray

Apache Airflow Provider for Ray

A provider you can install into your Airflow environment to access custom Ray XCom backends, Ray Hooks, and Ray Operators.


🧪 Experimental Version

This provider is an experimental alpha containing necessary components to orchestrate and schedule Ray tasks using Airflow. It is actively maintained and being developed to bring production-ready workflows to Ray using Airflow. Thie release contains everything needed to begin building these workflows using the Airlfow taskflow API.

Current Release: 0.2.1

Requirements

Visit the Ray Project page for more info on Ray.

⚠️ The server version and client version (build) of Ray MUST be the same.

- Python Version >= 3.7
- Airflow Version >= 2.0.0
- Ray Version == 1.3.0
- Filelock >= 3.0.0

Modules

  • Ray XCom Backend: Custom XCom backend to assist operators in moving data between tasks using the Ray API with its internal Plasma store, thereby allowing for in-memory distributed processing and handling of large data objects.
  • Ray Hook: Extension of Http hook that uses the Ray client to provide connections to the Ray Server.
  • Ray Decorator: Task decorator to be used with the task flow API, combining wrapping the existing airflow @task decorate with ray.remote functionality, thereby executing each task on the ray cluster.

Configuration and Usage

  1. Add the provider package wheel file to the root directory of your Airflow project.

  2. In your Airflow Dockerfile, you will need to add an environment variable to specify your custom backend, along with the provider wheel install. Add the following:

    FROM quay.io/astronomer/ap-airflow:2.0.2-1-buster-onbuild
    USER root
    RUN pip uninstall astronomer-airflow-version-check -y
    USER astro
    ENV AIRFLOW__CORE__XCOM_BACKEND=ray_provider.xcom.ray_backend.RayBackend
    

    Check ap-airflow version, if unsure, change to ap-airflow:latest-onbuild

  3. We are using a Ray 1.3.0 and python version 3.7. To get a bleeding edge version of Ray, you can to follow this format to build the wheel url in your requirements.txt file:

    pip install airflow-provider-ray
    
  4. Configure Ray Locally. To run ray locally, you'll need a minimum 6GB of free memory.To start, in your environment with ray installed, run:

    (venv)$ ray start --num-cpus=8 --object-store-memory=7000000000 --head
    

    If you have extra resources, you can bump the memory up.

    You should now be able to open the ray dashboard at http://127.0.0.1:8265/.

  5. Start your Airflow environment and open the UI.

  6. In the Airflow UI, add an Airflow Pool with the following:

    Pool (name): ray_worker_pool
    Slots: 25
    
  7. In the Airflow UI, add an Airflow Connection with the following:

    Conn Id: ray_cluster_connection
    Conn Type: HTTP
    Host: Cluster IP Address, with basic Auth params if needed
    Port: 10001
    
  8. In your Airflow DAG python file, you must include the following in your default_args dictionary:

    from ray_provider.xcom.ray_backend import RayBackend
    .
    .
    .
    default_args = {
        'on_success_callback': RayBackend.on_success_callback,
        'on_failure_callback': RayBackend.on_failure_callback,
        .
        .
        .
    }
    @dag(
        default_args=default_args,
        .
        .
    )
    def ray_example_dag():
        # do stuff
    
  9. Using the taskflow API, your airflow task should now use the @ray_task decorator for any ray task and add the ray_conn_id, parameter as task_args, like:

    from ray_provider.decorators import ray_task
    
    default_args = {
        'on_success_callback': RayBackend.on_success_callback,
        'on_failure_callback': RayBackend.on_failure_callback,
        .
        .
        .
    }
    task_args = {"ray_conn_id": "ray_cluster_connection"}
    .
    .
    .
    @dag(
        default_args=default_args,
        .
        .
    )
    def ray_example_dag():
    
        @ray_task(**task_args)
        def sum_cols(df: pd.DataFrame) -> pd.DataFrame:
            return pd.DataFrame(df.sum()).T
    

Project Contributors and Maintainers

This project is built in collaboration between Astronomer and Anyscale, with active contributions from:

This project is formatted via black:

pip install black
black .

Connections

TBD - [Info on building a connection to Ray]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow-provider-ray-0.2.1.tar.gz (12.6 kB view details)

Uploaded Source

Built Distribution

airflow_provider_ray-0.2.1-py3-none-any.whl (22.3 kB view details)

Uploaded Python 3

File details

Details for the file airflow-provider-ray-0.2.1.tar.gz.

File metadata

  • Download URL: airflow-provider-ray-0.2.1.tar.gz
  • Upload date:
  • Size: 12.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/4.0.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.57.0 CPython/3.7.9

File hashes

Hashes for airflow-provider-ray-0.2.1.tar.gz
Algorithm Hash digest
SHA256 d0bdd611c6a2016364e9fb19ac8ac0067d6a2e27b0686cc7f92c0bc27e58f3e2
MD5 f70457c826fe2ff4762b48d84b96018a
BLAKE2b-256 95c028140c901c77d94878ec6d1cef04f716caf26a65cf7bffce01f9b279265b

See more details on using hashes here.

File details

Details for the file airflow_provider_ray-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: airflow_provider_ray-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 22.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/4.0.1 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.57.0 CPython/3.7.9

File hashes

Hashes for airflow_provider_ray-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 fefd0724e85553de19bb0a56e208f0179ac99ac11ee327bf5f2d80b210b4eea3
MD5 ad01b2aa9ac97abadebdf3b277b7eb7d
BLAKE2b-256 0bb5694769b9348234b71c46355b1a1ec98f099297f59b8ee6f3c2c207c3825f

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page