Skip to main content

Data Quality for PySpark Pipelines

Project description

Data-Kalite

PyPI version

Kalite means Quality in Turkish.
This is a framework on top of open source Great Expectations PySpark Suite with additional features.

About Great Expectations:

  • It supports Spark 3.0
  • It has built in Metrics and Validations Suite that provide information
  • [Optional] Can be hooked up with the GE HTML static pages

On Top of GE:

  • Ability to return a standardized DataFrame for both Metrics and Validations that can be pushed to DWH for historical analysis
  • Runs all validations first before failing the pipeline
    • Returns a Validation generated standardized DataFrame
    • Failing the pipeline when one of the validation has failed
    • Logs validation through logger
  • Support to add metadata column for tracking purposes (dag_id etc)
  • Support to pass optional pipeline args (batch_date etc)
  • Deriving validations through configurations (YAML configs or Dict)
  • Easy to run default metrics
  • Ability to use lower level api for metrics and validations

Usage

Requires: 3.7.0 < Python < 3.11.0
Few ways:

  • Download from Pypi pip install data-kalite
  • Clone and integrate in your codebase
  • Clone and build your python package:
    • pip install . for local install
    • python setup.py bdist_wheel for distribution

Metrics

How to run metrics:
Read: Limitations
To run default metrics;

from kalite.application.default_metrics import DefaultMetrics

source_data = "<set_dataframe>"
metadata = {
    "dag_id": "<set>",
    "dag_task_id": "<set>",
    "dag_run_id": "<set>",
    "pipeline_args": {"<set>": "<set>"}, # OPTIONAL
}
# run() returns a DataFrame
# set unique_count_threshold depending on data size: Details [here](https://github.com/MJFND/data-kalite/blob/main/kalite/functions/metrics.py#L27-L29) 
metrics = DefaultMetrics(source_data=source_data, metadata=metadata, unique_count_threshold=5).run()
metrics.show(10,False)

Implementing Metrics:

from typing import List
from kalite.data_classes.metrics_data import MetricsData
from kalite.functions.metrics import GeMetrics, Metrics

class TempM(Metrics):
    def metrics(self) -> List[MetricsData]:
        return GeMetrics(self.source_data).\
            get_column_unique_count("column_A").\
            get_column_max("column_B").\
            result()
    
source_data = "<set_dataframe>"
metadata = {
    "dag_id": "<set>",
    "dag_task_id": "<set>",
    "dag_run_id": "<set>",
    "pipeline_args": {"<set>": "<set>"}, # OPTIONAL
}
# run() returns a DataFrame
# set unique_count_threshold depending on data size: Details [here](https://github.com/MJFND/data-kalite/blob/main/kalite/functions/metrics.py#L27-L29) 
metrics = TempM(source_data=source_data, metadata=metadata, unique_count_threshold=2).run()
metrics.show(10,False)

Refer to unit test cases on usage.

Metrics sample result:

id created_at pipeline_args dag_id dag_run_id dag_task_id metrics results column_name column_value
53d924311faca5490a94d6f2d51ae7d6 2023-05-29 23:12:35.351 {id -> 123, flag -> True} mydag 123456 mytask get_column_unique_count 0.0 event_date null
9c209f282a8a91f4a2c07b59406e68cf 2023-05-29 23:12:35.351 {id -> 123, flag -> True} mydag 123456 mytask get_column_value_count 10.0 source unknown
5f57c0104e1fab302b134bac84b7c2d2 2023-05-29 23:12:35.351 {id -> 123, flag -> True} mydag 123456 mytask get_column_median 10.0 device_os_version null
afc8be1f1a2fc296f2133ebc1cca45f7 2023-05-29 23:12:35.351 {id -> 123, flag -> True} mydag 123456 mytask get_rate 0.9 device_os_name Android
3d10cc1b98a617e54a8af6eed33677d4 2023-05-29 23:12:35.351 {id -> 123, flag -> True} mydag 123456 mytask get_column_unique_count 0.0 type null

Validations

How to run validations:
Read: Limitations
To run validations, validations function must be implemented.

from typing import List
from kalite.data_classes.validations_data import ValidationsData
from kalite.functions.validations import GeValidations, Validations

class TempV(Validations):
    def validations(self) -> List[ValidationsData]:
        return GeValidations(self.source_data).\
            expect_column_to_exist("column_A").\
            expect_column_values_to_be_of_type("column_A", "StringType").\
            expect_column_value_to_exist("column_A", "XYZ").\
            expect_column_values_to_not_have_one_unique_count("column_A").\
            result()

