Skip to main content

A Python-native data quality tool with AI superpowers, built on DuckDB for speed

Project description

DuckGuard

Data Quality That Just Works

3 lines of code10x faster20x less memory

Stop wrestling with 50+ lines of boilerplate. Start validating data in seconds.

PyPI version Downloads GitHub stars Python 3.10+ License: Apache-2.0 CI Docs

Open In Colab Kaggle


From Zero to Validated in 30 Seconds

pip install duckguard
from duckguard import connect

orders = connect("orders.csv")                               # CSV, Parquet, JSON, S3, databases...
assert orders.customer_id.is_not_null()                      # Just like pytest!
assert orders.amount.between(0, 10000)                       # Readable validations
assert orders.status.isin(["pending", "shipped", "delivered"])

quality = orders.score()
print(f"Grade: {quality.grade}")  # A, B, C, D, or F

That's it. No context. No datasource. No validator. No expectation suite. Just data quality.


Demo

DuckGuard Demo

Why DuckGuard?

Every data quality tool asks you to write 50+ lines of boilerplate before you can validate a single column. DuckGuard gives you a pytest-like API powered by DuckDB's speed.

Great Expectations

# 50+ lines of setup required
from great_expectations import get_context

context = get_context()
datasource = context.sources.add_pandas("my_ds")
asset = datasource.add_dataframe_asset(
    name="orders", dataframe=df
)
batch_request = asset.build_batch_request()
expectation_suite = context.add_expectation_suite(
    "orders_suite"
)
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="orders_suite"
)
validator.expect_column_values_to_not_be_null(
    "customer_id"
)
validator.expect_column_values_to_be_between(
    "amount", min_value=0, max_value=10000
)
# ... and more configuration

45 seconds | 4GB RAM | 20+ dependencies

DuckGuard

from duckguard import connect

orders = connect("orders.csv")

assert orders.customer_id.is_not_null()
assert orders.amount.between(0, 10000)













4 seconds | 200MB RAM | 7 dependencies

Feature DuckGuard Great Expectations Soda Core Pandera
Lines of code to start 3 50+ 10+ 5+
Time for 1GB CSV* ~4 sec ~45 sec ~20 sec ~15 sec
Memory for 1GB CSV* ~200 MB ~4 GB ~1.5 GB ~1.5 GB
Learning curve Minutes Days Hours Minutes
Pytest-like API Yes - - -
DuckDB-powered Yes - Partial -
Cloud storage (S3/GCS/Azure) Yes Yes Yes -
Database connectors 11+ Yes Yes -
PII detection Built-in - - -
Anomaly detection (7 methods) Built-in - Partial -
Schema evolution tracking Built-in - Yes -
Freshness monitoring Built-in - Yes -
Data contracts Yes - Yes Yes
Row-level error details Yes Yes - Yes
Cross-dataset & FK checks Built-in Partial Yes -
Reconciliation Built-in - - -
Distribution drift Built-in - - -
Conditional checks Built-in - - -
Query-based checks Built-in - Yes -
YAML rules Yes Yes Yes -
dbt integration Yes Yes Yes -
Slack/Teams/Email alerts Yes Yes Yes -
HTML/PDF reports Yes Yes Yes -

*Performance varies by hardware and data characteristics. Based on typical usage patterns with DuckDB's columnar engine.


Installation

pip install duckguard

# With optional features
pip install duckguard[reports]     # HTML/PDF reports
pip install duckguard[snowflake]   # Snowflake connector
pip install duckguard[databricks]  # Databricks connector
pip install duckguard[airflow]     # Airflow integration
pip install duckguard[all]         # Everything

Feature Overview

🎯

Quality Scoring
A-F grades with 4 quality dimensions

🔒

PII Detection
Auto-detect emails, SSNs, phones

📈

Anomaly Detection
Z-score, IQR, KS-test, ML baselines

🔔

Alerts
Slack, Teams, Email

Freshness Monitoring
Detect stale data automatically

📐

Schema Evolution
Track and detect breaking changes

📜

Data Contracts
Schema + SLA enforcement

🔎

Row-Level Errors
See exactly which rows failed

📄

