Skip to main content

Mock a datalake easily to be able to test your pyspark data application

Project description

pyspark-data-mocker

pyspark-data-mocker is a testing tool that facilitates the burden of setting up a desired datalake, so you can test easily the behavior of your data application. It configures also the spark session to optimize it for testing purpose.

Install

pip install pyspark-data-mocker

Usage

pyspark-data-mocker searches the directory you provide in order to seek and load files that can be interpreted as tables, storing them inside the datalake. That datalake will contain certain databases depending on the folders inside the root directory. For example, let's take a look into the basic_datalake

$ tree tests/data/basic_datalake -n --charset=ascii  # byexample: +rm=~
tests/data/basic_datalake
|-- bar
|   |-- courses.csv
|   `-- students.csv
`-- foo
    `-- exams.csv
~
2 directories, 3 files

This file hierarchy will be respected in the further datalake when loaded: each sub-folder will be considered as spark database, and each file will be loaded as table, using the filename to name the table.

How can we load them using pyspark-data-mocker? Really simple!

>>> from pyspark_data_mocker import DataLakeBuilder
>>> builder = DataLakeBuilder.load_from_dir("./tests/data/basic_datalake")  # byexample: +timeout=20 +pass

And that's it! you will now have in that execution context a datalake with the structure defined in the folder basic_datalake. Let's take a closer look by running some queries.

>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.getOrCreate()
>>> spark.sql("SHOW DATABASES").show()
+---------+
|namespace|
+---------+
|      bar|
|  default|
|      foo|
+---------+

We have the default database (which came for free when instantiating spark), and the two folders inside tests/data/basic_datalake: bar and foo.

>>> spark.sql("SHOW TABLES IN bar").show()
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|      bar|  courses|      false|
|      bar| students|      false|
+---------+---------+-----------+

>>> spark.sql("SELECT * FROM bar.courses").show()
+---+------------+
| id| course_name|
+---+------------+
|  1|Algorithms 1|
|  2|Algorithms 2|
|  3|  Calculus 1|
+---+------------+


>>> spark.table("bar.students").show()
+---+----------+---------+--------------------+------+
| id|first_name|last_name|               email|gender|
+---+----------+---------+--------------------+------+
|  1|  Shirleen|  Dunford|sdunford0@amazona...|Female|
|  2|      Niko|  Puckrin|npuckrin1@shinyst...|  Male|
|  3|    Sergei|   Barukh|sbarukh2@bizjourn...|  Male|
|  4|       Sal|  Maidens|smaidens3@senate.gov|  Male|
|  5|    Cooper|MacGuffie| cmacguffie4@ibm.com|  Male|
+---+----------+---------+--------------------+------+

Note how it is already filled with the data each CSV file has! The tool supports all kind of files: csv, parquet, csv, json. The application will infer which format to use by looking the file extension.

>>> spark.sql("SHOW TABLES IN foo").show()
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|      foo|    exams|      false|
+---------+---------+-----------+

>>> spark.table("foo.exams").show()
+---+----------+---------+----------+----+
| id|student_id|course_id|      date|note|
+---+----------+---------+----------+----+
|  1|         1|        1|2022-05-01|   9|
|  2|         2|        1|2022-05-08|   7|
|  3|         3|        1|2022-06-17|   4|
|  4|         1|        3|2023-05-12|   9|
|  5|         2|        3|2023-05-12|  10|
|  6|         3|        3|2022-12-07|   7|
|  7|         4|        3|2022-12-07|   4|
|  8|         5|        3|2022-12-07|   2|
|  9|         1|        2|2023-05-01|   5|
| 10|         2|        2|2023-05-07|   8|
+---+----------+---------+----------+----+

Cleanup

You can easily clean the datalake by using the cleanup function

>>> builder.cleanup()
>>> spark.sql("SHOW DATABASES").show()
+---------+
|namespace|
+---------+
|  default|
+---------+

Configuration

pyspark-data-mocker has a default spark configuration that optimize tests executions.

>>> spark_conf = spark.conf
>>> spark_conf.get("spark.app.name")
'test'
>>> spark_conf.get("spark.master")
'local[1]'
>>> spark_conf.get("spark.sql.warehouse.dir")
'/tmp/tmp<...>/spark_warehouse'
>>> spark_conf.get("spark.sql.shuffle.partitions")
'1'

>>> spark_conf.get("spark.ui.showConsoleProgress")
'false'

>>> spark_conf.get("spark.ui.enabled")
'false'
>>> spark_conf.get("spark.ui.dagGraph.retainedRootRDDs")
'1'
>>> spark_conf.get("spark.ui.retainedJobs")
'1'
>>> spark_conf.get("spark.ui.retainedStages")
'1'
>>> spark_conf.get("spark.ui.retainedTasks")
'1'
>>> spark_conf.get("spark.sql.ui.retainedExecutions")
'1'
>>> spark_conf.get("spark.worker.ui.retainedExecutors")
'1'
>>> spark_conf.get("spark.worker.ui.retainedDrivers")
'1'

>>> spark_conf.get("spark.sql.catalogImplementation")
'in-memory'

To better understand what these configuration means and why it is configured like this, you can take a look on Sergey Ivanychev's excellent research on "Faster PySpark Unit Test"

Some of these configurations can be overridden by providing a config yaml file. For example

$ cat /tmp/custom_config.yaml
app_name: test_complete
number_of_cores: 4
enable_hive: True
warehouse_dir: /tmp/full_delta_lake
delta_configuration:
    scala_version: '2.12'
    delta_version: '2.0.2'
    snapshot_partitions: 2
    log_cache_size: 3

Let's digest each value and what it controls:

config name type description default value
number_of_cores INTEGER change the amount of CPU cores The spark session will use 1
enable_hive BOOL Enables the usage of Apache Hive's catalog false
warehouse_dir STRING If set, it will create a persistent directory where the wharehouse will live. By default pyspark_data_mocker uses a TemporaryDirectory that will exists as long the builder instance exists tempfile.TemporaryDirectory()
delta_configuration DELTA_CONFIG If set, it will enable Delta Lake framework None

Among the things you can change when enabling Delta capabilities are:

config name type description
scala_version STRING Version of Scala that the spark session will use. Thake into consideration that the scala version MUST be compatible with the Delta-core version used
delta_version STRING Version of delta core used. The version used highly depends on the pyspark version
snapshot_partitions INTEGER Tells delta how should the partitions be done
log_cache_size INTEGER Limits the Delta log cache

For the delta configuration, take into consideration that ALL VALUES should be explicitly set-up, there is no default value for each one of them.

To use a custom configuration, you can pass a string or pathlib.Path optional argument to load_from_dir.

>>> builder = DataLakeBuilder.load_from_dir("./tests/data/basic_datalake", "/tmp/custom_config.yaml")  # byexample: +timeout=20
<...>
>>> spark_conf = SparkSession.builder.getOrCreate().conf
>>> spark_conf.get("spark.app.name")
'test_complete'
>>> spark_conf.get("spark.master")
'local[4]'
>>> spark_conf.get("spark.sql.warehouse.dir")
'/tmp/full_delta_lake/spark_warehouse'

>>> spark_conf.get("spark.jars.packages")
'io.delta:delta-core_2.12:2.0.2'
>>> spark_conf.get("spark.sql.extensions")
'io.delta.sql.DeltaSparkSessionExtension'
>>> spark_conf.get("spark.databricks.delta.snapshotPartitions")
'2'
>>> spark_conf.get("spark.sql.catalog.spark_catalog")
'org.apache.spark.sql.delta.catalog.DeltaCatalog'

>>> spark_conf.get("spark.sql.catalogImplementation")
'hive'

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyspark_data_mocker-0.1.2.tar.gz (23.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyspark_data_mocker-0.1.2-py3-none-any.whl (23.9 kB view details)

Uploaded Python 3

File details

Details for the file pyspark_data_mocker-0.1.2.tar.gz.

File metadata

  • Download URL: pyspark_data_mocker-0.1.2.tar.gz
  • Upload date:
  • Size: 23.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.8.18 Linux/5.15.0-1041-azure

File hashes

Hashes for pyspark_data_mocker-0.1.2.tar.gz
Algorithm Hash digest
SHA256 d217a36a1ad300a9a1762c3a99d4e803114b0e74891c0b01e63c39c9058cdb5b
MD5 3ec88d8856f78229e064976fa5835fd0
BLAKE2b-256 62cd25b8acd2410fa8e327d8237f98e35f9beb8e18d72e39903a4c1afabe7d5b

See more details on using hashes here.

File details

Details for the file pyspark_data_mocker-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: pyspark_data_mocker-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 23.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.8.18 Linux/5.15.0-1041-azure

File hashes

Hashes for pyspark_data_mocker-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 81c8aa31de8a2d31c9bd761d850e36c56873eed6521fbb874d6999496e10b8bf
MD5 53921b36ddde5355eb9e2b953a0aaf85
BLAKE2b-256 9b4ca20a18096bc3aa0e1642af3447fd7734ce48c58af7b87ecfee377dfd428f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page