Skip to main content

SparkML base classes for Transformers and Estimators

Project description

This document includes an example of how to build a custom Estimator and Transformer using the base classes in this repository, and how to integrate them with SparkML Pipelines. For information about the SparkML Pipelines concepts and use of existing Estimators and Transformers within the SparkML module, please refer to the Spark ML Pipelines documentation.

Build a custom Transformer

In this section we build a Transformer that adds a constant to a column and updates the column’s values in-place.

import pyspark.sql.functions as F
from pyspark import keyword_only
from sparkml_base_classes import TransformerBaseClass


class AdditionColumnTransformer(TransformerBaseClass):

    @keyword_only
    def __init__(self, column_name=None, value=None):
        super().__init__()

    def _transform(self, ddf):
        self._logger.info("AdditionColumn transform with column {self._column_name}")
        ddf = ddf.withColumn(self._column_name, F.col(self._column_name) + self._value)
        return ddf

Build a custom Estimator

In this section we build an Estimator that normalizes the values of a column by the mean. An Estimator’s _fit method must return a Transformer because the use of an Estimator consists of 2 steps:

  1. Fitting the estimator.

    This step consists of using the _fit method to calculate some value(s) from the DataFrame and return a Transformer that stores the calculated value(s) and use them in the _transform method to transform a DataFrame. In this example the Estimator calculates the mean and returns a Transformer that divides the column by this mean value.

  2. Transforming the DataFrame.

    Once the Estimator has been fitted and a Transformer has been returned, then we use the returned Transformer to transform the DataFrame. In this case the Transformer divides the specified column by the mean and returns the transformed DataFrame.

import pyspark.sql.functions as F
from pyspark import keyword_only
from sparkml_base_classes import EstimatorBaseClass, TransformerBaseClass

class MeanNormalizerTransformer(TransformerBaseClass):

    @keyword_only
    def __init__(self, column_name=None, mean=None):
        super().__init__()

    def _transform(self, ddf):
        # add your transformation logic here
        self._logger.info("MeanNormalizer transform")
        ddf = ddf.withColumn(self._column_name, F.col(self._column_name) / self._mean)
        return ddf

class MeanNormalizerEstimator(EstimatorBaseClass):

    @keyword_only
    def __init__(self, column_name=None):
        super().__init__()

    def _fit(self, ddf):
        # add your transformation logic here
        self._logger.info("MeanNormalizer fit")
        mean, = ddf.agg(F.mean(self._column_name)).head()
        return MeanNormalizerTransformer(
            column_name=self._column_name,
            mean=mean
        )

Build the Pipeline

In this section we will build a Pipeline containing our custom Transformer and Estimator. We will first initialize both classes and then add them as stages to the Pipeline.

from pyspark.ml import Pipeline

multiply_column_transformer = AdditionColumnTransformer(column_name="foo", value=2)
mean_normalizer_estimator = MeanNormalizerEstimator(column_name="foo")
my_pipeline = Pipeline(stages=[multiply_column_transformer, mean_normalizer_estimator])

Fit the Pipeline and transform the DataFrame

In this section we will fit the created Pipeline to a DataFrame and then use the fitted Pipeline (or PipelineModel in SparkML terms) to transform a DataFrame. Thus, after a Pipeline’s fit method runs, it produces a PipelineModel, which is a Transformer. This PipelineModel can be later used to transform any DataFrame. Please refer to the Spark ML Pipelines documentation for an in-depth description.

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline

spark = SparkSession.builder.getOrCreate()

ddf = spark.createDataFrame(
    [[1], [2], [3]],
    ["foo"],
)

# the returned object is of PipelineModel type
my_fitted_pipeline = my_pipeline.fit(ddf)
my_fitted_pipeline.transform(ddf).show()

+----+
| foo|
+----+
|0.75|
| 1.0|
|1.25|
+----+

Save and load fitted Pipeline

In the previous section we transformed the DataFrame immediately after fitting the Pipeline, in this section we will use an intermediary saving mechanism that allows us to decouple the fitting of the Pipeline from the transforming of the DataFrame.

from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

ddf = spark.createDataFrame(
    [[8], [10], [12]],
    ["foo"],
)

my_fitted_pipeline.save('my_fitted_pipeline.pipeline')
my_fitted_pipeline = PipelineModel.load('my_fitted_pipeline.pipeline')
my_fitted_pipeline.transform(ddf).show()

+----+
| foo|
+----+
| 2.5|
|   3|
| 3.5|
+----+

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sparkml_base_classes-0.1.5.tar.gz (6.2 kB view hashes)

Uploaded Source

Built Distribution

sparkml_base_classes-0.1.5-py3-none-any.whl (6.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page