HTML/PDF Reports
Beautiful shareable reports

📈

Historical Tracking
Quality trends over time

🔗

Cross-Dataset Checks
FK, reconciliation, drift

🚀

CI/CD Ready
dbt, Airflow, GitHub Actions

📋

YAML Rules
Declarative validation rules

🔍

Auto-Profiling
Semantic types & rule suggestions

Conditional Checks
Validate when conditions are met

📊

Group-By Validation
Segmented per-group checks

Connect to Anything

from duckguard import connect

# Files
orders = connect("orders.csv")
orders = connect("orders.parquet")
orders = connect("orders.json")

# Cloud Storage
orders = connect("s3://bucket/orders.parquet")
orders = connect("gs://bucket/orders.parquet")
orders = connect("az://container/orders.parquet")

# Databases
orders = connect("postgres://localhost/db", table="orders")
orders = connect("mysql://localhost/db", table="orders")
orders = connect("snowflake://account/db", table="orders")
orders = connect("bigquery://project/dataset", table="orders")
orders = connect("databricks://workspace/catalog/schema", table="orders")
orders = connect("redshift://cluster/db", table="orders")

# Modern Formats
orders = connect("delta://path/to/delta_table")
orders = connect("iceberg://path/to/iceberg_table")

# pandas DataFrame
import pandas as pd
orders = connect(pd.read_csv("orders.csv"))

Supported: CSV, Parquet, JSON, Excel | S3, GCS, Azure Blob | PostgreSQL, MySQL, SQLite, Snowflake, BigQuery, Redshift, Databricks, SQL Server, Oracle, MongoDB | Delta Lake, Apache Iceberg | pandas DataFrames


Cookbook

Column Validation

orders = connect("orders.csv")

# Null & uniqueness
orders.order_id.is_not_null()          # No nulls allowed
orders.order_id.is_unique()            # All values distinct
orders.order_id.has_no_duplicates()    # Alias for is_unique

# Range & comparison
orders.amount.between(0, 10000)        # Inclusive range
orders.amount.greater_than(0)          # Minimum (exclusive)
orders.amount.less_than(100000)        # Maximum (exclusive)

# Pattern & enum
orders.email.matches(r'^[\w.+-]+@[\w-]+\.[\w.]+$')
orders.status.isin(["pending", "shipped", "delivered"])

# String length
orders.order_id.value_lengths_between(5, 10)

Every validation returns a ValidationResult with .passed, .message, .summary(), and .failed_rows.

Row-Level Error Debugging

result = orders.quantity.between(1, 100)

if not result.passed:
    print(result.summary())
    # Column 'quantity' has 3 values outside [1, 100]
    #
    # Sample of 3 failing rows (total: 3):
    #   Row 5: quantity=500 - Value outside range [1, 100]
    #   Row 23: quantity=-2 - Value outside range [1, 100]
    #   Row 29: quantity=0 - Value outside range [1, 100]

    for row in result.failed_rows:
        print(f"Row {row.row_number}: {row.value} ({row.reason})")

    print(result.get_failed_values())        # [500, -2, 0]
    print(result.get_failed_row_indices())   # [5, 23, 29]

Quality Scoring

score = orders.score()

print(score.grade)          # A, B, C, D, or F
print(score.overall)        # 0-100 composite score
print(score.completeness)   # % non-null across all columns
print(score.uniqueness)     # % unique across key columns
print(score.validity)       # % values passing type/range checks
print(score.consistency)    # % consistent formatting

Cross-Dataset Validation

orders = connect("orders.csv")
customers = connect("customers.csv")

# Foreign key check
result = orders.customer_id.exists_in(customers.customer_id)

# FK with null handling
result = orders.customer_id.references(customers.customer_id, allow_nulls=True)

# Get orphan values for debugging
orphans = orders.customer_id.find_orphans(customers.customer_id)
print(f"Invalid IDs: {orphans}")

# Compare value sets
result = orders.status.matches_values(lookup.code)

# Compare row counts with tolerance
result = orders.row_count_matches(backup, tolerance=10)

Reconciliation

source = connect("orders_source.parquet")
target = connect("orders_migrated.parquet")

