Utility classes for comfy Spark job authoriing.

Project description


Utility functions and classes for working with Dataframes, provisioning SparkSession and much more.

Core features:

  • Provisioning Spark session with some routine settings set in advance, including Delta Lake configuration. You must have delta-core jars in class path for this to work.
  • Spark job argument wrappers, allowing to specify job inputs for and outputs for spark.write.format(...).save(...) in a generic way. Those are exposed as source and target built-in arguments (see example below).

Consider a simple Spark Job that reads json data from source and stores it as parquet in target. This job can be defined using spark-utils as below:

from spark_utils.common.spark_job_args import SparkJobArgs
from spark_utils.common.spark_session_provider import SparkSessionProvider

def main(args=None):
     Job entrypoint
    :param args:
    spark_args = SparkJobArgs().parse(args)

    source_table = spark_args.source('json_source')
    target_table = spark_args.output('parquet_target')

    # Spark session and hadoop FS
    spark_session = SparkSessionProvider().get_session()
    df =

You can also provision Spark Session using Kubernetes API server as a resource manager. Use Java options from the example below for Java 17 installations:

from spark_utils.common.spark_session_provider import SparkSessionProvider
from spark_utils.models.k8s_config import SparkKubernetesConfig

config = {
    'spark.local.dir': '/tmp',
    'spark.driver.extraJavaOptions': "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:InitiatingHeapOccupancyPercent=35 -XX:OnOutOfMemoryError='kill -9 %p' -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/ --add-opens=java.base/ --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/ --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/ --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.base/",
    'spark.executor.extraJavaOptions': "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:InitiatingHeapOccupancyPercent=35 -XX:OnOutOfMemoryError='kill -9 %p' -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/ --add-opens=java.base/ --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/ --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/ --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.base/",
    'spark.executor.instances': '5'

spc = SparkKubernetesConfig(
    '': 'worker', 
    '': 'spot'
ssp = SparkSessionProvider(additional_configs=config).configure_for_k8s(

spark_session = ssp.get_session()

Now we can call this job directly or with spark-submit. Note that you must have spark-utils in PYTHONPATH before running the script:

spark-submit --master local[*] --deploy-mode client --name simpleJob ~/path/to/ --source 'json_source|file://tmp/test_json/*|json' --output 'parquet_target|file://tmp/test_parquet/*|parquet'
  • Job argument encryption is supported. This functionality requires an encryption key to be present in a cluster environment variable RUNTIME_ENCRYPTION_KEY. The only supported algorithm now is fernet. You can declare an argument as encrypted using new_encrypted_arg function. You then must pass an encrypted value to the declared argument, which will be decrypted by spark-utils when a job is executed and passed to the consumer.

For example, you can pass sensitive spark configuration (storage access keys, hive database passwords etc.) encrypted:

import json

from spark_utils.common.spark_job_args import SparkJobArgs
from spark_utils.common.spark_session_provider import SparkSessionProvider

def main(args=None):
    spark_args = SparkJobArgs()
        .new_encrypted_arg("--custom-config", type=str, default=None,
                           help="Optional spark configuration flags to pass. Will be treated as an encrypted value.")

    spark_session = SparkSessionProvider(
            spark_args.parsed_args.custom_config) if spark_args.parsed_args.custom_config else None).get_session()

  • Delta Lake utilities
    • Table publishing to Hive Metastore.
    • Delta OSS compaction with row count / file optimization target.
  • Models for common data operations like data copying etc. Note that actual code for those operations will be migrated to this repo a bit later.
  • Utility functions for common data operations, for example, flattening parent-child hierarchy, view concatenation, column name clear etc.

There are so many possibilities with this project - please feel free to open an issue / PR adding new capabilities or fixing those nasty bugs!

Getting Started

Spark Utils must be installed on your cluster or virtual env that Spark is using Python interpreter from:

pip install spark-utils

Build and Test

Test pipeline runs Spark in local mode, so everything can be tested against our current runtime. Update the image used in build.yaml if you require a test against a different runtime version.

