Blazing fast data quality framework for Python, built on Apache DataFusion
Project description
qualink
Blazing fast data quality framework for Python, built on Apache DataFusion.
Features
- High Performance: Leverages Apache DataFusion for fast data processing and validation.
- Flexible Constraints: Supports various data quality constraints including completeness, uniqueness, and custom assertions.
- YAML Configuration: Define validation suites declaratively using YAML files.
- CLI –
qualinkctl: Run YAML-driven validations from the terminal — no Python script required. - Cloud Object Stores: Read data directly from Amazon S3 (and S3-compatible services).
- Multiple Output Formats: Results can be formatted as human-readable text, JSON, or Markdown.
- Async Support: Built with asyncio for non-blocking operations.
- Analyzers: Compute reusable dataset and column metrics independent of pass/fail checks.
- Metrics Repository: Persist analyzer outputs over time using tagged result keys.
- Anomaly Detection: Detect unexpected metric shifts from historical baselines.
- Intelligent Rule Suggestions: Generate candidate validation rules from column profiles.
- Easy Integration: Simple API for defining and running validation suites.
Installation
Install qualink using uv:
uv add qualink
Or using pip:
pip install qualink
Quick Start
Here's a basic example of using qualink to validate a CSV file:
import asyncio
from datafusion import SessionContext
from qualink.checks import Check, Level
from qualink.constraints import Assertion
from qualink.core import ValidationSuite
from qualink.formatters import MarkdownFormatter
async def main() -> None:
ctx = SessionContext()
ctx.register_csv("users", "examples/users.csv")
result = await (
ValidationSuite()
.on_data(ctx, "users")
.with_name("User Data Quality")
.add_check(Check.builder("Critical Checks").with_level(Level.ERROR).is_complete("user_id").build())
.add_check(
Check.builder("Data Quality")
.with_level(Level.WARNING)
.has_completeness("name", Assertion.greater_than_or_equal(0.95))
.build()
)
.run()
)
print(MarkdownFormatter().format(result))
if __name__ == "__main__":
asyncio.run(main())
YAML Configuration
You can also define validation suites using YAML files for a declarative approach:
suite:
name: "User Data Quality"
data_sources:
- name: users_source
format: csv
path: "examples/users.csv"
table_name: users
checks:
- name: "Critical Checks"
level: error
rules:
- is_complete: user_id
- is_unique: email
- has_size:
gt: 0
- name: "Data Quality"
level: warning
rules:
- has_completeness:
column: name
gte: 0.95
Run the YAML configuration:
import asyncio
from qualink.config import run_yaml
from qualink.formatters import HumanFormatter
async def main() -> None:
result = await run_yaml("path/to/your/config.yaml")
print(HumanFormatter().format(result))
if __name__ == "__main__":
asyncio.run(main())
run_yaml() also accepts filesystem URIs such as s3://my-bucket/checks.yaml or
file:///absolute/path/to/checks.yaml, in addition to local file paths and inline YAML strings.
CLI – qualinkctl
The simplest way to run a YAML validation is with qualinkctl:
# Human-readable output (default)
uv run qualinkctl checks.yaml
# JSON output
uv run qualinkctl checks.yaml -f json
# Markdown report saved to file
uv run qualinkctl checks.yaml -f markdown -o report.md
# JSON report written to object storage
uv run qualinkctl checks.yaml -f json -o s3://my-bucket/qualink/results.json
# Show all constraints (including passed) with debug logging
uv run qualinkctl checks.yaml --show-passed -v
qualinkctl exits with code 0 on success and 1 on failure, making it easy to use in CI/CD pipelines:
uv run qualinkctl checks.yaml -f json -o results.json || echo "Validation failed!"
Run uv run qualinkctl --help for a full list of options.
Advanced Features
Runnable end-to-end examples are available in:
examples/adbc_sqlite_example.pyexamples/analyzers_example.pyexamples/metrics_repository_example.pyexamples/anomaly_detection_example.pyexamples/intelligent_rule_suggestions_example.pyexamples/output_results_example.pyexamples/file_uri_validation.py
ADBC Datasources
qualink can also register database-backed sources through ADBC and materialize them into DataFusion tables before running checks.
SQLite example shape:
connections:
sqlite_local:
uri: sqlite:///tmp/users.db
data_sources:
- name: users_source
connection: sqlite_local
table: users
table_name: users
To run the SQLite example after installing the optional ADBC packages:
uv sync --group adbc
uv run python examples/adbc_sqlite_example.py
Secret-backed Connections
Sensitive connection values can be resolved inline from environment variables, AWS Systems Manager Parameter Store, AWS Secrets Manager, or GCP Secret Manager.
Example:
connections:
sqlite_local:
uri:
from: env
key: QUALINK_SQLITE_URI
data_sources:
- name: users_source
connection: sqlite_local
table: users
table_name: users
AWS SSM example:
connections:
postgres_prod:
uri:
from: aws_ssm
key: /qualink/prod/postgres/uri
region: us-east-1
AWS Secrets Manager JSON field extraction:
connections:
snowflake_prod:
uri:
from: aws_secretsmanager
key: qualink/prod/snowflake
field: uri
region: eu-west-1
The checked-in reference config is examples/secret_backed_connections.yaml.
Result Outputs to Filesystems
Validation results can be written to local paths or filesystem URIs backed by PyArrow filesystems such as S3, GCS, and Azure Blob/Data Lake.
CLI example:
uv run qualinkctl checks.yaml -f json -o s3://my-bucket/qualink/results.json
uv run qualinkctl checks.yaml -f markdown -o gs://my-bucket/qualink/report.md
YAML-driven outputs:
outputs:
- path: reports/results.json
format: json
show_passed: true
- uri: s3://my-bucket/qualink/results.md
format: markdown
Python API example:
from qualink.config import run_yaml
from qualink.config.parser import load_yaml
from qualink.output import OutputService, normalize_output_specs
config = load_yaml("examples/output_results.yaml")
result = await run_yaml("examples/output_results.yaml")
OutputService().emit_many(result, normalize_output_specs(config))
S3 Object Store Sources
qualink can read data directly from Amazon S3 using DataFusion's built-in AmazonS3:
suite:
name: "Cloud Data Quality"
data_sources:
- name: users_source
format: parquet
path: s3://my-data-lake/data/users.parquet
table_name: users
checks:
- name: "Completeness"
level: error
rules:
- is_complete: user_id
- is_unique: email
Use the standard AWS credential chain. On Glue, ECS, EKS, or EC2 with an attached role, explicit keys are usually not required.
Constraints
qualink supports the following constraint types:
- Completeness: Ensures a column has no null values or meets a minimum completeness ratio.
- Uniqueness: Checks for duplicate values in a column.
- Assertion: Custom assertions using SQL expressions.
Formatters
Results can be formatted using:
HumanFormatter: Human-readable text output.JsonFormatter: JSON format for programmatic processing.MarkdownFormatter: Markdown tables for documentation.
Benchmarks
qualink ships with a real-world benchmark suite that validates ~42 million NYC Yellow Taxi trip records (654 MB of Parquet data) through 12 check groups and 92 constraints — in under 1.5 seconds.
========================================================================
qualink Benchmark — NYC Taxi Trips
========================================================================
Parquet files : 3
Total size : 654.3 MB
Data dir : benchmarks/data
YAML config : benchmarks/nyc_taxi_validation.yaml
• data-200901.parquet (211.9 MB)
• data-201206.parquet (231.1 MB)
• data-201501.parquet (211.3 MB)
========================================================================
⏱ Running benchmark with 'human' formatter …
Verification PASSED: NYC Taxi Trips – qualink Benchmark Suite
Checks 12
Constraints 92
Passed 91
Failed 1
Skipped 0
Pass rate 98.9%
Execution time 1440 ms
Status Check Message
-------- ---------- ---------------------------------------------
[FAIL] Uniqueness Uniqueness of (id) is 0.0000, expected >= 1.0
========================================================================
Status : ✅ PASSED
Total records : 41.94M
Wall-clock : 1.455s
Checks : 12
Constraints : 92
Passed : 91
Failed : 1
Pass rate : 98.9%
Engine time : 0.02m
========================================================================
Run it yourself
# 1. Download data (parquet files from public S3)
./benchmarks/download_data.sh 3
# 2. Run the benchmark
uv run python benchmarks/run_benchmark.py
# Other output formats
uv run python benchmarks/run_benchmark.py --format markdown
uv run python benchmarks/run_benchmark.py --format json
See benchmarks/README.md for full dataset details and configuration.
Development
To set up the development environment:
git clone https://github.com/gopidesupavan/qualink.git
cd qualink
uv sync
Run tests:
uv run pytest
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
Acknowledgments
- Apache DataFusion for the query engine
- AWS Deequ for the inspiration
- Term Guard
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file qualink-0.0.3.tar.gz.
File metadata
- Download URL: qualink-0.0.3.tar.gz
- Upload date:
- Size: 55.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
08fe0ee906ea310873eba1191607b4fcd33ebcaf5aa8f167ada480b451585461
|
|
| MD5 |
bf0a2a8d0b0c0c27c65c3992c87ad9b8
|
|
| BLAKE2b-256 |
5ef54c9246c791e2b1d17115fbd565a5853a894b20bdc693683d930c6c6ea580
|
Provenance
The following attestation bundles were made for qualink-0.0.3.tar.gz:
Publisher:
release.yml on gopidesupavan/qualink
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
qualink-0.0.3.tar.gz -
Subject digest:
08fe0ee906ea310873eba1191607b4fcd33ebcaf5aa8f167ada480b451585461 - Sigstore transparency entry: 1154619345
- Sigstore integration time:
-
Permalink:
gopidesupavan/qualink@d0c08da8093d0d8960cc4a6bd8ac727875ec49dc -
Branch / Tag:
refs/heads/main - Owner: https://github.com/gopidesupavan
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@d0c08da8093d0d8960cc4a6bd8ac727875ec49dc -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file qualink-0.0.3-py3-none-any.whl.
File metadata
- Download URL: qualink-0.0.3-py3-none-any.whl
- Upload date:
- Size: 86.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e8bd6205ce748a061e7774534a104f37c8a0c8796161524df6f6a320c4a6c1f8
|
|
| MD5 |
86a11bfae1892dedf85f2173140d20e0
|
|
| BLAKE2b-256 |
5ee1015995f1f5dc949bc9dbae2939e1ddfeebe762569ff88f0fcd6e802b4e3f
|
Provenance
The following attestation bundles were made for qualink-0.0.3-py3-none-any.whl:
Publisher:
release.yml on gopidesupavan/qualink
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
qualink-0.0.3-py3-none-any.whl -
Subject digest:
e8bd6205ce748a061e7774534a104f37c8a0c8796161524df6f6a320c4a6c1f8 - Sigstore transparency entry: 1154619346
- Sigstore integration time:
-
Permalink:
gopidesupavan/qualink@d0c08da8093d0d8960cc4a6bd8ac727875ec49dc -
Branch / Tag:
refs/heads/main - Owner: https://github.com/gopidesupavan
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@d0c08da8093d0d8960cc4a6bd8ac727875ec49dc -
Trigger Event:
workflow_dispatch
-
Statement type: