Skip to main content

A Flake8 plugin to check for PySpark withColumn usage in loops

Project description

Flake8-pyspark-with-column

Upload Python Package PyPI - Downloads

Getting started

pip install flake8-pyspark-with-column
flake8 --select PSRPK001,PSPRT002,PSPRK003,PSPRK004

Alternatively you can add the following tox.ini file to the root of your project:

[flake8]
select = 
    PSPRK001,
    PSPRK002,
    PSPRK003,
    PSPRK004

About

A flake8 plugin that detects of usage withColumn in a loop or inside reduce. From the PySpark documentation about withColumn method:

This method introduces a projection internally. Therefore, calling it multiple times, for instance, via loops in order to add multiple columns can generate big plans which can cause performance issues and even StackOverflowException. To avoid this, use select() with multiple columns at once.

What happens under the hood?

When you run a PySpark application the following happens:

  1. Spark creates Unresolved Logical Plan that is a result of parsing SQL
  2. Spark do analysis of this plan to create an Analyzed Logical Plan
  3. Spark apply optimization rules to create an Optimized Logical Plan

spark-flow

What is the problem with withColumn? It creates a single node in the unresolved plan. So, calling withColumn 500 times will create an unresolved plan with 500 nodes. During the analysis Spark should visit each node to check that column exists and has a right data type. After that Spark will start applying rules, but rules are applyed once per plan recursively, so concatenation of 500 calls to withColumn will require 500 applies of the corresponding rule. All of that may significantly increase the amount of time from Unresolved Logical Plan to Optimized Logical Plan:

bechmark

From the other side, both withColumns and select(*cols) create only one node in the plan doesn't matter how many columns we want to add.

Rules

This plugin contains the following rules:

  • PSPRK001: Usage of withColumn in a loop detected
  • PSPRK002: Usage of withColumn inside reduce is detected
  • PSPRK003: Usage of withColumnRenamed in a loop detected
  • PSPRK004: Usage of withColumnRenamed inside reduce is detected

Examples

Let's imagine we want to apply an ML model to our data but our Model expects double values and our table contain decimal values. The goal is to cast all Decimal columns to Double.

Implementation with withColumn (bad example):

def cast_to_double(df: DataFrame) -> DataFrame:
  for field in df.schema.fields:
    if isinstance(field.dataType, DecimalType):
      df = df.withColumn(field.name, col(field.name).cast(DoubleType()))
  return df

Implementation without withColumn (good example):

def cast_to_double(df: DataFrame) -> DataFrame:
  cols_to_select = []
  for field in df.schema.fields:
    if isinstance(field.dataType, DecimalType):
      cols_to_select.append(col(field.name).cast(DoubleType()).alias(field.name))
    else:
      cols_to_select.append(col(field.name))
  return df.select(*cols_to_select)

Usage

flake8 %your-code-here%

screenshot of how it works

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flake8_pyspark_with_column-0.0.4.tar.gz (8.6 kB view details)

Uploaded Source

Built Distribution

flake8_pyspark_with_column-0.0.4-py2.py3-none-any.whl (8.2 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file flake8_pyspark_with_column-0.0.4.tar.gz.

File metadata

File hashes

Hashes for flake8_pyspark_with_column-0.0.4.tar.gz
Algorithm Hash digest
SHA256 14e221e267aaaa570a302c05111944985e9f69669f0a974afb8594468dcc117a
MD5 dc9fd1d85c2cb4b7ba1892d3680c520d
BLAKE2b-256 27fed64091f77768945bc16d281bb261a261a78968a0ecdb661ac8ca0f296e2e

See more details on using hashes here.

File details

Details for the file flake8_pyspark_with_column-0.0.4-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for flake8_pyspark_with_column-0.0.4-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 c3ca754f8fe3888244b73c837ccdafecefef2fbdd6f002983cde5c7a5d9013f1
MD5 5c7eb9fcc39525908583bc5d5c7a34ed
BLAKE2b-256 8a63af4e5ec2d63b93ad6ac3c1a89396dde995cadd1916394af01512a691f97c

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page