Skip to main content

RayDP: Distributed Data Processing on Ray

Project description

RayDP

RayDP is a distributed data processing library that provides simple APIs for running Spark on Ray and integrating Spark with distributed deep learning and machine learning frameworks. RayDP makes it simple to build distributed end-to-end data analytics and AI pipeline. Instead of using lots of glue code or an orchestration framework to stitch multiple distributed programs, RayDP allows you to write Spark, PyTorch, Tensorflow, XGBoost code in a single python program with increased productivity and performance. You can build an end-to-end pipeline on a single Ray cluster by using Spark for data preprocessing, RaySGD or Horovod for distributed deep learning, RayTune for hyperparameter tuning and RayServe for model serving.

Installation

You can install latest RayDP using pip. RayDP requires Ray and PySpark. Please also make sure java is installed and JAVA_HOME is set properly.

pip install raydp

Or you can install RayDP nightly build:

pip install raydp-nightly

If you'd like to build and install the latest master, use the following command:

./build.sh
pip install dist/raydp*.whl

Spark on Ray

RayDP provides an API for starting a Spark job on Ray without a need to setup a Spark cluster separately. RayDP supports Ray as a Spark resource manager and runs Spark executors in Ray actors. To create a Spark session, call the raydp.init_spark API. For example:

import ray
import raydp

# connect to ray cluster
ray.init(address='auto')

# create a Spark cluster with specified resource requirements
spark = raydp.init_spark(app_name='RayDP Example',
                         num_executors=2,
                         executor_cores=2,
                         executor_memory='4GB')

# normal data processesing with Spark
df = spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
df.show()
word_count = df.groupBy('word').count()
word_count.show()

# stop the spark cluster
raydp.stop_spark()

Spark features such as dynamic resource allocation, spark-submit script, etc are also supported. Please refer to Spark on Ray for more details.

Ray Client

RayDP works the same way when using ray client. However, spark driver would be on the local machine. This is convenient if you want to do some experiment in an interactive environment. If this is not desired, e.g. due to performance, you can define an ray actor, which calls init_spark and performs all the calculation in its method. This way, spark driver will be in the ray cluster, and is rather similar to spark cluster deploy mode.

Pandas on Spark

PySpark 3.2.0 provides Pandas API on Spark(originally Koalas). Users familiar with Pandas can use it to scale current pandas workloads on RayDP.

import ray
import raydp
import pyspark.pandas as ps

# connect to ray cluster
ray.init(address='auto')

# create a Spark cluster with specified resource requirements
spark = raydp.init_spark(app_name='RayDP Example',
                         num_executors=2,
                         executor_cores=2,
                         executor_memory='4GB')

# Use pandas on spark to create a dataframe and aggregate
psdf = ps.range(100)
print(psdf.count())

# stop the spark cluster
raydp.stop_spark()

Machine Learning and Deep Learning With a Spark DataFrame

RayDP provides APIs for converting a Spark DataFrame to a Ray Dataset or Ray MLDataset which can be consumed by XGBoost, RaySGD or Horovod on Ray. RayDP also provides high level scikit-learn style Estimator APIs for distributed training with PyTorch or Tensorflow.

Spark DataFrame <=> Ray Dataset

import ray
import raydp

ray.init()
spark = raydp.init_spark(app_name="RayDP Example",
                         num_executors=2,
                         executor_cores=2,
                         executor_memory="4GB")

# Spark Dataframe to Ray Dataset
df1 = spark.range(0, 1000)
ds1 = ray.data.from_spark(df1)

# Ray Dataset to Spark Dataframe
ds2 = ray.data.from_items([{"id": i} for i in range(1000)])
df2 = ds2.to_spark(spark)

Please refer to Spark+XGBoost on Ray for a full example.

Spark DataFrame => Ray MLDataset

RayDP provides an API for creating a Ray MLDataset from a Spark dataframe. MLDataset can be converted to a PyTorch or Tensorflow dataset for distributed training with Horovod on Ray or RaySGD. MLDataset is also supported by XGBoost on Ray as a data source.

import ray
import raydp
from raydp.spark import RayMLDataset

ray.init()
spark = raydp.init_spark(app_name="RayDP Example",
                         num_executors=2,
                         executor_cores=2,
                         executor_memory="4GB")

df = spark.range(0, 1000)
ds = RayMLDataset.from_spark(df, num_shards=10)

Please refer to Spark+Horovod on Ray for a full example.

Estimator API

The Estimator APIs allow you to train a deep neural network directly on a Spark DataFrame, leveraging Ray’s ability to scale out across the cluster. The Estimator APIs are wrappers of RaySGD and hide the complexity of converting a Spark DataFrame to a PyTorch/Tensorflow dataset and distributing the training. RayDP provides raydp.torch.TorchEstimator for PyTorch and raydp.tf.TFEstimator for Tensorflow. The following is an example of using TorchEstimator.

import ray
import raydp
from raydp.torch import TorchEstimator

ray.init(address="auto")
spark = raydp.init_spark(app_name="RayDP Example",
                         num_executors=2,
                         executor_cores=2,
                         executor_memory="4GB")

# Spark DataFrame Code 
df = spark.read.parquet() 
train_df = df.withColumn()

# PyTorch Code 
model = torch.nn.Sequential(torch.nn.Linear(2, 1)) 
optimizer = torch.optim.Adam(model.parameters())

estimator = TorchEstimator(model=model, optimizer=optimizer, ...) 
estimator.fit_on_spark(train_df)

raydp.stop_spark()

Please refer to NYC Taxi PyTorch Estimator and NYC Taxi Tensorflow Estimator for full examples.

MPI on Ray

RayDP also provides an API for running MPI job on Ray. We support three types of MPI: intel_mpi, openmpi and MPICH. You can refer to doc/mpi.md for more details.

More Examples

Examples

Not sure how to use RayDP? Check the examples folder. We have added many examples showing how RayDP works together with PyTorch, TensorFlow, XGBoost, Horovod, and so on. If you still cannot find what you want, feel free to post an issue to ask us!

Google Colab Notebook

You can easily run code on Google Colab with a google account. Maybe this is the easiest way to get started with raydp, you can have a try. Here are two demos: Raytrain example, Pytorch example.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

raydp_nightly-2022.9.9.dev1-py3-none-any.whl (8.2 MB view details)

Uploaded Python 3

File details

Details for the file raydp_nightly-2022.9.9.dev1-py3-none-any.whl.

File metadata

File hashes

Hashes for raydp_nightly-2022.9.9.dev1-py3-none-any.whl
Algorithm Hash digest
SHA256 8f0bde215d0f6c7d46400c2df85436fe58ec9ad96f62855385cf8cbbadc9cf35
MD5 8f77a3a4c001b89c7bf034a473ad9bd7
BLAKE2b-256 4903fc424defc32c98b88731aca039487497437dfedd3289b6e8b2f8649f9666

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page