Skip to main content

Utility classes for comfy Spark job authoriing.

Project description

Introduction

Code style: black

Utility functions and classes for working with Dataframes, provisioning SparkSession and much more.

Core features:

  • Provisioning Spark session with some routine settings set in advance, including Delta Lake configuration. You must have delta-core jars in class path for this to work.
  • Spark job argument wrappers, allowing to specify job inputs for spark.read.format(...).options(...).load(...) and outputs for spark.write.format(...).save(...) in a generic way. Those are exposed as source and target built-in arguments (see example below).

Consider a simple Spark Job that reads json data from source and stores it as parquet in target. This job can be defined using spark-utils as below:

from spark_utils.common.spark_job_args import SparkJobArgs
from spark_utils.common.spark_session_provider import SparkSessionProvider


def main(args=None):
    """
     Job entrypoint
    :param args:
    :return:
    """
    spark_args = SparkJobArgs().parse(args)

    source_table = spark_args.source('json_source')
    target_table = spark_args.output('parquet_target')

    # Spark session and hadoop FS
    spark_session = SparkSessionProvider().get_session()
    df = spark_session.read.format(source_table.data_format).load(source_table.data_path)
    df.write.format(target_table.data_format).save(target_table.data_path)

You can also provision Spark Session using Kubernetes API server as a resource manager. Use Java options from the example below for Java 17 installations:

from spark_utils.common.spark_session_provider import SparkSessionProvider
from spark_utils.models.k8s_config import SparkKubernetesConfig

config = {
    'spark.local.dir': '/tmp',
    'spark.driver.extraJavaOptions': "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:InitiatingHeapOccupancyPercent=35 -XX:OnOutOfMemoryError='kill -9 %p' -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.util.stream=ALL-UNNAMED",
    'spark.executor.extraJavaOptions': "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:InitiatingHeapOccupancyPercent=35 -XX:OnOutOfMemoryError='kill -9 %p' -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.util.stream=ALL-UNNAMED",
    'spark.executor.instances': '5'
}

spc = SparkKubernetesConfig(
  application_name='test',
  k8s_namespace='my-spark-namespace',
  spark_image='myregistry.io/spark:v3.3.1',
  executor_node_affinity={
    'kubernetes.mycompany.com/sparknodetype': 'worker', 
    'kubernetes.azure.com/scalesetpriority': 'spot'
  },
  executor_name_prefix='spark-k8s-test'
)
ssp = SparkSessionProvider(additional_configs=config).configure_for_k8s(
  master_url='https://my-k8s-cluster.mydomain.io',
  spark_config=spc
)

spark_session = ssp.get_session()

Now we can call this job directly or with spark-submit. Note that you must have spark-utils in PYTHONPATH before running the script:

spark-submit --master local[*] --deploy-mode client --name simpleJob ~/path/to/main.py --source 'json_source|file://tmp/test_json/*|json' --output 'parquet_target|file://tmp/test_parquet/*|parquet'
  • Job argument encryption is supported. This functionality requires an encryption key to be present in a cluster environment variable RUNTIME_ENCRYPTION_KEY. The only supported algorithm now is fernet. You can declare an argument as encrypted using new_encrypted_arg function. You then must pass an encrypted value to the declared argument, which will be decrypted by spark-utils when a job is executed and passed to the consumer.

For example, you can pass sensitive spark configuration (storage access keys, hive database passwords etc.) encrypted:

import json

from spark_utils.common.spark_job_args import SparkJobArgs
from spark_utils.common.spark_session_provider import SparkSessionProvider


def main(args=None):
    spark_args = SparkJobArgs()
        .new_encrypted_arg("--custom-config", type=str, default=None,
                           help="Optional spark configuration flags to pass. Will be treated as an encrypted value.")
        .parse(args)

    spark_session = SparkSessionProvider(
        additional_configs=json.loads(
            spark_args.parsed_args.custom_config) if spark_args.parsed_args.custom_config else None).get_session()

    ...
  • Delta Lake utilities
    • Table publishing to Hive Metastore.
    • Delta OSS compaction with row count / file optimization target.
  • Models for common data operations like data copying etc. Note that actual code for those operations will be migrated to this repo a bit later.
  • Utility functions for common data operations, for example, flattening parent-child hierarchy, view concatenation, column name clear etc.

There are so many possibilities with this project - please feel free to open an issue / PR adding new capabilities or fixing those nasty bugs!

Getting Started

Spark Utils must be installed on your cluster or virtual env that Spark is using Python interpreter from:

pip install spark-utils

Build and Test

Test pipeline runs Spark in local mode, so everything can be tested against our current runtime. Update the image used in build.yaml if you require a test against a different runtime version.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

spark_utils-1.3.0.tar.gz (18.7 kB view details)

Uploaded Source

Built Distribution

spark_utils-1.3.0-py3-none-any.whl (33.0 kB view details)

Uploaded Python 3

File details

Details for the file spark_utils-1.3.0.tar.gz.

File metadata

  • Download URL: spark_utils-1.3.0.tar.gz
  • Upload date:
  • Size: 18.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.4 CPython/3.11.10 Linux/6.5.0-1025-azure

File hashes

Hashes for spark_utils-1.3.0.tar.gz
Algorithm Hash digest
SHA256 db38fc7c23408033a3e615545ad340177489a08d2a11dbfb6e0598406691df96
MD5 910c0b374acf53fec9e10a7c3b3f45e2
BLAKE2b-256 cfaff342c6fccdd6a6835d6f1b384afdcdbfad90a6ff3145fc798fda7a3a29e3

See more details on using hashes here.

File details

Details for the file spark_utils-1.3.0-py3-none-any.whl.

File metadata

  • Download URL: spark_utils-1.3.0-py3-none-any.whl
  • Upload date:
  • Size: 33.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.4 CPython/3.11.10 Linux/6.5.0-1025-azure

File hashes

Hashes for spark_utils-1.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 bd237aad37e00c2d613be9bd48a08b0d76f24f9961ba982e115b3b2456df54b8
MD5 dc105be2cfffcba26be31ad89743a930
BLAKE2b-256 c66ccbc08e969dc65cb225585ff483a698da19d3894b3ded633109ee4fc6feb5

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page