Skip to main content

Data Quality eXcellence

Project description

DQX - Data Quality Excellence

Data quality as code. Works with your warehouse, scales with your needs.

Tests codecov Documentation Status CodeRabbit Pull Request Reviews Python License

Why DQX?

  • Write validation logic as testable Python functions - No more complex SQL scripts scattered across your codebase
  • Execute efficiently on any SQL backend - DuckDB, BigQuery, Snowflake, or your existing data warehouse
  • No clusters or complex infrastructure needed - Runs wherever your data lives
  • Integrates seamlessly with existing workflows - Drop it into your current pipeline

Quick Start

pip install dqlib

Define your data quality checks as Python functions:

import pyarrow as pa
from dqx.api import check, VerificationSuite, MetricProvider, Context
from dqx.common import ResultKey
from dqx.datasource import DuckRelationDataSource
from dqx.orm.repositories import InMemoryMetricDB


# Define your validation rules
@check(name="Revenue integrity")
def validate_revenue(mp: MetricProvider, ctx: Context) -> None:
    # Verify reported revenue is positive
    reported = mp.sum("revenue")
    ctx.assert_that(reported).where(
        name="Revenue is positive", severity="P0"
    ).is_positive()

    # Check average transaction size is reasonable
    avg_revenue = mp.average("revenue")
    ctx.assert_that(avg_revenue).where(
        name="Average transaction size", severity="P1"
    ).is_between(10, 100)


# Your own metric store
db = InMemoryMetricDB()
suite = VerificationSuite([validate_revenue], db, "Daily validation")

# Data comes from your warehouse
data = pa.Table.from_pydict(
    {"price": [10.5, 20.0, 15.5], "quantity": [2, 1, 3], "revenue": [21.0, 20.0, 46.5]}
)
datasource = DuckRelationDataSource.from_arrow(data)

# Validate your data
suite.run([datasource], ResultKey())
# ✓ Revenue integrity: OK

Real-World Examples

1. Data Completeness

Monitor critical fields aren't missing

@check(name="Customer data quality")
def check_completeness(mp: MetricProvider, ctx: Context) -> None:
    # Flag if more than 5% of emails are missing
    null_rate = mp.null_count("email") / mp.num_rows()
    ctx.assert_that(null_rate).where(name="Email completeness", severity="P0").is_lt(
        0.05
    )

    # Ensure all orders have customer IDs
    ctx.assert_that(mp.null_count("customer_id")).where(
        name="Customer ID required", severity="P0"
    ).is_eq(0)

2. Revenue Integrity

Catch calculation errors in financial data

@check(name="Financial accuracy")
def validate_financials(mp: MetricProvider, ctx: Context) -> None:
    # Verify totals match across systems
    total_revenue = mp.sum("revenue")
    total_collected = mp.sum("payments")

    ctx.assert_that(total_collected / total_revenue).where(
        name="Payment collection rate", severity="P1"
    ).is_between(
        0.95, 1.05
    )  # 5% tolerance

    # Check for negative prices
    ctx.assert_that(mp.minimum("price")).where(
        name="No negative prices", severity="P0"
    ).is_geq(0)

3. Trend Monitoring

Alert on unexpected metric changes

@check(name="Business metrics stability")
def monitor_trends(mp: MetricProvider, ctx: Context) -> None:
    # Alert on significant daily changes
    daily_change = mp.sum("revenue") / mp.sum("revenue", lag=1)
    ctx.assert_that(daily_change).where(
        name="Daily revenue stability", severity="P0"
    ).is_between(
        0.8, 1.2
    )  # ±20% change

    # Track week-over-week growth
    wow_change = mp.sum("revenue") / mp.sum("revenue", lag=7)
    ctx.assert_that(wow_change).where(
        name="Weekly revenue trend", severity="P1"
    ).is_geq(
        0.95
    )  # Allow 5% decline

4. Cross-Dataset Validation

Ensure consistency across environments

@check(name="Production vs Staging", datasets=["production", "staging"])
def validate_environments(mp: MetricProvider, ctx: Context) -> None:
    # Compare row counts
    prod_count = mp.num_rows(dataset="production")
    staging_count = mp.num_rows(dataset="staging")

    ctx.assert_that(prod_count).where(name="Row count match", severity="P1").is_between(
        staging_count - 100, staging_count + 100
    )  # Allow 100 row difference

    # Verify key metrics align
    prod_revenue = mp.sum("revenue", dataset="production")
    staging_revenue = mp.sum("revenue", dataset="staging")

    ctx.assert_that((prod_revenue - staging_revenue) / prod_revenue).where(
        name="Revenue consistency", severity="P0"
    ).is_lt(
        0.01
    )  # Less than 1% difference