recon = source.reconcile(
    target,
    key_columns=["order_id"],
    compare_columns=["amount", "status", "customer_id"],
)

print(recon.match_percentage)    # 95.5
print(recon.missing_in_target)   # 3
print(recon.extra_in_target)     # 1
print(recon.value_mismatches)    # {'amount': 5, 'status': 2}
print(recon.summary())

Distribution Drift Detection

baseline = connect("orders_jan.parquet")
current  = connect("orders_feb.parquet")

drift = current.amount.detect_drift(baseline.amount)

print(drift.is_drifted)    # True/False
print(drift.p_value)       # 0.0023
print(drift.statistic)     # KS statistic
print(drift.message)       # Human-readable summary

Group-By Validation

grouped = orders.group_by("region")

print(grouped.groups)        # [{'region': 'North'}, ...]
print(grouped.group_count)   # 4

for stat in grouped.stats():
    print(stat)              # {'region': 'North', 'row_count': 150}

# Ensure every group has at least 10 rows
result = grouped.row_count_greater_than(10)
for g in result.get_failed_groups():
    print(f"{g.key_string}: only {g.row_count} rows")

🆕 What's New in 3.2

DuckGuard 3.2 adds AI-powered data quality — the first data quality library with native LLM integration.

AI Features (NEW)

# Explain quality issues in plain English
duckguard explain orders.csv

# AI suggests validation rules based on your data
duckguard suggest orders.csv

# Get AI-powered fix suggestions for quality issues
duckguard fix orders.csv
from duckguard.ai import explainer, rules_generator, fixer

# Natural language quality explanation
summary = explainer.explain(dataset)

# AI-generated validation rules
rules = rules_generator.suggest_rules(dataset)

# Suggest fixes for data quality issues
fixes = fixer.suggest_fixes(dataset, results)

Supports OpenAI, Anthropic, and Ollama (local models). Configure via environment variables or AIConfig.

Also in 3.2

  • 🔍 Improved semantic type detection — smarter column classification, fewer false positives
  • 📄 Apache 2.0 license — OSI-approved, enterprise-friendly
  • 🛡️ SQL injection prevention — multi-layer escaping in all string-based checks
  • 📖 Full documentation sitexdatahubai.github.io/duckguard
  • 🔒 PEP 561 typedpy.typed marker for mypy/pyright

What's New in 3.0

DuckGuard 3.0 introduces conditional checks, multi-column validation, query-based expectations, distributional tests, and 7 anomaly detection methods.

Conditional Checks

Apply validation rules only when a SQL condition is met:

# Email required only for shipped orders
orders.email.not_null_when("status = 'shipped'")

# Quantity must be 1-100 for US orders
orders.quantity.between_when(1, 100, "country = 'US'")

# Status must be shipped or delivered for UK
orders.status.isin_when(["shipped", "delivered"], "country = 'UK'")

# Also: unique_when(), matches_when()

Multi-Column Checks

Validate relationships across columns:

# Ship date must come after created date
orders.expect_column_pair_satisfy(
    column_a="ship_date",
    column_b="created_at",
    expression="ship_date >= created_at",
)

# Composite key uniqueness
orders.expect_columns_unique(columns=["order_id", "customer_id"])

# Multi-column sum check
orders.expect_multicolumn_sum_to_equal(
    columns=["subtotal", "tax", "shipping"],
    expected_sum=59.50,
)

Query-Based Checks

Run custom SQL for unlimited flexibility:

# No rows should have negative quantities
orders.expect_query_to_return_no_rows(
    "SELECT * FROM table WHERE quantity < 0"
)

# Verify data exists
orders.expect_query_to_return_rows(
    "SELECT * FROM table WHERE status = 'shipped'"
)

# Exact value check on aggregate
orders.expect_query_result_to_equal(
    "SELECT COUNT(*) FROM table", expected=1000
)

# Range check on aggregate
orders.expect_query_result_to_be_between(
    "SELECT AVG(amount) FROM table", min_value=50, max_value=500
)

Distributional Checks

Statistical tests for distribution shape (requires scipy):

# Test for normal distribution
orders.amount.expect_distribution_normal(significance_level=0.05)

