Skip to main content

A library that provides useful extensions to Apache Spark.

Project description

Spark Extension

This project provides extensions to the Apache Spark project in Scala and Python:

Diff: A diff transformation and application for Datasets that computes the differences between two datasets, i.e. which rows to add, delete or change to get from one dataset to the other.

Histogram: A histogram transformation that computes the histogram DataFrame for a value column.

Global Row Number: A withRowNumbers transformation that provides the global row number w.r.t. the current order of the Dataset, or any given order. In contrast to the existing SQL function row_number, which requires a window spec, this transformation provides the row number across the entire Dataset without scaling problems.

Inspect Parquet files: The structure of Parquet files (the metadata, not the data stored in Parquet) can be inspected similar to parquet-tools or parquet-cli by reading from a simple Spark data source. This simplifies identifying why some Parquet files cannot be split by Spark into scalable partitions.

Install Python packages into PySpark job: Install Python dependencies via PIP or Poetry programatically into your running PySpark job (PySpark ≥ 3.1.0):

# noinspection PyUnresolvedReferences
from gresearch.spark import *

# using PIP
spark.install_pip_package("pandas==1.4.3", "pyarrow")
spark.install_pip_package("-r", "requirements.txt")

# using Poetry
spark.install_poetry_project("../my-poetry-project/", poetry_python="../venv-poetry/bin/python")

Count null values: count_null(e: Column): an aggregation function like count that counts null values in column e. This is equivalent to calling count(when(e.isNull, lit(1))).

.Net DateTime.Ticks: Convert .Net (C#, F#, Visual Basic) DateTime.Ticks into Spark timestamps, seconds and nanoseconds.

Available methods:
dotnet_ticks_to_timestamp(column_or_name)         # returns timestamp as TimestampType
dotnet_ticks_to_unix_epoch(column_or_name)        # returns Unix epoch seconds as DecimalType
dotnet_ticks_to_unix_epoch_nanos(column_or_name)  # returns Unix epoch nanoseconds as LongType

The reverse is provided by (all return LongType .Net ticks):

timestamp_to_dotnet_ticks(column_or_name)
unix_epoch_to_dotnet_ticks(column_or_name)
unix_epoch_nanos_to_dotnet_ticks(column_or_name)

Spark temporary directory: Create a temporary directory that will be removed on Spark application shutdown.

Example:
# noinspection PyUnresolvedReferences
from gresearch.spark import *

dir = spark.create_temporary_dir("prefix")

Spark job description: Set Spark job description for all Spark jobs within a context.

Example:
from gresearch.spark import job_description, append_job_description

with job_description("parquet file"):
    df = spark.read.parquet("data.parquet")
    with append_job_description("count"):
        count = df.count
    with append_job_description("write"):
        df.write.csv("data.csv")

For details, see the README.md at the project homepage.

Using Spark Extension

PyPi package (local Spark cluster only)

You may want to install the pyspark-extension python package from PyPi into your development environment. This provides you code completion, typing and test capabilities during your development phase.

Running your Python application on a Spark cluster will still require one of the ways below to add the Scala package to the Spark environment.

pip install pyspark-extension==2.15.0.3.4

Note: Pick the right Spark version (here 3.4) depending on your PySpark version.

PySpark API

Start a PySpark session with the Spark Extension dependency (version ≥1.1.0) as follows:

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "uk.co.gresearch.spark:spark-extension_2.12:2.15.0-3.4") \
    .getOrCreate()

Note: Pick the right Scala version (here 2.12) and Spark version (here 3.4) depending on your PySpark version.

PySpark REPL

Launch the Python Spark REPL with the Spark Extension dependency (version ≥1.1.0) as follows:

pyspark --packages uk.co.gresearch.spark:spark-extension_2.12:2.15.0-3.4

Note: Pick the right Scala version (here 2.12) and Spark version (here 3.4) depending on your PySpark version.

PySpark spark-submit

Run your Python scripts that use PySpark via spark-submit:

spark-submit --packages uk.co.gresearch.spark:spark-extension_2.12:2.15.0-3.4 [script.py]

Note: Pick the right Scala version (here 2.12) and Spark version (here 3.4) depending on your Spark version.

Your favorite Data Science notebook

There are plenty of Data Science notebooks around. To use this library, add a jar dependency to your notebook using these Maven coordinates:

uk.co.gresearch.spark:spark-extension_2.12:2.15.0-3.4

Or download the jar and place it on a filesystem where it is accessible by the notebook, and reference that jar file directly.

Check the documentation of your favorite notebook to learn how to add jars to your Spark environment.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyspark_extension-2.15.0.4.0.tar.gz (365.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyspark_extension-2.15.0.4.0-py3-none-any.whl (350.7 kB view details)

Uploaded Python 3

File details

Details for the file pyspark_extension-2.15.0.4.0.tar.gz.

File metadata

  • Download URL: pyspark_extension-2.15.0.4.0.tar.gz
  • Upload date:
  • Size: 365.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pyspark_extension-2.15.0.4.0.tar.gz
Algorithm Hash digest
SHA256 ac3554a09878353ec2c38708fec81cca9cb7870c4b8dfe98a52298b859d695a2
MD5 6b28e150f83f07893d7c28290b3408c9
BLAKE2b-256 7050efa94b37bea96bb7cc5103d8a1ef0460fc96e0b6f238b115b47a77edda35

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyspark_extension-2.15.0.4.0.tar.gz:

Publisher: publish-release.yml on G-Research/spark-extension

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pyspark_extension-2.15.0.4.0-py3-none-any.whl.

File metadata

File hashes

Hashes for pyspark_extension-2.15.0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0908ebea95a4e31e98bab46568eab88c78823a7578284349449eea78118137d8
MD5 fcc1fb3ea1058fc1928eb88aff5baa28
BLAKE2b-256 a6fefb3831ddf431f9b7c9c2e9d2d8e8efb457aa5150b3181556426b83cfede6

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyspark_extension-2.15.0.4.0-py3-none-any.whl:

Publisher: publish-release.yml on G-Research/spark-extension

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page