5. Data Quality SLAs

Track quality metrics with severity levels

@check(name="Data quality SLAs")
def enforce_slas(mp: MetricProvider, ctx: Context) -> None:
    # P0: Critical - No duplicate transactions
    ctx.assert_that(mp.duplicate_count(["transaction_id"])).where(
        name="Transaction uniqueness", severity="P0"
    ).is_eq(0)

    # P1: High - Recent activity
    recent_count = mp.count_values("status", "active")
    total_count = mp.num_rows()
    active_rate = recent_count / total_count

    ctx.assert_that(active_rate).where(
        name="Active record percentage", severity="P1"
    ).is_gt(
        0.5
    )  # At least 50% active

    # P2: Medium - Cardinality checks
    unique_users = mp.unique_count("user_id")
    ctx.assert_that(unique_users).where(
        name="Active user threshold", severity="P2"
    ).is_gt(1000)

Profiles

Adjust validation behavior during specific periods:

from dqx.profiles import SeasonalProfile, tag, check

christmas = SeasonalProfile(
    name="Christmas 2024",
    start_date=date(2024, 12, 20),
    end_date=date(2025, 1, 5),
    rules=[
        tag("xmas").set(metric_multiplier=2.0),  # Scale metrics
        tag("non-critical").set(severity="P3"),  # Downgrade severity
        check("Volume Check").disable(),  # Skip checks
    ],
)

suite = VerificationSuite(checks, db, "My Suite", profiles=[christmas])

Quick Reference

Available Metrics

Metric Description Example
num_rows() Total row count mp.num_rows()
sum(col) Sum of values mp.sum("revenue")
average(col) Mean value mp.average("price")
minimum(col) / maximum(col) Min/max values mp.minimum("age")
first(col) First value in column mp.first("timestamp")
variance(col) Statistical variance mp.variance("score")
null_count(col) Count of null values mp.null_count("email")
duplicate_count([cols]) Count of duplicate rows mp.duplicate_count(["id"])
count_values(col, val) Count specific values mp.count_values("status", "active")
unique_count(col) Distinct value count mp.unique_count("user_id")
custom_sql(sql) Execute custom SQL expression mp.custom_sql("SUM(CASE WHEN origin = 'NL' THEN 1 ELSE 0 END)")

Extended Metrics

Metric Description Example
ext.day_over_day(metric) Day-over-day change mp.ext.day_over_day(mp.sum("revenue"))
ext.week_over_week(metric) Week-over-week change mp.ext.week_over_week(mp.average("price"))
ext.stddev(metric, offset, n) Standard deviation over window mp.ext.stddev(mp.sum("sales"), offset=0, n=7)

Available Assertions

Assertion Description Example
is_eq(value, tol) Equals with tolerance .is_eq(100, tol=0.01)
is_neq(value, tol) Not equals with tolerance .is_neq(100, tol=0.01)
is_between(min, max) In range (inclusive) .is_between(0, 100)
is_positive() Greater than zero .is_positive()
is_zero() Equals zero .is_zero()
is_negative() Less than zero .is_negative()
is_gt(val) / is_geq(val) Greater than (or equal) .is_gt(0.95)
is_lt(val) / is_leq(val) Less than (or equal) .is_lt(0.05)
noop() No validation (collect only) .noop()

License

MIT License. See LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dqlib-0.5.16.tar.gz (117.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dqlib-0.5.16-py3-none-any.whl (132.9 kB view details)

Uploaded Python 3

File details

Details for the file dqlib-0.5.16.tar.gz.

File metadata

  • Download URL: dqlib-0.5.16.tar.gz
  • Upload date:
  • Size: 117.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dqlib-0.5.16.tar.gz
Algorithm Hash digest
SHA256 8aa9209990ce488a1f2d7df3c9894ceafced8ee32ecf416f3551abca2eccc937
MD5 878d164dd8e02a2c7feb40bf7016b655
BLAKE2b-256 31670555c15c35a0874245e47b4106648e8a995d83cdeef3472e085c35b3cb8a

See more details on using hashes here.

File details

Details for the file dqlib-0.5.16-py3-none-any.whl.

File metadata

  • Download URL: dqlib-0.5.16-py3-none-any.whl
  • Upload date:
  • Size: 132.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dqlib-0.5.16-py3-none-any.whl
Algorithm Hash digest
SHA256 1a36671bc47189c01e9eb7d43cd770cf76b4813db7ac64ece860b743fb507da6
MD5 91ceb2f4c88b2d1d9f229fe3bfd0e00e
BLAKE2b-256 2b17155d52c53d50fdad87229f65587cd5765f31efe691c5cef34dab190b5c0b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page