Data Contract Validator and Pipeline Guardian — generic, open-source
Project description
DCVPG — Data Contract Validator & Pipeline Guardian
DCVPG is an open-source framework for defining, validating, and enforcing data contracts in modern data pipelines. It catches schema drift, quality violations, and freshness SLA breaches before they reach production.
The Problem
Data pipelines break silently. A backend team renames a column, an upstream job starts sending nulls, or an overnight load produces 10× fewer rows than expected — and nobody knows until a BI dashboard breaks or a finance report is wrong.
DCVPG solves this by making data quality a first-class, code-reviewed, automatically enforced contract between teams.
How It Works
- Define — Write a YAML contract that describes exactly what data you expect: field names, types, nullability, value ranges, allowed values, row-count SLAs, and freshness deadlines
- Validate — Run DCVPG before your pipeline writes to production; violations are caught and reported immediately
- Quarantine — Failed batches are isolated with full metadata; downstream jobs never see bad data
- Alert — Slack, PagerDuty, Teams, or webhook notifications with violation details
- Heal — AI Auto-Healer proposes an updated contract and opens a GitHub PR for human review
Features
| Category | Capability |
|---|---|
| Contracts | Schema, nullability, type, unique, range, allowed values, format (regex), row-count SLA, freshness SLA |
| AI Generation | Claude profiles a live table and drafts the contract YAML automatically |
| AI Auto-Healer | On CRITICAL violations, an LLM proposes a fix and opens a GitHub PR |
| AI Anomaly Detection | Statistical baseline monitoring per-field (volume, null-rate, value distribution) |
| AI RCA Agent | Root-cause analysis agent that explains why a pipeline failed |
| Quarantine Engine | Isolates failed batches; replay after a contract fix is merged |
| Schema Drift Detection | Compares live source schema vs contract definition; alerts on changes |
| Connectors | PostgreSQL, MySQL, Snowflake, BigQuery, S3, GCS, REST API, CSV/Parquet file |
| Custom Rules | Write Python validation rules extending the built-in rule set |
| Orchestrator Operators | Native Airflow operator, Prefect task, and Dagster asset check |
| REST API | Full FastAPI backend with auth, pagination, and Prometheus metrics |
| Streamlit Dashboard | 6-page real-time monitoring UI |
| MCP Server | 10 tools for Claude Desktop / Cursor to manage pipelines by chat |
| Alerting | Slack, PagerDuty, Microsoft Teams, generic webhook |
Who Is This For?
Already have pipelines? (Most common)
DCVPG wraps around your existing pipelines without requiring rewrites. You add it as a quality gate that runs before data reaches production:
Your existing pipeline:
extract → transform → [DCVPG validates] → load to production
↑
stops bad data here
Step 1 — Generate a contract from your live table (no hand-writing needed):
pip install "dcvpg[ai]"
export ANTHROPIC_API_KEY=sk-ant-...
dcvpg init my_project && cd my_project
# Edit dcvpg.config.yaml to point at your database
dcvpg generate --source postgres_main --table orders --output-dir ./contracts
dcvpg register contracts/orders.yaml
Step 2 — Add one operator to your existing Airflow DAG:
from dcvpg.orchestrators.airflow.operators.contract_validator import DataContractValidatorOperator
validate = DataContractValidatorOperator(
task_id="validate_orders",
contract_name="orders_raw",
config_path="/opt/airflow/dcvpg.config.yaml",
)
# Gate your existing load task — nothing else changes
extract >> transform >> validate >> load_to_prod
That's it. DCVPG does not replace or rebuild your pipeline — it guards the data flowing through it.
Building a new pipeline?
Write the contract first as the formal agreement between the team that produces the data and the team that consumes it:
# 1. Author the contract (or generate from a staging table)
dcvpg generate --source postgres_staging --table orders --output-dir ./contracts
# 2. Register and validate as part of every PR / CI run
dcvpg register contracts/orders.yaml
dcvpg validate --all
A GitHub Actions workflow is scaffolded automatically by dcvpg init — every push runs dcvpg validate --all so contract regressions are caught in CI before they ever hit production.
pip install dcvpg
dcvpg init my_project
cd my_project
Edit dcvpg.config.yaml to point at your database and connections, then:
# Optional: generate a contract from a live table using AI
export ANTHROPIC_API_KEY=sk-ant-...
dcvpg generate --source postgres_main --table orders --output-dir ./contracts
# Register the contract
dcvpg register contracts/orders.yaml
# Validate
dcvpg validate --all
See the Quick Start Guide for the full walkthrough including Docker Compose setup.
Installation
# Core (PostgreSQL, file connectors, validation engine, CLI)
pip install dcvpg
# + AI features (contract generation, auto-healing, anomaly detection)
pip install "dcvpg[ai]"
# + MCP server (Claude Desktop / Cursor integration)
pip install "dcvpg[mcp]"
# + Airflow operator
pip install "dcvpg[airflow]"
# + Prefect task
pip install "dcvpg[prefect]"
# + Dagster asset check
pip install "dcvpg[dagster]"
# Everything
pip install "dcvpg[all]"
Requirements: Python 3.11+, PostgreSQL 15+ (for quarantine & audit storage)
Contract Format
Contracts are plain YAML files. Here is a complete example:
contract:
name: orders_raw
version: "1.2"
description: "Raw orders table from the e-commerce backend."
owner_team: data-engineering
source_owner: backend-team
pipeline_tags: [crm, revenue]
source_connection: postgres_main
source_table: orders
# Row-count and freshness SLAs
row_count_min: 1000
row_count_max: 5000000
sla_freshness_hours: 6
schema:
- field: id
type: integer
nullable: false
unique: true
- field: status
type: string
nullable: false
allowed_values: ["active", "inactive", "pending"]
- field: amount
type: float
nullable: true
min: 0.0
max: 999999.99
- field: email
type: string
nullable: false
format: email # Regex-backed format validation
- field: created_at
type: timestamp
nullable: false
# Reference a custom Python validation rule
custom_rules:
- rule: no_weekend_orders.NoWeekendOrders
params:
date_field: created_at
Full field reference: Contract Authoring Guide
Custom Validation Rules
Extend the built-in rule set with plain Python:
# custom_rules/no_weekend_orders.py
import pandas as pd
from dcvpg.engine.rules.base_rule import BaseRule
from dcvpg.engine.models import ValidationResult
class NoWeekendOrders(BaseRule):
def validate(self, data: pd.DataFrame, field: str, params: dict) -> ValidationResult:
dates = pd.to_datetime(data[params.get("date_field", field)], errors="coerce")
weekend_count = int((dates.dt.dayofweek >= 5).sum())
if weekend_count > 0:
return ValidationResult(
passed=False, field=field,
violation_type="WEEKEND_ORDER_FOUND",
rows_affected=weekend_count,
expected_value="Orders must be placed on weekdays (Mon–Fri)",
)
return ValidationResult(passed=True, field=field)
Register in config:
extensions:
custom_rules_dir: ./custom_rules
Full guide: Custom Rules
Orchestrator Integration
Airflow
from dcvpg.orchestrators.airflow.operators.contract_validator import DataContractValidatorOperator
validate = DataContractValidatorOperator(
task_id="validate_orders",
contract_name="orders_raw",
config_path="/opt/airflow/dcvpg.config.yaml",
)
Prefect
from dcvpg.orchestrators.prefect.tasks import validate_contract
@flow
def my_flow():
validate_contract(contract_name="orders_raw", config_path="./dcvpg.config.yaml")
Dagster
from dcvpg.orchestrators.dagster.assets import build_contract_asset_check
orders_check = build_contract_asset_check("orders_raw", config_path="./dcvpg.config.yaml")
MCP Server — Chat-Driven Pipeline Management
DCVPG ships a Model Context Protocol server with 10 tools, letting you manage pipelines from Claude Desktop or Cursor by chat.
pip install "dcvpg[mcp]"
dcvpg mcp-server start
Example prompts:
- "What pipelines are currently failing?"
- "Show me the violation details for the orders pipeline."
- "Is there any schema drift in the payments contract?"
- "Generate a contract for the
userstable in postgres_main." - "Open a PR to fix the type mismatch in the orders contract."
- "Replay batch abc-123 now that the fix is merged."
| Tool | Description |
|---|---|
get_pipeline_status |
Live health of all pipelines |
get_violation_detail |
Full violation breakdown for a pipeline |
list_quarantine_batches |
All quarantined batches with metadata |
get_schema_diff |
Contract vs live source schema drift report |
create_fix_pr |
Open a GitHub PR to update a broken contract |
replay_quarantine |
Re-validate and release a quarantined batch |
generate_contract |
AI-generate a contract from a live data source |
get_incident_summary |
Incidents in the last N days |
get_contract_detail |
Full spec, rules, ownership, version history |
approve_contract_update |
Merge an approved PR and reload the contract |
Full setup guide: MCP Setup
REST API
uvicorn dcvpg.api.main:app --reload
# → http://localhost:8000/docs
Key endpoints:
| Method | Endpoint | Description |
|---|---|---|
GET |
/health |
Health check |
GET |
/api/v1/contracts |
List all contracts |
POST |
/api/v1/contracts/generate |
AI-generate a contract |
GET |
/api/v1/pipelines |
List pipeline run history |
GET |
/api/v1/quarantine |
List quarantined batches |
GET |
/metrics |
Prometheus metrics |
All endpoints (except /health and /metrics) require an Authorization: <key> header.
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ DCVPG Framework │
│ │
│ CLI / REST API / MCP Server / Streamlit Dashboard │
│ │ │
│ ▼ │
│ Validator ◄── Contract Registry ◄── YAML Contracts │
│ │ │
│ ┌─────┴──────┐ │
│ │ Rules │ schema · type · null · range · │
│ │ Engine │ unique · format · allowed_values · │
│ │ │ row-count SLA · freshness SLA · │
│ └─────┬──────┘ custom Python rules │
│ │ │
│ ┌─────┴──────────────────┐ │
│ │ Connectors │ PostgreSQL · MySQL · Snowflake │
│ │ │ BigQuery · S3 · GCS · File │
│ └─────┬──────────────────┘ │
│ │ │
│ ┌─────┴──────────────────┐ │
│ │ Quarantine Engine │──► PostgreSQL audit store │
│ │ Alert Manager │──► Slack · PagerDuty · Teams │
│ └─────┬──────────────────┘ │
│ │ │
│ ┌─────┴──────────────────┐ │
│ │ AI Agents │ ContractGenerator │
│ │ (Anthropic Claude) │ AutoHealer → GitHub PR │
│ │ │ AnomalyDetector │
│ │ │ RCA Agent │
│ └────────────────────────┘ │
│ │
│ Orchestrators: Airflow · Prefect · Dagster │
└─────────────────────────────────────────────────────────────────┘
Documentation
| Guide | Description |
|---|---|
| Quick Start | Get up and running in 5 minutes |
| Contract Authoring | Full YAML field reference |
| Connectors | Configure PostgreSQL, Snowflake, S3, and more |
| Custom Rules | Write Python validation extensions |
| MCP Setup | Claude Desktop / Cursor integration |
| API Reference | REST API endpoints |
Contributing
See CONTRIBUTING.md. Pull requests are welcome for new connectors, rule types, and orchestrator integrations.
License
Apache 2.0 — see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dcvpg-1.3.1.tar.gz.
File metadata
- Download URL: dcvpg-1.3.1.tar.gz
- Upload date:
- Size: 70.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ac4e496d8d2cbc24280d7313d92c5a9bf622c2e3154c26e087859096029d8fc7
|
|
| MD5 |
833241ea461cabe3aa7c495f280b581f
|
|
| BLAKE2b-256 |
93871ba026ba1767944dadc3b3c376d31cb8e3de237d786071832c76cc75ab1a
|
Provenance
The following attestation bundles were made for dcvpg-1.3.1.tar.gz:
Publisher:
cd.yml on pasindudilshan1/dcvpg
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dcvpg-1.3.1.tar.gz -
Subject digest:
ac4e496d8d2cbc24280d7313d92c5a9bf622c2e3154c26e087859096029d8fc7 - Sigstore transparency entry: 1066677719
- Sigstore integration time:
-
Permalink:
pasindudilshan1/dcvpg@e6c54293e21f85d5ad9e2edc9f5f2dd5ba0fa551 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/pasindudilshan1
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
cd.yml@e6c54293e21f85d5ad9e2edc9f5f2dd5ba0fa551 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file dcvpg-1.3.1-py3-none-any.whl.
File metadata
- Download URL: dcvpg-1.3.1-py3-none-any.whl
- Upload date:
- Size: 105.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0ac25c56d217e321c09d16fc06bbb41fbf69058c455dde9fc266cc3e1cbdca46
|
|
| MD5 |
7b3d9c8880eec332585f3885a45c2c27
|
|
| BLAKE2b-256 |
aa637c70f57ac40567e381c83128a4655e1f27207e4bac4bff49f84f13c68823
|
Provenance
The following attestation bundles were made for dcvpg-1.3.1-py3-none-any.whl:
Publisher:
cd.yml on pasindudilshan1/dcvpg
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dcvpg-1.3.1-py3-none-any.whl -
Subject digest:
0ac25c56d217e321c09d16fc06bbb41fbf69058c455dde9fc266cc3e1cbdca46 - Sigstore transparency entry: 1066677728
- Sigstore integration time:
-
Permalink:
pasindudilshan1/dcvpg@e6c54293e21f85d5ad9e2edc9f5f2dd5ba0fa551 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/pasindudilshan1
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
cd.yml@e6c54293e21f85d5ad9e2edc9f5f2dd5ba0fa551 -
Trigger Event:
workflow_dispatch
-
Statement type: