Utility classes for comfy Spark job authoriing.
Project description
Introduction
Utility functions and classes for working with Dataframes, provisioning SparkSession and much more.
Core features:
- Provisioning Spark session with some routine settings set in advance, including Delta Lake configuration. You must have delta-core jars in class path for this to work.
- Spark job argument wrappers, allowing to specify job inputs for
spark.read.format(...).options(...).load(...)
and outputs forspark.write.format(...).save(...)
in a generic way. Those are exposed assource
andtarget
built-in arguments (see example below).
Consider a simple Spark Job that reads json
data from source
and stores it as parquet
in target
. This job can be defined using spark-utils
as below:
from spark_utils.common.spark_job_args import SparkJobArgs
from spark_utils.common.spark_session_provider import SparkSessionProvider
def main(args=None):
"""
Job entrypoint
:param args:
:return:
"""
spark_args = SparkJobArgs()
.parse(args)
source_table = spark_args.source('json_source')
target_table = spark_args.output('parquet_target')
# Spark session and hadoop FS
spark_session = SparkSessionProvider().get_session()
df = spark_session.read.format(source_table.data_format).load(source_table.data_path)
df.write.format(target_table.data_format).save(target_table.data_path)
Now we can call this job directly or with spark-submit
. Note that you must have spark-utils
in PYTHONPATH before running the script:
spark-submit --master local[*] --deploy-mode client --name simpleJob ~/path/to/main.py --source 'json_source|file://tmp/test_json/*|json' --output 'parquet_target|file://tmp/test_parquet/*|parquet'
- Job argument encryption is supported. This functionality requires an encryption key to be present in a cluster environment variable
RUNTIME_ENCRYPTION_KEY
. The only supported algorithm now isfernet
. You can declare an argument as encrypted usingnew_encrypted_arg
function. You then must pass an encrypted value to the declared argument, which will be decrypted byspark-utils
when a job is executed and passed to the consumer.
For example, you can pass sensitive spark configuration (storage access keys, hive database passwords etc.) encrypted:
import json
from spark_utils.common.spark_job_args import SparkJobArgs
from spark_utils.common.spark_session_provider import SparkSessionProvider
def main(args=None):
spark_args = SparkJobArgs()
.new_encrypted_arg("--custom-config", type=str, default=None,
help="Optional spark configuration flags to pass. Will be treated as an encrypted value.")
.parse(args)
spark_session = SparkSessionProvider(
additional_configs=json.loads(
spark_args.parsed_args.custom_config) if spark_args.parsed_args.custom_config else None).get_session()
...
- Delta Lake utilities
- Table publishing to Hive Metastore.
- Delta OSS compaction with row count / file optimization target.
- Models for common data operations like data copying etc. Note that actual code for those operations will be migrated to this repo a bit later.
- Utility functions for common data operations, for example, flattening parent-child hierarchy, view concatenation, column name clear etc.
There are so many possibilities with this project - please feel free to open an issue / PR adding new capabilities or fixing those nasty bugs!
Getting Started
Spark Utils must be installed on your cluster or virtual env that Spark is using Python interpreter from:
pip install spark-utils
Build and Test
Test pipeline runs Spark in local mode, so everything can be tested against our current runtime. Update the image used in build.yaml
if you require a test against a different runtime version.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for spark_utils-0.7.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 40a2ac7ccbaefda4dcf85fe750aa83cb9724828c4d23a2a042432e87578aa1eb |
|
MD5 | 205459eeaca6f036b7f6b7287d14bec8 |
|
BLAKE2b-256 | ebfcf2148800fa694ade9f367cf82570b93c5619b1117313f4d1d9fab76d86ed |