Data quality validation runner for SHACL rules in Cognite Data Fusion.
Project description
Data Quality Validation
SHACL-based data quality validation for Cognite Data Fusion (CDF) using Cognite Functions and Workflows.
Overview
This repository provides automated data quality validation for both DMS instances and time series data using SHACL (Shapes Constraint Language) rules. Validation results are posted to the CDF Records API for tracking and analysis.
Key Features:
- ๐ Instance Validation: Validates DMS instances on data changes with automatic reference loading
- ๐ Time Series Validation: Quality checks using CDF SDK and INDSL functions (outliers, gaps, density)
- ๐ฏ Multi-Environment: Separate configurations for different environments (e.g.,
cog-ai,abp-dev) - ๐ Automated Deployment: CI/CD pipeline with intelligent change detection
- ๐ Records API Integration: All validation results stored for analysis and monitoring
Python package (cognite-data-quality):
run_validation: Run validation directly from DMS (no workflows). Supports TTL, JSON, or YAML view config. Ideal for development.deploy_validation_infrastructure: Unified deployment of functions, workflows, and triggers.deploy_incremental: Smart incremental deployment with hash-based change detection.call_validation: Invoke the unified validation function with type-based dispatch (instance, timeseries, orchestrator, etc.).
Architecture
Instance Validation Flow
graph LR
A[DMS Instance Change] --> B[Workflow Trigger]
B --> C[Cognite Function:<br/>data-quality-validation<br/>type: instance]
C --> D[Load SHACL Rules<br/>from CDF Files]
C --> E[Fetch Instances<br/>from DMS]
E --> F[Auto-load References<br/>configurable depth]
D --> G[NEAT Validation]
F --> G
G --> H{Conforms?}
H -->|Yes| I[Records API:<br/>PASSED]
H -->|No| J[Records API:<br/>FAILED + Violations]
I --> K[DataQualityValidationRecord]
J --> K
Time Series Validation Flow
graph LR
A[Scheduled Trigger<br/>cron] --> B[Cognite Function:<br/>data-quality-validation<br/>type: timeseries]
B --> C[Load SHACL Rules<br/>from CDF Files]
B --> D{Selection Method}
D -->|DMS Filter| E[Filter Time Series<br/>by prefix/properties]
D -->|Instance IDs| F[Explicit List<br/>of Time Series]
E --> G[Fetch Time Series<br/>from DMS]
F --> G
C --> H[NEAT Validation<br/>with CDF/INDSL Functions]
G --> H
H --> I{Backfill Mode?}
I -->|Yes| J[Process Historic Data<br/>in Chunks]
I -->|No| K[Validate Last<br/>Complete Hour]
J --> L[Save Cursor State<br/>for Resume]
K --> M[Records API:<br/>Per Time Series Result]
L --> M
M --> N[DataQualityValidationRecord]
Repository Structure
.
โโโ cognite_data_quality/ # Python package
โ โโโ _function_code/ # Unified SHACL validation function code
โ โ โโโ handler.py # Main dispatcher (routes by validation_type)
โ โ โโโ handlers/ # Type-specific handlers
โ โ โ โโโ instance_validation.py # Real-time instance validation
โ โ โ โโโ partitioned_validation.py # Historic partition validation
โ โ โ โโโ timeseries_validation.py # Time series validation with backfill
โ โ โ โโโ orchestrator.py # Historic validation orchestrator
โ โ โ โโโ test_rule.py # Test SHACL rules without writing
โ โ โโโ common/ # Shared utilities
โ โ โโโ cursor_state.py # Cursor state management
โ โ โโโ records_api.py # Records API helpers
โ โ โโโ shacl_utils.py # SHACL utilities
โ โ โโโ time_window.py # Time window parsing
โ โโโ deploy.py # Unified deployment API
โ โโโ invoke.py # Function invocation API
โ โโโ runner.py # Direct validation runner (no workflows)
โ โโโ template_generator.py # Generate config templates
โ
โโโ config/ # Configuration by environment
โ โโโ environments/
โ โโโ cog-ai/ # Example environment
โ โ โโโ settings.yaml # Environment settings
โ โ โโโ shacl_rules/ # SHACL rules (TTL files)
โ โ โโโ views/ # Instance validation configs
โ โ โโโ timeseries/ # Time series validation configs
โ โโโ abp-dev/ # Another environment
โ โโโ settings.yaml
โ โโโ shacl_rules/
โ โโโ views/
โ
โโโ generate_demo_data/ # Demo data generator
โ โโโ handler.py
โ โโโ requirements.txt
โ
โโโ scripts/ # Deployment & utility scripts
โ โโโ shared.py # Common utilities
โ โโโ deploy_infrastructure.py # Unified deployment script (replaces 5 scripts)
โ โโโ validate_config.py # Validate YAML/SHACL syntax
โ
โโโ test_and_deploy/ # Notebooks & dashboard
โโโ historic_data_validation.ipynb # Historic validation notebook
โโโ validation_dashboard.py # Streamlit dashboard
โโโ batch_dashboard.py # Batch validation dashboard
Configuration Structure
Environment Settings (settings.yaml)
Each environment has its own settings.yaml defining:
function: # Unified validation function
external_id: "data-quality-validation"
name: "Data Quality Validation"
description: "Unified function for all SHACL validation types (instance, partitioned, timeseries, orchestrator)"
runtime: "py311"
source_dir: "function"
include_files:
- "handler.py"
- "handlers/**/*.py"
- "common/**/*.py"
- "requirements.txt"
workflow: # Instance workflow settings
external_id_prefix: "dq-shacl"
version: "1"
task:
retries: 3
timeout: 600
timeseries_workflow: # Time series workflow settings
external_id_prefix: "dq-ts-shacl"
version: "1"
task:
retries: 3
timeout: 1800
trigger: # Instance trigger settings
external_id_prefix: "dq-shacl-trigger"
batch_size: 10
batch_timeout: 60
records: # Records API settings
space: "dataQuality"
container: "DataQualityValidationRecord"
stream_id: "dq_validation_stream"
shacl_rules:
source_dir: "config/environments/cog-ai/shacl_rules"
mime_type: "text/turtle"
timeseries:
config_dir: "config/environments/cog-ai/timeseries"
Instance Validation Config (views/*.yaml)
view:
space: "sp_enterprise_process_industry"
external_id: "Equipment"
version: "v1"
instance_space: "sp_enterprise_process_industry"
shacl_rules:
file: "equipment_shacl_rules.ttl"
external_id: "equipment_shacl_rules"
datamodel:
space: "sp_enterprise_process_industry"
external_id: "EnterpriseProcessIndustry"
version: "v1"
validation:
auto_load_depth: 2 # Auto-load referenced instances
verbose: true
records:
rule_set_id: "EquipmentSHACLv1"
rule_set_version: "1.0"
Time Series Validation Config (timeseries/*.yaml)
name: "demo-timeseries-quality"
description: "Quality checks for demo time series"
# Selection method 1: DMS Filter (server-side filtering)
filter:
instance_space: "ts_dq_test"
view:
space: "cdf_cdm"
external_id: "CogniteTimeSeries"
version: "v1"
expression:
type: "prefix"
property: ["node", "externalId"]
value: "dq_test_"
limit: 1000
# OR Selection method 2: Explicit instance IDs
# instance_ids:
# - space: "ts_dq_test"
# external_id: "sensor_001"
# - space: "ts_dq_test"
# external_id: "sensor_002"
shacl_rules:
file: "timeseries_quality_rules.ttl"
external_id: "demo_timeseries_shacl_rules"
datamodel:
space: "cdf_cdm"
external_id: "CogniteCore"
version: "v1"
validation:
auto_load_depth: 0
verbose: true
records:
rule_set_id: "DemoTimeSeriesQuality"
rule_set_version: "1.0"
schedule:
cron: "0 * * * *" # Every hour
# Optional: Backfill historic data
backfill:
enabled: false
start_time: "30d-ago"
end_time: "now"
window_minutes: 60
Deployment
CI/CD Deployment (Recommended)
The repository uses GitHub Actions for automated deployment:
- Push to
main: Automatically deploys all changes - Pull Request: Validates configurations without deploying
- Manual Trigger: Supports dry-run, force redeploy, and specific view/config selection
Deployment Steps
The CI/CD pipeline uses a unified deployment script:
python scripts/deploy_infrastructure.py --env cog-ai
This single script handles:
- Records Container: Ensures Records API container exists
- Functions: Deploys all Cognite Functions with thisisneat 0.2.3
- Instance Workflows: Deploys workflows & triggers for real-time validation
- Time Series Workflows: Deploys scheduled workflows for time series validation
- Data Models (optional): Can deploy data models if needed
Smart Deployment: Only redeploys when:
- Function code or dependencies change (tracked via hash in function metadata)
- View/timeseries configuration changes (tracked via hash in workflow description)
- SHACL rules content changes
- Function secrets change
Manual Deployment
Option 1: Unified Script (Recommended)
# Set environment variables
export COGNITE_CLUSTER=api
export COGNITE_PROJECT=my-project
export COGNITE_CLIENT_ID=your-client-id
export COGNITE_CLIENT_SECRET=your-client-secret
export AZURE_TENANT_ID=your-tenant-id
# Install package
pip install cognite-data-quality
# Deploy everything
python scripts/deploy_infrastructure.py --env cog-ai
# Deployment options
python scripts/deploy_infrastructure.py --env cog-ai --dry-run # Preview changes
python scripts/deploy_infrastructure.py --env cog-ai --force # Force redeployment
python scripts/deploy_infrastructure.py --env cog-ai --views equipment # Deploy specific views
Option 2: Python API
from cognite_data_quality import (
deploy_validation_infrastructure,
deploy_incremental,
load_cognite_client_from_toml,
)
client = load_cognite_client_from_toml()
# Full deployment
deploy_validation_infrastructure(
client=client,
env_name="cog-ai",
config_dir="config/environments/cog-ai",
force=False,
)
# Or incremental deployment (only changed resources)
deploy_incremental(
client=client,
env_name="cog-ai",
config_dir="config/environments/cog-ai",
)
GitHub Environment Setup
Create a GitHub environment (e.g., cog-ai-arn) with:
Secrets:
COGNITE_CLIENT_ID: OAuth client IDCOGNITE_CLIENT_SECRET: OAuth client secret
Variables:
COGNITE_CLUSTER: CDF cluster (e.g.,api,westeurope-1)COGNITE_PROJECT: CDF project nameAZURE_TENANT_ID: Azure AD tenant ID
Adding New Validations
Adding Instance Validation
- Create SHACL rules in
config/environments/{env}/shacl_rules/my_view_shacl_rules.ttl:
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix : <http://example.org/shapes#> .
:MyViewShape a sh:NodeShape ;
sh:targetClass <http://example.org/MyView> ;
sh:property [
sh:path <http://example.org/name> ;
sh:minCount 1 ;
sh:datatype xsd:string ;
sh:message "Name is required and must be a string" ;
] .
- Create view config in
config/environments/{env}/views/my_view.yaml:
view:
space: "my_space"
external_id: "MyView"
version: "v1"
instance_space: "my_instances"
shacl_rules:
file: "my_view_shacl_rules.ttl"
external_id: "my_view_shacl_rules"
datamodel:
space: "my_space"
external_id: "MyDataModel"
version: "v1"
validation:
auto_load_depth: 2
verbose: true
records:
rule_set_id: "MyViewSHACLv1"
rule_set_version: "1.0"
- Deploy: Commit and push to
main, or run manually:
python scripts/deploy_infrastructure.py --env my_env --views my_view
Adding Time Series Validation
- Create SHACL rules in
config/environments/{env}/shacl_rules/my_timeseries_quality_rules.ttl:
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix cdf_sdk: <http://purl.org/cognite/cdf_sdk#> .
@prefix : <http://example.org/shapes#> .
:DataDensityShape a sh:NodeShape ;
sh:targetClass <http://purl.org/cognite/cdf_cdm/v1/CogniteTimeSeries> ;
sh:sparql [
sh:message "Time series must have at least 50 datapoints in the last hour" ;
sh:select """
SELECT $this ?count
WHERE {
BIND(cdf_sdk:datapoints_count($this, "60m-ago", "now") AS ?count)
FILTER (?count < 50)
}
""" ;
] .
- Create config in
config/environments/{env}/timeseries/my_sensors_quality.yaml:
name: "my-sensors-quality"
description: "Quality checks for my sensors"
filter:
instance_space: "my_space"
view:
space: "cdf_cdm"
external_id: "CogniteTimeSeries"
version: "v1"
expression:
type: "prefix"
property: ["node", "externalId"]
value: "sensor_"
limit: 500
shacl_rules:
file: "my_timeseries_quality_rules.ttl"
external_id: "my_sensors_shacl_rules"
datamodel:
space: "cdf_cdm"
external_id: "CogniteCore"
version: "v1"
validation:
auto_load_depth: 0
verbose: true
records:
rule_set_id: "MySensorsQuality"
rule_set_version: "1.0"
schedule:
cron: "0 6 * * *" # Daily at 6 AM UTC
- Deploy: Commit and push to
main, or run manually:
python scripts/deploy_infrastructure.py --env my_env --timeseries my_sensors_quality
Time Series Quality Functions
The time series validation supports these CDF SPARQL functions:
CDF SDK Functions
| Function | Description |
|---|---|
cdf_sdk:datapoints_count(?ts, start, end) |
Count datapoints in time range |
cdf_sdk:datapoints_average(?ts, start, end) |
Average value in time range |
cdf_sdk:datapoints_min(?ts, start, end) |
Minimum value in time range |
cdf_sdk:datapoints_max(?ts, start, end) |
Maximum value in time range |
cdf_sdk:timeseries_exists(?ts) |
Check if time series exists |
INDSL Functions
| Function | Description |
|---|---|
cdf_indsl:extreme_outliers(?ts, alpha) |
Detect extreme outliers using modified Z-score |
cdf_indsl:gaps_identification(?ts, threshold) |
Detect gaps exceeding threshold |
cdf_indsl:value_decrease_check(?ts) |
Check for value decreases (for counters) |
cdf_indsl:out_of_range(?ts) |
Detect out-of-range values |
Time Windows in SHACL Rules
Use placeholders that get replaced at runtime:
"60m-ago","90m-ago","24h-ago","7d-ago": Relative time (replaced with actual timestamp)"now": Current time (replaced with actual timestamp)
Example:
BIND(cdf_sdk:datapoints_count($this, "60m-ago", "now") AS ?count)
For hourly schedules, use a 90-minute window to ensure 30-minute overlap for edge cases.
Backfill Mode
Time series validation supports backfilling historic data:
backfill:
enabled: true
start_time: "30d-ago" # Or ISO timestamp: "2024-01-01T00:00:00Z"
end_time: "now" # Or ISO timestamp: "2024-01-31T23:59:59Z"
window_minutes: 60 # Chunk size (should match SHACL rule window)
Features:
- Processes data in chunks aligned to hour boundaries
- Saves cursor state for resume on failure
- Automatically switches to normal mode when complete
- Each chunk posts separate records to Records API
Validation Results
All validation results are posted to the CDF Records API:
{
"space": "dataQuality",
"externalId": "dq_{ruleSetId}_{instanceId}_{jobRunId}",
"sources": [{
"source": {
"type": "container",
"space": "dataQuality",
"externalId": "DataQualityValidationRecord"
},
"properties": {
"ruleSetId": "EquipmentSHACLv1",
"ruleSetVersion": "1.0",
"jobRunId": "validation_1706234567",
"passedValidation": false,
"resultSeverity": ["Violation"],
"failedConstraints": ["MinCountConstraintComponent::name"],
"focusNode": "http://purl.org/cognite/sp_enterprise/equipment_001",
"focusNodeInstance": {
"space": "sp_enterprise",
"externalId": "equipment_001"
},
"validationReport": {
"violationCount": 1,
"violations": [{
"sourceConstraintComponent": "MinCountConstraintComponent",
"resultMessage": "Name is required",
"resultSeverity": "Violation",
"resultPath": "name",
"value": null
}],
"summary": "1 violation(s)"
},
"dataDomainExternalId": "Equipment"
}
}]
}
Validation Dashboard
Visualize validation results with the Streamlit dashboard:
cd test_and_deploy
pip install -r dashboard_requirements.txt
streamlit run validation_dashboard.py
The dashboard shows:
- Validation pass/fail rates over time
- Failed constraints breakdown
- Per-instance validation history
- Severity distribution
Demo Data Generator
For testing time series validation, enable the demo data generator:
Creates 25 test time series with various quality patterns:
| Pattern | Count | Description | Expected Validation |
|---|---|---|---|
good_* |
5 | Regular sensor data | โ Pass all checks |
outlier_* |
5 | Data with extreme outliers | โ Fail outlier detection |
gap_* |
5 | Random gaps (2-6 hours) | โ Fail gap detection |
decrease_* |
3 | Counter resets | โ Fail decrease checks |
sparse_* |
3 | Low data density (10%) | โ Fail density checks |
noisy_* |
2 | High variance (50% std dev) | โ Fail range checks |
empty_* |
2 | No datapoints | โ Fail presence checks |
Enable Demo Data
- Set
demo_data.enabled: truein your environment'ssettings.yaml - Deploy:
python scripts/deploy_demo_data.py
The generator runs every minute, creating realistic test data patterns for validation testing.
Local Development
Setup
- Create
.envfile with CDF credentials:
COGNITE_CLUSTER=api
COGNITE_PROJECT=my-project
COGNITE_CLIENT_ID=your-client-id
COGNITE_CLIENT_SECRET=your-client-secret
AZURE_TENANT_ID=your-tenant-id
- Install dependencies:
pip install cognite-data-quality
# Or install with development dependencies
pip install cognite-data-quality[dev]
- Use notebooks in
test_and_deploy/:testing_and_exploration.ipynb: Test validation logicdeploy.ipynb: Manual deploymentvalidation_dashboard.py: Visualize results
Validate Configurations
Before deploying, validate your configs:
python scripts/validate_config.py
This checks:
- YAML syntax
- SHACL Turtle syntax
- Required fields
- File references
Troubleshooting
Function Logs
View function execution logs:
from cognite.client import CogniteClient
client = CogniteClient()
# Get recent function calls
calls = client.functions.calls.list(
function_external_id="data-quality-validation",
limit=10
)
# Get logs for a specific call
logs = client.functions.calls.get_logs(call_id=calls[0].id)
print(logs)
Workflow Executions
Check workflow execution status:
executions = client.workflows.executions.list(
workflow_external_id="dq-shacl-equipment",
limit=10
)
for execution in executions:
print(f"Status: {execution.status}, Started: {execution.started_at}")
Common Issues
Issue: Workflow not triggering on data changes
- Check trigger status:
client.workflows.triggers.retrieve(external_id="...") - Verify instance space matches trigger filter
- Check workflow version matches trigger version
Issue: "Failed to load SHACL rules"
- Ensure SHACL file exists in CDF Files
- Check file external_id matches config
- Verify file content is valid Turtle syntax
Issue: "No instances to validate"
- Check DMS filter in time series config
- Verify time series exist in specified space
- Ensure view and version match
Prerequisites
- Python 3.10+
- Cognite Python SDK
- thisisneat 0.2.3 (automatically installed with package)
- Access to CDF project with:
- Cognite Functions
- Workflows
- Records API (alpha)
- Data Modeling Services (DMS)
License
See LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cognite_data_quality-0.1.0.tar.gz.
File metadata
- Download URL: cognite_data_quality-0.1.0.tar.gz
- Upload date:
- Size: 467.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c9f7142bc998673bb095176fbe27b8532cbf3621bbb2832ae2788fc689506779
|
|
| MD5 |
4c29a2acd1dd7074ea1a13e936c05f02
|
|
| BLAKE2b-256 |
de1cc1fbe081ebe90afcd0a988675cb6ebdb16a7f1fe2731bc0336a2f228bdbd
|
File details
Details for the file cognite_data_quality-0.1.0-py3-none-any.whl.
File metadata
- Download URL: cognite_data_quality-0.1.0-py3-none-any.whl
- Upload date:
- Size: 113.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
47d777f86ba9eb56df46d59b9ab5bc798e17b6c7ba059ff4193a7236e0df2063
|
|
| MD5 |
4eae121c1dfb154412ff9ef3416fa9db
|
|
| BLAKE2b-256 |
2d13f124dfd6779071c3dce71a946b598c3b3b0bc20fcd1aa0cb8f5eb5733ac0
|