Skip to main content

TPCDS_PySpark is a TPC-DS workload generator implemented in Python designed to run at scale using Apache Spark.

Project description

TPCDS_PySpark is a TPC-DS workload generator written in Python and designed to run at scale using Apache Spark.
A key feature of this tool is that it collects and reports performance metrics using sparkMeasure, a performance monitoring library for Apache Spark.

Motivations and goals

  • Set up a Spark performance lab
    • Run TPC-DS workloads at scale and study Spark performance
    • Learn about collecting and analyzing Spark performance data, including timing and metrics measurements
    • Learn about Spark performance and optimization
  • Compare performance across different Spark configurations and system configurations

Author and contact: Luca.Canali@cern.ch

Getting started

You can start using TPCDS_PySpark by running the tool as a standalone Python script, from the command line, or by using the TPCDS class to run TPCDS workloads from your Python code. The tool runs also on notebooks, for example Colab.

  1. Get started Python script: getstarted.py
  2. Get started Notebooks:
    TPCDS_PySpark get-started on Colab
    TPCDS_PySpark get-started

Installation and requirements:

pip install tpcds_pyspark
pip install pyspark
pip install sparkmeasure
pip install pandas

Command line:

# Download the test data
wget https://sparkdltrigger.web.cern.ch/sparkdltrigger/TPCDS/tpcds_10.zip
unzip -q tpcds_10.zip

# 1. Run the tool for a minimal test
tpcds_pyspark_run.py -d tpcds_10 -n 1 -r 1 --queries q1,q2

# 2. run all queries with default options
./tpcds_pyspark_run.py -d tpcds_10 

# 3. A more complex example, run all queries on a YARN cluster and save the metrics to a file
spark-submit --master yarn --conf spark.log.level=error  --conf spark.executor.cores=8              --conf spark.executor.memory=32g --conf spark.driver.memory=4g              --conf spark.driver.extraClassPath=tpcds_pyspark/spark-measure_2.12-0.23.jar \ 
             --conf spark.dynamicAllocation.enabled=false --conf spark.executor.instances=4               tpcds_pyspark_run.py -d HDFS_PATH/tpcds_100 -o ./tpcds_100_out.cvs

API mode:

# Get the tool
# pip install tpcds_pyspark 

# Download the test data
wget https://sparkdltrigger.web.cern.ch/sparkdltrigger/TPCDS/tpcds_10.zip
unzip -q tpcds_10.zip

from tpcds_pyspark import TPCDS

tpcds = TPCDS(num_runs=1, queries_repeat_times=1, queries=['q1','q2'])
tpcds.map_tables()

tpcds.run_TPCDS()
tpcds.print_test_results()

Usage:

Use it as a script or as an API from Python. Script:

tpcds_pyspark_run.py --help

options:
  -h, --help            show this help message and exit
  --data_path DATA_PATH, -d DATA_PATH
                        Path to the data folder with TPCDS data used for testing. Default: tpcds_10
  --data_format DATA_FORMAT
                        Data format of the data used for testing. Default: parquet
  --num_runs NUM_RUNS, -n NUM_RUNS
                        Number of runs, the TPCS workload will be run this number of times. Default: 2
  --queries_repeat_times QUERIES_REPEAT_TIMES, -r QUERIES_REPEAT_TIMES
                        Number of repetitions, each query will be run this number of times for each run. Default: 3
  --sleep_time SLEEP_TIME, -s SLEEP_TIME
                        Time in seconds to sleep before each query execution. Default: 1
  --queries QUERIES, -q QUERIES
                        List of TPCDS queries to run. Default: all
  --queries_exclude QUERIES_EXCLUDE, -x QUERIES_EXCLUDE
                        List of queries to exclude from the running loop. Default: None
  --output_file OUTPUT_FILE, -o OUTPUT_FILE
                        Optional output file, this will contain the collected metrics details in csv format
  --cluster_output_file CLUSTER_OUTPUT_FILE, -c CLUSTER_OUTPUT_FILE
                        Optional, save the collected metrics to a csv file using Spark, use this to save to HDFS or S3
  --run_using_metastore
                        Run TPCDS using tables defined in metastore tables instead of temporary views. See also --create_metastore_tables to define the tables.
                        Default: False
  --create_metastore_tables
                        Create metastore tables instead of using temporary views. Default: False
  --create_metastore_tables_and_compute_statistics
                        Create metastore tables and compute table statistics to use with Spark CBO. Default: False

