Skip to main content

Data quality validation runner for SHACL rules in Cognite Data Fusion.

Project description

Data Quality Validation

SHACL-based data quality validation for Cognite Data Fusion (CDF) using Cognite Functions and Workflows.

Overview

This repository provides automated data quality validation for both DMS instances and time series data using SHACL (Shapes Constraint Language) rules. Validation results are posted to the CDF Records API for tracking and analysis.

Key Features:

  • ๐Ÿ” Instance Validation: Validates DMS instances on data changes with automatic reference loading
  • ๐Ÿ“Š Time Series Validation: Quality checks using CDF SDK and INDSL functions (outliers, gaps, density)
  • ๐ŸŽฏ Multi-Environment: Separate configurations for different environments (e.g., cog-ai, abp-dev)
  • ๐Ÿ”„ Automated Deployment: CI/CD pipeline with intelligent change detection
  • ๐Ÿ“ˆ Records API Integration: All validation results stored for analysis and monitoring

Python package (cognite-data-quality):

  • run_validation: Run validation directly from DMS (no workflows). Supports TTL, JSON, or YAML view config. Ideal for development.
  • deploy_validation_infrastructure: Unified deployment of functions, workflows, and triggers.
  • deploy_incremental: Smart incremental deployment with hash-based change detection.
  • call_validation: Invoke the unified validation function with type-based dispatch (instance, timeseries, orchestrator, etc.).

Architecture

Instance Validation Flow

graph LR
    A[DMS Instance Change] --> B[Workflow Trigger]
    B --> C[Cognite Function:<br/>data-quality-validation<br/>type: instance]
    C --> D[Load SHACL Rules<br/>from CDF Files]
    C --> E[Fetch Instances<br/>from DMS]
    E --> F[Auto-load References<br/>configurable depth]
    D --> G[NEAT Validation]
    F --> G
    G --> H{Conforms?}
    H -->|Yes| I[Records API:<br/>PASSED]
    H -->|No| J[Records API:<br/>FAILED + Violations]
    I --> K[DataQualityValidationRecord]
    J --> K

Time Series Validation Flow

graph LR
    A[Scheduled Trigger<br/>cron] --> B[Cognite Function:<br/>data-quality-validation<br/>type: timeseries]
    B --> C[Load SHACL Rules<br/>from CDF Files]
    B --> D{Selection Method}
    D -->|DMS Filter| E[Filter Time Series<br/>by prefix/properties]
    D -->|Instance IDs| F[Explicit List<br/>of Time Series]
    E --> G[Fetch Time Series<br/>from DMS]
    F --> G
    C --> H[NEAT Validation<br/>with CDF/INDSL Functions]
    G --> H
    H --> I{Backfill Mode?}
    I -->|Yes| J[Process Historic Data<br/>in Chunks]
    I -->|No| K[Validate Last<br/>Complete Hour]
    J --> L[Save Cursor State<br/>for Resume]
    K --> M[Records API:<br/>Per Time Series Result]
    L --> M
    M --> N[DataQualityValidationRecord]

Repository Structure