# Kolmogorov-Smirnov test
orders.quantity.expect_ks_test(distribution="norm")

# Chi-square goodness of fit
orders.status.expect_chi_square_test()

Anomaly Detection (7 Methods)

from duckguard import detect_anomalies, AnomalyDetector
from duckguard.anomaly import BaselineMethod, KSTestMethod, SeasonalMethod

# High-level API: detect anomalies across columns
report = detect_anomalies(orders, method="zscore", columns=["quantity", "amount"])
print(report.has_anomalies, report.anomaly_count)
for a in report.anomalies:
    print(f"{a.column}: score={a.score:.2f}, anomaly={a.is_anomaly}")

# AnomalyDetector with IQR
detector = AnomalyDetector(method="iqr", threshold=1.5)
report = detector.detect(orders, columns=["quantity"])

# ML Baseline: fit on historical data, score new values
baseline = BaselineMethod(sensitivity=2.0)
baseline.fit([100, 102, 98, 105, 97, 103])
print(baseline.baseline_mean, baseline.baseline_std)

score = baseline.score(250)  # Single value
print(score.is_anomaly, score.score)

scores = baseline.score(orders.amount)  # Entire column
print(max(scores))

# KS-Test: detect distribution drift
ks = KSTestMethod(p_value_threshold=0.05)
ks.fit([1, 2, 3, 4, 5])
comparison = ks.compare_distributions([10, 11, 12, 13, 14])
print(comparison.is_drift, comparison.p_value, comparison.message)

# Seasonal: time-aware anomaly detection
seasonal = SeasonalMethod(period="daily", sensitivity=2.0)
seasonal.fit([10, 12, 11, 13, 9, 14])

Available methods: zscore, iqr, modified_zscore, percent_change, baseline, ks_test, seasonal


YAML Rules & Data Contracts

Declarative Rules

# duckguard.yaml
name: orders_validation
description: Quality checks for the orders dataset

checks:
  order_id:
    - not_null
    - unique
  quantity:
    - between: [1, 1000]
  status:
    - allowed_values: [pending, shipped, delivered, cancelled, returned]
  email:
    - not_null:
        severity: warning
from duckguard import load_rules, execute_rules

rules = load_rules("duckguard.yaml")
result = execute_rules(rules, "orders.csv")

print(f"Passed: {result.passed_count}/{result.total_checks}")
for r in result.results:
    tag = "PASS" if r.passed else "FAIL"
    print(f"  [{tag}] {r.message}")

Auto-Discover Rules

from duckguard import connect, generate_rules

orders = connect("orders.csv")
yaml_rules = generate_rules(orders, dataset_name="orders")
print(yaml_rules)  # Ready-to-use YAML

Data Contracts

from duckguard import generate_contract, validate_contract, diff_contracts
from duckguard.contracts import contract_to_yaml

# Generate a contract from existing data
contract = generate_contract(orders, name="orders_v1", owner="data-team")
print(contract.name, contract.version, len(contract.schema))

# Validate data against a contract
validation = validate_contract(contract, "orders.csv")
print(validation.passed)

# Export to YAML
print(contract_to_yaml(contract))

# Detect breaking changes between versions
diff = diff_contracts(contract_v1, contract_v2)
if diff.has_breaking_changes:
    for change in diff.changes:
        print(change)

Auto-Profiling & Semantic Analysis

from duckguard import AutoProfiler, SemanticAnalyzer, detect_type, detect_types_for_dataset

# Profile entire dataset — quality scores, pattern detection, and rule suggestions included
profiler = AutoProfiler()
profile = profiler.profile(orders)
print(f"Columns: {profile.column_count}, Rows: {profile.row_count}")
print(f"Quality: {profile.overall_quality_grade} ({profile.overall_quality_score:.1f}/100)")

# Per-column quality grades and percentiles
for col in profile.columns:
    print(f"  {col.name}: grade={col.quality_grade}, nulls={col.null_percent:.1f}%")
    if col.median_value is not None:
        print(f"    p25={col.p25_value}, median={col.median_value}, p75={col.p75_value}")

