Skip to main content

data profiling and basic data quality rules check

Project description

dq-module

dq-module is a tool which can be used to perform validations and profiling on the datasets.This tool is compatible with two run_engines pyspark and polars.

Features

1. Data Validation

This library contains a SingleDatasetQualityCheck() class which can used to validate the dataset against a defined set of rules. This class contains the following rules which can be used to validate the dataset:

  • null_check
  • schema_check
  • range_min_check
  • range_max_check
  • length_check
  • unique_records_check
  • unique_keys_check
  • allowed_values_check
  • min_length_check
  • column_count_check
  • not_allowed_values_check
  • date_range_check
  • regex_check
  • row_count_check

How To Use

import dataqualitycheck as dq
from datetime import date
import time
Step 1: Adding a datasource

We have 4 classes which can be used to connect to a datasource:

  1. AzureBlobDF() - This class can be used to interact with the datasources on azure blob storage .
  2. DatabricksFileSystemDF() - This class can be used to interect with the datasources on databricks filesystem.
  3. DatabricksSqlDF() - This class can be used to interact with the datasources on databricks databases.
  4. LocalFileSystemDF() - This class can be used to interact with the datasources on your local filesystem.

Each of the above class provides the functionalities to read and write from the respective datasources.

Example: Pass the configuration of the blob connector in blob_connector_configand add a datasource by defining a data_read_ob and data_write_ob.

blob_connector_config = {"storage_account_name" : "{storage-account-name}", "container_name" : "cooler-images", "sas_token" : {vaild-sas-token}}
data_read_ob = dq.AzureBlobDF(storage_name=blob_connector_config["storage_account_name"], sas_token=blob_connector_config["sas_token"])
data_write_ob = dq.DatabricksFileSystemDF()

tables_list is a dictionary that contains the list of sources along with the container_name , source_type , layer , source_name , filename , read_connector_method and latest_file_path for the tables on which the validations has to be applied .

#This is optional.It is required when you are calling individual rules.
tables_list = {}
Step 2: Add a DataContext

Instantiate a DataContext by passing tables_list,interaction_between_tables,data_read_ob,data_write_ob, data_right_structure,job_id,time_zone,no_of_partition and output_db_name . You can also pass the run_engine with which you want to apply the quality checks. By default, the run_engine is pyspark.

dq_ob = dq.SingleDatasetQualityCheck(tables_list={}, 
                                     interaction_between_tables=[],  
                                     data_read_ob=data_read_ob, 
                                     data_write_ob=data_write_ob, 
                                     data_right_structure='file',
                                     job_id={pipeline_run_id},
                                     time_zone=None,
                                     output_db_name={database_name_where_report_has_to_be_written},
                                     no_of_partition=4)
Step 3:

Passing a rules_diagnosys_summery_file_path and config_df as an input and apply validations on various columns of respective table defined in the config_df.

Here is an sample of the config_df.

container_name source_type layer source_name filename rule_name column_to_be_checked value date_column_config date_format_dictionary ruletype active read_connector_method latest_file_path output_folder_structure failed_schema_source_list
{blob-storage-container-name} {source-type} processed {data-source} {data-filename} null_check {column-name} null null null Mandatory 1 blob path-to-file processed/
{blob-storage-container-name} {source-type} processed {data-source} {data-filename} range_min_check {column-name} 10 null null Not Mandatory 1 blob path-to-file processed/
{blob-storage-container-name} {source-type} processed {data-source} {data-filename} range_max_check {column-name} 1000 null null Not Mandatory 1 blob path-to-file processed/

Example:

rules_diagnosys_summery_folder_path = {folder-path-for-the-output-report}
 
config_df = spark.read.option("header",True).csv({path-to-the-config})

dq_ob.apply_validation(config_df, write_summary_on_database=True, failed_schema_source_list=[], output_summary_folder_path=rules_diagnosys_summery_folder_path)

2. Data Profiling

  • We can generate a detailed summary statistics such as mean, median, mode, list of uniques, missing count etc. of a dataset using the DataProfile() class.
  • This class can also be used to recommend some data quality rules based on the profiling report generated on the dataset.

