Skip to main content

Spark data quality check tool

Project description

Data-Quality-Check

CI Build

DQC

Requirements

  • Python 3.7+
  • Java 8+
  • Apache Spark 3.0+

Usage

Installation

pip install --upgrade data-quality-check

# Install Spark if needed
pip install pyspark

Quick Start

from data_quality_check.config import Config
from data_quality_check.profiler.combined_profiler import CombinedProfiler
from data_quality_check.report.renders.html.render import render_all

config_dict = {
    'dataset': {'name': 'mydb.my_table'},
    'profiling': {
        'general': {'columns': ['*']}
    }
}
config = Config().parse_obj(config_dict)
profiler = CombinedProfiler(spark, config=config)
result = profiler.run()
html = render_all(all_pr=result)

# Present in Jupyter notebooks
from IPython.core.display import display, HTML
display(HTML(html))

# Present in Databricks notebooks
displayHTML(html)

# Save to a html file
f = open("report.html", "w")
f.write(html)
f.close()

If you do not have a ready-to-use spark session, use the codes below to create one:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MyApp").enableHiveSupport().getOrCreate()

Development

Dependencies

Filename Requirements
requirements.txt Package requirements
requirements-dev.txt Requirements for development

Test

PYTHONPATH=./src pytest tests/*

Build

python setup.py sdist bdist_wheel && twine check dist/*

Publish

twine upload --repository-url https://test.pypi.org/legacy/ dist/*
twine upload dist/*

Manual

Profiling Check

There are 2 types of useful profilers : GeneralProfiler and CustomizedProfiler. If you would like to run both profilers on your dataset. You can use CombinedProfiler which will run both profilers.

Combined Profiler

The easiest way to run a combined profiler(mix of general and customized profiler) on you dataset:

Example of running combined profiling

from data_quality_check.config import Config
from data_quality_check.profiler.combined_profiler import CombinedProfiler
from data_quality_check.report.renders.html.render import render_all

config_dict = {
    'dataset': {'name': 'my_table'},
    'profiling': {
        'general': {'columns': ['*']}
    },
    'customized': {
            'code_check': [
                {'column': 'my_code_col', 'codes': ['A', 'B', 'C', 'D']}
            ]
    }
}
config = Config().parse_obj(config_dict)
profiler = CombinedProfiler(spark, config=config)
result = profiler.run()
html = render_all(all_pr=result)

displayHTML(html)

General Profiler

from pyspark.sql import SparkSession
from data_quality_check.config import ConfigDataset
from data_quality_check.profiler.general_profiler import GeneralProfiler

spark = SparkSession.builder.appName("SparkProfilingApp").enableHiveSupport().getOrCreate()
data = [{'name': 'Alice', 'age': 1, 'gender': 'female', 'is_new': True},
        {'name': 'Tom', 'age': 10, 'gender': 'male', 'is_new': False}]

# Run general check on spark df
df = spark.createDataFrame(data)
result_df = GeneralProfiler(spark, df=df).run(return_type='dataframe')
result_df.show()

# Run general check on spark/hive table
df.createOrReplaceTempView('my_table')
result_df = GeneralProfiler(spark, dataset_config=ConfigDataset(name='my_table')).run(return_type='dataframe')
result_df.show()

Customized Profiler

import json

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType
from data_quality_check.config import Config, ConfigDataset, ConfigProfilingCustomized
from data_quality_check.profiler.customized_profiler import CustomizedProfiler

# Initialize spark
spark = SparkSession.builder.appName("SparkProfilingApp").enableHiveSupport().getOrCreate()
dept = [("Finance", 1),
        ("Marketing", 2),
        ("Sales", 3),
        ("IT", 4)]
deptSchema = StructType([StructField('dept_name', StringType(), True),
                         StructField('dept_id', LongType(), True)])
spark.createDataFrame(data=dept, schema=deptSchema).createOrReplaceTempView('dept')
print('dept table:')
spark.table('dept').show(truncate=False)

employee = [(1, "Amy", 1, 'male', 1000, 'amy@example.com'),
            (2, "Caro", 2, 'male', 1000, 'caro@example.com'),
            (3, "Mark", 3, 'Error', 2000, 'unknown'),
            (4, "Timi", 4, 'female', 2000, None),
            (5, "Tata", 5, 'unknown', 3000, 'bad email address'),
            (6, "Zolo", None, None, 3000, 'my-C0omplicated_EMAIL@A.ddress.xyz')]
employeeSchema = StructType([StructField('uid', LongType(), True),
                             StructField('name', StringType(), True),
                             StructField('dept_id', LongType(), True),
                             StructField('gender', StringType(), True),
                             StructField('income', LongType(), True),
                             StructField('email', StringType(), True)])
spark.createDataFrame(data=employee, schema=employeeSchema).createOrReplaceTempView('employee')
print('employee table:')
spark.table('employee').show(truncate=False)

# Specify the configuration of customized profiler
customized_config_dict = {
    'code_check': [
        {'column': 'gender', 'codes': ['male', 'female', 'unknown']}
    ],
    'key_mapping_check': [
        {'column': 'dept_id', 'target_table': 'dept', 'target_column': 'dept_id'}
    ]
}

customized_config = ConfigProfilingCustomized.parse_obj(customized_config_dict)
dataset_config = ConfigDataset.parse_obj({'name': 'employee'})

# Initialize CustomizedProfiler with configuration
customized_profiler = CustomizedProfiler(spark,
                                         dataset_config=dataset_config,
                                         customized_profiling_config=customized_config)

result = customized_profiler.run(return_type='dict')
print(json.dumps(result, indent=' ', ensure_ascii=False, allow_nan=True))

Expectation Verification

To be done.

Supported Checks and Expectation

Profiler Type Check Type Render result as HTML? Support Expectation? Description
General Distinct Values Count YES Will DO Number of unique values in a given column. This equals to Unique Row Count
General Null Row Count YES Will DO Null row count in a given column
General Empty Row Count YES Will DO Empty/Blank text row count in a given column
General Zero Row Count YES Will DO 0-valued row count in a given column
General Valued Row Count YES Will DO Number of rows which are not null in a given column
General Total Row Count YES Will DO Number of total rows
General Unique Row Count YES Will DO Number of rows that have unique value
General Duplicated Valued Row Count YES Will DO Number of rows that have duplicated values
General Minimum Value YES Will DO Minimum value
General Maximum Value YES Will DO Maximum value
General Mean Value YES Will DO Mean/average value
General Standard Deviation Value YES Will DO Standard deviation value of a column
General Values Count YES Will DO Number of values in a given column
--- --- --- --- ---
Customized Code Check YES Will DO Check if values from the columns are in the given(expected) codes list
Customized Key Mapping Check Will DO Will DO Find the values from this table column that do not exist in the target(another) table's column. Hint: target table usually is dim table

Will DO = Is scheduled to be developed, but not implemented yet.

Expectation Type Scope Description
ExpectColumnToExist --- ---
... ... ...

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

data-quality-check-0.0.20.tar.gz (26.3 kB view hashes)

Uploaded Source

Built Distribution

data_quality_check-0.0.20-py3-none-any.whl (31.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page