Skip to main content

Data Quality eXcellence

Project description

DQX - Data Quality X

Data quality as code. Works with your warehouse, scales with your needs.

Tests codecov Documentation Status CodeRabbit Pull Request Reviews Python License

Why DQX?

  • Write validation logic as testable Python functions - No more complex SQL scripts scattered across your codebase
  • Execute efficiently on any SQL backend - DuckDB, BigQuery, Snowflake, or your existing data warehouse
  • No clusters or complex infrastructure needed - Runs wherever your data lives
  • Integrates seamlessly with existing workflows - Drop it into your current pipeline

Quick Start

pip install dqlib

Define your data quality checks as Python functions:

import pyarrow as pa
from dqx.api import check, VerificationSuite, MetricProvider, Context
from dqx.common import ResultKey
from dqx.datasource import DuckRelationDataSource
from dqx.orm.repositories import InMemoryMetricDB


# Define your validation rules
@check(name="Revenue integrity")
def validate_revenue(mp: MetricProvider, ctx: Context) -> None:
    # Verify reported revenue is positive
    reported = mp.sum("revenue")
    ctx.assert_that(reported).config(
        name="Revenue is positive", severity="P0"
    ).is_positive()

    # Check average transaction size is reasonable
    avg_revenue = mp.average("revenue")
    ctx.assert_that(avg_revenue).config(
        name="Average transaction size", severity="P1"
    ).is_between(10, 100)


# Your own metric store
db = InMemoryMetricDB()
suite = VerificationSuite([validate_revenue], db, "Daily validation")

# Data comes from your warehouse
data = pa.Table.from_pydict(
    {"price": [10.5, 20.0, 15.5], "quantity": [2, 1, 3], "revenue": [21.0, 20.0, 46.5]}
)
datasource = DuckRelationDataSource.from_arrow(data)

# Validate your data
suite.run([datasource], ResultKey())
# ✓ Revenue integrity: OK

Real-World Examples

1. Data Completeness

Monitor critical fields aren't missing

@check(name="Customer data quality")
def check_completeness(mp: MetricProvider, ctx: Context) -> None:
    # Flag if more than 5% of emails are missing
    null_rate = mp.null_count("email") / mp.num_rows()
    ctx.assert_that(null_rate).config(name="Email completeness", severity="P0").is_lt(
        0.05
    )

    # Ensure all orders have customer IDs
    ctx.assert_that(mp.null_count("customer_id")).config(
        name="Customer ID required", severity="P0"
    ).is_eq(0)

2. Revenue Integrity

Catch calculation errors in financial data

@check(name="Financial accuracy")
def validate_financials(mp: MetricProvider, ctx: Context) -> None:
    # Verify totals match across systems
    total_revenue = mp.sum("revenue")
    total_collected = mp.sum("payments")

    ctx.assert_that(total_collected / total_revenue).config(
        name="Payment collection rate", severity="P1"
    ).is_between(
        0.95, 1.05
    )  # 5% tolerance

    # Check for negative prices
    ctx.assert_that(mp.minimum("price")).config(
        name="No negative prices", severity="P0"
    ).is_geq(0)

3. Trend Monitoring

Alert on unexpected metric changes

@check(name="Business metrics stability")
def monitor_trends(mp: MetricProvider, ctx: Context) -> None:
    # Alert on significant daily changes
    daily_change = mp.sum("revenue") / mp.sum("revenue", lag=1)
    ctx.assert_that(daily_change).config(
        name="Daily revenue stability", severity="P0"
    ).is_between(
        0.8, 1.2
    )  # ±20% change

    # Track week-over-week growth
    wow_change = mp.sum("revenue") / mp.sum("revenue", lag=7)
    ctx.assert_that(wow_change).config(
        name="Weekly revenue trend", severity="P1"
    ).is_geq(
        0.95
    )  # Allow 5% decline

4. Cross-Dataset Validation

Ensure consistency across environments

@check(name="Production vs Staging", datasets=["production", "staging"])
def validate_environments(mp: MetricProvider, ctx: Context) -> None:
    # Compare row counts
    prod_count = mp.num_rows(dataset="production")
    staging_count = mp.num_rows(dataset="staging")

    ctx.assert_that(prod_count).config(
        name="Row count match", severity="P1"
    ).is_between(
        staging_count - 100, staging_count + 100
    )  # Allow 100 row difference

    # Verify key metrics align
    prod_revenue = mp.sum("revenue", dataset="production")
    staging_revenue = mp.sum("revenue", dataset="staging")

    ctx.assert_that((prod_revenue - staging_revenue) / prod_revenue).config(
        name="Revenue consistency", severity="P0"
    ).is_lt(
        0.01
    )  # Less than 1% difference

