Skip to main content

Blazing fast data quality framework for Python, built on Apache DataFusion

Project description

qualink

Official Website

Blazing fast data quality framework for Python, built on Apache DataFusion.

Features

  • High Performance: Leverages Apache DataFusion for fast data processing and validation.
  • Flexible Constraints: Supports various data quality constraints including completeness, uniqueness, and custom assertions.
  • YAML Configuration: Define validation suites declaratively using YAML files.
  • CLI – qualinkctl: Run YAML-driven validations from the terminal — no Python script required.
  • Cloud Object Stores: Read data directly from Amazon S3 (and S3-compatible services).
  • Multiple Output Formats: Results can be formatted as human-readable text, JSON, or Markdown.
  • Async Support: Built with asyncio for non-blocking operations.
  • Analyzers: Compute reusable dataset and column metrics independent of pass/fail checks.
  • Metrics Repository: Persist analyzer outputs over time using tagged result keys.
  • Anomaly Detection: Detect unexpected metric shifts from historical baselines.
  • Intelligent Rule Suggestions: Generate candidate validation rules from column profiles.
  • Easy Integration: Simple API for defining and running validation suites.

Installation

Install qualink using uv:

uv add qualink

Or using pip:

pip install qualink

Quick Start

Here's a basic example of using qualink to validate a CSV file:

import asyncio
from datafusion import SessionContext
from qualink.checks import Check, Level
from qualink.constraints import Assertion
from qualink.core import ValidationSuite
from qualink.formatters import MarkdownFormatter


async def main() -> None:
    ctx = SessionContext()
    ctx.register_csv("users", "examples/users.csv")

    result = await (
        ValidationSuite()
        .on_data(ctx, "users")
        .with_name("User Data Quality")
        .add_check(Check.builder("Critical Checks").with_level(Level.ERROR).is_complete("user_id").build())
        .add_check(
            Check.builder("Data Quality")
            .with_level(Level.WARNING)
            .has_completeness("name", Assertion.greater_than_or_equal(0.95))
            .build()
        )
        .run()
    )

    print(MarkdownFormatter().format(result))


if __name__ == "__main__":
    asyncio.run(main())

YAML Configuration

You can also define validation suites using YAML files for a declarative approach:

suite:
  name: "User Data Quality"

data_sources:
  - name: users_source
    format: csv
    path: "examples/users.csv"
    table_name: users

checks:
  - name: "Critical Checks"
    level: error
    rules:
      - is_complete: user_id
      - is_unique: email
      - has_size:
          gt: 0
  - name: "Data Quality"
    level: warning
    rules:
      - has_completeness:
          column: name
          gte: 0.95

Run the YAML configuration:

import asyncio
from qualink.config import run_yaml
from qualink.formatters import HumanFormatter


async def main() -> None:
    result = await run_yaml("path/to/your/config.yaml")
    print(HumanFormatter().format(result))


if __name__ == "__main__":
    asyncio.run(main())

run_yaml() also accepts filesystem URIs such as s3://my-bucket/checks.yaml or file:///absolute/path/to/checks.yaml, in addition to local file paths and inline YAML strings.

CLI – qualinkctl

The simplest way to run a YAML validation is with qualinkctl:

# Human-readable output (default)
uv run qualinkctl checks.yaml

# JSON output
uv run qualinkctl checks.yaml -f json

# Markdown report saved to file
uv run qualinkctl checks.yaml -f markdown -o report.md

# JSON report written to object storage
uv run qualinkctl checks.yaml -f json -o s3://my-bucket/qualink/results.json

# Show all constraints (including passed) with debug logging
uv run qualinkctl checks.yaml --show-passed -v

qualinkctl exits with code 0 on success and 1 on failure, making it easy to use in CI/CD pipelines:

uv run qualinkctl checks.yaml -f json -o results.json || echo "Validation failed!"

Run uv run qualinkctl --help for a full list of options.

Advanced Features

Runnable end-to-end examples are available in:

  • examples/adbc_sqlite_example.py
  • examples/analyzers_example.py
  • examples/metrics_repository_example.py
  • examples/anomaly_detection_example.py
  • examples/intelligent_rule_suggestions_example.py
  • examples/output_results_example.py
  • examples/file_uri_validation.py

ADBC Datasources

qualink can also register database-backed sources through ADBC and materialize them into DataFusion tables before running checks.

SQLite example shape:

connections:
  sqlite_local:
    uri: sqlite:///tmp/users.db

data_sources:
  - name: users_source
    connection: sqlite_local
    table: users
    table_name: users

To run the SQLite example after installing the optional ADBC packages:

uv sync --group adbc
uv run python examples/adbc_sqlite_example.py