Use TPCDS PySpark as an API from Python:

  • Use the TPCDS class to run TPCDS workloads from your Python code:
    • pip install tpcds_pyspark
    • from tpcds_pyspark import TPCDS

API description: TPCDS

  • TPCDS(data_path, data_format, num_runs=2, queries_repeat_times, queries, sleep_time)
    • Defaults: data_path="./tpcds_10", data_format="parquet", num_runs=2, queries_repeat_times=3, queries=tpcds_queries, queries_exclude=[], sleep_time=1
  • data_path: path to the Parquet folder with TPCDS data used for testing
  • data_format: format of the TPCDS data, default: "parquet"
  • num_runs: number of runs, the TPCS workload will be run this number of times. Default: 2
  • queries_repeat_times: number of repetitions, each query will be run this number of times for each run. Default: 3
  • queries: list of TPCDS queries to run
  • queries_exclude: list of queries to exclude from the running loop
  • sleep_time: time in seconds to sleep before each query execution. Default: 1
  • Example: tpcds = TPCDS(data_path="tpcds_10", queries=['q1', 'q2'])

TPCDS main class methods:

  • map_tables: map the TPCDS tables to the Spark catalog
    • map_tables(self, define_temporary_views=True, define_catalog_tables=False):
    • this is a required step before running the TPCDS workload
    • Example: tpcds.map_tables()
  • run_TPCDS: run the TPCDS workload
    • as side effect it populates the following class attributes: self.metadata, self.grouped, self.aggregated
    • Example: results = tpcds.run_TPCDS()
  • print_test_results(output_file=None): print the collected and aggregated metrics to stdout or to a file on the local filesystem containing the metadata, metrics grouped by query name and agregated metrics
  • save_with_spark: save the collected metrics to a cluster filesystem (HDFS, S3) using Spark
    • save_with_spark(file_path):
    • Example: tpcds.save_with_spark("HDFS_or_S3_path/my_test_metrics.csv")
  • compute_table_statistics: compute table statistics for the TPCDS tables (optional)
    • compute_table_statistics(collect_column_statistics=True)
    • use only when mapping tables to the Spark catalog (metastore) and when the statistics are not available
    • Example: tpcds.compute_table_statistics()

Output

  • The tool will print to stdout the collected metrics, including timing and metrics measurements.
  • It will also print metadata, metrics grouped by query name and aggregated metrics
  • Save the collected metrics to a local csv files: -o my_test_metrics.csv
    • this will save 4 files: the raw metrics, metadata, metrics grouped by query name, and aggregated metrics
  • Optionally, save the collected metrics to a cluster filesystem (HDFS, S3) using Spark: --cluster_output_file PATH/my_test_metrics.csv

Download TPCDS Data

The tool requires TPCDS benchmark data in parquet or other format. For convenience the TPCDS benchmark data at scale 10G can be downloaded:

# TPCDS scale 10G
wget https://sparkdltrigger.web.cern.ch/sparkdltrigger/TPCDS/tpcds_10.zip
unzip -q tpcds_10.zip

# TPCDS scale 100G
wget https://sparkdltrigger.web.cern.ch/sparkdltrigger/TPCDS/tpcds_100.zip
unzip tpcds_100.zip

Generate TPCDS data with a configurable scale factor

Source, labs and examples

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

TPCDS_PySpark-1.0.5.tar.gz (186.0 kB view details)

Uploaded Source

Built Distribution

TPCDS_PySpark-1.0.5-py2.py3-none-any.whl (331.5 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file TPCDS_PySpark-1.0.5.tar.gz.

File metadata

  • Download URL: TPCDS_PySpark-1.0.5.tar.gz
  • Upload date:
  • Size: 186.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.11.5

File hashes

Hashes for TPCDS_PySpark-1.0.5.tar.gz
Algorithm Hash digest
SHA256 91f12e4e2c5b612f06e61dbe1c417876f901ed463b410bbbccad5e61f07a327d
MD5 771f70d90e6b9c2d73770c614a8c97fd
BLAKE2b-256 e7ae04dffe26c0915ca3cbf716edec8a2de46e055f82503bf2306bf86dc5358d

See more details on using hashes here.

File details

Details for the file TPCDS_PySpark-1.0.5-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for TPCDS_PySpark-1.0.5-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 f861744ba8f7a61ca9908c713000d2157e14a3e31b171a740ac05cf8c7bf4839
MD5 2ff1a58d17c473d60571ef843c5402a7
BLAKE2b-256 17d4b9e6ed0b504b6b18de0367522d9cc36d57cec955f62b06508a2ef0f6dde6

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page