.
โ”œโ”€โ”€ cognite_data_quality/                # Python package
โ”‚   โ”œโ”€โ”€ _function_code/                  # Unified SHACL validation function code
โ”‚   โ”‚   โ”œโ”€โ”€ handler.py                   # Main dispatcher (routes by validation_type)
โ”‚   โ”‚   โ”œโ”€โ”€ handlers/                    # Type-specific handlers
โ”‚   โ”‚   โ”‚   โ”œโ”€โ”€ instance_validation.py   # Real-time instance validation
โ”‚   โ”‚   โ”‚   โ”œโ”€โ”€ partitioned_validation.py # Historic partition validation
โ”‚   โ”‚   โ”‚   โ”œโ”€โ”€ timeseries_validation.py # Time series validation with backfill
โ”‚   โ”‚   โ”‚   โ”œโ”€โ”€ orchestrator.py          # Historic validation orchestrator
โ”‚   โ”‚   โ”‚   โ””โ”€โ”€ test_rule.py             # Test SHACL rules without writing
โ”‚   โ”‚   โ””โ”€โ”€ common/                      # Shared utilities
โ”‚   โ”‚       โ”œโ”€โ”€ cursor_state.py          # Cursor state management
โ”‚   โ”‚       โ”œโ”€โ”€ records_api.py           # Records API helpers
โ”‚   โ”‚       โ”œโ”€โ”€ shacl_utils.py           # SHACL utilities
โ”‚   โ”‚       โ””โ”€โ”€ time_window.py           # Time window parsing
โ”‚   โ”œโ”€โ”€ deploy.py                        # Unified deployment API
โ”‚   โ”œโ”€โ”€ invoke.py                        # Function invocation API
โ”‚   โ”œโ”€โ”€ runner.py                        # Direct validation runner (no workflows)
โ”‚   โ””โ”€โ”€ template_generator.py            # Generate config templates
โ”‚
โ”œโ”€โ”€ config/                              # Configuration by environment
โ”‚   โ””โ”€โ”€ environments/
โ”‚       โ”œโ”€โ”€ cog-ai/                      # Example environment
โ”‚       โ”‚   โ”œโ”€โ”€ settings.yaml            # Environment settings
โ”‚       โ”‚   โ”œโ”€โ”€ shacl_rules/             # SHACL rules (TTL files)
โ”‚       โ”‚   โ”œโ”€โ”€ views/                   # Instance validation configs
โ”‚       โ”‚   โ””โ”€โ”€ timeseries/              # Time series validation configs
โ”‚       โ””โ”€โ”€ abp-dev/                     # Another environment
โ”‚           โ”œโ”€โ”€ settings.yaml
โ”‚           โ”œโ”€โ”€ shacl_rules/
โ”‚           โ””โ”€โ”€ views/
โ”‚
โ”œโ”€โ”€ generate_demo_data/                  # Demo data generator
โ”‚   โ”œโ”€โ”€ handler.py
โ”‚   โ””โ”€โ”€ requirements.txt
โ”‚
โ”œโ”€โ”€ scripts/                             # Deployment & utility scripts
โ”‚   โ”œโ”€โ”€ shared.py                        # Common utilities
โ”‚   โ”œโ”€โ”€ deploy_infrastructure.py         # Unified deployment script (replaces 5 scripts)
โ”‚   โ””โ”€โ”€ validate_config.py               # Validate YAML/SHACL syntax
โ”‚
โ””โ”€โ”€ test_and_deploy/                     # Notebooks & dashboard
    โ”œโ”€โ”€ historic_data_validation.ipynb   # Historic validation notebook
    โ”œโ”€โ”€ validation_dashboard.py          # Streamlit dashboard
    โ””โ”€โ”€ batch_dashboard.py               # Batch validation dashboard

Configuration Structure

Environment Settings (settings.yaml)

Each environment has its own settings.yaml defining:

function:                              # Unified validation function
  external_id: "data-quality-validation"
  name: "Data Quality Validation"
  description: "Unified function for all SHACL validation types (instance, partitioned, timeseries, orchestrator)"
  runtime: "py311"
  source_dir: "function"
  include_files:
    - "handler.py"
    - "handlers/**/*.py"
    - "common/**/*.py"
    - "requirements.txt"

workflow:                              # Instance workflow settings
  external_id_prefix: "dq-shacl"
  version: "1"
  task:
    retries: 3
    timeout: 600

timeseries_workflow:                   # Time series workflow settings
  external_id_prefix: "dq-ts-shacl"
  version: "1"
  task:
    retries: 3
    timeout: 1800

trigger:                               # Instance trigger settings
  external_id_prefix: "dq-shacl-trigger"
  batch_size: 10
  batch_timeout: 60

