Skip to main content

Spark data quality check tool

Project description

Data-Quality-Check

CI Build

DQC

Requirements

  • Python 3.7+
  • Java 8+
  • Apache Spark 3.0+

Usage

Installation

pip install --upgrade data-quality-check

# Install Spark if needed
pip install pyspark

Quick Start

from data_quality_check.config import Config
from data_quality_check.profiler.combined_profiler import CombinedProfiler
from data_quality_check.report.renders.html.render import render_all

config_dict = {
    'dataset': {'name': 'mydb.my_table'},
    'profiling': {
        'general': {'columns': ['*']}
    }
}
config = Config().parse_obj(config_dict)
profiler = CombinedProfiler(spark, config=config)
result = profiler.run()
html = render_all(all_pr=result)

# Present in Jupyter notebooks
from IPython.core.display import display, HTML
display(HTML(html))

# Present in Databricks notebooks
displayHTML(html)

# Save to a html file
f = open("report.html", "w")
f.write(html)
f.close()

If you do not have a ready-to-use spark session, use the codes below to create one:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MyApp").enableHiveSupport().getOrCreate()

Development

Dependencies

Filename Requirements
requirements.txt Package requirements
requirements-dev.txt Requirements for development

Test

PYTHONPATH=./src pytest tests/*

Build

python setup.py sdist bdist_wheel && twine check dist/*

Publish

twine upload --repository-url https://test.pypi.org/legacy/ dist/*
twine upload dist/*

Manual

Profiling Check

There are 2 types of useful profilers : GeneralProfiler and CustomizedProfiler. If you would like to run both profilers on your dataset. You can use CombinedProfiler which will run both profilers.

Combined Profiler

The easiest way to run a combined profiler(mix of general and customized profiler) on you dataset:

Example of running combined profiling

from data_quality_check.config import Config
from data_quality_check.profiler.combined_profiler import CombinedProfiler
from data_quality_check.report.renders.html.render import render_all

config_dict = {
    'dataset': {'name': 'my_table'},
    'profiling': {
        'general': {'columns': ['*']}
    },
    'customized': {
            'code_check': [
                {'column': 'my_code_col', 'codes': ['A', 'B', 'C', 'D']}
            ]
    }
}
config = Config().parse_obj(config_dict)
profiler = CombinedProfiler(spark, config=config)
result = profiler.run()
html = render_all(all_pr=result)

displayHTML(html)

General Profiler

from pyspark.sql import SparkSession
from data_quality_check.config import ConfigDataset
from data_quality_check.profiler.general_profiler import GeneralProfiler

spark = SparkSession.builder.appName("SparkProfilingApp").enableHiveSupport().getOrCreate()
data = [{'name': 'Alice', 'age': 1, 'gender': 'female', 'is_new': True},
        {'name': 'Tom', 'age': 10, 'gender': 'male', 'is_new': False}]

# Run general check on spark df
df = spark.createDataFrame(data)
result_df = GeneralProfiler(spark, df=df).run(return_type='dataframe')
result_df.show()

# Run general check on spark/hive table
df.createOrReplaceTempView('my_table')
result_df = GeneralProfiler(spark, dataset_config=ConfigDataset(name='my_table')).run(return_type='dataframe')
result_df.show()

Customized Profiler

import json

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType
from data_quality_check.config import Config, ConfigDataset, ConfigProfilingCustomized
from data_quality_check.profiler.customized_profiler import CustomizedProfiler

# Initialize spark
spark = SparkSession.builder.appName("SparkProfilingApp").enableHiveSupport().getOrCreate()
dept = [("Finance", 1),
        ("Marketing", 2),
        ("Sales", 3),
        ("IT", 4)]
deptSchema = StructType([StructField('dept_name', StringType(), True),
                         StructField('dept_id', LongType(), True)])
spark.createDataFrame(data=dept, schema=deptSchema).createOrReplaceTempView('dept')
print('dept table:')
spark.table('dept').show(truncate=False)

employee = [(1, "Amy", 1, 'male', 1000, 'amy@example.com'),
            (2, "Caro", 2, 'male', 1000, 'caro@example.com'),
            (3, "Mark", 3, 'Error', 2000, 'unknown'),
            (4, "Timi", 4, 'female', 2000, None),
            (5, "Tata", 5, 'unknown', 3000, 'bad email address'),
            (6, "Zolo", None, None, 3000, 'my-C0omplicated_EMAIL@A.ddress.xyz')]
employeeSchema = StructType([StructField('uid', LongType(), True),
                             StructField('name', StringType(), True),
                             StructField('dept_id', LongType(), True),
                             StructField('gender', StringType(), True),
                             StructField('income', LongType(), True),
                             StructField('email', StringType(), True)])
spark.createDataFrame(data=employee, schema=employeeSchema).createOrReplaceTempView('employee')
print('employee table:')
spark.table('employee').show(truncate=False)

# Specify the configuration of customized profiler
customized_config_dict = {
    'code_check': [
        {'column': 'gender', 'codes': ['male', 'female', 'unknown']}
    ],
    'key_mapping_check': [
        {'column': 'dept_id', 'target_table': 'dept', 'target_column': 'dept_id'}
    ]
}

customized_config = ConfigProfilingCustomized.parse_obj(customized_config_dict)
dataset_config = ConfigDataset.parse_obj({'name': 'employee'})

# Initialize CustomizedProfiler with configuration
customized_profiler = CustomizedProfiler(spark,
                                         dataset_config=dataset_config,
                                         customized_profiling_config=customized_config)

result = customized_profiler.run(return_type='dict')
print(json.dumps(result, indent=' ', ensure_ascii=False, allow_nan=True))

Expectation Verification

TBD

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

data-quality-check-0.0.19.tar.gz (25.2 kB view hashes)

Uploaded Source

Built Distribution

data_quality_check-0.0.19-py3-none-any.whl (31.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page