Skip to main content

This project helps us to run Data Quality Rules in flight while spark job is being run

Project description

Spark-Expectations

CodeQL build codecov Code style: black Checked with mypy License PYPI version PYPI - Downloads PYPI - Python Version

Spark Expectations is a specialized tool designed with the primary goal of maintaining data integrity within your processing pipeline. By identifying and preventing malformed or incorrect data from reaching the target destination, it ensues that only quality data is passed through. Any erroneous records are not simply ignored but are filtered into a separate error table, allowing for detailed analysis and reporting. Additionally, Spark Expectations provides valuable statistical data on the filtered content, empowering you with insights into your data quality.


The documentation for spark-expectations can be found here

Contributors

Thanks to all the contributors who have helped ideate, develop and bring it to its current state

Contributing

We're delighted that you're interested in contributing to our project! To get started, please carefully read and follow the guidelines provided in our contributing document

What is Spark Expectations?

Spark Expectations is a Data quality framework built in PySpark as a solution for the following problem statements:

  1. The existing data quality tools validates the data in a table at rest and provides the success and error metrics. Users need to manually check the metrics to identify the error records
  2. The error data is not quarantined to an error table or there are no corrective actions taken to send only the valid data to downstream
  3. Users further downstream must consume the same data incorrectly, or they must perform additional calculations to eliminate records that don't comply with the data quality rules.
  4. Another process is required as a corrective action to rectify the errors in the data and lot of planning is usually required for this activity

Spark Expectations solves these issues using the following principles:

  1. All the records which fail one or more data quality rules, are by default quarantined in an _error table along with the metadata on rules that failed, job information etc. This makes it easier for analysts or product teams to view the incorrect data and collaborate with the teams responsible for correcting and reprocessing it.
  2. Aggregated metrics are provided for the raw data and the cleansed data for each run along with the required metadata to prevent recalculation or computation.
  3. The data that doesn't meet the data quality contract or the standards is not moved to the next level or iterations unless or otherwise specified.

Features Of Spark Expectations

Please find the spark-expectations flow and feature diagrams below

Spark - Expectations Setup

Configurations

In order to establish the global configuration parameter for DQ Spark Expectations, you must define and complete the required fields within a variable. This involves creating a variable and ensuring that all the necessary information is provided in the appropriate fields.

from spark_expectations.config.user_config import Constants as user_config

se_user_conf = {
    user_config.se_notifications_enable_email: False,
    user_config.se_notifications_email_smtp_host: "mailhost.nike.com",
    user_config.se_notifications_email_smtp_port: 25,
    user_config.se_notifications_email_from: "<sender_email_id>",
    user_config.se_notifications_email_to_other_nike_mail_id: "<receiver_email_id's>",
    user_config.se_notifications_email_subject: "spark expectations - data quality - notifications", 
    user_config.se_notifications_enable_slack: True,
    user_config.se_notifications_slack_webhook_url: "<slack-webhook-url>", 
    user_config.se_notifications_on_start: True, 
    user_config.se_notifications_on_completion: True,
    user_config.se_notifications_on_fail: True,
    user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True, 
    user_config.se_notifications_on_error_drop_threshold: 15,
    #Optional
    #Below two params are optional and need to be enabled to capture the detailed stats in the <stats_table_name>_detailed.
    #user_config.enable_query_dq_detailed_result: True,
    #user_config.enable_agg_dq_detailed_result: True,
    
}

Spark Expectations Initialization

For all the below examples the below import and SparkExpectations class instantiation is mandatory

  1. Instantiate SparkExpectations class which has all the required functions for running data quality rules
from spark_expectations.core.expectations import SparkExpectations, WrappedDataFrameWriter
from pyspark.sql import SparkSession

spark: SparkSession = SparkSession.builder.getOrCreate()
writer = WrappedDataFrameWriter().mode("append").format("delta")
# writer = WrappedDataFrameWriter().mode("append").format("iceberg")
# product_id should match with the "product_id" in the rules table
se: SparkExpectations = SparkExpectations(
    product_id="your_product",
    rules_df=spark.table("dq_spark_local.dq_rules"),
    stats_table="dq_spark_local.dq_stats",
    stats_table_writer=writer,
    target_and_error_table_writer=writer,
    debugger=False,
    # stats_streaming_options={user_config.se_enable_streaming: False},
)
  1. Decorate the function with @se.with_expectations decorator
from spark_expectations.config.user_config import *
from pyspark.sql import DataFrame
import os


@se.with_expectations(
    target_table="dq_spark_local.customer_order",
    write_to_table=True,
    user_conf=se_user_conf,
    target_table_view="order",
)
def build_new() -> DataFrame:
    # Return the dataframe on which Spark-Expectations needs to be run
    _df_order: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
    )
    _df_order.createOrReplaceTempView("order")

    return _df_order 

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

spark_expectations-2.2.1.tar.gz (995.2 kB view details)

Uploaded Source

Built Distribution

spark_expectations-2.2.1-py3-none-any.whl (1.0 MB view details)

Uploaded Python 3

File details

Details for the file spark_expectations-2.2.1.tar.gz.

File metadata

  • Download URL: spark_expectations-2.2.1.tar.gz
  • Upload date:
  • Size: 995.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.12

File hashes

Hashes for spark_expectations-2.2.1.tar.gz
Algorithm Hash digest
SHA256 1618faf22dc7012f338a0932f967c9b7a411536263b2729d0f25c8f7c32bd6b7
MD5 e5448f2540f59f0ad9a111bc7488a86b
BLAKE2b-256 bad40c39b58b78b9d416e3b03bf50efbdb91244379bcfb1db9f722d00d5d0ec9

See more details on using hashes here.

File details

Details for the file spark_expectations-2.2.1-py3-none-any.whl.

File metadata

File hashes

Hashes for spark_expectations-2.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 ca551d21fcfd1452897ecda5acce150b4f70d0ce3412375eaf40fa3116305517
MD5 878feac7a0553814c876f8f47aba3d28
BLAKE2b-256 58e7c43b25d71cfe3faf4d0c94adfee5c2ed034fa1ebfc714c8eb04f3b0e81be

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page