records:                               # Records API settings
  space: "dataQuality"
  container: "DataQualityValidationRecord"
  stream_id: "dq_validation_stream"

shacl_rules:
  source_dir: "config/environments/cog-ai/shacl_rules"
  mime_type: "text/turtle"

timeseries:
  config_dir: "config/environments/cog-ai/timeseries"

Instance Validation Config (views/*.yaml)

view:
  space: "sp_enterprise_process_industry"
  external_id: "Equipment"
  version: "v1"

instance_space: "sp_enterprise_process_industry"

shacl_rules:
  file: "equipment_shacl_rules.ttl"
  external_id: "equipment_shacl_rules"

datamodel:
  space: "sp_enterprise_process_industry"
  external_id: "EnterpriseProcessIndustry"
  version: "v1"

validation:
  auto_load_depth: 2                   # Auto-load referenced instances
  verbose: true

records:
  rule_set_id: "EquipmentSHACLv1"
  rule_set_version: "1.0"

Time Series Validation Config (timeseries/*.yaml)

name: "demo-timeseries-quality"
description: "Quality checks for demo time series"

# Selection method 1: DMS Filter (server-side filtering)
filter:
  instance_space: "ts_dq_test"
  view:
    space: "cdf_cdm"
    external_id: "CogniteTimeSeries"
    version: "v1"
  expression:
    type: "prefix"
    property: ["node", "externalId"]
    value: "dq_test_"
  limit: 1000

# OR Selection method 2: Explicit instance IDs
# instance_ids:
#   - space: "ts_dq_test"
#     external_id: "sensor_001"
#   - space: "ts_dq_test"
#     external_id: "sensor_002"

shacl_rules:
  file: "timeseries_quality_rules.ttl"
  external_id: "demo_timeseries_shacl_rules"

datamodel:
  space: "cdf_cdm"
  external_id: "CogniteCore"
  version: "v1"

validation:
  auto_load_depth: 0
  verbose: true

records:
  rule_set_id: "DemoTimeSeriesQuality"
  rule_set_version: "1.0"

schedule:
  cron: "0 * * * *"                    # Every hour

# Optional: Backfill historic data
backfill:
  enabled: false
  start_time: "30d-ago"
  end_time: "now"
  window_minutes: 60

Deployment

CI/CD Deployment (Recommended)

The repository uses GitHub Actions for automated deployment:

  1. Push to main: Automatically deploys all changes
  2. Pull Request: Validates configurations without deploying
  3. Manual Trigger: Supports dry-run, force redeploy, and specific view/config selection

Deployment Steps

The CI/CD pipeline uses a unified deployment script:

python scripts/deploy_infrastructure.py --env cog-ai

This single script handles:

  1. Records Container: Ensures Records API container exists
  2. Functions: Deploys all Cognite Functions with thisisneat 0.2.3
  3. Instance Workflows: Deploys workflows & triggers for real-time validation
  4. Time Series Workflows: Deploys scheduled workflows for time series validation
  5. Data Models (optional): Can deploy data models if needed

Smart Deployment: Only redeploys when:

  • Function code or dependencies change (tracked via hash in function metadata)
  • View/timeseries configuration changes (tracked via hash in workflow description)
  • SHACL rules content changes
  • Function secrets change

Manual Deployment

Option 1: Unified Script (Recommended)

# Set environment variables
export COGNITE_CLUSTER=api
export COGNITE_PROJECT=my-project
export COGNITE_CLIENT_ID=your-client-id
export COGNITE_CLIENT_SECRET=your-client-secret
export AZURE_TENANT_ID=your-tenant-id

# Install package
pip install cognite-data-quality

# Deploy everything
python scripts/deploy_infrastructure.py --env cog-ai

# Deployment options
python scripts/deploy_infrastructure.py --env cog-ai --dry-run         # Preview changes
python scripts/deploy_infrastructure.py --env cog-ai --force           # Force redeployment
python scripts/deploy_infrastructure.py --env cog-ai --views equipment # Deploy specific views

Option 2: Python API

from cognite_data_quality import (
    deploy_validation_infrastructure,
    deploy_incremental,
    load_cognite_client_from_toml,
)

client = load_cognite_client_from_toml()

# Full deployment
deploy_validation_infrastructure(
    client=client,
    env_name="cog-ai",
    config_dir="config/environments/cog-ai",
    force=False,
)

# Or incremental deployment (only changed resources)
deploy_incremental(
    client=client,
    env_name="cog-ai",
    config_dir="config/environments/cog-ai",
)

GitHub Environment Setup

Create a GitHub environment (e.g., cog-ai-arn) with:

Secrets:

  • COGNITE_CLIENT_ID: OAuth client ID
  • COGNITE_CLIENT_SECRET: OAuth client secret

Variables:

  • COGNITE_CLUSTER: CDF cluster (e.g., api, westeurope-1)
  • COGNITE_PROJECT: CDF project name
  • AZURE_TENANT_ID: Azure AD tenant ID

Adding New Validations

Adding Instance Validation

  1. Create SHACL rules in config/environments/{env}/shacl_rules/my_view_shacl_rules.ttl:
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix : <http://example.org/shapes#> .

:MyViewShape a sh:NodeShape ;
    sh:targetClass <http://example.org/MyView> ;
    sh:property [
        sh:path <http://example.org/name> ;
        sh:minCount 1 ;
        sh:datatype xsd:string ;
        sh:message "Name is required and must be a string" ;
    ] .
  1. Create view config in config/environments/{env}/views/my_view.yaml:
view:
  space: "my_space"
  external_id: "MyView"
  version: "v1"

instance_space: "my_instances"

shacl_rules:
  file: "my_view_shacl_rules.ttl"
  external_id: "my_view_shacl_rules"

datamodel:
  space: "my_space"
  external_id: "MyDataModel"
  version: "v1"

validation:
  auto_load_depth: 2
  verbose: true

records:
  rule_set_id: "MyViewSHACLv1"
  rule_set_version: "1.0"
  1. Deploy: Commit and push to main, or run manually:
python scripts/deploy_infrastructure.py --env my_env --views my_view

Adding Time Series Validation

  1. Create SHACL rules in config/environments/{env}/shacl_rules/my_timeseries_quality_rules.ttl:
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix cdf_sdk: <http://purl.org/cognite/cdf_sdk#> .
@prefix : <http://example.org/shapes#> .

:DataDensityShape a sh:NodeShape ;
    sh:targetClass <http://purl.org/cognite/cdf_cdm/v1/CogniteTimeSeries> ;
    sh:sparql [
        sh:message "Time series must have at least 50 datapoints in the last hour" ;
        sh:select """
            SELECT $this ?count
            WHERE {
                BIND(cdf_sdk:datapoints_count($this, "60m-ago", "now") AS ?count)
                FILTER (?count < 50)
            }
        """ ;
    ] .
  1. Create config in config/environments/{env}/timeseries/my_sensors_quality.yaml:
name: "my-sensors-quality"
description: "Quality checks for my sensors"

filter:
  instance_space: "my_space"
  view:
    space: "cdf_cdm"
    external_id: "CogniteTimeSeries"
    version: "v1"
  expression:
    type: "prefix"
    property: ["node", "externalId"]
    value: "sensor_"
  limit: 500

shacl_rules:
  file: "my_timeseries_quality_rules.ttl"
  external_id: "my_sensors_shacl_rules"

datamodel:
  space: "cdf_cdm"
  external_id: "CogniteCore"
  version: "v1"

validation:
  auto_load_depth: 0
  verbose: true

records:
  rule_set_id: "MySensorsQuality"
  rule_set_version: "1.0"

schedule:
  cron: "0 6 * * *"  # Daily at 6 AM UTC
  1. Deploy: Commit and push to main, or run manually:
python scripts/deploy_infrastructure.py --env my_env --timeseries my_sensors_quality

Time Series Quality Functions

The time series validation supports these CDF SPARQL functions:

CDF SDK Functions

Function Description
cdf_sdk:datapoints_count(?ts, start, end) Count datapoints in time range
cdf_sdk:datapoints_average(?ts, start, end) Average value in time range
cdf_sdk:datapoints_min(?ts, start, end) Minimum value in time range
cdf_sdk:datapoints_max(?ts, start, end) Maximum value in time range
cdf_sdk:timeseries_exists(?ts) Check if time series exists

INDSL Functions

Function Description
cdf_indsl:extreme_outliers(?ts, alpha) Detect extreme outliers using modified Z-score
cdf_indsl:gaps_identification(?ts, threshold) Detect gaps exceeding threshold
cdf_indsl:value_decrease_check(?ts) Check for value decreases (for counters)
cdf_indsl:out_of_range(?ts) Detect out-of-range values

Time Windows in SHACL Rules

Use placeholders that get replaced at runtime:

  • "60m-ago", "90m-ago", "24h-ago", "7d-ago": Relative time (replaced with actual timestamp)
  • "now": Current time (replaced with actual timestamp)

Example:

BIND(cdf_sdk:datapoints_count($this, "60m-ago", "now") AS ?count)

For hourly schedules, use a 90-minute window to ensure 30-minute overlap for edge cases.

Backfill Mode

Time series validation supports backfilling historic data:

backfill:
  enabled: true
  start_time: "30d-ago"         # Or ISO timestamp: "2024-01-01T00:00:00Z"
  end_time: "now"               # Or ISO timestamp: "2024-01-31T23:59:59Z"
  window_minutes: 60            # Chunk size (should match SHACL rule window)

Features:

  • Processes data in chunks aligned to hour boundaries
  • Saves cursor state for resume on failure
  • Automatically switches to normal mode when complete
  • Each chunk posts separate records to Records API

Validation Results

All validation results are posted to the CDF Records API:

{
  "space": "dataQuality",
  "externalId": "dq_{ruleSetId}_{instanceId}_{jobRunId}",
  "sources": [{
    "source": {
      "type": "container",
      "space": "dataQuality",
      "externalId": "DataQualityValidationRecord"
    },
    "properties": {
      "ruleSetId": "EquipmentSHACLv1",
      "ruleSetVersion": "1.0",
      "jobRunId": "validation_1706234567",
      "passedValidation": false,
      "resultSeverity": ["Violation"],
      "failedConstraints": ["MinCountConstraintComponent::name"],
      "focusNode": "http://purl.org/cognite/sp_enterprise/equipment_001",
      "focusNodeInstance": {
        "space": "sp_enterprise",
        "externalId": "equipment_001"
      },
      "validationReport": {
        "violationCount": 1,
        "violations": [{
          "sourceConstraintComponent": "MinCountConstraintComponent",
          "resultMessage": "Name is required",
          "resultSeverity": "Violation",
          "resultPath": "name",
          "value": null
        }],
        "summary": "1 violation(s)"
      },
      "dataDomainExternalId": "Equipment"
    }
  }]
}

Validation Dashboard

Visualize validation results with the Streamlit dashboard:

cd test_and_deploy
pip install -r dashboard_requirements.txt
streamlit run validation_dashboard.py

The dashboard shows:

  • Validation pass/fail rates over time
  • Failed constraints breakdown
  • Per-instance validation history
  • Severity distribution

Demo Data Generator

For testing time series validation, enable the demo data generator:

Creates 25 test time series with various quality patterns:

Pattern Count Description Expected Validation
good_* 5 Regular sensor data โœ… Pass all checks
outlier_* 5 Data with extreme outliers โŒ Fail outlier detection
gap_* 5 Random gaps (2-6 hours) โŒ Fail gap detection
decrease_* 3 Counter resets โŒ Fail decrease checks
sparse_* 3 Low data density (10%) โŒ Fail density checks
noisy_* 2 High variance (50% std dev) โŒ Fail range checks
empty_* 2 No datapoints โŒ Fail presence checks

Enable Demo Data

  1. Set demo_data.enabled: true in your environment's settings.yaml
  2. Deploy:
python scripts/deploy_demo_data.py

The generator runs every minute, creating realistic test data patterns for validation testing.

Local Development

Setup

  1. Create .env file with CDF credentials:
COGNITE_CLUSTER=api
COGNITE_PROJECT=my-project
COGNITE_CLIENT_ID=your-client-id
COGNITE_CLIENT_SECRET=your-client-secret
AZURE_TENANT_ID=your-tenant-id
  1. Install dependencies:
pip install cognite-data-quality
# Or install with development dependencies
pip install cognite-data-quality[dev]
  1. Use notebooks in test_and_deploy/:
    • testing_and_exploration.ipynb: Test validation logic
    • deploy.ipynb: Manual deployment
    • validation_dashboard.py: Visualize results

Validate Configurations

Before deploying, validate your configs:

python scripts/validate_config.py

This checks:

  • YAML syntax
  • SHACL Turtle syntax
  • Required fields
  • File references

Troubleshooting

Function Logs

View function execution logs:

from cognite.client import CogniteClient
client = CogniteClient()

# Get recent function calls
calls = client.functions.calls.list(
    function_external_id="data-quality-validation",
    limit=10
)

# Get logs for a specific call
logs = client.functions.calls.get_logs(call_id=calls[0].id)
print(logs)

Workflow Executions

Check workflow execution status:

executions = client.workflows.executions.list(
    workflow_external_id="dq-shacl-equipment",
    limit=10
)

for execution in executions:
    print(f"Status: {execution.status}, Started: {execution.started_at}")

Common Issues

Issue: Workflow not triggering on data changes

  • Check trigger status: client.workflows.triggers.retrieve(external_id="...")
  • Verify instance space matches trigger filter
  • Check workflow version matches trigger version

Issue: "Failed to load SHACL rules"

  • Ensure SHACL file exists in CDF Files
  • Check file external_id matches config
  • Verify file content is valid Turtle syntax

Issue: "No instances to validate"

  • Check DMS filter in time series config
  • Verify time series exist in specified space
  • Ensure view and version match

Prerequisites

  • Python 3.10+
  • Cognite Python SDK
  • thisisneat 0.2.3 (automatically installed with package)
  • Access to CDF project with:
    • Cognite Functions
    • Workflows
    • Records API (alpha)
    • Data Modeling Services (DMS)

License

See LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cognite_data_quality-0.1.1.tar.gz (694.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cognite_data_quality-0.1.1-py3-none-any.whl (121.4 kB view details)

Uploaded Python 3

File details

Details for the file cognite_data_quality-0.1.1.tar.gz.

File metadata

  • Download URL: cognite_data_quality-0.1.1.tar.gz
  • Upload date:
  • Size: 694.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.11.15

File hashes

Hashes for cognite_data_quality-0.1.1.tar.gz
Algorithm Hash digest
SHA256 b0f69c0ad41667a8bbfa69bd53157508a7d42e24a2c9cfbf66535f9943adfa31
MD5 99965b2abddd7435566e9617cdde8614
BLAKE2b-256 9c4bac7f90b56588cf88d73a419ff48f8bea19e78e92ed27d17238e3d48f8e08

See more details on using hashes here.

File details

Details for the file cognite_data_quality-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for cognite_data_quality-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 0f53eff37caaac0fce41b75e448494176d71ce69bfd891383ae12c2c8ea89b2d
MD5 0615c971230bfd7a6aea942601a16390
BLAKE2b-256 921b98e2931a4182b7af27a36777d7923ab77a52e4e2a475c9451b1fb0a31212

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page