Data Quality framework for Pyspark jobs
Project description
Owl Data Sanitizer: A light Spark data validation framework
This is a small framework for data quality validation. This first version works reading spark dataframes from local datasources like local system, s3 or hive and delivers hive tables with quality reports.
Let's follow this example:
Input data from a hive table:
+----------+--------------+--------+---------+------------------+---------+
|GENERAL_ID| NAME| CODE|ADDR_DESC|ULTIMATE_PARENT_ID|PARENT_ID|
+----------+--------------+--------+---------+------------------+---------+
| 1|Dummy 1 Entity|12000123| null| null| null|
| 2| null| null| null| 2| 2|
| 3| null|12000123| null| 3| 3|
| 4| 1| 1| null| 4| 4|
| 5| 1|12000123| null| 5| 5|
| 6| null| 3| null| 6| 6|
| null| null|12000123| null| 11| 7|
| 7| 2| null| null| 8| 8|
+----------+--------------+--------+---------+------------------+---------+
following this validation config with 4 sections:
source_table
including the table metadata.correctness_validations
including correctness validations per column. the rule must be a valid spark SQL expression.parent_children_constraints
including children parent constrains. This means that any parent id should be valid id.compare_related_tables_list
including comparison with other tables or the same table in other environments.
{
"source_table": {
"name": "test.data_test",
"id_column": "GENERAL_ID",
"unique_per_cols": [GENERAL_ID, ULTIMATE_PARENT_ID],
"fuzzy_desuplication_distance": 1,
"output_correctness_table": "test.data_test_correctness",
"output_completeness_table": "test.data_test_completeness",
"output_comparison_table": "test.data_test_comparison"
},
"correctness_validations": [
{
"column": "CODE",
"rule": "CODE is not null and CODE != '' and CODE != 'null'"
},
{
"column": "NAME",
"rule": "NAME is not null and NAME != '' and NAME != 'null'"
},
{
"column": "GENERAL_ID",
"rule": "GENERAL_ID is not null and GENERAL_ID != '' and GENERAL_ID != 'null' and CHAR_LENGTH(GENERAL_ID) < 4"
}
],
"completeness_validations": [
{
"column": "OVER_ALL_COUNT",
"rule": "OVER_ALL_COUNT <= 7"
}
],
"parent_children_constraints": [
{
"column": "GENERAL_ID",
"parent": "ULTIMATE_PARENT_ID"
},
{
"column": "GENERAL_ID",
"parent": "PARENT_ID"
}
],
"compare_related_tables_list": ["test.diff_df", "test.diff_df_2"]
}
Therefore, these results are delivered in two output hive tables:
a). Correctness Report.
- You will see and output col per validation col showing either 1 when there is error or 0 when is clean.
- Sum of error per columns.
+----------+-------------+-------------+-------------------+--------------------------------------+-----------------------------+-------------+--------------------------+-----------------+-----------------+-----------------------+------------------------------------------+---------------------------------+-----------------+
|GENERAL_ID|IS_ERROR_CODE|IS_ERROR_NAME|IS_ERROR_GENERAL_ID|IS_ERROR_GENERAL_ID_ULTIMATE_PARENT_ID|IS_ERROR_GENERAL_ID_PARENT_ID|IS_ERROR__ROW|dt |IS_ERROR_CODE_SUM|IS_ERROR_NAME_SUM|IS_ERROR_GENERAL_ID_SUM|IS_ERROR_GENERAL_ID_ULTIMATE_PARENT_ID_SUM|IS_ERROR_GENERAL_ID_PARENT_ID_SUM|IS_ERROR__ROW_SUM|
+----------+-------------+-------------+-------------------+--------------------------------------+-----------------------------+-------------+--------------------------+-----------------+-----------------+-----------------------+------------------------------------------+---------------------------------+-----------------+
|null |0 |1 |1 |1 |0 |1 |2020-04-17 09:39:04.783505|2 |4 |1 |2 |1 |5 |
|3 |0 |1 |0 |0 |0 |1 |2020-04-17 09:39:04.783505|2 |4 |1 |2 |1 |5 |
|7 |1 |0 |0 |1 |1 |1 |2020-04-17 09:39:04.783505|2 |4 |1 |2 |1 |5 |
|5 |0 |0 |0 |0 |0 |0 |2020-04-17 09:39:04.783505|2 |4 |1 |2 |1 |5 |
|6 |0 |1 |0 |0 |0 |1 |2020-04-17 09:39:04.783505|2 |4 |1 |2 |1 |5 |
|4 |0 |0 |0 |0 |0 |0 |2020-04-17 09:39:04.783505|2 |4 |1 |2 |1 |5 |
|2 |1 |1 |0 |0 |0 |1 |2020-04-17 09:39:04.783505|2 |4 |1 |2 |1 |5 |
|1 |0 |0 |0 |0 |0 |0 |2020-04-17 09:39:04.783505|2 |4 |1 |2 |1 |5 |
+----------+-------------+-------------+-------------------+--------------------------------------+-----------------------------+-------------+--------------------------+-----------------+-----------------+-----------------------+------------------------------------------+---------------------------------+-----------------+
b) Completeness Report.
- The overall count of the dataframe.
- Column checking if the overall count is complete, example:
IS_ERROR_OVER_ALL_COUNT
.
+--------------+-----------------------+--------------------------+
|OVER_ALL_COUNT|IS_ERROR_OVER_ALL_COUNT|dt |
+--------------+-----------------------+--------------------------+
|8 |1 |2020-04-17 09:39:04.783505|
+--------------+-----------------------+--------------------------+
c). Comparison of schema and values with related dataframes.
NOTE: the result includes for now only the ids that are different and a further join with the source data to see differences is needed.
+--------------+----------------------------------+-----------------+------------------+-----------------+--------------------------+
|df |missing_cols_right |missing_cols_left|missing_vals_right|missing_vals_left|dt |
+--------------+----------------------------------+-----------------+------------------+-----------------+--------------------------+
|test.diff_df_2|GENERAL_ID:string,ADDR_DESC:string|GENERAL_ID:int | | |2020-04-17 09:39:07.572483|
|test.diff_df | | |6,7 | |2020-04-17 09:39:07.572483|
+--------------+----------------------------------+-----------------+------------------+-----------------+--------------------------+
Installation
Install owl sanitizer from PyPI:
pip install owl-sanitizer-data-quality
Then you can call the library.
from spark_validation.dataframe_validation.dataframe_validator import CreateHiveValidationDF
from spark_validation.common.config import Config
spark_session = SparkSession.builder.enableHiveSupport().getOrCreate()
with open(PATH_TO_CONFIG_FILE) as f:
config = Config.parse(f)
CreateHiveValidationDF.validate(spark_session, config)
To use in your spark submit command or airflow dag.
- Add
py_files
:[https://pypi.org/project/owl-sanitizer-data-quality/latest/]
. application
:owl-sanitizer-data-quality/latest/src/spark_validation/dataframe_validation/hive_validator.py
application_package
:https://pypi.org/project/owl-sanitizer-data-quality/latest/owl-sanitizer-data-quality-latest.tar.gz
application_params
:URL_TO_YOUR_REMOTE_CONFIG_FILE
Contact
Please ask questions about technical issues here on GitHub.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file owl-sanitizer-data-quality-0.4.tar.gz
.
File metadata
- Download URL: owl-sanitizer-data-quality-0.4.tar.gz
- Upload date:
- Size: 8.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.4.0 requests-toolbelt/0.9.1 tqdm/4.36.1 CPython/3.7.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | e0b04e83887c57ee83973d58f4ffc7bc531480870df28e05414bbdf7ee86fcc3 |
|
MD5 | 8b212a1f66f60329e1159948e8b2c15c |
|
BLAKE2b-256 | dd2c6aa5dfaed04de289de8e89ed8cd800bfe1849152a073822214d2eebfbf08 |
File details
Details for the file owl_sanitizer_data_quality-0.4-py3-none-any.whl
.
File metadata
- Download URL: owl_sanitizer_data_quality-0.4-py3-none-any.whl
- Upload date:
- Size: 12.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.4.0 requests-toolbelt/0.9.1 tqdm/4.36.1 CPython/3.7.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 13da1345f5da0ed3b93fbc701e75686f7189a26dbcf537f1d167b56ed7e66659 |
|
MD5 | 5e6d6c328fedd92f57ca0bce179afb32 |
|
BLAKE2b-256 | 80b3198dc146315a157ce490e846074a29cb6a26115291ced2594d2b1fc4a963 |