Skip to main content

Testframework for PySpark DataFrames

Project description

Build Status Version Ruff

pyspark-testframework

The goal of the pyspark-testframework is to provide a simple way to create tests for PySpark DataFrames. The test results are returned in DataFrame format as well.

Tutorial

Let's first create an example pyspark DataFrame

The data will contain the primary keys, street names and house numbers of some addresses.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import functions as F
# Initialize Spark session
spark = SparkSession.builder.appName("PySparkTestFrameworkTutorial").getOrCreate()

# Define the schema
schema = StructType(
    [
        StructField("id", IntegerType(), True),
        StructField("street", StringType(), True),
        StructField("house_number", IntegerType(), True),
    ]
)

# Define the data
data = [
    (1, "Rochussenstraat", 27),
    (2, "Coolsingel", 31),
    (3, "%Witte de Withstraat", 27),
    (4, "Lijnbaan", -3),
    (5, None, 13),
]

df = spark.createDataFrame(data, schema)

df.show(truncate=False)
+---+--------------------+------------+
|id |street              |house_number|
+---+--------------------+------------+
|1  |Rochussenstraat     |27          |
|2  |Coolsingel          |31          |
|3  |%Witte de Withstraat|27          |
|4  |Lijnbaan            |-3          |
|5  |null                |13          |
+---+--------------------+------------+

Import and initialize the DataFrameTester

from testframework.dataquality import DataFrameTester
df_tester = DataFrameTester(
    df=df,
    primary_key="id",
    spark=spark,
)

Import configurable tests

from testframework.dataquality.tests import ValidNumericRange, RegexTest

Initialize the RegexTest to test for valid street names

valid_street_format = RegexTest(
    name="ValidStreetFormat",
    pattern=r"^[A-Z][a-zéèáàëï]*([ -][A-Z]?[a-zéèáàëï]*)*$",
)

Run valid_street_format on the street column using the .test() method of DataFrameTester.

df_tester.test(
    col="street",
    test=valid_street_format,
    nullable=False,  # nullable is False, hence null values are converted to False
    description="Street is in valid Dutch street format.",
).show(truncate=False)
+---+--------------------+-------------------------+
|id |street              |street__ValidStreetFormat|
+---+--------------------+-------------------------+
|1  |Rochussenstraat     |true                     |
|2  |Coolsingel          |true                     |
|3  |%Witte de Withstraat|false                    |
|4  |Lijnbaan            |true                     |
|5  |null                |false                    |
+---+--------------------+-------------------------+

Run the IntegerString test on the number column

By setting the return_failed_rows parameter to True, we can get only the rows that failed the test.

df_tester.test(
    col="house_number",
    test=ValidNumericRange(
        min_value=1,
    ),
    nullable=False,
    # description="House number is in a valid format" # optional, let's not define it for illustration purposes
    return_failed_rows=True,  # only return the failed rows
).show()
+---+------------+-------------------------------+
| id|house_number|house_number__ValidNumericRange|
+---+------------+-------------------------------+
|  4|          -3|                          false|
+---+------------+-------------------------------+

Let's take a look at the test results of the DataFrame using the .results attribute.

df_tester.results.show(truncate=False)
+---+-------------------------+-------------------------------+
|id |street__ValidStreetFormat|house_number__ValidNumericRange|
+---+-------------------------+-------------------------------+
|1  |true                     |true                           |
|2  |true                     |true                           |
|3  |false                    |true                           |
|4  |true                     |false                          |
|5  |false                    |true                           |
+---+-------------------------+-------------------------------+

We can use .descriptions or .descriptions_df to get the descriptions of the tests.


This can be useful for reporting purposes. For example to create reports for the business with more detailed information than just the column name and the test name.
df_tester.descriptions
{'street__ValidStreetFormat': 'Street is in valid Dutch street format.',
 'house_number__ValidNumericRange': 'house_number__ValidNumericRange(min_value=1.0, max_value=inf)'}
df_tester.description_df.show(truncate=False)
+-------------------------------+-------------------------------------------------------------+
|test                           |description                                                  |
+-------------------------------+-------------------------------------------------------------+
|street__ValidStreetFormat      |Street is in valid Dutch street format.                      |
|house_number__ValidNumericRange|house_number__ValidNumericRange(min_value=1.0, max_value=inf)|
+-------------------------------+-------------------------------------------------------------+

Custom tests

Sometimes tests are too specific or complex to be covered by the configurable tests. That's why we can create custom tests and add them to the DataFrameTester object.

Let's do this using a custom test which should tests that every house has a bath room. We'll start by creating a new DataFrame with rooms rather than houses.

rooms = [
    (1,1, "living room"),
    (2,1, "bathroom"),
    (3,1, "kitchen"),
    (4,1, "bed room"),
    (5,2, "living room"),
    (6,2, "bed room"),
    (7,2, "kitchen"),
]

schema_rooms = StructType(
    [   StructField("id", IntegerType(), True),
        StructField("house_id", IntegerType(), True),
        StructField("room", StringType(), True),
    ]
)

room_df = spark.createDataFrame(rooms, schema=schema_rooms)

room_df.show(truncate=False)
+---+--------+-----------+
|id |house_id|room       |
+---+--------+-----------+
|1  |1       |living room|
|2  |1       |bathroom   |
|3  |1       |kitchen    |
|4  |1       |bed room   |
|5  |2       |living room|
|6  |2       |bed room   |
|7  |2       |kitchen    |
+---+--------+-----------+

