RayDP: Distributed Data Processing on Ray
Project description
RayDP
RayDP is a distributed data processing library that provides simple APIs for running Spark on Ray and integrating Spark with distributed deep learning and machine learning frameworks. RayDP makes it simple to build distributed end-to-end data analytics and AI pipeline. Instead of using lots of glue code or an orchestration framework to stitch multiple distributed programs, RayDP allows you to write Spark, PyTorch, Tensorflow, XGBoost code in a single python program with increased productivity and performance. You can build an end-to-end pipeline on a single Ray cluster by using Spark for data preprocessing, Ray Train or Horovod for distributed deep learning, RayTune for hyperparameter tuning and RayServe for model serving.
Installation
You can install latest RayDP using pip. RayDP requires Ray and PySpark. Please also make sure java is installed and JAVA_HOME is set properly.
pip install raydp
Or you can install RayDP nightly build:
pip install raydp-nightly
If you'd like to build and install the latest master, use the following command:
./build.sh
pip install dist/raydp*.whl
Spark on Ray
RayDP provides an API for starting a Spark job on Ray without a need to setup a Spark cluster separately. RayDP supports Ray as a Spark resource manager and runs Spark executors in Ray actors. To create a Spark session, call the raydp.init_spark
API. For example:
import ray
import raydp
# connect to ray cluster
ray.init(address='auto')
# create a Spark cluster with specified resource requirements
spark = raydp.init_spark(app_name='RayDP Example',
num_executors=2,
executor_cores=2,
executor_memory='4GB')
# normal data processesing with Spark
df = spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
df.show()
word_count = df.groupBy('word').count()
word_count.show()
# stop the spark cluster
raydp.stop_spark()
Spark features such as dynamic resource allocation, spark-submit script, etc are also supported. Please refer to Spark on Ray for more details.
Ray Client
RayDP works the same way when using ray client. However, spark driver would be on the local machine. This is convenient if you want to do some experiment in an interactive environment. If this is not desired, e.g. due to performance, you can define an ray actor, which calls init_spark
and performs all the calculation in its method. This way, spark driver will be in the ray cluster, and is rather similar to spark cluster deploy mode.
Pandas on Spark
PySpark 3.2.0 provides Pandas API on Spark(originally Koalas). Users familiar with Pandas can use it to scale current pandas workloads on RayDP.
import ray
import raydp
import pyspark.pandas as ps
# connect to ray cluster
ray.init(address='auto')
# create a Spark cluster with specified resource requirements
spark = raydp.init_spark(app_name='RayDP Example',
num_executors=2,
executor_cores=2,
executor_memory='4GB')
# Use pandas on spark to create a dataframe and aggregate
psdf = ps.range(100)
print(psdf.count())
# stop the spark cluster
raydp.stop_spark()
Machine Learning and Deep Learning With a Spark DataFrame
RayDP provides APIs for converting a Spark DataFrame to a Ray Dataset which can be consumed by XGBoost, Ray Train or Horovod on Ray. RayDP also provides high level scikit-learn style Estimator APIs for distributed training with PyTorch or Tensorflow.
Spark DataFrame <=> Ray Dataset
import ray
import raydp
ray.init()
spark = raydp.init_spark(app_name="RayDP Example",
num_executors=2,
executor_cores=2,
executor_memory="4GB")
# Spark Dataframe to Ray Dataset
df1 = spark.range(0, 1000)
ds1 = ray.data.from_spark(df1)
# Ray Dataset to Spark Dataframe
ds2 = ray.data.from_items([{"id": i} for i in range(1000)])
df2 = ds2.to_spark(spark)
Please refer to Spark+XGBoost on Ray for a full example.
Estimator API
The Estimator APIs allow you to train a deep neural network directly on a Spark DataFrame, leveraging Ray’s ability to scale out across the cluster. The Estimator APIs are wrappers of Ray Train and hide the complexity of converting a Spark DataFrame to a PyTorch/Tensorflow dataset and distributing the training. RayDP provides raydp.torch.TorchEstimator
for PyTorch and raydp.tf.TFEstimator
for Tensorflow. The following is an example of using TorchEstimator.
import ray
import raydp
from raydp.torch import TorchEstimator
ray.init(address="auto")
spark = raydp.init_spark(app_name="RayDP Example",
num_executors=2,
executor_cores=2,
executor_memory="4GB")
# Spark DataFrame Code
df = spark.read.parquet(…)
train_df = df.withColumn(…)
# PyTorch Code
model = torch.nn.Sequential(torch.nn.Linear(2, 1))
optimizer = torch.optim.Adam(model.parameters())
estimator = TorchEstimator(model=model, optimizer=optimizer, ...)
estimator.fit_on_spark(train_df)
raydp.stop_spark()
Please refer to NYC Taxi PyTorch Estimator and NYC Taxi Tensorflow Estimator for full examples.
MPI on Ray
RayDP also provides an API for running MPI job on Ray. We support three types of MPI: intel_mpi
, openmpi
and MPICH
. You can refer to doc/mpi.md for more details.
More Examples
Examples
Not sure how to use RayDP? Check the examples
folder. We have added many examples showing how RayDP works together with PyTorch, TensorFlow, XGBoost, Horovod, and so on. If you still cannot find what you want, feel free to post an issue to ask us!
Google Colab Notebook
You can easily run code on Google Colab with a google account. Maybe this is the easiest way to get started with raydp, you can have a try. Here are two demos: Raytrain example, Pytorch example.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
File details
Details for the file raydp_nightly-2022.10.20.dev1-py3-none-any.whl
.
File metadata
- Download URL: raydp_nightly-2022.10.20.dev1-py3-none-any.whl
- Upload date:
- Size: 323.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.9.15
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | c6d33c7337f84b932a62e8e979563ebae62dd89afde56a7addc33e26a8220fd9 |
|
MD5 | 3ae1efe676ae0ea1edd0d3659f3ded0c |
|
BLAKE2b-256 | 8b468a9f3d3d79f0f9e2db1ac86271a2e0658c35db93ee620d69fb72655d6c18 |