Skip to main content

Utility classes for comfy Spark job authoriing.

Project description

Introduction

Code style: black

Utility functions and classes for working with Dataframes, provisioning SparkSession and much more.

Core features:

  • Provisioning Spark session with some routine settings set in advance, including Delta Lake configuration. You must have delta-core jars in class path for this to work.
  • Spark job argument wrappers, allowing to specify job inputs for spark.read.format(...).options(...).load(...) and outputs for spark.write.format(...).save(...) in a generic way. Those are exposed as source and target built-in arguments (see example below).

Consider a simple Spark Job that reads json data from source and stores it as parquet in target. This job can be defined using spark-utils as below:

from spark_utils.common.spark_job_args import SparkJobArgs
from spark_utils.common.spark_session_provider import SparkSessionProvider


def main(args=None):
    """
     Job entrypoint
    :param args:
    :return:
    """
    spark_args = SparkJobArgs().parse(args)

    source_table = spark_args.source('json_source')
    target_table = spark_args.output('parquet_target')

    # Spark session and hadoop FS
    spark_session = SparkSessionProvider().get_session()
    df = spark_session.read.format(source_table.data_format).load(source_table.data_path)
    df.write.format(target_table.data_format).save(target_table.data_path)

You can also provision Spark Session using Kubernetes API server as a resource manager. Use Java options from the example below for Java 17 installations:

from spark_utils.common.spark_session_provider import SparkSessionProvider
from spark_utils.models.k8s_config import SparkKubernetesConfig

config = {
    'spark.local.dir': '/tmp',
    'spark.driver.extraJavaOptions': "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:InitiatingHeapOccupancyPercent=35 -XX:OnOutOfMemoryError='kill -9 %p' -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.util.stream=ALL-UNNAMED",
    'spark.executor.extraJavaOptions': "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:InitiatingHeapOccupancyPercent=35 -XX:OnOutOfMemoryError='kill -9 %p' -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.util.stream=ALL-UNNAMED",
    'spark.executor.instances': '5'
}

spc = SparkKubernetesConfig(
  application_name='test',
  k8s_namespace='my-spark-namespace',
  spark_image='myregistry.io/spark:v3.3.1',
  executor_node_affinity={
    'kubernetes.mycompany.com/sparknodetype': 'worker', 
    'kubernetes.azure.com/scalesetpriority': 'spot'
  },
  executor_name_prefix='spark-k8s-test'
)
ssp = SparkSessionProvider(additional_configs=config).configure_for_k8s(
  master_url='https://my-k8s-cluster.mydomain.io',
  spark_config=spc
)

spark_session = ssp.get_session()

Now we can call this job directly or with spark-submit. Note that you must have spark-utils in PYTHONPATH before running the script:

spark-submit --master local[*] --deploy-mode client --name simpleJob ~/path/to/main.py --source 'json_source|file://tmp/test_json/*|json' --output 'parquet_target|file://tmp/test_parquet/*|parquet'
  • Job argument encryption is supported. This functionality requires an encryption key to be present in a cluster environment variable RUNTIME_ENCRYPTION_KEY. The only supported algorithm now is fernet. You can declare an argument as encrypted using new_encrypted_arg function. You then must pass an encrypted value to the declared argument, which will be decrypted by spark-utils when a job is executed and passed to the consumer.

For example, you can pass sensitive spark configuration (storage access keys, hive database passwords etc.) encrypted:

import json

from spark_utils.common.spark_job_args import SparkJobArgs
from spark_utils.common.spark_session_provider import SparkSessionProvider


def main(args=None):
    spark_args = SparkJobArgs()
        .new_encrypted_arg("--custom-config", type=str, default=None,
                           help="Optional spark configuration flags to pass. Will be treated as an encrypted value.")
        .parse(args)

    spark_session = SparkSessionProvider(
        additional_configs=json.loads(
            spark_args.parsed_args.custom_config) if spark_args.parsed_args.custom_config else None).get_session()

    ...
  • Delta Lake utilities
    • Table publishing to Hive Metastore.
    • Delta OSS compaction with row count / file optimization target.
  • Models for common data operations like data copying etc. Note that actual code for those operations will be migrated to this repo a bit later.
  • Utility functions for common data operations, for example, flattening parent-child hierarchy, view concatenation, column name clear etc.

There are so many possibilities with this project - please feel free to open an issue / PR adding new capabilities or fixing those nasty bugs!

Getting Started

Spark Utils must be installed on your cluster or virtual env that Spark is using Python interpreter from:

pip install spark-utils

Build and Test

Test pipeline runs Spark in local mode, so everything can be tested against our current runtime. Update the image used in build.yaml if you require a test against a different runtime version.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

spark_utils-1.3.2.tar.gz (18.8 kB view details)

Uploaded Source

Built Distribution

spark_utils-1.3.2-py3-none-any.whl (33.9 kB view details)

Uploaded Python 3

File details

Details for the file spark_utils-1.3.2.tar.gz.

File metadata

  • Download URL: spark_utils-1.3.2.tar.gz
  • Upload date:
  • Size: 18.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.11.11 Linux/6.8.0-1021-azure

File hashes

Hashes for spark_utils-1.3.2.tar.gz
Algorithm Hash digest
SHA256 1c2967996567f4bb7ce2566f724644de1d3f85ff55fd3bc5979ff6b0dd59bc96
MD5 9af27b96d6f191c2306c28bb40e6aa26
BLAKE2b-256 e16eb6d95618b9fc16ba99d4eccefdbfa2adabe658c66b703a58c09abb1472f2

See more details on using hashes here.

File details

Details for the file spark_utils-1.3.2-py3-none-any.whl.

File metadata

  • Download URL: spark_utils-1.3.2-py3-none-any.whl
  • Upload date:
  • Size: 33.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.11.11 Linux/6.8.0-1021-azure

File hashes

Hashes for spark_utils-1.3.2-py3-none-any.whl
Algorithm Hash digest
SHA256 ab6446a139e52b52f4f2353e1aa419ca27110a08e22e43e5012c65cdf7c4eceb
MD5 6e47dd49321bf4aed423a9365212a24d
BLAKE2b-256 f06a9fce00f717ce6efd57cfa7cc8fae226b95941ed3dcda15f3f7513a701db0

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page