Dagster integration library for Ray
Project description
dagster-ray
Ray integration library for Dagster.
dagster-ray
allows running Ray computations in Dagster pipelines. It provides various Dagster abstractions, the most important being Resource
, and helper @op
s and @schedule
s, for multiple backends.
The following backends are implemented:
- local
KubeRay
(kubernetes)
dagster-ray
is tested across multiple version combinations of components such as ray
, dagster
, KubeRay Operator
, and Python
.
dagster-ray
integrates with Dagster+ out of the box.
Documentation can be found below.
[!NOTE] This project is in early development. Contributions are very welcome! See the Development section below.
Backends
dagster-ray
provides a RayResource
class, which does not implement any specific backend.
It defines the common interface for all Ray
resources.
It can be used for type annotations in your @op
and @asset
definitions.
Examples:
from dagster import asset
from dagster_ray import RayResource
import ray
@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
):
return ray.get(ray.put(42))
The other resources below are the actual backends that implement the RayResource
interface.
Local
These resources can be used for development and testing purposes.
They provide the same interface as the other *Ray
resources, but don't require any external infrastructure.
The public objects can be imported from dagster_ray.local
module.
Resources
LocalRay
A dummy resource which is useful for testing and development.
It doesn't do anything, but provides the same interface as the other *Ray
resources.
Examples:
Using the LocalRay
resource
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.local import LocalRay
import ray
@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))
definitions = Definitions(resources={"ray_cluster": LocalRay()}, assets=[my_asset])
Conditionally using the LocalRay
resource in development and KubeRayCluster
in production:
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.local import LocalRay
from dagster_ray.kuberay import KubeRayCluster
import ray
@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))
IN_K8s = ...
definitions = Definitions(
resources={"ray_cluster": KubeRayCluster() if IN_K8s else LocalRay()},
assets=[my_asset],
)
KubeRay
This backend requires a Kubernetes cluster with the KubeRay Operator
installed.
Integrates with Dagster+ by injecting environment variables such as DAGSTER_CLOUD_DEPLOYMENT_NAME
and tags such as dagster/user
into default configuration values and RayCluster
labels.
The public objects can be imported from dagster_ray.kuberay
module.
Resources
KubeRayCluster
KubeRayCluster
can be used for running Ray computations on Kubernetes.
When added as resource dependency to an @op/@asset
, the KubeRayCluster
:
- Starts a dedicated
RayCluster
for it - Connects to the cluster in client mode with
ray.init()
(unlessskip_init
is set toTrue
) - Tears down the cluster after the step is executed (unless
skip_cleanup
is set toTrue
)
RayCluster
comes with minimal default configuration, matching KubeRay
defaults.
Examples:
Basic usage (will create a single-node, non-scaling RayCluster
):
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.kuberay import KubeRayCluster
import ray
@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))
definitions = Definitions(
resources={"ray_cluster": KubeRayCluster()}, assets=[my_asset]
)
Larger cluster with auto-scaling enabled:
from dagster_ray.kuberay import KubeRayCluster, RayClusterConfig
ray_cluster = KubeRayCluster(
ray_cluster=RayClusterConfig(
enable_in_tree_autoscaling=True,
worker_group_specs=[
{
"groupName": "workers",
"replicas": 2,
"minReplicas": 1,
"maxReplicas": 10,
# ...
}
],
)
)
KubeRayAPI
This resource can be used to interact with the Kubernetes API Server.
Examples:
Listing currently running RayClusters
:
from dagster import op, Definitions
from dagster_ray.kuberay import KubeRayAPI
@op
def list_ray_clusters(
kube_ray_api: KubeRayAPI,
):
return kube_ray_api.kuberay.list_ray_clusters(k8s_namespace="kuberay")
Jobs
delete_kuberay_clusters
This job
can be used to delete RayClusters
from a given list of names.
cleanup_old_ray_clusters
This job
can be used to delete old RayClusters
which no longer correspond to any active Dagster Runs.
They may be left behind if the automatic cluster cleanup was disabled or failed.
Schedules
Cleanup schedules can be trivially created using the cleanup_old_ray_clusters
or delete_kuberay_clusters
jobs.
cleanup_old_ray_clusters
dagster-ray
provides an example daily cleanup schedule.
Executor
WIP
Development
poetry install --all-extras
poetry shell
pre-commit install
Testing
KubeRay
Required tools:
docker
kubectl
helm
minikube
Running pytest
will automatically:
- build an image with the local
dagster-ray
code - start a
minikube
Kubernetes cluster - load the built
dagster-ray
and loadedkuberay-operator
images into the cluster - install the
KubeRay Operator
in the cluster withhelm
- run the tests
Thus, no manual setup is required, just the presence of the tools listed above. This makes testing a breeze!
[!NOTE] Specifying a comma-separated list of
KubeRay Operator
versions in theKUBE_RAY_OPERATOR_VERSIONS
environment variable will spawn a new test for each version.
[!NOTE] it may take a while to download
minikube
andkuberay-operator
images and build the localdagster-ray
image during the first tests invocation
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for dagster_ray-0.0.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4e91cf1082c654175c9aa62d4d57717a718d5daa11205160a0a58e4c1a8e7a5e |
|
MD5 | 1487d92285f22f2ca6d437f2b364aaec |
|
BLAKE2b-256 | 0c7e99e4011d5c4a000b5426af3a52258c160c66670f278c1aa964ccaba8985d |