Skip to main content

A Flake8 plugin to check for PySpark withColumn usage in loops

Project description

Flake8-pyspark-with-column

Upload Python Package PyPI - Downloads

Getting started

pip install flake8-pyspark-with-column
flake8 --select PSRPK001,PSPRT002,PSPRK003,PSPRK004

Alternatively you can add the following tox.ini file to the root of your project:

[flake8]
select = 
    PSPRK001,
    PSPRK002,
    PSPRK003,
    PSPRK004

About

A flake8 plugin that detects of usage withColumn in a loop or inside reduce. From the PySpark documentation about withColumn method:

This method introduces a projection internally. Therefore, calling it multiple times, for instance, via loops in order to add multiple columns can generate big plans which can cause performance issues and even StackOverflowException. To avoid this, use select() with multiple columns at once.

What happens under the hood?

When you run a PySpark application the following happens:

  1. Spark creates Unresolved Logical Plan that is a result of parsing SQL
  2. Spark do analysis of this plan to create an Analyzed Logical Plan
  3. Spark apply optimization rules to create an Optimized Logical Plan

spark-flow

What is the problem with withColumn? It creates a single node in the unresolved plan. So, calling withColumn 500 times will create an unresolved plan with 500 nodes. During the analysis Spark should visit each node to check that column exists and has a right data type. After that Spark will start applying rules, but rules are applyed once per plan recursively, so concatenation of 500 calls to withColumn will require 500 applies of the corresponding rule. All of that may significantly increase the amount of time from Unresolved Logical Plan to Optimized Logical Plan:

bechmark

From the other side, both withColumns and select(*cols) create only one node in the plan doesn't matter how many columns we want to add.

Rules

This plugin contains the following rules:

  • PSPRK001: Usage of withColumn in a loop detected
  • PSPRK002: Usage of withColumn inside reduce is detected
  • PSPRK003: Usage of withColumnRenamed in a loop detected
  • PSPRK004: Usage of withColumnRenamed inside reduce is detected

Examples

Let's imagine we want to apply an ML model to our data but our Model expects double values and our table contain decimal values. The goal is to cast all Decimal columns to Double.

Implementation with withColumn (bad example):

def cast_to_double(df: DataFrame) -> DataFrame:
  for field in df.schema.fields:
    if isinstance(field.dataType, DecimalType):
      df = df.withColumn(field.name, col(field.name).cast(DoubleType()))
  return df

Implementation without withColumn (good example):

def cast_to_double(df: DataFrame) -> DataFrame:
  cols_to_select = []
  for field in df.schema.fields:
    if isinstance(field.dataType, DecimalType):
      cols_to_select.append(col(field.name).cast(DoubleType()).alias(field.name))
    else:
      cols_to_select.append(col(field.name))
  return df.select(*cols_to_select)

Usage

flake8 %your-code-here%

screenshot of how it works

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flake8_pyspark_with_column-0.0.6.tar.gz (8.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flake8_pyspark_with_column-0.0.6-py2.py3-none-any.whl (8.2 kB view details)

Uploaded Python 2Python 3

File details

Details for the file flake8_pyspark_with_column-0.0.6.tar.gz.

File metadata

File hashes

Hashes for flake8_pyspark_with_column-0.0.6.tar.gz
Algorithm Hash digest
SHA256 68780e1af6488e340ce6e2b48bd43d625dccb4e31a9241421863c85a73f95798
MD5 f527a72eff6ec42c8d8513a0bd125099
BLAKE2b-256 b1e2403a982e601ad41c547672220a818655a8c6e27d0cd7ebb3c13a13b391a9

See more details on using hashes here.

Provenance

The following attestation bundles were made for flake8_pyspark_with_column-0.0.6.tar.gz:

Publisher: python-publish.yml on SemyonSinchenko/flake8-pyspark-with-column

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file flake8_pyspark_with_column-0.0.6-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for flake8_pyspark_with_column-0.0.6-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 2ae59b72609c421edb4f5027ce8edbce65cbcebcd3ee9ae746d2f979a007fa92
MD5 ee7a7e83d679bafd655b8b0329b6e3be
BLAKE2b-256 8a12ab99dbd73ccfcae2b89fa0157b5cf21da192b4f2a84abb472ecc4e02f2ce

See more details on using hashes here.

Provenance

The following attestation bundles were made for flake8_pyspark_with_column-0.0.6-py2.py3-none-any.whl:

Publisher: python-publish.yml on SemyonSinchenko/flake8-pyspark-with-column

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page