Secret-backed Connections

Sensitive connection values can be resolved inline from environment variables, AWS Systems Manager Parameter Store, AWS Secrets Manager, or GCP Secret Manager.

Example:

connections:
  sqlite_local:
    uri:
      from: env
      key: QUALINK_SQLITE_URI

data_sources:
  - name: users_source
    connection: sqlite_local
    table: users
    table_name: users

AWS SSM example:

connections:
  postgres_prod:
    uri:
      from: aws_ssm
      key: /qualink/prod/postgres/uri
      region: us-east-1

AWS Secrets Manager JSON field extraction:

connections:
  snowflake_prod:
    uri:
      from: aws_secretsmanager
      key: qualink/prod/snowflake
      field: uri
      region: eu-west-1

The checked-in reference config is examples/secret_backed_connections.yaml.

Result Outputs to Filesystems

Validation results can be written to local paths or filesystem URIs backed by PyArrow filesystems such as S3, GCS, and Azure Blob/Data Lake.

CLI example:

uv run qualinkctl checks.yaml -f json -o s3://my-bucket/qualink/results.json
uv run qualinkctl checks.yaml -f markdown -o gs://my-bucket/qualink/report.md

YAML-driven outputs:

outputs:
  - path: reports/results.json
    format: json
    show_passed: true
  - uri: s3://my-bucket/qualink/results.md
    format: markdown

Python API example:

from qualink.config import run_yaml
from qualink.config.parser import load_yaml
from qualink.output import OutputService, normalize_output_specs

config = load_yaml("examples/output_results.yaml")
result = await run_yaml("examples/output_results.yaml")
OutputService().emit_many(result, normalize_output_specs(config))

S3 Object Store Sources

qualink can read data directly from Amazon S3 using DataFusion's built-in AmazonS3:

suite:
  name: "Cloud Data Quality"

data_sources:
  - name: users_source
    format: parquet
    path: s3://my-data-lake/data/users.parquet
    table_name: users

checks:
  - name: "Completeness"
    level: error
    rules:
      - is_complete: user_id
      - is_unique: email

Use the standard AWS credential chain. On Glue, ECS, EKS, or EC2 with an attached role, explicit keys are usually not required.

Constraints

qualink supports the following constraint types:

  • Completeness: Ensures a column has no null values or meets a minimum completeness ratio.
  • Uniqueness: Checks for duplicate values in a column.
  • Assertion: Custom assertions using SQL expressions.

Formatters

Results can be formatted using:

  • HumanFormatter: Human-readable text output.
  • JsonFormatter: JSON format for programmatic processing.
  • MarkdownFormatter: Markdown tables for documentation.

Benchmarks

qualink ships with a real-world benchmark suite that validates ~42 million NYC Yellow Taxi trip records (654 MB of Parquet data) through 12 check groups and 92 constraints — in under 1.5 seconds.

========================================================================
  qualink Benchmark — NYC Taxi Trips
========================================================================
  Parquet files : 3
  Total size    : 654.3 MB
  Data dir      : benchmarks/data
  YAML config   : benchmarks/nyc_taxi_validation.yaml

    • data-200901.parquet  (211.9 MB)
    • data-201206.parquet  (231.1 MB)
    • data-201501.parquet  (211.3 MB)
========================================================================

⏱  Running benchmark with 'human' formatter …

Verification PASSED: NYC Taxi Trips – qualink Benchmark Suite

Checks          12
Constraints     92
Passed          91
Failed          1
Skipped         0
Pass rate       98.9%
Execution time  1440 ms

Status    Check       Message
--------  ----------  ---------------------------------------------
[FAIL]    Uniqueness  Uniqueness of (id) is 0.0000, expected >= 1.0

========================================================================
  Status         : ✅ PASSED
  Total records  : 41.94M
  Wall-clock     : 1.455s
  Checks         : 12
  Constraints    : 92
  Passed         : 91
  Failed         : 1
  Pass rate      : 98.9%
  Engine time    : 0.02m
========================================================================

Run it yourself

# 1. Download data (parquet files from public S3)
./benchmarks/download_data.sh 3

# 2. Run the benchmark
uv run python benchmarks/run_benchmark.py

# Other output formats
uv run python benchmarks/run_benchmark.py --format markdown
uv run python benchmarks/run_benchmark.py --format json

See benchmarks/README.md for full dataset details and configuration.

Development

To set up the development environment:

git clone https://github.com/gopidesupavan/qualink.git
cd qualink
uv sync

Run tests:

uv run pytest

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

Acknowledgments

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

qualink-0.0.3.tar.gz (55.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

qualink-0.0.3-py3-none-any.whl (86.8 kB view details)

Uploaded Python 3

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page