Skip to main content

Data Quality eXcellence

Project description

DQX - Data Quality Excellence

Data quality as code. Works with your warehouse, scales with your needs.

Tests codecov Documentation Status CodeRabbit Pull Request Reviews Python License

Why DQX?

  • Write validation logic as testable Python functions - No more complex SQL scripts scattered across your codebase
  • Execute efficiently on any SQL backend - DuckDB, BigQuery, Snowflake, or your existing data warehouse
  • No clusters or complex infrastructure needed - Runs wherever your data lives
  • Integrates seamlessly with existing workflows - Drop it into your current pipeline

Quick Start

pip install dqlib

Define your data quality checks as Python functions:

import pyarrow as pa
from dqx.api import check, VerificationSuite, MetricProvider, Context
from dqx.common import ResultKey
from dqx.datasource import DuckRelationDataSource
from dqx.orm.repositories import InMemoryMetricDB


# Define your validation rules
@check(name="Revenue integrity")
def validate_revenue(mp: MetricProvider, ctx: Context) -> None:
    # Verify reported revenue is positive
    reported = mp.sum("revenue")
    ctx.assert_that(reported).where(
        name="Revenue is positive", severity="P0"
    ).is_positive()

    # Check average transaction size is reasonable
    avg_revenue = mp.average("revenue")
    ctx.assert_that(avg_revenue).where(
        name="Average transaction size", severity="P1"
    ).is_between(10, 100)


# Your own metric store
db = InMemoryMetricDB()
suite = VerificationSuite([validate_revenue], db, "Daily validation")

# Data comes from your warehouse
data = pa.Table.from_pydict(
    {"price": [10.5, 20.0, 15.5], "quantity": [2, 1, 3], "revenue": [21.0, 20.0, 46.5]}
)
datasource = DuckRelationDataSource.from_arrow(data)

# Validate your data
suite.run([datasource], ResultKey())
# ✓ Revenue integrity: OK

Real-World Examples

1. Data Completeness

Monitor critical fields aren't missing

@check(name="Customer data quality")
def check_completeness(mp: MetricProvider, ctx: Context) -> None:
    # Flag if more than 5% of emails are missing
    null_rate = mp.null_count("email") / mp.num_rows()
    ctx.assert_that(null_rate).where(name="Email completeness", severity="P0").is_lt(
        0.05
    )

    # Ensure all orders have customer IDs
    ctx.assert_that(mp.null_count("customer_id")).where(
        name="Customer ID required", severity="P0"
    ).is_eq(0)

2. Revenue Integrity

Catch calculation errors in financial data

@check(name="Financial accuracy")
def validate_financials(mp: MetricProvider, ctx: Context) -> None:
    # Verify totals match across systems
    total_revenue = mp.sum("revenue")
    total_collected = mp.sum("payments")

    ctx.assert_that(total_collected / total_revenue).where(
        name="Payment collection rate", severity="P1"
    ).is_between(
        0.95, 1.05
    )  # 5% tolerance

    # Check for negative prices
    ctx.assert_that(mp.minimum("price")).where(
        name="No negative prices", severity="P0"
    ).is_geq(0)

3. Trend Monitoring

Alert on unexpected metric changes

@check(name="Business metrics stability")
def monitor_trends(mp: MetricProvider, ctx: Context) -> None:
    # Alert on significant daily changes
    daily_change = mp.sum("revenue") / mp.sum("revenue", lag=1)
    ctx.assert_that(daily_change).where(
        name="Daily revenue stability", severity="P0"
    ).is_between(
        0.8, 1.2
    )  # ±20% change

    # Track week-over-week growth
    wow_change = mp.sum("revenue") / mp.sum("revenue", lag=7)
    ctx.assert_that(wow_change).where(
        name="Weekly revenue trend", severity="P1"
    ).is_geq(
        0.95
    )  # Allow 5% decline

4. Cross-Dataset Validation

Ensure consistency across environments

@check(name="Production vs Staging", datasets=["production", "staging"])
def validate_environments(mp: MetricProvider, ctx: Context) -> None:
    # Compare row counts
    prod_count = mp.num_rows(dataset="production")
    staging_count = mp.num_rows(dataset="staging")

    ctx.assert_that(prod_count).where(name="Row count match", severity="P1").is_between(
        staging_count - 100, staging_count + 100
    )  # Allow 100 row difference

    # Verify key metrics align
    prod_revenue = mp.sum("revenue", dataset="production")
    staging_revenue = mp.sum("revenue", dataset="staging")

    ctx.assert_that((prod_revenue - staging_revenue) / prod_revenue).where(
        name="Revenue consistency", severity="P0"
    ).is_lt(
        0.01
    )  # Less than 1% difference

