Skip to main content

Helper for handling PySpark DataFrame partition size 📑🎛️

Project description

RepartiPy

RepartiPy helps you to elaborately handle PySpark DataFrame partition size.

Possible Use Cases

  • Repartition your DataFrame precisely, without knowing the whole DataFrame size (i.e. Dynamic Repartition)
  • Estimate your DataFrame size with more accuracy

Why RepartiPy

Although Spark SizeEstimator can be used to estimate a DataFrame size, it is not accurate sometimes. RepartiPy uses Spark's execution plan statistics in order to provide a roundabout way. It suggests two approaches to achieve this:

  • reaprtipy.SizeEstimator
  • reaprtipy.SamplingSizeEstimator

reaprtipy.SizeEstimator

Recommended when your executor resource (memory) is affordable to cache the whole DataFrame. SizeEstimator just simply caches the whole Dataframe into the memory and extract the execution plan statistics.

repartipy.SamplingSizeEstimator

Recommended when your executor resource (memory) is NOT affordable to cache the whole dataframe. SamplingSizeEstimator uses 'disk write and re-read (HDFS)' approach behind the scene for two reasons:

  1. Prevent double read from the source like S3, which might be inefficient -> better performance
  2. Reduce partition skewness by reading data again on purpose (leverage MaxPartitionBytes) -> better sampling result

Therefore, you must have HDFS settings on your cluster and enough disk space.

This may not be accurate compared to SizeEstimator due to sampling. If you want more accurate results, tune the sample_count option properly. Additionally, this approach will be slower than SizeEstimator as SamplingSizeEstimator requires disk I/O and additional logics.

How To Use

Setup

pip install repartipy

Prerequisite

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
input_data = [
        (1, "Seoul"),
        (2, "Busan"),
    ]
df = spark.createDataFrame(data=input_data, schema=["id", "location"])

get_desired_partition_count()

Calculate ideal number of partitions for a DataFrame

SizeEstimator will suggest desired_partition_count, so that each partition can have desired_partition_size_in_bytes (default: 1GiB) after repartition. reproduce() produces exactly the same df, but internally reproduced by SizeEstimator for better performance. SizeEstimator reproduces df from Memory (Cache). SamplingSizeEstimator reproduces df from Disk (HDFS).

with SizeEstimator

import repartipy

one_gib_in_bytes = 1073741824

with repartipy.SizeEstimator(spark=spark, df=df) as se:
    desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)
    
    se.reproduce().repartition(desired_partition_count).write.save("your/write/path")
    # or 
    se.reproduce().coalesce(desired_partition_count).write.save("your/write/path")

with SamplingSizeEstimator

import repartipy
    
one_gib_in_bytes = 1073741824

with repartipy.SamplingSizeEstimator(spark=spark, df=df, sample_count=10) as se:
    desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)
    
    se.reproduce().repartition(desired_partition_count).write.save("your/write/path")
    # or 
    se.reproduce().coalesce(desired_partition_count).write.save("your/write/path")

estimate()

Estimate size of a DataFrame

with SizeEstimator

import repartipy

with repartipy.SizeEstimator(spark=spark, df=df) as se:
    df_size_in_bytes = se.estimate()

with SamplingSizeEstimator

import repartipy

with repartipy.SamplingSizeEstimator(spark=spark, df=df, sample_count=10) as se:
    df_size_in_bytes = se.estimate()

Benchmark

Overall, there appears to be a slight performance loss when employing RepartiPy. This benchmark compares the running time of spark jobs in the following two cases to give a rough estimate:

  • Static Repartition (repartition without RepartiPy)
# e.g.
df.repartition(123).write.save("your/write/path")
  • Dynamic Repartition (repartition with RepartiPy)
# e.g.
with repartipy.SizeEstimator(spark=spark, df=df) as se:
    desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)
    se.reproduce().repartition(desired_partition_count).write.save("your/write/path")

All the other conditions remain the same except the usage of RepartiPy.

Note

Benchmark results provided are for brief reference only, not absolute. Actual performance metrics can vary depending on your own circumstances (e.g. your data, your spark code, your cluster resources, ...).

SizeEstimator

  • DataFrame Size ~= 256 MiB (decompressed size)
Static Dynamic
Running Time 8.5 min 8.6 min

SamplingSizeEstimator

  • DataFrame Size ~= 241 GiB (decompressed size)
Static Dynamic
Running Time 14 min 16 min

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

repartipy-0.1.8.tar.gz (10.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

repartipy-0.1.8-py3-none-any.whl (10.9 kB view details)

Uploaded Python 3

File details

Details for the file repartipy-0.1.8.tar.gz.

File metadata

  • Download URL: repartipy-0.1.8.tar.gz
  • Upload date:
  • Size: 10.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.12.1 Darwin/22.6.0

File hashes

Hashes for repartipy-0.1.8.tar.gz
Algorithm Hash digest
SHA256 c81752d36e942cf5659ffeea8fa891e72fb770084f0bef5bb6b5232bd4d41c44
MD5 6768cb1e712ca19334583af6d2ed7e6a
BLAKE2b-256 13d5a6ad73055174b2bc4c037047d38d39e69178d3cf24374755255dec1f82f4

See more details on using hashes here.

File details

Details for the file repartipy-0.1.8-py3-none-any.whl.

File metadata

  • Download URL: repartipy-0.1.8-py3-none-any.whl
  • Upload date:
  • Size: 10.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.12.1 Darwin/22.6.0

File hashes

Hashes for repartipy-0.1.8-py3-none-any.whl
Algorithm Hash digest
SHA256 286ea8e612b88deadf687c8764d9b34e494e8781bf19612555bf6809a5696256
MD5 ce07bdf56d638cfb1a0396aaeeb9a806
BLAKE2b-256 840b50f43386e382df837c5bcb2fe31cbb3aa0d92d558c55d15a176ba82d2682

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page