Python library for data validation on DataFrame APIs including Snowflake/Snowpark, Apache/PySpark and Pandas/DataFrame.
Project description
cuallee
Meaning good in Aztec (Nahuatl), pronounced: QUAL-E
This library provides an intuitive API to describe data quality checks initially just for PySpark dataframes v3.3.0. And extended to pandas, snowpark, duckdb, daft and more.
It is a replacement written in pure python of the pydeequ framework.
I gave up in deequ as after extensive use, the API is not user-friendly, the Python Callback servers produce additional costs in our compute clusters, and the lack of support to the newest version of PySpark.
As result cuallee was born
This implementation goes in hand with the latest API from PySpark and uses the Observation API to collect metrics
at the lower cost of computation.
When benchmarking against pydeequ, cuallee uses circa <3k java classes underneath and remarkably less memory.
Support
cuallee is the data quality framework truly dataframe agnostic.
| Provider | API | Versions |
|---|---|---|
snowpark |
1.11.1, 1.4.0 |
|
pyspark & spark-connect |
4.0.1,3.5.x, 3.4.0, 3.3.x, 3.2.x |
|
bigquery |
3.4.1 |
|
pandas |
2.0.2, 1.5.x, 1.4.x |
|
duckdb |
1.4.0,1.0.00.10.20.9.20.8.0 |
|
polars |
1.34.0, 1.0.00.19.6 |
|
daft |
0.2.24, 0.2.19 |
Logos are trademarks of their own brands.
Install
pip install cuallee
Checks
The most common checks for data integrity validations are completeness and uniqueness an example of this dimensions shown below:
from cuallee import Check, CheckLevel # WARN:0, ERR: 1
# Nulls on column Id
check = Check(CheckLevel.WARNING, "Completeness")
(
check
.is_complete("id")
.is_unique("id")
.validate(df)
).show() # Returns a pyspark.sql.DataFrame
[!IMPORTANT] A new version of the
validateoutput is currently under construction.
Dates
Perhaps one of the most useful features of cuallee is its extensive number of checks for Date and Timestamp values. Including, validation of ranges, set operations like inclusion, or even a verification that confirms continuity on dates using the is_daily check function.
# Unique values on id
check = Check(CheckLevel.WARNING, "CheckIsBetweenDates")
df = spark.sql(
"""
SELECT
explode(
sequence(
to_date('2022-01-01'),
to_date('2022-01-10'),
interval 1 day)) as date
""")
assert (
check.is_between("date", ("2022-01-01", "2022-01-10"))
.validate(df)
.first()
.status == "PASS"
)
Membership
Other common test is the validation of list of values as part of the multiple integrity checks required for better quality data.
df = spark.createDataFrame([[1, 10], [2, 15], [3, 17]], ["ID", "value"])
check = Check(CheckLevel.WARNING, "is_contained_in_number_test")
check.is_contained_in("value", (10, 15, 20, 25)).validate(df)
Regular Expressions
When it comes to the flexibility of matching, regular expressions are always to the rescue. cuallee makes use of the regular expressions to validate that fields of type String conform to specific patterns.
df = spark.createDataFrame([[1, "is_blue"], [2, "has_hat"], [3, "is_smart"]], ["ID", "desc"])
check = Check(CheckLevel.WARNING, "has_pattern_test")
check.has_pattern("desc", r"^is.*t$") # only match is_smart 33% of rows.
check.validate(df).first().status == "FAIL"
Anomalies
Statistical tests are a great aid for verifying anomalies on data. Here an example that shows that will PASS only when 40% of data is inside the interquartile range
df = spark.range(10)
check = Check(CheckLevel.WARNING, "IQR_Test")
check.is_inside_interquartile_range("id", pct=0.4)
check.validate(df).first().status == "PASS"
+---+-------------------+-----+-------+------+-----------------------------+-----+----+----------+---------+--------------+------+
|id |timestamp |check|level |column|rule |value|rows|violations|pass_rate|pass_threshold|status|
+---+-------------------+-----+-------+------+-----------------------------+-----+----+----------+---------+--------------+------+
|1 |2022-10-19 00:09:39|IQR |WARNING|id |is_inside_interquartile_range|10000|10 |4 |0.6 |0.4 |PASS |
+---+-------------------+-----+-------+------+-----------------------------+-----+----+----------+---------+--------------+------+
Workflows (Process Mining)
Besides the common citizen-like checks, cuallee offers out-of-the-box real-life checks. For example, suppose that you are working SalesForce or SAP environment. Very likely your business processes will be driven by a lifecycle:
Order-To-CashRequest-To-PayInventory-Logistics-Delivery- Others.
In this scenario,
cualleeoffers the ability that the sequence of events registered over time, are according to a sequence of events, like the example below:
import pyspark.sql.functions as F
from cuallee import Check, CheckLevel
data = pd.DataFrame({
"name":["herminio", "herminio", "virginie", "virginie"],
"event":["new","active", "new", "active"],
"date": ["2022-01-01", "2022-01-02", "2022-01-03", "2022-02-04"]}
)
df = spark.createDataFrame(data).withColumn("date", F.to_date("date"))
# Cuallee Process Mining
# Testing that all edges on workflows
check = Check(CheckLevel.WARNING, "WorkflowViolations")
# Validate that 50% of data goes from new => active
check.has_workflow("name", "event", "date", [("new", "active")], pct=0.5)
check.validate(df).show(truncate=False)
+---+-------------------+------------------+-------+-------------------------+------------+--------------------+----+----------+---------+--------------+------+
|id |timestamp |check |level |column |rule |value |rows|violations|pass_rate|pass_threshold|status|
+---+-------------------+------------------+-------+-------------------------+------------+--------------------+----+----------+---------+--------------+------+
|1 |2022-11-07 23:08:50|WorkflowViolations|WARNING|('name', 'event', 'date')|has_workflow|(('new', 'active'),)|4 |2.0 |0.5 |0.5 |PASS |
+---+-------------------+------------------+-------+-------------------------+------------+--------------------+----+----------+---------+--------------+------+
Assertions
[2024-09-28] ✨ New feature! Return a simple true|false as a unified result for your check
import pandas as pd
from cuallee import Check
df = pd.DataFrame({"X":[1,2,3]})
# .ok(dataframe) method of a check will call validate and then verify that all rules are PASS
assert Check().is_complete("X").ok(df)
Controls
Simplify the entire validation of a dataframe in a particular dimension.
import pandas as pd
from cuallee import Control
df = pd.DataFrame({"X":[1,2,3], "Y": [10,20,30]})
# Checks all columns in dataframe for using is_complete check
Control.completeness(df)
cuallee VS pydeequ
In the test folder there are docker containers with the requirements to match the tests. Also a perftest.py available at the root folder for interests.
# 1000 rules / # of seconds
cuallee: ▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇ 162.00
pydeequ: ▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇ 322.00
Catalogue
| Check | Description | DataType |
|---|---|---|
is_complete |
Zero nulls |
agnostic |
is_unique |
Zero duplicates |
agnostic |
is_primary_key |
Zero duplicates |
agnostic |
are_complete |
Zero nulls on group of columns |
agnostic |
are_unique |
Composite primary key check | agnostic |
is_composite_key |
Zero duplicates on multiple columns | agnostic |
is_greater_than |
col > x |
numeric |
is_positive |
col > 0 |
numeric |
is_negative |
col < 0 |
numeric |
is_greater_or_equal_than |
col >= x |
numeric |
is_less_than |
col < x |
numeric |
is_less_or_equal_than |
col <= x |
numeric |
is_equal_than |
col == x |
numeric |
is_contained_in |
col in [a, b, c, ...] |
agnostic |
is_in |
Alias of is_contained_in |
agnostic |
not_contained_in |
col not in [a, b, c, ...] |
agnostic |
not_in |
Alias of not_contained_in |
agnostic |
is_between |
a <= col <= b |
numeric, date |
has_pattern |
Matching a pattern defined as a regex |
string |
is_legit |
String not null & not empty ^\S$ |
string |
has_min |
min(col) == x |
numeric |
has_max |
max(col) == x |
numeric |
has_std |
σ(col) == x |
numeric |
has_mean |
μ(col) == x |
numeric |
has_sum |
Σ(col) == x |
numeric |
has_percentile |
%(col) == x |
numeric |
has_cardinality |
count(distinct(col)) == x |
agnostic |
has_infogain |
count(distinct(col)) > 1 |
agnostic |
has_max_by |
A utilitary predicate for max(col_a) == x for max(col_b) |
agnostic |
has_min_by |
A utilitary predicate for min(col_a) == x for min(col_b) |
agnostic |
has_correlation |
Finds correlation between 0..1 on corr(col_a, col_b) |
numeric |
has_entropy |
Calculates the entropy of a column entropy(col) == x for classification problems |
numeric |
is_inside_interquartile_range |
Verifies column values reside inside limits of interquartile range Q1 <= col <= Q3 used on anomalies. |
numeric |
is_in_millions |
col >= 1e6 |
numeric |
is_in_billions |
col >= 1e9 |
numeric |
is_t_minus_1 |
For date fields confirms 1 day ago t-1 |
date |
is_t_minus_2 |
For date fields confirms 2 days ago t-2 |
date |
is_t_minus_3 |
For date fields confirms 3 days ago t-3 |
date |
is_t_minus_n |
For date fields confirms n days ago t-n |
date |
is_today |
For date fields confirms day is current date t-0 |
date |
is_yesterday |
For date fields confirms 1 day ago t-1 |
date |
is_on_weekday |
For date fields confirms day is between Mon-Fri |
date |
is_on_weekend |
For date fields confirms day is between Sat-Sun |
date |
is_on_monday |
For date fields confirms day is Mon |
date |
is_on_tuesday |
For date fields confirms day is Tue |
date |
is_on_wednesday |
For date fields confirms day is Wed |
date |
is_on_thursday |
For date fields confirms day is Thu |
date |
is_on_friday |
For date fields confirms day is Fri |
date |
is_on_saturday |
For date fields confirms day is Sat |
date |
is_on_sunday |
For date fields confirms day is Sun |
date |
is_on_schedule |
For date fields confirms time windows i.e. 9:00 - 17:00 |
timestamp |
is_daily |
Can verify daily continuity on date fields by default. [2,3,4,5,6] which represents Mon-Fri in PySpark. However new schedules can be used for custom date continuity |
date |
has_workflow |
Adjacency matrix validation on 3-column graph, based on group, event, order columns. |
agnostic |
is_custom |
User-defined custom function applied to dataframe for row-based validation. |
agnostic |
satisfies |
An open SQL expression builder to construct custom checks |
agnostic |
validate |
The ultimate transformation of a check with a dataframe input for validation |
agnostic |
Controls pyspark
| Check | Description | DataType |
|---|---|---|
completeness |
Zero nulls |
agnostic |
information |
Zero nulls and cardinality > 1 |
agnostic |
intelligence |
Zero nulls, zero empty strings and cardinality > 1 | agnostic |
percentage_fill |
% rows not empty |
agnostic |
percentage_empty |
% rows empty |
agnostic |
ISO Standard
A new module has been incorporated in cuallee==0.4.0 which allows the verification of International Standard Organization columns in data frames. Simply access the check.iso interface to add the set of checks as shown below.
| Check | Description | DataType |
|---|---|---|
iso_4217 |
currency compliant ccy |
string |
iso_3166 |
country compliant country |
string |
df = spark.createDataFrame([[1, "USD"], [2, "MXN"], [3, "CAD"], [4, "EUR"], [5, "CHF"]], ["id", "ccy"])
check = Check(CheckLevel.WARNING, "ISO Compliant")
check.iso.iso_4217("ccy")
check.validate(df).show()
+---+-------------------+-------------+-------+------+---------------+--------------------+----+----------+---------+--------------+------+
| id| timestamp| check| level|column| rule| value|rows|violations|pass_rate|pass_threshold|status|
+---+-------------------+-------------+-------+------+---------------+--------------------+----+----------+---------+--------------+------+
| 1|2023-05-14 18:28:02|ISO Compliant|WARNING| ccy|is_contained_in|{'BHD', 'CRC', 'M...| 5| 0.0| 1.0| 1.0| PASS|
+---+-------------------+-------------+-------+------+---------------+--------------------+----+----------+---------+--------------+------+
Snowflake Connection
In order to establish a connection to your SnowFlake account cuallee relies in the following environment variables to be avaialble in your environment:
SF_ACCOUNTSF_USERSF_PASSWORDSF_ROLESF_WAREHOUSESF_DATABASESF_SCHEMA
Spark Connect
Just add the environment variable SPARK_REMOTE to your remote session, then cuallee will connect using
spark_connect = SparkSession.builder.remote(os.getenv("SPARK_REMOTE")).getOrCreate()
and convert all checks to select as opposed to Observation API compute instructions.
Databricks Connection
By default cuallee will search for a SparkSession available in the globals so there is literally no need to . When working in a local environment it will automatically search for an available session, or start one.SparkSession.builder
DuckDB
For testing on duckdb simply pass your table name to your check et voilà
import duckdb
conn = duckdb.connect(":memory:")
check = Check(CheckLevel.WARNING, "DuckDB", table_name="temp/taxi/*.parquet")
check.is_complete("VendorID")
check.is_complete("tpep_pickup_datetime")
check.validate(conn)
id timestamp check level column rule value rows violations pass_rate pass_threshold status
0 1 2022-10-31 23:15:06 test WARNING VendorID is_complete N/A 19817583 0.0 1.0 1.0 PASS
1 2 2022-10-31 23:15:06 test WARNING tpep_pickup_datetime is_complete N/A 19817583 0.0 1.0 1.0 PASS
Roadmap
100% data frame agnostic implementation of data quality checks.
Define once, run everywhere
[x] PySpark 3.5.0[x] PySpark 3.4.0[x] PySpark 3.3.0[x] PySpark 3.2.x[x] Snowpark DataFrame[x] Pandas DataFrame[x] DuckDB Tables[x] BigQuery Client[x] Polars DataFrame[*] Dagster Integration[x] Spark Connect[x] Daft- [-] PDF Report
- Metadata check
- Help us in a discussion?
Whilst expanding the functionality feels a bit as an overkill because you most likely can connect spark via its drivers to whatever DBMS of your choice.
In the desire to make it even more user-friendly we are aiming to make cuallee portable to all the providers above.
Authors
- canimus / Herminio Vazquez / 🇲🇽
- vestalisvirginis / Virginie Grosboillot / 🇫🇷
Contributors
Guidelines
Documentation
Paper
cuallee has been published in the Journal of Open Source Software
Vazquez et al., (2024). cuallee: A Python package for data quality checks across multiple DataFrame APIs. Journal of Open Source Software, 9(98), 6684, https://doi.org/10.21105/joss.06684
If you use cuallee please consider citing this work. Citation
License
Apache License 2.0 Free for commercial use, modification, distribution, patent use, private use. Just preserve the copyright and license.
Made with ❤️ in Utrecht 🇳🇱
Maintained over ⌛ from Ljubljana 🇸🇮
Extended 🚀 by contributions all over the 🌎
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cuallee-0.15.4.tar.gz.
File metadata
- Download URL: cuallee-0.15.4.tar.gz
- Upload date:
- Size: 59.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c8c9040a8dd1d7a63385e31829fba3eab501bf0371e2c834edc06c717b084781
|
|
| MD5 |
6cd1097701b83a8fa3d31df7a6c74329
|
|
| BLAKE2b-256 |
cea0dff65fb90b43a2c0d10eded76e21b84425355cf59b5a916fa67006cdcc20
|
File details
Details for the file cuallee-0.15.4-py3-none-any.whl.
File metadata
- Download URL: cuallee-0.15.4-py3-none-any.whl
- Upload date:
- Size: 58.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
87e78547a1f8503f24f168f5164ea14f63d35739282ca6632ce690b0fae465e8
|
|
| MD5 |
339ddc2e84bdc33844637c4175513e11
|
|
| BLAKE2b-256 |
1ad06736dc164d52267b2439cab1db727ab9544d1cd763a07c25ed5f1ec9b106
|