Skip to main content

Testframework for PySpark DataFrames

Project description

Build Status Version Ruff

pyspark-testframework

Work in progress









The goal of the pyspark-testframework is to provide a simple way to create tests for PySpark DataFrames. The test results are returned in DataFrame format as well.

Tutorial

Let's first create an example pyspark DataFrame

The data will contain the primary keys, street names and house numbers of some addresses.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import functions as F
# Initialize Spark session
spark = SparkSession.builder.appName("PySparkTestFrameworkTutorial").getOrCreate()

# Define the schema
schema = StructType(
    [
        StructField("primary_key", IntegerType(), True),
        StructField("street", StringType(), True),
        StructField("house_number", IntegerType(), True),
    ]
)

# Define the data
data = [
    (1, "Rochussenstraat", 27),
    (2, "Coolsingel", 31),
    (3, "%Witte de Withstraat", 27),
    (4, "Lijnbaan", -3),
    (5, None, 13),
]

df = spark.createDataFrame(data, schema)

df.show(truncate=False)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/08/12 11:17:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+-----------+--------------------+------------+
|primary_key|street              |house_number|
+-----------+--------------------+------------+
|1          |Rochussenstraat     |27          |
|2          |Coolsingel          |31          |
|3          |%Witte de Withstraat|27          |
|4          |Lijnbaan            |-3          |
|5          |null                |13          |
+-----------+--------------------+------------+

Import and initialize the DataFrameTester

from testframework.dataquality import DataFrameTester
df_tester = DataFrameTester(
    df=df,
    primary_key="primary_key",
    spark=spark,
)

Import configurable tests

from testframework.dataquality.tests import ValidNumericRange, RegexTest

Initialize the RegexTest to test for valid street names

valid_street_name = RegexTest(
    name="ValidStreetName",
    pattern=r"^[A-Z][a-zéèáàëï]*([ -][A-Z]?[a-zéèáàëï]*)*$",
)

Run valid_street_name on the street column using the .test() method of DataFrameTester.

df_tester.test(
    col="street",
    test=valid_street_name,
    nullable=False,  # nullable, hence null values are converted to True
    description="street contains valid Dutch street name.",
).show(truncate=False)
+-----------+--------------------+-----------------------+
|primary_key|street              |street__ValidStreetName|
+-----------+--------------------+-----------------------+
|1          |Rochussenstraat     |true                   |
|2          |Coolsingel          |true                   |
|3          |%Witte de Withstraat|false                  |
|4          |Lijnbaan            |true                   |
|5          |null                |false                  |
+-----------+--------------------+-----------------------+

Run the IntegerString test on the number column

df_tester.test(
    col="house_number",
    test=ValidNumericRange(
        min_value=0,
    ),
    nullable=True,  # nullable, hence null values are converted to True
    # description is optional, let's not define it for illustration purposes
).show()
+-----------+------------+-------------------------------+
|primary_key|house_number|house_number__ValidNumericRange|
+-----------+------------+-------------------------------+
|          1|          27|                           true|
|          2|          31|                           true|
|          3|          27|                           true|
|          4|          -3|                          false|
|          5|          13|                           true|
+-----------+------------+-------------------------------+

Let's take a look at the test results of the DataFrame using the .results attribute.

df_tester.results.show(truncate=False)
+-----------+-----------------------+-------------------------------+
|primary_key|street__ValidStreetName|house_number__ValidNumericRange|
+-----------+-----------------------+-------------------------------+
|1          |true                   |true                           |
|2          |true                   |true                           |
|3          |false                  |true                           |
|4          |true                   |false                          |
|5          |false                  |true                           |
+-----------+-----------------------+-------------------------------+

We can use .descriptions or .descriptions_df to get the descriptions of the tests.


This can be useful for reporting purposes. For example to create reports for the business with more detailed information than just the column name and the test name.
df_tester.descriptions
{'street__ValidStreetName': 'street contains valid Dutch street name.',
 'house_number__ValidNumericRange': 'house_number__ValidNumericRange(min_value=0.0, max_value=inf)'}
df_tester.description_df.show(truncate=False)
+-------------------------------+-------------------------------------------------------------+
|test                           |description                                                  |
+-------------------------------+-------------------------------------------------------------+
|street__ValidStreetName        |street contains valid Dutch street name.                     |
|house_number__ValidNumericRange|house_number__ValidNumericRange(min_value=0.0, max_value=inf)|
+-------------------------------+-------------------------------------------------------------+

Custom tests

Sometimes tests are too specific or complex to be covered by the configurable tests. That's why we can create custom tests and add them to the DataFrameTester object.

Let's do this using a custom test which should tests that every house has a bath room. We'll start by creating a new DataFrame with rooms rather than houses.

rooms = [
    (1, "living room"),
    (1, "bath room"),
    (1, "kitchen"),
    (1, "bed room"),
    (2, "living room"),
    (2, "bed room"),
    (2, "kitchen"),
]