# Suggested rules (25+ pattern types: email, SSN, UUID, credit card, etc.)
print(f"Suggested rules: {len(profile.suggested_rules)}")
for rule in profile.suggested_rules[:5]:
    print(f"  {rule}")

# Deep profiling — distribution analysis + outlier detection (numeric columns)
deep_profiler = AutoProfiler(deep=True)
deep_profile = deep_profiler.profile(orders)
for col in deep_profile.columns:
    if col.distribution_type:
        print(f"  {col.name}: {col.distribution_type}, skew={col.skewness:.2f}")
    if col.outlier_count is not None:
        print(f"    outliers: {col.outlier_count} ({col.outlier_percentage:.1f}%)")

# Configurable thresholds
strict = AutoProfiler(null_threshold=0.0, unique_threshold=100.0, pattern_min_confidence=95.0)
strict_profile = strict.profile(orders)
# Detect semantic type for a single column
print(detect_type(orders, "email"))     # SemanticType.EMAIL
print(detect_type(orders, "country"))   # SemanticType.COUNTRY_CODE

# Detect types for all columns at once
type_map = detect_types_for_dataset(orders)
for col, stype in type_map.items():
    print(f"  {col}: {stype}")

# Full PII analysis
analysis = SemanticAnalyzer().analyze(orders)
print(f"PII columns: {analysis.pii_columns}")  # ['email', 'phone']
for col in analysis.columns:
    if col.is_pii:
        print(f"  {col.name}: {col.semantic_type.value} (confidence: {col.confidence:.0%})")

Supported semantic types: email, phone, url, ip_address, ssn, credit_card, person_name, address, country, state, city, zipcode, latitude, longitude, date, datetime, currency, percentage, boolean, uuid, identifier, and more.


Freshness, Schema & History

Freshness Monitoring

from datetime import timedelta
from duckguard.freshness import FreshnessMonitor

# Quick check
print(orders.freshness.last_modified)   # 2024-01-30 14:22:01
print(orders.freshness.age_human)       # "2 hours ago"
print(orders.freshness.is_fresh)        # True

# Custom threshold
print(orders.is_fresh(timedelta(hours=6)))

# Structured monitoring
monitor = FreshnessMonitor(threshold=timedelta(hours=1))
result = monitor.check(orders)
print(result.is_fresh, result.age_human)

Schema Evolution

from duckguard.schema_history import SchemaTracker, SchemaChangeAnalyzer

# Capture a snapshot
tracker = SchemaTracker()
snapshot = tracker.capture(orders)
for col in snapshot.columns[:5]:
    print(f"  {col.name}: {col.dtype}")

# View history
history = tracker.get_history(orders.source)
print(f"Snapshots: {len(history)}")

# Detect breaking changes
analyzer = SchemaChangeAnalyzer()
report = analyzer.detect_changes(orders)
print(report.has_breaking_changes, len(report.changes))

Historical Tracking & Trends

from duckguard.history import HistoryStorage, TrendAnalyzer

# Store validation results
storage = HistoryStorage()
storage.store(exec_result)

# Query past runs
runs = storage.get_runs("orders.csv", limit=10)
for run in runs:
    print(f"  {run.run_id}: passed={run.passed}, checks={run.total_checks}")

# Analyze quality trends
trends = TrendAnalyzer(storage).analyze("orders.csv", days=30)
print(trends.summary())

Reports & Notifications

DuckGuard generates self-contained HTML reports with dark mode, trend charts, collapsible sections, sortable tables, and search — all in a single file with zero JavaScript dependencies.

Live demos: Light / Auto ThemeDark Theme

from duckguard.reports import HTMLReporter, ReportConfig, generate_html_report

# Quick one-liner
generate_html_report(exec_result, "report.html")

# Full-featured report with trends and metadata
config = ReportConfig(
    title="Orders Quality Report",
    dark_mode="auto",          # "auto", "light", or "dark"
    include_trends=True,
    include_metadata=True,
)
reporter = HTMLReporter(config=config)
reporter.generate(
    exec_result,
    "report.html",
    trend_data=trend_data,     # from HistoryStorage.get_trend()
    row_count=dataset.row_count,
    column_count=dataset.column_count,
)

