Utility classes for comfy Spark job authoriing.
Project description
Introduction
Utility functions and classes for working with Dataframes, provisioning SparkSession and much more.
Core features:
- Provisioning Spark session with some routine settings set in advance, including Delta Lake configuration. You must have delta-core jars in class path for this to work.
- Spark job argument wrappers, allowing to specify job inputs for
spark.read.format(...).options(...).load(...)
and outputs forspark.write.format(...).save(...)
in a generic way. Those are exposed assource
andtarget
built-in arguments (see example below).
Consider a simple Spark Job that reads json
data from source
and stores it as parquet
in target
. This job can be
defined using spark-utils
as below:
from spark_utils.common.spark_job_args import SparkJobArgs
from spark_utils.common.spark_session_provider import SparkSessionProvider
def main(args=None):
"""
Job entrypoint
:param args:
:return:
"""
spark_args = SparkJobArgs().parse(args)
source_table = spark_args.source('json_source')
target_table = spark_args.output('parquet_target')
# Spark session and hadoop FS
spark_session = SparkSessionProvider().get_session()
df = spark_session.read.format(source_table.data_format).load(source_table.data_path)
df.write.format(target_table.data_format).save(target_table.data_path)
You can also provision Spark Session using Kubernetes API server as a resource manager. Use Java options from the example below for Java 17 installations:
from spark_utils.common.spark_session_provider import SparkSessionProvider
from spark_utils.models.k8s_config import SparkKubernetesConfig
config = {
'spark.local.dir': '/tmp',
'spark.driver.extraJavaOptions': "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:InitiatingHeapOccupancyPercent=35 -XX:OnOutOfMemoryError='kill -9 %p' -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.util.stream=ALL-UNNAMED",
'spark.executor.extraJavaOptions': "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:InitiatingHeapOccupancyPercent=35 -XX:OnOutOfMemoryError='kill -9 %p' -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.util.stream=ALL-UNNAMED",
'spark.executor.instances': '5'
}
spc = SparkKubernetesConfig(
application_name='test',
k8s_namespace='my-spark-namespace',
spark_image='myregistry.io/spark:v3.3.1',
executor_node_affinity={
'kubernetes.mycompany.com/sparknodetype': 'worker',
'kubernetes.azure.com/scalesetpriority': 'spot'
},
executor_name_prefix='spark-k8s-test'
)
ssp = SparkSessionProvider(additional_configs=config).configure_for_k8s(
master_url='https://my-k8s-cluster.mydomain.io',
spark_config=spc
)
spark_session = ssp.get_session()
Now we can call this job directly or with spark-submit
. Note that you must have spark-utils
in PYTHONPATH before
running the script:
spark-submit --master local[*] --deploy-mode client --name simpleJob ~/path/to/main.py --source 'json_source|file://tmp/test_json/*|json' --output 'parquet_target|file://tmp/test_parquet/*|parquet'
- Job argument encryption is supported. This functionality requires an encryption key to be present in a cluster
environment variable
RUNTIME_ENCRYPTION_KEY
. The only supported algorithm now isfernet
. You can declare an argument as encrypted usingnew_encrypted_arg
function. You then must pass an encrypted value to the declared argument, which will be decrypted byspark-utils
when a job is executed and passed to the consumer.
For example, you can pass sensitive spark configuration (storage access keys, hive database passwords etc.) encrypted:
import json
from spark_utils.common.spark_job_args import SparkJobArgs
from spark_utils.common.spark_session_provider import SparkSessionProvider
def main(args=None):
spark_args = SparkJobArgs()
.new_encrypted_arg("--custom-config", type=str, default=None,
help="Optional spark configuration flags to pass. Will be treated as an encrypted value.")
.parse(args)
spark_session = SparkSessionProvider(
additional_configs=json.loads(
spark_args.parsed_args.custom_config) if spark_args.parsed_args.custom_config else None).get_session()
...
- Delta Lake utilities
- Table publishing to Hive Metastore.
- Delta OSS compaction with row count / file optimization target.
- Models for common data operations like data copying etc. Note that actual code for those operations will be migrated to this repo a bit later.
- Utility functions for common data operations, for example, flattening parent-child hierarchy, view concatenation, column name clear etc.
There are so many possibilities with this project - please feel free to open an issue / PR adding new capabilities or fixing those nasty bugs!
Getting Started
Spark Utils must be installed on your cluster or virtual env that Spark is using Python interpreter from:
pip install spark-utils
Build and Test
Test pipeline runs Spark in local mode, so everything can be tested against our current runtime. Update the image used
in build.yaml
if you require a test against a different runtime version.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file spark_utils-1.3.0.tar.gz
.
File metadata
- Download URL: spark_utils-1.3.0.tar.gz
- Upload date:
- Size: 18.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.4 CPython/3.11.10 Linux/6.5.0-1025-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | db38fc7c23408033a3e615545ad340177489a08d2a11dbfb6e0598406691df96 |
|
MD5 | 910c0b374acf53fec9e10a7c3b3f45e2 |
|
BLAKE2b-256 | cfaff342c6fccdd6a6835d6f1b384afdcdbfad90a6ff3145fc798fda7a3a29e3 |
File details
Details for the file spark_utils-1.3.0-py3-none-any.whl
.
File metadata
- Download URL: spark_utils-1.3.0-py3-none-any.whl
- Upload date:
- Size: 33.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.4 CPython/3.11.10 Linux/6.5.0-1025-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | bd237aad37e00c2d613be9bd48a08b0d76f24f9961ba982e115b3b2456df54b8 |
|
MD5 | dc105be2cfffcba26be31ad89743a930 |
|
BLAKE2b-256 | c66ccbc08e969dc65cb225585ff483a698da19d3894b3ded633109ee4fc6feb5 |