Skip to main content

data profiling and basic data quality rules check

Project description

dq-module

dq-module is a tool which can be used to perform validations and profiling on the datasets.This tool is compatible with two run_engines pyspark and polars.

Features

1. Data Validation

This library contains a SingleDatasetQualityCheck() class which can used to validate the dataset against a defined set of rules. This class contains the following rules which can be used to validate the dataset:

  • null_check
  • schema_check
  • range_min_check
  • range_max_check
  • length_check
  • unique_records_check
  • unique_keys_check
  • allowed_values_check
  • min_length_check
  • column_count_check
  • not_allowed_values_check
  • date_range_check
  • regex_check
  • row_count_check

How To Use

import dataqualitycheck as dq
from datetime import date
import time
Step 1: Adding a datasource

We have 4 classes which can be used to connect to a datasource:

  1. AzureBlobDF() - This class can be used to interact with the datasources on azure blob storage .
  2. DatabricksFileSystemDF() - This class can be used to interect with the datasources on databricks filesystem.
  3. DatabricksSqlDF() - This class can be used to interact with the datasources on databricks databases.
  4. LocalFileSystemDF() - This class can be used to interact with the datasources on your local filesystem.

Each of the above class provides the functionalities to read and write from the respective datasources.

Example: Pass the configuration of the blob connector in blob_connector_configand add a datasource by defining a data_read_ob and data_write_ob.

blob_connector_config = {"storage_account_name" : "<storage-account-name>", "container_name" : "cooler-images", "sas_token" : <vaild-sas-token>}
data_read_ob = dq.AzureBlobDF(storage_name=blob_connector_config["storage_account_name"], sas_token=blob_connector_config["sas_token"])
data_write_ob = dq.DatabricksFileSystemDF()

tables_list is a dictionary that contains the list of sources along with the container_name , source_type , layer , source_name , filename , read_connector_method and latest_file_path for the tables on which the validations has to be applied .

#This is optional.It is required when you are calling individual rules.
tables_list = {}
Step 2: Add a DataContext

Instantiate a DataContext by passing tables_list,interaction_between_tables,data_read_ob,data_write_ob, data_right_structure,job_id,time_zone,no_of_partition and output_db_name . You can also pass the run_engine with which you want to apply the quality checks. By default, the run_engine is pyspark.

dq_ob = dq.SingleDatasetQualityCheck(tables_list={}, 
                                     interaction_between_tables=[],  
                                     data_read_ob=data_read_ob, 
                                     data_write_ob=data_write_ob, 
                                     data_right_structure='file',
                                     job_id=<pipeline_run_id>,
                                     time_zone=None,
                                     output_db_name=<database_name_where_report_has_to_be_written>,
                                     no_of_partition=4)
Step 3:

Passing a rules_diagnosys_summery_file_path and config_df as an input and apply validations on various columns of respective table defined in the config_df.

Here is an sample of the config_df.

container_name source_type layer source_name filename rule_name column_to_be_checked value date_column_config date_format_dictionary ruletype active read_connector_method latest_file_path output_folder_structure failed_schema_source_list
processed null_check null null null Mandatory 1 blob path-to-file processed/
processed range_min_check 10 null null Not Mandatory 1 blob path-to-file processed/
processed range_max_check 1000 null null Not Mandatory 1 blob path-to-file processed/

Example:

rules_diagnosys_summery_folder_path = <folder-path-for-the-output-report>
 
config_df = spark.read.option("header",True).csv(<path-to-the-config>)

dq_ob.apply_validation(config_df, write_summary_on_database=True, failed_schema_source_list=[], output_summary_folder_path=rules_diagnosys_summery_folder_path)

2. Data Profiling

  • We can generate a detailed summary statistics such as mean, median, mode, list of uniques, missing count etc. of a dataset using the DataProfile() class.
  • This class can also be used to recommend some data quality rules based on the profiling report generated on the dataset.

How To Use

Step 1: Add a Datasource

data_write_ob = dq.DatabricksSqlDF()
data_write_ob = dq.DatabricksSqlDF()

Step 2: Add a DataContext

import pytz
time_zone = pytz.timezone('US/Central')
dq_ob = dq.DataProfile(tables_list=tables_list,
                       interaction_between_tables=[],
                       data_read_ob=data_read_ob,
                       data_write_ob=data_write_ob,
                       data_right_structure='table',
                       job_id=<pipeline_run_id>,
                       time_zone=time_zone,
                       no_of_partition=4,
                       output_db_name=<database_name_where_report_has_to_be_written>,
                       run_engine='polars')

Step 3: Pass config_df as an input and apply data profiling on various columns of respective table defined in the config_df.

# You can create a config_df in pyspark/polars run_engine directly also rather than reading as a csv.
config_df = spark.createDataFrame([{"container_name" : "<blob-storage-container-name>",
                                    "source_type" : "<source-type>",
                                    "layer" : "raw",
                                    "source_name" : "<data-source>",
                                    "filename" : "<filename>",
                                    "latest_file_path" : "<new-filename>",  
                                    "read_connector_method" : "databricks sql",
                                    "output_folder_structure" : "<directory-path-to-store-result>"}])
# Generating a data profiling report.
dq_ob.apply_data_profiling(source_config_df=config_df, write_consolidated_report=True)
# Generating a data profiling report as well as recommending the quality rules based on the profiling report.
rules_config = dq_ob.data_profiling_based_quality_rules(config_df, list_of_columns_to_be_ignored)

3. Consistency Check

You can check the consistency of common columns between two tables using the ConsistencyCheck() class.

How To Use

Step 1: Add a datasource.

data_read_ob = dq.DatabricksFileSystemDF()
data_write_ob = dq.AzureBlobDF(storage_name=blob_connector_config["storage_account_name"], sas_token=blob_connector_config["sas_token"])

Step 2: Add a DataContext

dq_ob = dq.ConsistencyCheck(tables_list={}, 
                            interaction_between_tables=[],
                            data_read_ob=data_read_ob,
                            data_write_ob=data_write_ob, 
                            data_right_structure='file',
                            job_id=<pipeline_run_id>,
                            time_zone=None,
                            no_of_partition=4,
                            output_db_name=<database_name_where_report_has_to_be_written>)

Step 3: Pass config_df and output_report_folder_path as an input and apply consistency check. Here is a sample consistency check config.

container_name base_table_source_type base_table_layer base_table_source_name base_table_filename base_table_col_name base_table_file_path mapped_table_source_type mapped_table_layer mapped_table_source_name mapped_table_filename mapped_table_col_name mapped_table_file_path read_connector_method output_folder_structure
processed processed dbfs processed/data_consistenct_test/
processed processed dbfs processed/data_consistenct_test/
config_df = spark.read.option("header",True).csv(<path-to-the-consistency-check-config>)

output_report_folder_path = <folder-path-for-the-output-report>

dq_ob.apply_consistency_check(config_df, output_report_folder_path)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dq_module-1.1.7.tar.gz (28.9 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page