# PDF export (requires weasyprint)
from duckguard.reports import generate_pdf_report
generate_pdf_report(exec_result, "report.pdf")

Notifications

from duckguard.notifications import (
    SlackNotifier, TeamsNotifier, EmailNotifier,
    format_results_text, format_results_markdown,
)

slack = SlackNotifier(webhook_url="https://hooks.slack.com/services/XXX")
teams = TeamsNotifier(webhook_url="https://outlook.office.com/webhook/XXX")
email = EmailNotifier(
    smtp_host="smtp.example.com", smtp_port=587,
    smtp_user="user", smtp_password="pass",
    to_addresses=["team@example.com"],
)

# Format for custom integrations
print(format_results_text(exec_result))
print(format_results_markdown(exec_result))

Integrations

dbt

from duckguard.integrations.dbt import rules_to_dbt_tests

dbt_tests = rules_to_dbt_tests(rules)

Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator

def validate_orders():
    from duckguard import connect, load_rules, execute_rules
    rules = load_rules("duckguard.yaml")
    result = execute_rules(rules, "s3://bucket/orders.parquet")
    if not result.passed:
        raise Exception(f"Quality check failed: {result.failed_count} failures")

dag = DAG("data_quality", schedule_interval="@daily", ...)
PythonOperator(task_id="validate", python_callable=validate_orders, dag=dag)

GitHub Actions

name: Data Quality
on: [push]
jobs:
  quality-check:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with: { python-version: "3.11" }
      - run: pip install duckguard
      - run: duckguard check data/orders.csv --rules duckguard.yaml

pytest

# tests/test_data_quality.py
from duckguard import connect

def test_orders_quality():
    orders = connect("data/orders.csv")
    assert orders.row_count > 0
    assert orders.order_id.is_not_null()
    assert orders.order_id.is_unique()
    assert orders.quantity.between(0, 10000)
    assert orders.status.isin(["pending", "shipped", "delivered", "cancelled"])

CLI

# Validate data against rules
duckguard check orders.csv --config duckguard.yaml

# Auto-discover rules from data
duckguard discover orders.csv > duckguard.yaml

# Generate reports (with dark mode and trend charts)
duckguard report orders.csv --output report.html --dark-mode auto --trends

# Anomaly detection
duckguard anomaly orders.csv --method zscore

# Freshness check
duckguard freshness orders.csv --max-age 6h

# Schema tracking
duckguard schema orders.csv --action capture
duckguard schema orders.csv --action changes

# Data contracts
duckguard contract generate orders.csv
duckguard contract validate orders.csv

# Dataset info
duckguard info orders.csv

# Profile dataset with quality scoring
duckguard profile orders.csv
duckguard profile orders.csv --deep --format json

Performance

Built on DuckDB for fast, memory-efficient validation:

Dataset Great Expectations DuckGuard Speedup
1GB CSV 45 sec, 4GB RAM 4 sec, 200MB RAM 10x faster
10GB Parquet 8 min, 32GB RAM 45 sec, 2GB RAM 10x faster
100M rows Minutes Seconds 10x faster

Why So Fast?

  • DuckDB engine: Columnar, vectorized, SIMD-optimized
  • Zero copy: Direct file access, no DataFrame conversion
  • Lazy evaluation: Only compute what's needed
  • Memory efficient: Stream large files without loading entirely

Scaling Guide

Data Size Recommendation
< 10M rows DuckGuard directly
10-100M rows Use Parquet, configure memory_limit
100GB+ Use database connectors (Snowflake, BigQuery, Databricks)
from duckguard import DuckGuardEngine, connect

engine = DuckGuardEngine(memory_limit="8GB")
dataset = connect("large_data.parquet", engine=engine)

API Quick Reference

Column Properties

col.null_count        # Number of null values
col.null_percent      # Percentage of null values
col.unique_count      # Number of distinct values
col.min, col.max      # Min/max values (numeric)
col.mean, col.median  # Mean and median (numeric)
col.stddev            # Standard deviation (numeric)

Column Validation Methods