5. Data Quality SLAs

Track quality metrics with severity levels

@check(name="Data quality SLAs")
def enforce_slas(mp: MetricProvider, ctx: Context) -> None:
    # P0: Critical - No duplicate transactions
    ctx.assert_that(mp.duplicate_count(["transaction_id"])).config(
        name="Transaction uniqueness", severity="P0"
    ).is_eq(0)

    # P1: High - Recent activity
    recent_count = mp.count_values("status", "active")
    total_count = mp.num_rows()
    active_rate = recent_count / total_count

    ctx.assert_that(active_rate).config(
        name="Active record percentage", severity="P1"
    ).is_gt(
        0.5
    )  # At least 50% active

    # P2: Medium - Cardinality checks
    unique_users = mp.unique_count("user_id")
    ctx.assert_that(unique_users).config(
        name="Active user threshold", severity="P2"
    ).is_gt(1000)

Profiles

Adjust validation behavior during specific periods:

from dqx.profiles import SeasonalProfile, tag, check

christmas = SeasonalProfile(
    name="Christmas 2024",
    start_date=date(2024, 12, 20),
    end_date=date(2025, 1, 5),
    rules=[
        tag("xmas").set(metric_multiplier=2.0),  # Scale metrics
        tag("non-critical").set(severity="P3"),  # Downgrade severity
        check("Volume Check").disable(),  # Skip checks
    ],
)

suite = VerificationSuite(checks, db, "My Suite", profiles=[christmas])

Quick Reference

Available Metrics

Metric Description Example
num_rows() Total row count mp.num_rows()
sum(col) Sum of values mp.sum("revenue")
average(col) Mean value mp.average("price")
minimum(col) / maximum(col) Min/max values mp.minimum("age")
first(col) First value in column mp.first("timestamp")
variance(col) Statistical variance mp.variance("score")
null_count(col) Count of null values mp.null_count("email")
duplicate_count([cols]) Count of duplicate rows mp.duplicate_count(["id"])
count_values(col, val) Count specific values mp.count_values("status", "active")
unique_count(col) Distinct value count mp.unique_count("user_id")
custom_sql(sql) Execute custom SQL expression mp.custom_sql("SUM(CASE WHEN origin = 'NL' THEN 1 ELSE 0 END)")

Extended Metrics

Metric Description Example
ext.day_over_day(metric) Day-over-day change mp.ext.day_over_day(mp.sum("revenue"))
ext.week_over_week(metric) Week-over-week change mp.ext.week_over_week(mp.average("price"))
ext.stddev(metric, offset, n) Standard deviation over window mp.ext.stddev(mp.sum("sales"), offset=0, n=7)

Available Assertions

Assertion Description Example
is_eq(value, tol) Equals with tolerance .is_eq(100, tol=0.01)
is_neq(value, tol) Not equals with tolerance .is_neq(100, tol=0.01)
is_between(min, max) In range (inclusive) .is_between(0, 100)
is_positive() Greater than zero .is_positive()
is_zero() Equals zero .is_zero()
is_negative() Less than zero .is_negative()
is_gt(val) / is_geq(val) Greater than (or equal) .is_gt(0.95)
is_lt(val) / is_leq(val) Less than (or equal) .is_lt(0.05)
noop() No validation (collect only) .noop()

License

MIT License. See LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dqlib-0.5.23.tar.gz (120.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dqlib-0.5.23-py3-none-any.whl (134.5 kB view details)

Uploaded Python 3

File details

Details for the file dqlib-0.5.23.tar.gz.

File metadata

  • Download URL: dqlib-0.5.23.tar.gz
  • Upload date:
  • Size: 120.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dqlib-0.5.23.tar.gz
Algorithm Hash digest
SHA256 fcdb79f48dc1867d2af530a598ed8fd89719a4c5140db443b3d7620c830bd9be
MD5 bb947e8d947c06a9ae5a78d06b2b127b
BLAKE2b-256 6e795febea58019fa6eac4cbcb85c7dc1b91ba64992261c89d424c965ac87f9e

See more details on using hashes here.

File details

Details for the file dqlib-0.5.23-py3-none-any.whl.

File metadata

  • Download URL: dqlib-0.5.23-py3-none-any.whl
  • Upload date:
  • Size: 134.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dqlib-0.5.23-py3-none-any.whl
Algorithm Hash digest
SHA256 4f06c263005b63a96e6fba7c18d89dfb4b2ce122fd6c5480888573912fb3a93f
MD5 70408ed6fe624e96bf487a67284a8a90
BLAKE2b-256 e8b6324e2deaaef07e84c9c3caabdf7b0154e58eb6bf6fb2056faf07dc9a9cbd

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page