How To Use

Step 1: Add a Datasource

data_write_ob = dq.DatabricksSqlDF()
data_write_ob = dq.DatabricksSqlDF()

Step 2: Add a DataContext

import pytz
time_zone = pytz.timezone('US/Central')
dq_ob = dq.DataProfile(tables_list=tables_list,
                       interaction_between_tables=[],
                       data_read_ob=data_read_ob,
                       data_write_ob=data_write_ob,
                       data_right_structure='table',
                       job_id={pipeline_run_id},
                       time_zone=time_zone,
                       no_of_partition=4,
                       output_db_name={database_name_where_report_has_to_be_written},
                       run_engine='polars')

Step 3: Pass config_df as an input and apply data profiling on various columns of respective table defined in the config_df.

# You can create a config_df in pyspark/polars run_engine directly also rather than reading as a csv.
config_df = spark.createDataFrame([{"container_name" : "{blob-storage-container-name}",
                                    "source_type" : "{source-type}",
                                    "layer" : "raw",
                                    "source_name" : "{data-source}",
                                    "filename" : "{filename}",
                                    "latest_file_path" : "{new-filename}",  
                                    "read_connector_method" : "databricks sql",
                                    "output_folder_structure" : "{directory-path-to-store-result}"}])
# Generating a data profiling report.
dq_ob.apply_data_profiling(source_config_df=config_df, write_consolidated_report=True)
# Generating a data profiling report as well as recommending the quality rules based on the profiling report.
rules_config = dq_ob.data_profiling_based_quality_rules(config_df, list_of_columns_to_be_ignored)

3. Consistency Check

You can check the consistency of common columns between two tables using the ConsistencyCheck() class.

How To Use

Step 1: Add a datasource.

data_read_ob = dq.DatabricksFileSystemDF()
data_write_ob = dq.AzureBlobDF(storage_name=blob_connector_config["storage_account_name"], sas_token=blob_connector_config["sas_token"])

Step 2: Add a DataContext

dq_ob = dq.ConsistencyCheck(tables_list={}, 
                            interaction_between_tables=[],
                            data_read_ob=data_read_ob,
                            data_write_ob=data_write_ob, 
                            data_right_structure='file',
                            job_id={pipeline_run_id},
                            time_zone=None,
                            no_of_partition=4,
                            output_db_name={database_name_where_report_has_to_be_written})

Step 3: Pass config_df and output_report_folder_path as an input and apply consistency check. Here is a sample consistency check config.

container_name base_table_source_type base_table_layer base_table_source_name base_table_filename base_table_col_name base_table_file_path mapped_table_source_type mapped_table_layer mapped_table_source_name mapped_table_filename mapped_table_col_name mapped_table_file_path read_connector_method output_folder_structure
{blob-storage-container-name} {source-type} processed {source} {tablename} {column-name} {absolute-filepath} {source-type} processed {source} {tablename} {column-name} {absolute-filepath} dbfs processed/data_consistenct_test/
{blob-storage-container-name} {source-type} processed {source} {tablename} {column-name} {absolute-filepath} {source-type} processed {source} {tablename} {column-name} {absolute-filepath} dbfs processed/data_consistenct_test/
config_df = spark.read.option("header",True).csv({path-to-the-consistency-check-config})

output_report_folder_path = {folder-path-for-the-output-report}

dq_ob.apply_consistency_check(config_df, output_report_folder_path)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dq_module-1.1.8.tar.gz (28.9 kB view details)

Uploaded Source

File details

Details for the file dq_module-1.1.8.tar.gz.

File metadata

  • Download URL: dq_module-1.1.8.tar.gz
  • Upload date:
  • Size: 28.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.14

File hashes

Hashes for dq_module-1.1.8.tar.gz
Algorithm Hash digest
SHA256 73cd31803effcc23583323ddfdcdd6ebccf1919348c5232d66fbf66fe386350e
MD5 def360844952f90ca79d4a922ee8aaf6
BLAKE2b-256 80f5643be09fd5063b5a29c7a05e339fabdc82fbe9e27e9d739e61a9adb49880

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page