Dagster integration library for Ray
Project description
dagster-ray
dagster-ray
allows creating Ray clusters and running distributed computations from Dagster code. It includes:
-
RayRunLauncher
- aRunLauncher
which submits Dagster runs as isolated Ray jobs (in cluster mode) to a Ray cluster. -
ray_executor
- anExecutor
which submits individual Dagster steps as isolated Ray jobs (in cluster mode) to a Ray cluster. -
RayIOManager
- anIOManager
which allows storing and retrieving intermediate values in Ray's object store. Ideal in conjunction withRayRunLauncher
andray_executor
. -
PipesKubeRayJobClient
, a Dagster Pipes client for launching and monitoring KubeRay'sRayJob
CR in Kubernetes. Typically used with external Pythons scripts. Allows receiving rich logs, events and metadata from the job. -
RayResource
, a resource representing a Ray cluster. Interactions with Ray are performed in client mode (requires stable persistent connection), so it's most suitable for relatively short-lived jobs. It has implementations forKubeRay
and local (mostly for testing) backends.dagster_ray.RayResource
defines the common interface shared by all backends and can be used for backend-agnostic type annotations. -
Miscellaneous utilities like
@op
,@job
and@schedule
for managingKubeRay
clusters
dagster-ray
is tested across multiple versions of Python, Ray, Dagster, and KubeRay Operator. It integrates with Dagster+ where possible.
Documentation can be found below.
[!NOTE] This project is in early development. APIs are unstable and can change at any time. Contributions are very welcome! See the Development section below.
Feature Matrix
There are different options available for running Dagster code on Ray. The following table summarizes the features of each option:
Feature | RayRunLauncher |
ray_executor |
PipesKubeRayJobClient |
KubeRayCluster |
---|---|---|---|---|
Creates Ray cluster | ❌ | ❌ | ✅ | ✅ |
Cluster mode | ✅ | ✅ | ✅ | ❌ |
For long-running jobs | ✅ | ✅ | ✅ | ❌ |
Enabled per-asset | ❌ | ❌ | ✅ | ✅ |
Configurable per-asset | ❌ | ✅ | ✅ | ✅ |
Runs asset/op body on Ray | ✅ | ✅ | ❌ | ❌ |
Requires configuring Dagster in the Ray cluster | ❌ | ❌ | ✅ | ✅ |
Examples
See the examples directory.
Installation
pip install dagster-ray
To install with extra dependencies for a particular backend (like kuberay
), run:
pip install 'dagster-ray[kuberay]'
RunLauncher
[!WARNING] The
RayRunLauncher
is a work in progress
pip install dagster-ray[run_launcher]
The RayRunLauncher
can be configured via dagster.yaml
:
run_launcher:
module: dagster_ray
class: RayRunLauncher
config:
ray:
num_cpus: 1
num_gpus: 0
Individual Runs can override Ray configuration:
from dagster import job
@job(
tags={
"dagster-ray/config": {
"num_cpus": 16,
"num_gpus": 1,
}
}
)
def my_job():
return my_op()
Executor
[!WARNING] The
ray_executor
is a work in progress
pip install dagster-ray[executor]
The ray_executor
can be used to execute Dagster steps on an existing remote Ray cluster.
The executor submits steps as Ray jobs. They are started directly in the Ray cluster. Example:
from dagster import job, op
from dagster_ray import ray_executor
@op(
tags={
"dagster-ray/config": {
"num_cpus": 8,
"num_gpus": 2,
"runtime_env": {"pip": {"packages": ["torch"]}},
}
}
)
def my_op():
import torch
# your expensive computation here
result = ...
return result
@job(executor_def=ray_executor.configured({"ray": {"num_cpus": 1}}))
def my_job():
return my_op()
Fields in the dagster-ray/config
tag override corresponding fields in the Executor config.
IOManager
RayIOManager
allows storing and retrieving intermediate values in Ray's object store. It can be used in conjunction with RayRunLauncher
and ray_executor
to store and retrieve intermediate values in a Ray cluster.
It works by storing Dagster step keys in a global Ray actor. This actor contains a mapping between step keys and Ray ObjectRef
s. It can be used with any pickable Python objects.
from dagster import asset, Definitions
from dagster_ray import RayIOManager
@asset(io_manager_key="ray_io_manager")
def upstream() -> int:
return 42
@asset
def downstream(upstream: int):
return 0
definitions = Definitions(
assets=[upstream, downstream], resources={"ray_io_manager": RayIOManager()}
)
It supports partitioned assets.
from dagster import (
asset,
Definitions,
StaticPartitionsDefinition,
AssetExecutionContext,
)
from dagster_ray import RayIOManager
partitions_def = StaticPartitionsDefinition(["a", "b", "c"])
@asset(io_manager_key="ray_io_manager", partitions_def=partitions_def)
def upstream(context: AssetExecutionContext):
return context.partition_key
@asset(partitions_def=partitions_def)
def downstream(context: AssetExecutionContext, upstream: str) -> None:
assert context.partition_key == upstream
It supports partition mappings. When loading multiple upstream partitions, they should be annotated with a Dict[str, ...]
, dict[str, ...]
, or Mapping[str, ...]
type hint.
from dagster import (
asset,
Definitions,
StaticPartitionsDefinition,
AssetExecutionContext,
)
from dagster_ray import RayIOManager
partitions_def = StaticPartitionsDefinition(["A", "B", "C"])
@asset(io_manager_key="ray_io_manager", partitions_def=partitions_def)
def upstream(context: AssetExecutionContext):
return context.partition_key.lower()
@asset
def downstream_unpartitioned(upstream: Dict[str, str]) -> None:
assert upstream == {"A": "a", "B": "b", "C": "c"}
Backends
KubeRay
pip install dagster-ray[kuberay]
This backend requires a Kubernetes cluster with KubeRay Operator
installed.
Integrates with Dagster+ by injecting environment variables such as DAGSTER_CLOUD_DEPLOYMENT_NAME
and tags such as dagster/user
into default configuration values and Kubernetes labels.
To run ray
code in client mode (from the Dagster Python process directly), use the KubeRayClient
resource (see the KubeRayCluster section).
To run ray
code in job mode, use the PipesKubeRayJobClient
with Dagster Pipes (see the Pipes section).
The public objects can be imported from dagster_ray.kuberay
module.
Pipes
dagster-ray
provides the PipesKubeRayJobClient
which can be used to execute remote Ray jobs on Kubernetes and receive Dagster events and logs from them.
RayJob will manage the lifecycle of the underlying RayCluster
, which will be cleaned up after the specified entrypoint exits. Doesn't require a persistent connection to the Ray cluster.
Examples:
In Dagster code, import PipesKubeRayJobClient
and invoke it inside an @op
or an @asset
:
from dagster import AssetExecutionContext, Definitions, asset
from dagster_ray.kuberay import PipesKubeRayJobClient
@asset
def my_asset(
context: AssetExecutionContext, pipes_rayjob_client: PipesKubeRayJobClient
):
pipes_rayjob_client.run(
context=context,
ray_job={
# RayJob manifest goes here
# full reference: https://ray-project.github.io/kuberay/reference/api/#rayjob
"metadata": {
# .metadata.name is not required and will be generated if not provided
"namespace": "ray"
},
"spec": {
"entrypoint": "python /app/my_script.py",
# *.container.image is not required and will be set to the current `dagster/image` tag if not provided
"rayClusterSpec": {
"headGroupSpec": {...},
"workerGroupSpecs": [...],
},
},
},
extra={"param": "value"},
)
definitions = Definitions(
resources={"pipes_rayjob_client": PipesKubeRayJobClient()}, assets=[my_asset]
)
In the Ray job, import dagster_pipes
(must be provided as a dependency) and emit regular Dagster events such as logs or asset materializations:
from dagster_pipes import open_dagster_pipes
with open_dagster_pipes() as context:
assert context.get_extra("param") == "value"
context.log.info("Hello from Ray Pipes!")
context.report_asset_materialization(
metadata={"some_metric": {"raw_value": 57, "type": "int"}},
data_version="alpha",
)
A convenient way to provide dagster-pipes
to the Ray job is with runtimeEnvYaml
field:
import yaml
ray_job = {"spec": {"runtimeEnvYaml": yaml.safe_dump({"pip": ["dagster-pipes"]})}}
Events emitted by the Ray job will be captured by PipesKubeRayJobClient
and will become available in the Dagster event log. Standard output and standard error streams will be forwarded to the standard output of the Dagster process.
Running locally
When running locally, the port_forward
option has to be set to True
in the PipesKubeRayJobClient
resource in order to interact with the Ray job. For convenience, it can be set automatically with:
from dagster_ray.kuberay.configs import in_k8s
pipes_rayjob_client = PipesKubeRayJobClient(..., port_forward=not in_k8s)
Resources
KubeRayCluster
KubeRayCluster
can be used for running Ray computations on Kubernetes in client (interactive) mode. Requires stable persistent connection through the duration of the Dagster step.
When added as resource dependency to an @op/@asset
, the KubeRayCluster
:
- Starts a dedicated
RayCluster
for it - Connects to the cluster in client mode with
ray.init()
(unlessskip_init
is set toTrue
) - Tears down the cluster after the step is executed (unless
skip_cleanup
is set toTrue
)
RayCluster
comes with minimal default configuration, matching KubeRay
defaults.
Examples:
Basic usage (will create a single-node, non-scaling RayCluster
):
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.kuberay import KubeRayCluster
import ray
@asset
def my_asset(
ray_cluster: RayResource, # RayResource is a backend-agnostic type annotation
):
return ray.get(ray.put(42)) # interact with the Ray cluster!
definitions = Definitions(
resources={"ray_cluster": KubeRayCluster()}, assets=[my_asset]
)
Larger cluster with auto-scaling enabled:
from dagster_ray.kuberay import KubeRayCluster, RayClusterConfig
ray_cluster = KubeRayCluster(
ray_cluster=RayClusterConfig(
enable_in_tree_autoscaling=True,
worker_group_specs=[
{
"groupName": "workers",
"replicas": 2,
"minReplicas": 1,
"maxReplicas": 10,
# ...
}
],
)
)
KubeRayClient
This resource can be used to interact with the Kubernetes API Server.
Examples:
Listing currently running RayClusters
:
from dagster import op, Definitions
from dagster_ray.kuberay import KubeRayClient
@op
def list_ray_clusters(
kube_ray_client: KubeRayClient,
):
return kube_ray_client.client.list(namespace="kuberay")
Jobs
delete_kuberay_clusters
This job
can be used to delete RayClusters
from a given list of names.
cleanup_old_ray_clusters
This job
can be used to delete old RayClusters
which no longer correspond to any active Dagster Runs.
They may be left behind if the automatic cluster cleanup was disabled or failed.
Schedules
Cleanup schedules can be trivially created using the cleanup_old_ray_clusters
or delete_kuberay_clusters
jobs.
cleanup_old_ray_clusters
dagster-ray
provides an example daily cleanup schedule.
Local
These resources can be used for development and testing purposes.
They provide the same interface as the other *Ray
resources, but don't require any external infrastructure.
The public objects can be imported from dagster_ray.local
module.
Resources
LocalRay
A dummy resource which is useful for testing and development.
It doesn't do anything, but provides the same interface as the other *Ray
resources.
Examples:
Using the LocalRay
resource
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.local import LocalRay
import ray
@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))
definitions = Definitions(resources={"ray_cluster": LocalRay()}, assets=[my_asset])
Conditionally using the LocalRay
resource in development and KubeRayCluster
in production:
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.local import LocalRay
from dagster_ray.kuberay import KubeRayCluster
import ray
@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))
IN_K8s = ...
definitions = Definitions(
resources={"ray_cluster": KubeRayCluster() if IN_K8s else LocalRay()},
assets=[my_asset],
)
Development
uv sync --all-extras
uv run pre-commit install
Testing
uv run pytest
KubeRay
Required tools:
docker
kubectl
helm
minikube
Running pytest
will automatically:
- build an image with the local
dagster-ray
code - start a
minikube
Kubernetes cluster - load the built
dagster-ray
and loadedkuberay-operator
images into the cluster - install
KubeRay Operator
into the cluster withhelm
- run the tests
Thus, no manual setup is required, just the presence of the tools listed above. This makes testing a breeze!
[!NOTE] Specifying a comma-separated list of
KubeRay Operator
versions in thePYTEST_KUBERAY_VERSIONS
environment variable will spawn a new test for each version.
[!NOTE] it may take a while to download
minikube
andkuberay-operator
images and build the localdagster-ray
image during the first tests invocation
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for dagster_ray-0.0.7-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1f276d74a96de3e257f197e51a3f261542b4e5bc1c3035c1fdc2179df2845e5c |
|
MD5 | 8631c134470fdd1a33d60fe2834d8717 |
|
BLAKE2b-256 | cdb197a42ec4e2b19abe591bf5d1af3c76837641fb86ad8756b1e2724344d25d |