Method Description
col.is_not_null() No nulls allowed
col.is_unique() All values distinct
col.between(min, max) Range check (inclusive)
col.greater_than(val) Minimum (exclusive)
col.less_than(val) Maximum (exclusive)
col.matches(regex) Regex pattern check
col.isin(values) Allowed values
col.has_no_duplicates() No duplicate values
col.value_lengths_between(min, max) String length range
col.exists_in(ref_col) FK: values exist in reference
col.references(ref_col, allow_nulls) FK with null handling
col.find_orphans(ref_col) List orphan values
col.matches_values(other_col) Compare value sets
col.detect_drift(ref_col) KS-test drift detection
col.not_null_when(condition) Conditional not-null
col.unique_when(condition) Conditional uniqueness
col.between_when(min, max, condition) Conditional range
col.isin_when(values, condition) Conditional enum
col.matches_when(pattern, condition) Conditional pattern
col.expect_distribution_normal() Normality test
col.expect_ks_test(distribution) KS distribution test
col.expect_chi_square_test() Chi-square test

Dataset Methods

Method Description
ds.score() Quality score (completeness, uniqueness, validity, consistency)
ds.reconcile(target, key_columns, compare_columns) Full reconciliation
ds.row_count_matches(other, tolerance) Row count comparison
ds.group_by(columns) Group-level validation
ds.expect_column_pair_satisfy(a, b, expr) Column pair check
ds.expect_columns_unique(columns) Composite key uniqueness
ds.expect_multicolumn_sum_to_equal(columns, sum) Multi-column sum
ds.expect_query_to_return_no_rows(sql) Custom SQL: no violations
ds.expect_query_to_return_rows(sql) Custom SQL: data exists
ds.expect_query_result_to_equal(sql, val) Custom SQL: exact value
ds.expect_query_result_to_be_between(sql, min, max) Custom SQL: range
ds.is_fresh(max_age) Data freshness check
ds.head(n) Preview first n rows

Enhanced Error Messages

DuckGuard provides helpful, actionable error messages with suggestions:

try:
    orders.nonexistent_column
except ColumnNotFoundError as e:
    print(e)
    # Column 'nonexistent_column' not found.
    # Available columns: order_id, customer_id, product_name, ...

try:
    connect("ftp://data.example.com/file.xyz")
except UnsupportedConnectorError as e:
    print(e)
    # No connector found for: ftp://data.example.com/file.xyz
    # Supported formats: CSV, Parquet, JSON, PostgreSQL, MySQL, ...

Community

We'd love to hear from you! Whether you have a question, idea, or want to share how you're using DuckGuard:


Contributing

We welcome contributions! See CONTRIBUTING.md for guidelines.

git clone https://github.com/XDataHubAI/duckguard.git
cd duckguard
pip install -e ".[dev]"

pytest                    # Run tests
black src tests           # Format code
ruff check src tests      # Lint

License

Apache License 2.0 - see LICENSE


Built with ❤️ by the DuckGuard Team

Discussions · Report Bug · Request Feature · Contribute

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

duckguard-3.2.0.tar.gz (305.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

duckguard-3.2.0-py3-none-any.whl (249.9 kB view details)

Uploaded Python 3

File details

Details for the file duckguard-3.2.0.tar.gz.

File metadata

  • Download URL: duckguard-3.2.0.tar.gz
  • Upload date:
  • Size: 305.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for duckguard-3.2.0.tar.gz
Algorithm Hash digest
SHA256 71aa7ce3173e2a1f144b1d34b1024d5b1116adf274ee589b64ed4cbf29df09a0
MD5 fdc8a52c720a47a17243e182ee93354c
BLAKE2b-256 931501eda83c433573a9579134b91204c0bbf1fa6b1fda2e8a37393662a39922

See more details on using hashes here.

File details

Details for the file duckguard-3.2.0-py3-none-any.whl.

File metadata

  • Download URL: duckguard-3.2.0-py3-none-any.whl
  • Upload date:
  • Size: 249.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for duckguard-3.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 32b9f54cfdd3842e5e0b36f290a3d81a54de6ecc1c4a5fe7bd8c3f10f6506d90
MD5 861154be2eced51f718519ff0e73418c
BLAKE2b-256 51c07b80d7150a679a62bd4c6b6e17dea785f02afd71f7d65a8226c79c2b603e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page