TPCDS_PySpark is a TPC-DS workload generator implemented in Python designed to run at scale using Apache Spark.
Project description
TPCDS_PySpark is a TPC-DS workload generator written in Python and designed to run at scale using Apache Spark.
A key feature of this tool is that it collects and reports performance metrics using sparkMeasure,
a performance monitoring library for Apache Spark.
Motivations and goals
- Set up a Spark performance lab
- Run TPC-DS workloads at scale and study Spark performance
- Learn about collecting and analyzing Spark performance data, including timing and metrics measurements
- Learn about Spark performance and optimization
- Compare performance across different Spark configurations and system configurations
Author and contact: Luca.Canali@cern.ch
Getting started
You can start using TPCDS_PySpark by running the tool as a standalone Python script, from the command line, or by using the TPCDS class to run TPCDS workloads from your Python code. The tool runs also on notebooks, for example Colab.
- Get started Python script: getstarted.py
- Get started Notebooks:
TPCDS_PySpark get-started on Colab
TPCDS_PySpark get-started
Installation and requirements:
pip install tpcds_pyspark
pip install pyspark
pip install sparkmeasure
pip install pandas
Command line:
# Download the test data
wget https://sparkdltrigger.web.cern.ch/sparkdltrigger/TPCDS/tpcds_10.zip
unzip -q tpcds_10.zip
# 1. Run the tool for a minimal test
tpcds_pyspark_run.py -d tpcds_10 -n 1 -r 1 --queries q1,q2
# 2. run all queries with default options
./tpcds_pyspark_run.py -d tpcds_10
# 3. A more complex example, run all queries on a YARN cluster and save the metrics to a file
spark-submit --master yarn --conf spark.log.level=error --conf spark.executor.cores=8 --conf spark.executor.memory=32g --conf spark.driver.memory=4g --conf spark.driver.extraClassPath=tpcds_pyspark/spark-measure_2.12-0.23.jar \
--conf spark.dynamicAllocation.enabled=false --conf spark.executor.instances=4 tpcds_pyspark_run.py -d HDFS_PATH/tpcds_100 -o ./tpcds_100_out.cvs
API mode:
# Get the tool
# pip install tpcds_pyspark
# Download the test data
wget https://sparkdltrigger.web.cern.ch/sparkdltrigger/TPCDS/tpcds_10.zip
unzip -q tpcds_10.zip
from tpcds_pyspark import TPCDS
tpcds = TPCDS(num_runs=1, queries_repeat_times=1, queries=['q1','q2'])
tpcds.map_tables()
tpcds.run_TPCDS()
tpcds.print_test_results()
Usage:
Use it as a script or as an API from Python. Script:
tpcds_pyspark_run.py --help
options:
-h, --help show this help message and exit
--data_path DATA_PATH, -d DATA_PATH
Path to the data folder with TPCDS data used for testing. Default: tpcds_10
--data_format DATA_FORMAT
Data format of the data used for testing. Default: parquet
--num_runs NUM_RUNS, -n NUM_RUNS
Number of runs, the TPCS workload will be run this number of times. Default: 2
--queries_repeat_times QUERIES_REPEAT_TIMES, -r QUERIES_REPEAT_TIMES
Number of repetitions, each query will be run this number of times for each run. Default: 3
--sleep_time SLEEP_TIME, -s SLEEP_TIME
Time in seconds to sleep before each query execution. Default: 1
--queries QUERIES, -q QUERIES
List of TPCDS queries to run. Default: all
--queries_exclude QUERIES_EXCLUDE, -x QUERIES_EXCLUDE
List of queries to exclude from the running loop. Default: None
--output_file OUTPUT_FILE, -o OUTPUT_FILE
Optional output file, this will contain the collected metrics details in csv format
--cluster_output_file CLUSTER_OUTPUT_FILE, -c CLUSTER_OUTPUT_FILE
Optional, save the collected metrics to a csv file using Spark, use this to save to HDFS or S3
--run_using_metastore
Run TPCDS using tables defined in metastore tables instead of temporary views. See also --create_metastore_tables to define the tables.
Default: False
--create_metastore_tables
Create metastore tables instead of using temporary views. Default: False
--create_metastore_tables_and_compute_statistics
Create metastore tables and compute table statistics to use with Spark CBO. Default: False
Use TPCDS PySpark as an API from Python:
- Use the TPCDS class to run TPCDS workloads from your Python code:
pip install tpcds_pyspark
from tpcds_pyspark import TPCDS
API description: TPCDS
- TPCDS(data_path, data_format, num_runs=2, queries_repeat_times, queries, sleep_time)
- Defaults: data_path="./tpcds_10", data_format="parquet", num_runs=2, queries_repeat_times=3, queries=tpcds_queries, queries_exclude=[], sleep_time=1
- data_path: path to the Parquet folder with TPCDS data used for testing
- data_format: format of the TPCDS data, default: "parquet"
- num_runs: number of runs, the TPCS workload will be run this number of times. Default: 2
- queries_repeat_times: number of repetitions, each query will be run this number of times for each run. Default: 3
- queries: list of TPCDS queries to run
- queries_exclude: list of queries to exclude from the running loop
- sleep_time: time in seconds to sleep before each query execution. Default: 1
- Example: tpcds = TPCDS(data_path="tpcds_10", queries=['q1', 'q2'])
TPCDS main class methods:
- map_tables: map the TPCDS tables to the Spark catalog
- map_tables(self, define_temporary_views=True, define_catalog_tables=False):
- this is a required step before running the TPCDS workload
- Example: tpcds.map_tables()
- run_TPCDS: run the TPCDS workload
- as side effect it populates the following class attributes: self.metadata, self.grouped, self.aggregated
- Example: results = tpcds.run_TPCDS()
- print_test_results(output_file=None): print the collected and aggregated metrics to stdout or to a file on the local filesystem containing the metadata, metrics grouped by query name and agregated metrics
- save_with_spark: save the collected metrics to a cluster filesystem (HDFS, S3) using Spark
- save_with_spark(file_path):
- Example: tpcds.save_with_spark("HDFS_or_S3_path/my_test_metrics.csv")
- compute_table_statistics: compute table statistics for the TPCDS tables (optional)
- compute_table_statistics(collect_column_statistics=True)
- use only when mapping tables to the Spark catalog (metastore) and when the statistics are not available
- Example: tpcds.compute_table_statistics()
Output
- The tool will print to stdout the collected metrics, including timing and metrics measurements.
- It will also print metadata, metrics grouped by query name and aggregated metrics
- Save the collected metrics to a local csv files:
-o my_test_metrics.csv
- this will save 4 files: the raw metrics, metadata, metrics grouped by query name, and aggregated metrics
- Optionally, save the collected metrics to a cluster filesystem (HDFS, S3) using Spark:
--cluster_output_file PATH/my_test_metrics.csv
Download TPCDS Data
The tool requires TPCDS benchmark data in parquet or other format. For convenience the TPCDS benchmark data at scale 10G can be downloaded:
# TPCDS scale 10G
wget https://sparkdltrigger.web.cern.ch/sparkdltrigger/TPCDS/tpcds_10.zip
unzip -q tpcds_10.zip
# TPCDS scale 100G
wget https://sparkdltrigger.web.cern.ch/sparkdltrigger/TPCDS/tpcds_100.zip
unzip tpcds_100.zip
Generate TPCDS data with a configurable scale factor
- You can generate Spark TPCDS benchmark data at any scale using the following steps:
- Download and build the Spark package from https://github.com/databricks/spark-sql-perf
- Download and build tpcds-kit for generating data from https://github.com/databricks/tpcds-kit
Source, labs and examples
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for TPCDS_PySpark-1.0.5-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | f861744ba8f7a61ca9908c713000d2157e14a3e31b171a740ac05cf8c7bf4839 |
|
MD5 | 2ff1a58d17c473d60571ef843c5402a7 |
|
BLAKE2b-256 | 17d4b9e6ed0b504b6b18de0367522d9cc36d57cec955f62b06508a2ef0f6dde6 |