Skip to main content

Framework for simpler Spark Pipelines

Project description

Build and Test

Upload Python Package

Known Vulnerabilities

SparkPipelineFramework

SparkPipelineFramework implements a few design patterns to make it easier to create Spark applications that:

  1. Separate data transformation logic from the pipeline execution code so you can compose pipelines by just stringing together transformers. (Based on the SparkML Pipeline class but enhanced to work for both ML and non-ML transformations)
  2. Enables running SQL transformations without writing any code
  3. Enables versioning of transformations so different pipelines can use older or newer versions of each transformer. This enables you to upgrade each pipeline at your own choice
  4. Enables Autocomplete of transformations when creating pipelines (in PyCharm).
  5. Implement many separation-of-concerns e.g., logging, performance monitoring, error reporting
  6. Supports both non-ML, ML and mixed workloads
  7. Has an additional(optional) library SparkAutoMapper(https://github.com/icanbwell/SparkAutoMapper) that enables data engineers and data analysts to easily map data without writing code.
  8. Has an additional(optional) library SparkPipelineFramework.Testing (https://github.com/icanbwell/SparkPipelineFramework.Testing) that allows you to create unit tests without writing code; you just put in the input file and confirm the output file

PyPi Package

This code is available as a package to import into your project. https://pypi.org/project/sparkpipelineframework/

Using it in your project

(For an example project that uses SparkPipelineFramework, see https://github.com/imranq2/TestSparkPipelineFramework)

  1. Add sparkpipelineframework package to your project requirements.txt/Pipefile
  2. make init. (this will setup Spark, Docker (to run Spark) )
  3. Create a folder called library in your project

using Pycharm

You can run SparkPipelineFrame project from Pycharm

  1. Add a new Docker Compose interpreter
  2. Choose docker-compose.yml for the configuration file
  3. Choose dev for the Service
  4. Click OK and give Pycharm a couple of minutes to index the content of the docker container
  5. Right click on the Test folder and click "Run 'pytest in tests'"

To create a new pipeline

  1. Create a class derived from FrameworkPipeline
  2. In your init function set self.transformers to the list of transformers to run for this pipeline. For example:
class MyPipeline(FrameworkPipeline):
    def __init__(self, parameters: Dict[str, Any], progress_logger: ProgressLogger):
        super().__init__(parameters=parameters,
                                         progress_logger=progress_logger)
        self.transformers = self.create_steps([
            FrameworkCsvLoader(
                view="flights",
                path_to_csv=parameters["flights_path"]
            ),
            FeaturesCarriers(parameters=parameters).transformers,
        ])

To Add a SQL transformation

  1. Create a new folder and a .sql file in that folder. This folder should be in the library folder or any subfolder you choose under the library folder.
  2. The name of the file is the name of the view that will be created/updated to store the result of your sql code e.g., carriers.sql means we will create/update a view called carriers with the results of your sql code.
  3. Add your sql to it. This can be any valid SparkSQL and can refer to any view created by the pipeline before this transformer is run. For example:
SELECT carrier, crsarrtime FROM flights
  1. Run the generate_proxies command as shown in the Generating Proxies section below
  2. Now go to your Pipeline class init and add to self.transformers. Start the folder name and hit ctrl-space for PyCharm to autocomplete the name
  3. That's it. Your sql has been automaticaly wrapped in a Transformer which will do logging, monitor performance and do error checking

To Add a Python transformation

  1. Create a new folder and .py file in that folder. This folder should be in the library folder or any subfolder you choose under the library folder.
  2. In the .py file, create a new class and derive from Transformer (from spark ML). Implement the _transform() function For example:
from typing import Optional, Dict, Any

from pyspark import keyword_only
from pyspark.sql.dataframe import DataFrame

from spark_pipeline_framework.progress_logger.progress_logger import ProgressLogger
from spark_pipeline_framework.proxy_generator.python_proxy_base import PythonProxyBase


class FeatureTransformer(PythonProxyBase):
    # noinspection PyUnusedLocal
    @keyword_only
    def __init__(self,
                 name: str = None,
                 parameters: Optional[Dict[str, Any]] = None,
                 progress_logger: Optional[ProgressLogger] = None,
                 verify_count_remains_same: bool = False
                 ) -> None:
        super(FeatureTransformer, self).__init__(name=name,
                                                 parameters=parameters,
                                                 progress_logger=progress_logger,
                                                 verify_count_remains_same=verify_count_remains_same)

    def _transform(self, df: DataFrame) -> DataFrame:
        pass
  1. Run the generate_proxies command as shown in the Generating Proxies section below
  2. Now go to your Pipeline class init and add to self.transformers. Start the folder name and hit ctrl-space for PyCharm to autocomplete the name

To Add a Machine Learning training transformation (called fit or Estimator in SparkML lingo)

  1. Create a new folder and .py file in that folder. This folder should be in the library folder or any subfolder you choose under the library folder.
  2. In the .py file, create a new class and derive from Estimator (from spark ML). Implement the fit() function
  3. Run the generate_proxies command as shown in the Generating Proxies section below
  4. Now go to your Pipeline class init and add to self.estimators. Start the folder name and hit ctrl-space for PyCharm to autocomplete the name

To Add a Machine Learning prediction transformation

  1. Create a new folder and .py file in that folder. This folder should be in the library folder or any subfolder you choose under the library folder.
  2. In the .py file, create a new class and derive from Estimator (from spark ML). Implement the _transform() function. Note that that can be the same class you use for training and prediction.
  3. Run the generate_proxies command as shown in the Generating Proxies section below
  4. Now go to your Pipeline class init and add to self.transformers. Start the folder name and hit ctrl-space for PyCharm to autocomplete the name

Including pipelines in other pipelines

Pipelines are fully composable so you can include one pipeline as a transformer in another pipeline. For example:

class MyPipeline(FrameworkPipeline):
    def __init__(self, parameters: Dict[str, Any], progress_logger: ProgressLogger):
        super(MyPipeline, self).__init__(parameters=parameters,
                                         progress_logger=progress_logger)
        self.transformers = self.create_steps([
            FrameworkCsvLoader(
                view="flights",
                path_to_csv=parameters["flights_path"]
            ),
            PipelineFoo(parameters=parameters).transformers,
            FeaturesCarriers(parameters=parameters).transformers,
        ])

Generating Proxies

  1. Run the following command to generate proxy classes. These automatically wrap your sql with a Spark Transformer that can be included in a Pipeline with no additional code.

python3 spark_pipeline_framework/proxy_generator/generate_proxies.py.

You can also add this to your project Makefile to make it easier to run:

.PHONY:proxies
proxies:
	python3 spark_pipeline_framework/proxy_generator/generate_proxies.py

Testing

Test a pipeline

A pipeline can be tested by providing test data in csv (or parquet), running the pipeline and then asserting for data in any view or dataframe.

from pathlib import Path
from typing import Dict, Any

from pyspark.sql.dataframe import DataFrame
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructType

from library.features.carriers.v1.features_carriers_v1 import FeaturesCarriersV1
from library.features.carriers_python.v1.features_carriers_python_v1 import FeaturesCarriersPythonV1
from spark_pipeline_framework.pipelines.framework_pipeline import FrameworkPipeline
from spark_pipeline_framework.progress_logger.progress_logger import ProgressLogger
from spark_pipeline_framework.transformers.framework_csv_loader import FrameworkCsvLoader
from spark_pipeline_framework.utilities.flattener import flatten


class MyPipeline(FrameworkPipeline):
    def __init__(self, parameters: Dict[str, Any], progress_logger: ProgressLogger):
        super(MyPipeline, self).__init__(parameters=parameters,
                                         progress_logger=progress_logger)
        self.transformers = self.create_steps([
            FrameworkCsvLoader(
                view="flights",
                path_to_csv=parameters["flights_path"],
                progress_logger=progress_logger
            ),
            FeaturesCarriersV1(parameters=parameters, progress_logger=progress_logger).transformers,
            FeaturesCarriersPythonV1(parameters=parameters, progress_logger=progress_logger).transformers
        ])


def test_can_run_framework_pipeline(spark_session: SparkSession) -> None:
    # Arrange
    data_dir: Path = Path(__file__).parent.joinpath('./')
    flights_path: str = f"file://{data_dir.joinpath('flights.csv')}"

    schema = StructType([])

    df: DataFrame = spark_session.createDataFrame(
        spark_session.sparkContext.emptyRDD(), schema)

    spark_session.sql("DROP TABLE IF EXISTS default.flights")

    # Act
    parameters = {
        "flights_path": flights_path
    }

    with ProgressLogger() as progress_logger:
        pipeline: MyPipeline = MyPipeline(parameters=parameters, progress_logger=progress_logger)
        transformer = pipeline.fit(df)
        transformer.transform(df)

    # Assert
    result_df: DataFrame = spark_session.sql("SELECT * FROM flights2")
    result_df.show()

    assert result_df.count() > 0

Testing a single Transformer directly

Each Transformer can be tested individually by setting up the data to pass into it (e.g., loading from csv) and then testing the result of running the transformer.

from pathlib import Path

from pyspark.sql.dataframe import DataFrame
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructType
from spark_pipeline_framework.transformers.framework_csv_loader import FrameworkCsvLoader
from spark_pipeline_framework.utilities.attr_dict import AttrDict

from library.features.carriers.v1.features_carriers_v1 import FeaturesCarriersV1


def test_carriers_v1(spark_session: SparkSession):
    # Arrange
    data_dir: Path = Path(__file__).parent.joinpath('./')
    flights_path: str = f"file://{data_dir.joinpath('flights.csv')}"

    schema = StructType([])

    df: DataFrame = spark_session.createDataFrame(
        spark_session.sparkContext.emptyRDD(), schema)

    spark_session.sql("DROP TABLE IF EXISTS default.flights")

    FrameworkCsvLoader(
        view="flights",
        path_to_csv=flights_path
    ).transform(dataset=df)

    parameters = {}

    FeaturesCarriersV1(parameters=parameters).transformers[0].transform(dataset=df)

    result_df: DataFrame = spark_session.sql("SELECT * FROM flights2")
    result_df.show()

    assert result_df.count() > 0

Contributing

Run make init This will install Java, Scala, Spark and other packages

Publishing a new package

  1. Create a new release
  2. The GitHub Action should automatically kick in and publish the package
  3. You can see the status in the Actions tab

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sparkpipelineframework-1.0.10.tar.gz (77.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sparkpipelineframework-1.0.10-py3-none-any.whl (179.8 kB view details)

Uploaded Python 3

File details

Details for the file sparkpipelineframework-1.0.10.tar.gz.

File metadata

  • Download URL: sparkpipelineframework-1.0.10.tar.gz
  • Upload date:
  • Size: 77.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/32.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.62.3 importlib-metadata/4.2.0 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.7.12

File hashes

Hashes for sparkpipelineframework-1.0.10.tar.gz
Algorithm Hash digest
SHA256 ca65ef8e48f417ab6f12bf91fc3c7dcddd0e08e70807ec45f7b18c6843b718e8
MD5 c3585226feeaa2a487bdb563032afc17
BLAKE2b-256 7b5354ba8436d2f9f72ef06b1d1e287a513bc9d3ade4c5f6d532c74409ba2db1

See more details on using hashes here.

File details

Details for the file sparkpipelineframework-1.0.10-py3-none-any.whl.

File metadata

  • Download URL: sparkpipelineframework-1.0.10-py3-none-any.whl
  • Upload date:
  • Size: 179.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/32.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.62.3 importlib-metadata/4.2.0 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.7.12

File hashes

Hashes for sparkpipelineframework-1.0.10-py3-none-any.whl
Algorithm Hash digest
SHA256 500589c06648b26acedc93d17cf33114c83da025a42ecfccb78d32460eb9945b
MD5 216389c83b5d2f2e5503fb80ea1e45cb
BLAKE2b-256 6c603073617777d73cd682d2476980510b5d7b76c402c592d3153bff84c16cde

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page