# source_data = "<set_dataframe>"
metadata = {
    "dag_id": "<set>",
    "dag_task_id": "<set>",
    "dag_run_id": "<set>",
    "pipeline_args": {"<set>": "<set>"}, # OPTIONAL
}
# run() returns a DataFrame
validations = TempV(source_data=source_data, metadata=metadata).run()
validations.show(10,False)

Refer to unit test cases on usage.

Config Driven Validations

A simple validator service on top of validations that allows to perform validations via a config dynamically. A config should be in a standard Dict format as shown underconfig_driven_validations.py.

Calling the function is easy, just pass config param as Dict, could be derived from a YAML or JSON. Benefits of decoupling from code is you can keep these validations in a centralized Data Contract repository.

YAML Config:

validations:
    <column_name>:
        - validation_1
        - validation_2
        .
        .
        - validation_n

Refer to unit test resource for full example: link

Function require config to be dictionary, it can be sourced from YAML or JSON as long as structure is followed.

from kalite.application.config_driven_validations import ConfigDrivenValidations

source_data = "<set_dataframe>"
metadata = {
    "dag_id": "<set>",
    "dag_task_id": "<set>",
    "dag_run_id": "<set>",
    "pipeline_args": {"<set>": "<set>"}, # OPTIONAL
}
config = "<set_dict>" # Details: https://github.com/MJFND/data-kalite/blob/main/kalite/application/config_driven_validations.py#L20-L28

# run() returns a DataFrame
validations = ConfigDrivenValidations(source_data=source_data, metadata=metadata, config=config).run()
validations.show(10,False)

Refer to unit test cases on usage.

Validation sample result:

id created_at pipeline_args dag_id dag_run_id dag_task_id validations results column_name ge_metadata
40a2037a2298c8a86aa9c5c92c3a9db9 2023-05-29 16:20:32.975 {id -> 123, flag -> True} mydag 123456 mytask expect_column_to_exist PASS decision {"meta": {}, "result": {}, "success": true, "expectation_config": {"meta": {}, "kwargs": {"column": "decision", "result_format": "BASIC"}, "expectation_type": "expect_column_to_exist"}, "exception_info": {"raised_exception": false, "exception_traceback": null, "exception_message": null}}
f5af2ed95a5ac827cf7d5020e44fee6e 2023-05-29 16:20:32.975 {id -> 123, flag -> True} mydag 123456 mytask expect_column_values_to_be_of_type PASS decision {"meta": {}, "result": {"observed_value": "StringType"}, "success": true, "expectation_config": {"meta": {}, "kwargs": {"column": "decision", "type_": "StringType", "result_format": "BASIC"}, "expectation_type": "expect_column_values_to_be_of_type"}, "exception_info": {"raised_exception": false, "exception_traceback": null, "exception_message": null}}

Failing the Task

After generating the DataFrame of validations, Validator can be used to throw exception to fail pipeline.

from kalite.functions.validator import Validator

validation_data = "<set_dataframe>" # dataframe that was generated through Validations
Validator.validate(validation_data)

Limitations

Data-Kalite does not support DATE or TIMESTAMP datatype input columns for certain metrics that requires column_value to be set.
E.g. Metric get_column_values_count, See metrics here

  • column_value is casted as STRING during DataFrame generation, casting can fail if input types are DATE or TIMESTAMP.

Since Spark column can be of one type, workaround is to explicitly convert these into STRING before running metrics or avoid running these checks for those types of columns.

Contributing

Setup Local Environment

Using makefile to keep things simple.

  • make install
    • test via:
      • make pre-commit
      • git commit
  • To learn more about pre-commit tools check the .pre-commit-config.yaml in root.

Metrics

How to add new metrics:

  • All metrics must start with get_
  • All metrics must be wrapped in a @Metrics.standardize function.
  • All metrics must return a MetricsSuiteResult
  • Most of the metrics are abstraction on top of GE as mentioned here.

Refer to existing function in metrics.py

Validations

How to add new validations:

  • All expectations must start with expect_
  • All expectations must be wrapped in a @Validations.standardize function.
  • All expectations must return a ExpectationSuiteValidationResult
  • Most of the validations are abstraction on top of GE as mentioned here.

Refer to existing function in validations.py

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

data_kalite-1.0.3-py3-none-any.whl (15.0 kB view details)

Uploaded Python 3

File details

Details for the file data_kalite-1.0.3-py3-none-any.whl.

File metadata

  • Download URL: data_kalite-1.0.3-py3-none-any.whl
  • Upload date:
  • Size: 15.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.16

File hashes

Hashes for data_kalite-1.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 0831e446bc86edf504d55540bdb032b9676f2070a46657b71e1a007d658bd950
MD5 893e292b72041d374de3f0d6254de05d
BLAKE2b-256 7a59c417f302a6c8b1f23060486cc8cf54444b0b848964dd658d8566d558108c

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page