RayDP: Distributed Data Processing on Ray
Project description
RayDP
RayDP brings popular big data frameworks including Apache Spark to Ray ecosystem and integrates with other Ray libraries seamlessly. RayDP makes it simple to build distributed end-to-end data analytics and AI pipeline on Ray by using Spark for data preprocessing, RayTune for hyperparameter tunning, RaySGD for distributed deep learning, RLlib for reinforcement learning and RayServe for model serving.
Key Features
Spark on Ray
RayDP enables you to start a Spark job on Ray in your python program without a need to setup a Spark cluster manually. RayDP supports Ray as a Spark resource manger and starts Spark executors using Ray actor directly. RayDP utilizes Ray's in-memory object store to efficiently exchange data between Spark and other Ray libraries. You can use Spark to read the input data, process the data using SQL, Spark DataFrame, or Pandas (via Koalas) API, extract and transform features using Spark MLLib, and use RayDP Estimator API for distributed training on the preprocessed dataset.
Estimator APIs for Distributed Training
RayDP provides high level scikit-learn style Estimator APIs for distributed training. The Estimator APIs allow you to train a deep neural network directly on a Spark DataFrame, leveraging Ray’s ability to scale out across the cluster. The Estimator APIs are wrappers of RaySGD and hide the complexity of converting a Spark DataFrame to a PyTorch/Tensorflow dataset and distributing the training.
Installation
You can install latest RayDP using pip. RayDP requires Ray (>=1.1.0) and PySpark (3.0.0 or 3.0.1).
pip install raydp
Or you can install our nightly build:
pip install raydp-nightly
If you'd like to build and install the latest master, use the following command:
./build.sh
pip install dist/raydp*.whl
Getting Started
To start a Spark job on Ray, you can use the raydp.init_spark
API. You can write Spark, PyTorch/Tensorflow, Ray code in the same python program to easily implement an end to end pipeline.
Classic Spark Word Count Example
After we use RayDP to initialize a Spark cluster, of course we can use Spark as usual.
import ray
import raydp
ray.init(address='auto')
spark = raydp.init_spark('word_count',
num_executors=2,
executor_cores=2,
executor_memory='1G')
df = spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
df.show()
word_count = df.groupBy('word').count()
word_count.show()
raydp.stop_spark()
Integration with PyTorch
However, combined with other ray components, such as raysgd and ray serve, we can easily build an end-to-end deep learning pipeline. In this example. we show how to use our estimator API, which is a wrapper around raysgd, to perform data preprocessing using Spark, and train a model using PyTorch.
import ray
import raydp
from raydp.torch import TorchEstimator
ray.init()
spark = raydp.init_spark(app_name="RayDP example",
num_executors=2,
executor_cores=2,
executor_memory="4GB")
# Spark DataFrame Code
df = spark.read.parquet(…)
train_df = df.withColumn(…)
# PyTorch Code
model = torch.nn.Sequential(torch.nn.Linear(2, 1))
optimizer = torch.optim.Adam(model.parameters())
# You can use the RayDP Estimator API or libraries like RaySGD for distributed training.
estimator = TorchEstimator(model=model, optimizer=optimizer, ...)
estimator.fit_on_spark(train_df)
raydp.stop_spark()
You can find more examples under the examples
folder.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
File details
Details for the file raydp_nightly-2021.3.17.dev0-py3-none-any.whl
.
File metadata
- Download URL: raydp_nightly-2021.3.17.dev0-py3-none-any.whl
- Upload date:
- Size: 10.1 MB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/3.7.3 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.59.0 CPython/3.9.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | e21492bc13a53113e5b9abd736e8c15028f15fd8ea94328b8ad7d64961a2ff3b |
|
MD5 | 77895f32b79aa887f02bb53b4fe43a9c |
|
BLAKE2b-256 | 4190933c408c2154d6121361b1c280a18cf4ab634189d1f6a01f9c315b7ae559 |