schema_rooms = StructType(
    [
        StructField("primary_key", IntegerType(), True),
        StructField("room", StringType(), True),
    ]
)

room_df = spark.createDataFrame(rooms, schema=schema_rooms)

room_df.show(truncate=False)
+-----------+-----------+
|primary_key|room       |
+-----------+-----------+
|1          |living room|
|1          |bath room  |
|1          |kitchen    |
|1          |bed room   |
|2          |living room|
|2          |bed room   |
|2          |kitchen    |
+-----------+-----------+

To create a custom test, we should create a pyspark DataFrame which contains the same primary_key column as the DataFrame to be tested using the DataFrameTester.

Let's create a boolean column that indicates whether the house has a bath room or not.

house_has_bath_room = room_df.groupBy("primary_key").agg(
    F.max(F.when(F.col("room") == "bath room", True).otherwise(False)).alias("has_bath_room")
)

house_has_bath_room.show(truncate=False)
+-----------+-------------+
|primary_key|has_bath_room|
+-----------+-------------+
|1          |true         |
|2          |false        |
+-----------+-------------+

We can add this 'custom test' to the DataFrameTester using add_custom_test_result.

In the background, all kinds of data validation checks are done by DataFrameTester to make sure that it fits the requirements to be added to the other test results.

df_tester.add_custom_test_result(
    result=house_has_bath_room,
    name="has_bath_room",
    description="House has a bath room",
    # fillna_value=0, # optional; by default null.
).show(truncate=False)
+-----------+-------------+
|primary_key|has_bath_room|
+-----------+-------------+
|1          |true         |
|2          |false        |
|3          |null         |
|4          |null         |
|5          |null         |
+-----------+-------------+

Despite that the data whether a house has a bath room is not available in the house DataFrame; we can still add the custom test to the DataFrameTester object.

df_tester.results.show(truncate=False)
+-----------+-----------------------+-------------------------------+-------------+
|primary_key|street__ValidStreetName|house_number__ValidNumericRange|has_bath_room|
+-----------+-----------------------+-------------------------------+-------------+
|1          |true                   |true                           |true         |
|2          |true                   |true                           |false        |
|3          |false                  |true                           |null         |
|4          |true                   |false                          |null         |
|5          |false                  |true                           |null         |
+-----------+-----------------------+-------------------------------+-------------+
df_tester.descriptions
{'street__ValidStreetName': 'street contains valid Dutch street name.',
 'house_number__ValidNumericRange': 'house_number__ValidNumericRange(min_value=0.0, max_value=inf)',
 'has_bath_room': 'House has a bath room'}

We can also get a summary of the test results using the .summary attribute.

df_tester.summary.show(truncate=False)
+-------------------------------+-------------------------------------------------------------+-------+--------+-----------------+--------+-----------------+
|test                           |description                                                  |n_tests|n_passed|percentage_passed|n_failed|percentage_failed|
+-------------------------------+-------------------------------------------------------------+-------+--------+-----------------+--------+-----------------+
|street__ValidStreetName        |street contains valid Dutch street name.                     |5      |3.0     |60.0             |2.0     |40.0             |
|house_number__ValidNumericRange|house_number__ValidNumericRange(min_value=0.0, max_value=inf)|5      |4.0     |80.0             |1.0     |20.0             |
|has_bath_room                  |House has a bath room                                        |2      |1.0     |50.0             |1.0     |50.0             |
+-------------------------------+-------------------------------------------------------------+-------+--------+-----------------+--------+-----------------+

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyspark_testframework-2.1.0.tar.gz (27.0 kB view details)

Uploaded Source

Built Distribution

pyspark_testframework-2.1.0-py3-none-any.whl (17.2 kB view details)

Uploaded Python 3

File details

Details for the file pyspark_testframework-2.1.0.tar.gz.

File metadata

  • Download URL: pyspark_testframework-2.1.0.tar.gz
  • Upload date:
  • Size: 27.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.0 CPython/3.12.5

File hashes

Hashes for pyspark_testframework-2.1.0.tar.gz
Algorithm Hash digest
SHA256 19fea98ae05eb7cc42ad54e6b6fee8cc158ea571456f5a0f40dd84877fcdd0a2
MD5 fdeb4a74c9d09583c3ced99bb3d0a35d
BLAKE2b-256 b3bbdc192eafc5d4bd0f9f9ac2fce7e7604b660dcaae2ab4a5a8d3587f3d47da

See more details on using hashes here.

File details

Details for the file pyspark_testframework-2.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for pyspark_testframework-2.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 41d20fbbc5504b7892d4aa8c589fc46fad3c6067f168fd79e6b076d844e37c4d
MD5 a345a9569700029c380e600a50707e98
BLAKE2b-256 10bacd39f6214f72544dba63b982d013f5435760aed62915d71cb8f55a4e130a

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page