Skip to main content

Data Quality eXcellence

Project description

DQX - Data Quality X

Data quality as code. Works with your warehouse, scales with your needs.

Tests codecov Documentation Status CodeRabbit Pull Request Reviews Python License

Why DQX?

  • Write validation logic as testable Python functions - No more complex SQL scripts scattered across your codebase
  • Execute efficiently on any SQL backend - DuckDB, BigQuery, Snowflake, or your existing data warehouse
  • No clusters or complex infrastructure needed - Runs wherever your data lives
  • Integrates seamlessly with existing workflows - Drop it into your current pipeline

Quick Start

pip install dqlib

Define your data quality checks as Python functions:

import pyarrow as pa
from dqx.api import check, VerificationSuite, MetricProvider, Context
from dqx.common import ResultKey
from dqx.datasource import DuckRelationDataSource
from dqx.orm.repositories import InMemoryMetricDB


# Define your validation rules
@check(name="Revenue integrity")
def validate_revenue(mp: MetricProvider, ctx: Context) -> None:
    # Verify reported revenue is positive
    reported = mp.sum("revenue")
    ctx.assert_that(reported).config(
        name="Revenue is positive", severity="P0"
    ).is_positive()

    # Check average transaction size is reasonable
    avg_revenue = mp.average("revenue")
    ctx.assert_that(avg_revenue).config(
        name="Average transaction size", severity="P1"
    ).is_between(10, 100)


# Your own metric store
db = InMemoryMetricDB()
suite = VerificationSuite([validate_revenue], db, "Daily validation")

# Data comes from your warehouse
data = pa.Table.from_pydict(
    {"price": [10.5, 20.0, 15.5], "quantity": [2, 1, 3], "revenue": [21.0, 20.0, 46.5]}
)
datasource = DuckRelationDataSource.from_arrow(data)

# Validate your data
suite.run([datasource], ResultKey())
# ✓ Revenue integrity: OK

Real-World Examples

1. Data Completeness

Monitor critical fields aren't missing

@check(name="Customer data quality")
def check_completeness(mp: MetricProvider, ctx: Context) -> None:
    # Flag if more than 5% of emails are missing
    null_rate = mp.null_count("email") / mp.num_rows()
    ctx.assert_that(null_rate).config(name="Email completeness", severity="P0").is_lt(
        0.05
    )

    # Ensure all orders have customer IDs
    ctx.assert_that(mp.null_count("customer_id")).config(
        name="Customer ID required", severity="P0"
    ).is_eq(0)

2. Revenue Integrity

Catch calculation errors in financial data

@check(name="Financial accuracy")
def validate_financials(mp: MetricProvider, ctx: Context) -> None:
    # Verify totals match across systems
    total_revenue = mp.sum("revenue")
    total_collected = mp.sum("payments")

    ctx.assert_that(total_collected / total_revenue).config(
        name="Payment collection rate", severity="P1"
    ).is_between(
        0.95, 1.05
    )  # 5% tolerance

    # Check for negative prices
    ctx.assert_that(mp.minimum("price")).config(
        name="No negative prices", severity="P0"
    ).is_geq(0)

3. Trend Monitoring

Alert on unexpected metric changes

@check(name="Business metrics stability")
def monitor_trends(mp: MetricProvider, ctx: Context) -> None:
    # Alert on significant daily changes
    daily_change = mp.sum("revenue") / mp.sum("revenue", lag=1)
    ctx.assert_that(daily_change).config(
        name="Daily revenue stability", severity="P0"
    ).is_between(
        0.8, 1.2
    )  # ±20% change

    # Track week-over-week growth
    wow_change = mp.sum("revenue") / mp.sum("revenue", lag=7)
    ctx.assert_that(wow_change).config(
        name="Weekly revenue trend", severity="P1"
    ).is_geq(
        0.95
    )  # Allow 5% decline

4. Cross-Dataset Validation

Ensure consistency across environments

@check(name="Production vs Staging", datasets=["production", "staging"])
def validate_environments(mp: MetricProvider, ctx: Context) -> None:
    # Compare row counts
    prod_count = mp.num_rows(dataset="production")
    staging_count = mp.num_rows(dataset="staging")

    ctx.assert_that(prod_count).config(
        name="Row count match", severity="P1"
    ).is_between(
        staging_count - 100, staging_count + 100
    )  # Allow 100 row difference

    # Verify key metrics align
    prod_revenue = mp.sum("revenue", dataset="production")
    staging_revenue = mp.sum("revenue", dataset="staging")

    ctx.assert_that((prod_revenue - staging_revenue) / prod_revenue).config(
        name="Revenue consistency", severity="P0"
    ).is_lt(
        0.01
    )  # Less than 1% difference

