Framework for simpler Spark Pipelines
Project description
SparkPipelineFramework
SparkPipelineFramework implements a few design patterns to make it easier to create Spark applications that:
- Separate data transformation logic from the pipeline execution code so you can compose pipelines by just stringing together transformers. (Based on the SparkML Pipeline class but enhanced to work for both ML and non-ML transformations)
- Enables running SQL transformations without writing any code
- Enables versioning of transformations so different pipelines can use older or newer versions of each transformer. This enables you to upgrade each pipeline at your own choice
- Enables Autocomplete of transformations when creating pipelines (in PyCharm).
- Implement many separation-of-concerns e.g., logging, performance monitoring, error reporting
- Supports both non-ML, ML and mixed workloads
- Has an additional library SparkPipelineFramework.AWS that makes running Spark pipelines in AWS easier
- Has a sister library SparkPipelineFramework.Catalog that implements a data and ML model catalog so you can load and save data by catalog name instead of path and can manage different versions of the data
PyPi Package
This code is available as a package to import into your project. https://pypi.org/project/sparkpipelineframework/
Using it in your project
(For an example project that uses SparkPipelineFramework, see https://github.com/imranq2/TestSparkPipelineFramework)
- Add sparkpipelineframework package to your project requirements.txt
- Create a folder called library in your project
To create a new pipeline
- Create a class derived from
FrameworkPipeline
- In your init funtion set self.transformers to the list of transformers to run for this pipeline. For example:
class MyPipeline(FrameworkPipeline):
def __init__(self, parameters: AttrDict, progress_logger: ProgressLogger):
super(MyPipeline, self).__init__(parameters=parameters,
progress_logger=progress_logger)
self.transformers = flatten([
[
FrameworkCsvLoader(
view="flights",
path_to_csv=parameters["flights_path"]
)
],
FeaturesCarriers(parameters=parameters).transformers,
])
To Add a SQL transformation
- Create a new folder and a .sql file in that folder. This folder should be in the library folder or any subfolder you choose under the library folder.
- The name of the file is the name of the view that will be created/updated to store the result of your sql code e.g., carriers.sql means we will create/update a view called carriers with the results of your sql code.
- Add your sql to it. This can be any valid SparkSQL and can refer to any view created by the pipeline before this transformer is run. For example:
SELECT carrier, crsarrtime FROM flights
- Run the generate_proxies command as shown in the Generating Proxies section below
- Now go to your Pipeline class init and add to self.transformers. Start the folder name and hit ctrl-space for PyCharm to autocomplete the name
- That's it. Your sql has been automaticaly wrapped in a Transformer which will do logging, monitor performance and do error checking
To Add a Python transformation
- Create a new folder and .py file in that folder. This folder should be in the library folder or any subfolder you choose under the library folder.
- In the .py file, create a new class and derive from Transformer (from spark ML). Implement the _transform() function For example:
class MyPythonTransformer(Transformer):
def _transform(self, df: DataFrame) -> DataFrame:
# read parameters and do your stuff here. You can either create/update a view or just update the passed in dataframe.
- Run the generate_proxies command as shown in the Generating Proxies section below
- Now go to your Pipeline class init and add to self.transformers. Start the folder name and hit ctrl-space for PyCharm to autocomplete the name
To Add a Machine Learning training transformation (called fit
or Estimator
in SparkML lingo)
- Create a new folder and .py file in that folder. This folder should be in the library folder or any subfolder you choose under the library folder.
- In the .py file, create a new class and derive from Estimator (from spark ML). Implement the fit() function
- Run the generate_proxies command as shown in the Generating Proxies section below
- Now go to your Pipeline class init and add to self.estimators. Start the folder name and hit ctrl-space for PyCharm to autocomplete the name
To Add a Machine Learning prediction transformation
- Create a new folder and .py file in that folder. This folder should be in the library folder or any subfolder you choose under the library folder.
- In the .py file, create a new class and derive from Estimator (from spark ML). Implement the _transform() function. Note that that can be the same class you use for training and prediction.
- Run the generate_proxies command as shown in the Generating Proxies section below
- Now go to your Pipeline class init and add to self.transformers. Start the folder name and hit ctrl-space for PyCharm to autocomplete the name
Including pipelines in other pipelines
Pipelines are fully composable so you can include one pipeline as a transformer in another pipeline. For example:
class MyPipeline(FrameworkPipeline):
def __init__(self, parameters: AttrDict, progress_logger: ProgressLogger):
super(MyPipeline, self).__init__(parameters=parameters,
progress_logger=progress_logger)
self.transformers = flatten([
[
FrameworkCsvLoader(
view="flights",
path_to_csv=parameters["flights_path"]
)
],
PipelineFoo(parameters=parameters).transformers,
FeaturesCarriers(parameters=parameters).transformers,
])
Generating Proxies
- Run the following command to generate proxy classes. These automatically wrap your sql with a Spark Transformer that can be included in a Pipeline with no additional code.
python3 spark_pipeline_framework/proxy_generator/generate_proxies.py
.
You can also add this to your project Makefile to make it easier to run:
.PHONY:proxies
proxies:
python3 spark_pipeline_framework/proxy_generator/generate_proxies.py
Contributing
Run make firstime
This will install Java, Scala, Spark and other packages
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for sparkpipelineframework-0.0.7.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8128b6b8260a2d6e3942829be7bf35f79011e607b68c25465a0316b5d4497c93 |
|
MD5 | 61a4c73c3279f8be85f5db60a329bfd8 |
|
BLAKE2b-256 | 54db31b24733e74a67e34b11434cca26e3e775816dddc5b28d41649d0a53d761 |
Hashes for sparkpipelineframework-0.0.7-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | e173be539e6da558192d318076333db87d1799b0f921fb07b5ad112ec41cebcb |
|
MD5 | 041771dc2f61d92a56ef4826e2bbf6a1 |
|
BLAKE2b-256 | 9661ae86bae83bb52c4a694d3f529f5284b10215a24a5a0208569a42a213eecf |