To create a custom test, we should create a pyspark DataFrame which contains the same primary_key column as the DataFrame to be tested using the DataFrameTester.

Let's create a boolean column that indicates whether the house has a bath room or not.

house_has_bathroom = room_df.groupBy("house_id").agg(
    F.max(F.when(F.col("room") == "bathroom", True).otherwise(False)).alias(
        "has_bathroom"
    )
)

house_has_bathroom.show(truncate=False)
+--------+------------+
|house_id|has_bathroom|
+--------+------------+
|1       |true        |
|2       |false       |
+--------+------------+

We can add this 'custom test' to the DataFrameTester using add_custom_test_result.

In the background, all kinds of data validation checks are done by DataFrameTester to make sure that it fits the requirements to be added to the other test results.

df_tester.add_custom_test_result(
    result=house_has_bathroom.withColumnRenamed("house_id", "id"),
    name="has_bathroom",
    description="House has a bathroom",
    # fillna_value=0, # optional; by default null.
).show(truncate=False)
+---+------------+
|id |has_bathroom|
+---+------------+
|1  |true        |
|2  |false       |
|3  |null        |
|4  |null        |
|5  |null        |
+---+------------+

Despite that the data whether a house has a bath room is not available in the house DataFrame; we can still add the custom test to the DataFrameTester object.

df_tester.results.show(truncate=False)
+---+-------------------------+-------------------------------+------------+
|id |street__ValidStreetFormat|house_number__ValidNumericRange|has_bathroom|
+---+-------------------------+-------------------------------+------------+
|1  |true                     |true                           |true        |
|2  |true                     |true                           |false       |
|3  |false                    |true                           |null        |
|4  |true                     |false                          |null        |
|5  |false                    |true                           |null        |
+---+-------------------------+-------------------------------+------------+
df_tester.descriptions
{'street__ValidStreetFormat': 'Street is in valid Dutch street format.',
 'house_number__ValidNumericRange': 'house_number__ValidNumericRange(min_value=1.0, max_value=inf)',
 'has_bathroom': 'House has a bathroom'}

We can also get a summary of the test results using the .summary attribute.

df_tester.summary.show(truncate=False)
+-------------------------------+-------------------------------------------------------------+-------+--------+-----------------+--------+-----------------+
|test                           |description                                                  |n_tests|n_passed|percentage_passed|n_failed|percentage_failed|
+-------------------------------+-------------------------------------------------------------+-------+--------+-----------------+--------+-----------------+
|street__ValidStreetFormat      |Street is in valid Dutch street format.                      |5      |3       |60.0             |2       |40.0             |
|house_number__ValidNumericRange|house_number__ValidNumericRange(min_value=1.0, max_value=inf)|5      |4       |80.0             |1       |20.0             |
|has_bathroom                   |House has a bathroom                                         |2      |1       |50.0             |1       |50.0             |
+-------------------------------+-------------------------------------------------------------+-------+--------+-----------------+--------+-----------------+

If you want to see all rows that failed any of the tests, you can use the .failed_tests attribute.

df_tester.failed_tests.show(truncate=False)
+---+-------------------------+-------------------------------+------------+
|id |street__ValidStreetFormat|house_number__ValidNumericRange|has_bathroom|
+---+-------------------------+-------------------------------+------------+
|2  |true                     |true                           |false       |
|3  |false                    |true                           |null        |
|4  |true                     |false                          |null        |
|5  |false                    |true                           |null        |
+---+-------------------------+-------------------------------+------------+

Of course, you can also see all rows that passed all tests using the .passed_tests attribute.

df_tester.passed_tests.show(truncate=False)
+---+-------------------------+-------------------------------+------------+
|id |street__ValidStreetFormat|house_number__ValidNumericRange|has_bathroom|
+---+-------------------------+-------------------------------+------------+
|1  |true                     |true                           |true        |
+---+-------------------------+-------------------------------+------------+

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyspark_testframework-2.8.0.tar.gz (31.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyspark_testframework-2.8.0-py3-none-any.whl (21.3 kB view details)

Uploaded Python 3

File details

Details for the file pyspark_testframework-2.8.0.tar.gz.

File metadata

  • Download URL: pyspark_testframework-2.8.0.tar.gz
  • Upload date:
  • Size: 31.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pyspark_testframework-2.8.0.tar.gz
Algorithm Hash digest
SHA256 83d93c0752135a3551a55e0055ce66228f0a0759aa865f2d1c1beb75ecbba33f
MD5 5775212d04c4293811b11fdb5f6a9c83
BLAKE2b-256 c28dd889b98b40a2cbd6375478b90e5698dcb240b5c2c523c937dcbfb02e8f28

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyspark_testframework-2.8.0.tar.gz:

Publisher: publish-to-pypi.yml on woonstadrotterdam/pyspark-testframework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pyspark_testframework-2.8.0-py3-none-any.whl.

File metadata

File hashes

Hashes for pyspark_testframework-2.8.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d0b123f49ed1d765b617a6ee2a3343d51cec17813ff7ee55857a1a8455acd5bd
MD5 3116ddbe3edb7bbd51729bc6ddc6ccd0
BLAKE2b-256 720562b4c25e0efb79f3c449ed578347043888be8f025fb2a41b8c37f5feae8e

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyspark_testframework-2.8.0-py3-none-any.whl:

Publisher: publish-to-pypi.yml on woonstadrotterdam/pyspark-testframework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page