5. Data Quality SLAs

Track quality metrics with severity levels

@check(name="Data quality SLAs")
def enforce_slas(mp: MetricProvider, ctx: Context) -> None:
    # P0: Critical - No duplicate transactions
    ctx.assert_that(mp.duplicate_count(["transaction_id"])).config(
        name="Transaction uniqueness", severity="P0"
    ).is_eq(0)

    # P1: High - Recent activity
    recent_count = mp.count_values("status", "active")
    total_count = mp.num_rows()
    active_rate = recent_count / total_count

    ctx.assert_that(active_rate).config(
        name="Active record percentage", severity="P1"
    ).is_gt(
        0.5
    )  # At least 50% active

    # P2: Medium - Cardinality checks
    unique_users = mp.unique_count("user_id")
    ctx.assert_that(unique_users).config(
        name="Active user threshold", severity="P2"
    ).is_gt(1000)

Profiles

Adjust validation behavior during specific periods:

from dqx.profiles import SeasonalProfile, tag, check

christmas = SeasonalProfile(
    name="Christmas 2024",
    start_date=date(2024, 12, 20),
    end_date=date(2025, 1, 5),
    rules=[
        tag("xmas").set(metric_multiplier=2.0),  # Scale metrics
        tag("non-critical").set(severity="P3"),  # Downgrade severity
        check("Volume Check").disable(),  # Skip checks
    ],
)

suite = VerificationSuite(checks, db, "My Suite", profiles=[christmas])

Quick Reference

Available Metrics

Metric Description Example
num_rows() Total row count mp.num_rows()
sum(col) Sum of values mp.sum("revenue")
average(col) Mean value mp.average("price")
minimum(col) / maximum(col) Min/max values mp.minimum("age")
first(col) First value in column mp.first("timestamp")
variance(col) Statistical variance mp.variance("score")
null_count(col) Count of null values mp.null_count("email")
duplicate_count([cols]) Count of duplicate rows mp.duplicate_count(["id"])
count_values(col, val) Count specific values mp.count_values("status", "active")
unique_count(col) Distinct value count mp.unique_count("user_id")
custom_sql(sql) Execute custom SQL expression mp.custom_sql("SUM(CASE WHEN origin = 'NL' THEN 1 ELSE 0 END)")

Extended Metrics

Metric Description Example
ext.day_over_day(metric) Day-over-day change mp.ext.day_over_day(mp.sum("revenue"))
ext.week_over_week(metric) Week-over-week change mp.ext.week_over_week(mp.average("price"))
ext.stddev(metric, offset, n) Standard deviation over window mp.ext.stddev(mp.sum("sales"), offset=0, n=7)

Available Assertions

Assertion Description Example
is_eq(value, tol) Equals with tolerance .is_eq(100, tol=0.01)
is_neq(value, tol) Not equals with tolerance .is_neq(100, tol=0.01)
is_between(min, max) In range (inclusive) .is_between(0, 100)
is_positive() Greater than zero .is_positive()
is_zero() Equals zero .is_zero()
is_negative() Less than zero .is_negative()
is_gt(val) / is_geq(val) Greater than (or equal) .is_gt(0.95)
is_lt(val) / is_leq(val) Less than (or equal) .is_lt(0.05)
noop() No validation (collect only) .noop()

License

MIT License. See LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dqlib-0.5.21.tar.gz (120.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dqlib-0.5.21-py3-none-any.whl (134.4 kB view details)

Uploaded Python 3

File details

Details for the file dqlib-0.5.21.tar.gz.

File metadata

  • Download URL: dqlib-0.5.21.tar.gz
  • Upload date:
  • Size: 120.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dqlib-0.5.21.tar.gz
Algorithm Hash digest
SHA256 fbeaf074ccfb61f1c70c9a51741fffcade8e03eddf5ba6bb56aef5831bfab932
MD5 0f12f2a14031d3f25912f88737cc0324
BLAKE2b-256 d1f0e6c0a04413dc03cbfacb9e45ff3a9ab55167d0d7827e3360ed2d52fc89a6

See more details on using hashes here.

File details

Details for the file dqlib-0.5.21-py3-none-any.whl.

File metadata

  • Download URL: dqlib-0.5.21-py3-none-any.whl
  • Upload date:
  • Size: 134.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dqlib-0.5.21-py3-none-any.whl
Algorithm Hash digest
SHA256 19de06e02fcf09abab70dc2cbacafce2805bad56c5357cbd7f8d6592ca210a42
MD5 693477e00b2326e3ccbab1075f55d10e
BLAKE2b-256 e578b7c714083f4b88286a5e646597113c6fc6d030d12787e709bd90e69e7054

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page