5. Data Quality SLAs

Track quality metrics with severity levels

@check(name="Data quality SLAs")
def enforce_slas(mp: MetricProvider, ctx: Context) -> None:
    # P0: Critical - No duplicate transactions
    ctx.assert_that(mp.duplicate_count(["transaction_id"])).where(
        name="Transaction uniqueness", severity="P0"
    ).is_eq(0)

    # P1: High - Recent activity
    recent_count = mp.count_values("status", "active")
    total_count = mp.num_rows()
    active_rate = recent_count / total_count

    ctx.assert_that(active_rate).where(
        name="Active record percentage", severity="P1"
    ).is_gt(
        0.5
    )  # At least 50% active

    # P2: Medium - Cardinality checks
    unique_users = mp.unique_count("user_id")
    ctx.assert_that(unique_users).where(
        name="Active user threshold", severity="P2"
    ).is_gt(1000)

Profiles

Adjust validation behavior during specific periods:

from dqx.profiles import HolidayProfile, tag, check

christmas = HolidayProfile(
    name="Christmas 2024",
    start_date=date(2024, 12, 20),
    end_date=date(2025, 1, 5),
    rules=[
        tag("xmas").set(metric_multiplier=2.0),  # Scale metrics
        tag("non-critical").set(severity="P3"),  # Downgrade severity
        check("Volume Check").disable(),  # Skip checks
    ],
)

suite = VerificationSuite(checks, db, "My Suite", profiles=[christmas])

Quick Reference

Available Metrics

Metric Description Example
num_rows() Total row count mp.num_rows()
sum(col) Sum of values mp.sum("revenue")
average(col) Mean value mp.average("price")
minimum(col) / maximum(col) Min/max values mp.minimum("age")
first(col) First value in column mp.first("timestamp")
variance(col) Statistical variance mp.variance("score")
null_count(col) Count of null values mp.null_count("email")
duplicate_count([cols]) Count of duplicate rows mp.duplicate_count(["id"])
count_values(col, val) Count specific values mp.count_values("status", "active")
unique_count(col) Distinct value count mp.unique_count("user_id")
custom_sql(sql) Execute custom SQL expression mp.custom_sql("SUM(CASE WHEN origin = 'NL' THEN 1 ELSE 0 END)")

Extended Metrics

Metric Description Example
ext.day_over_day(metric) Day-over-day change mp.ext.day_over_day(mp.sum("revenue"))
ext.week_over_week(metric) Week-over-week change mp.ext.week_over_week(mp.average("price"))
ext.stddev(metric, offset, n) Standard deviation over window mp.ext.stddev(mp.sum("sales"), offset=0, n=7)

Available Assertions

Assertion Description Example
is_eq(value, tol) Equals with tolerance .is_eq(100, tol=0.01)
is_neq(value, tol) Not equals with tolerance .is_neq(100, tol=0.01)
is_between(min, max) In range (inclusive) .is_between(0, 100)
is_positive() Greater than zero .is_positive()
is_zero() Equals zero .is_zero()
is_negative() Less than zero .is_negative()
is_gt(val) / is_geq(val) Greater than (or equal) .is_gt(0.95)
is_lt(val) / is_leq(val) Less than (or equal) .is_lt(0.05)
noop() No validation (collect only) .noop()

License

MIT License. See LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dqlib-0.5.12.tar.gz (114.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dqlib-0.5.12-py3-none-any.whl (130.5 kB view details)

Uploaded Python 3

File details

Details for the file dqlib-0.5.12.tar.gz.

File metadata

  • Download URL: dqlib-0.5.12.tar.gz
  • Upload date:
  • Size: 114.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dqlib-0.5.12.tar.gz
Algorithm Hash digest
SHA256 6c3736975e43376a366c8e88378c2790e25be42a55ffa7d44f0682ca52288083
MD5 88d2d3f950e05de383ff48da2a8c0193
BLAKE2b-256 5fa8c8ff0f2e23904ae3bde59725f7a507afb3c9b57dd1cfbfcb96b346ca7be5

See more details on using hashes here.

File details

Details for the file dqlib-0.5.12-py3-none-any.whl.

File metadata

  • Download URL: dqlib-0.5.12-py3-none-any.whl
  • Upload date:
  • Size: 130.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dqlib-0.5.12-py3-none-any.whl
Algorithm Hash digest
SHA256 b0d55e6092ee6e06c2bbfa5ba9f3e2bed08115a515f6872ba969b07a97b641fb
MD5 5d4b1aac1f417e80f0fd28f2336a155b
BLAKE2b-256 7ea9a97296de2ad875b9da60221d9d1b36fb5cdf9197cde620c8eae1ef73f3cf

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page