Official orchestration integrations for Truthound data quality framework
Project description
Alpha Stage — This project is currently in alpha. APIs may change without notice. Not recommended for production use yet.
Truthound Orchestration
Truthound Orchestration provides the official orchestration integrations for Truthound 3.x across major workflow orchestration platforms. It is the first-party compatibility line for running Truthound in orchestration environments, while still exposing Protocol-based extension points for advanced teams that need alternative or custom engines.
Documentation: https://truthound.netlify.app
Quick Start
# Recommended: Truthound 3.x with a platform integration
pip install truthound-orchestration[airflow] "truthound>=3.0,<4.0"
pip install truthound-orchestration[dagster] "truthound>=3.0,<4.0"
pip install truthound-orchestration[prefect] "truthound>=3.0,<4.0"
# Core package + Truthound 3.x
pip install truthound-orchestration "truthound>=3.0,<4.0"
# All supported platform integrations
pip install truthound-orchestration[all] "truthound>=3.0,<4.0"
from common.engines import TruthoundEngine
import polars as pl
engine = TruthoundEngine()
df = pl.read_csv("data.csv")
with engine:
# Data validation
result = engine.check(df, auto_schema=True)
print(f"Status: {result.status.name}")
# Drift detection (14 statistical methods)
drift = engine.detect_drift(baseline_df, current_df, method="ks")
print(f"Drifted: {drift.is_drifted}, Rate: {drift.drift_rate:.2%}")
# Anomaly detection (ML-based)
anomalies = engine.detect_anomalies(df, detector="isolation_forest")
print(f"Anomalies: {anomalies.has_anomalies}, Rate: {anomalies.anomaly_rate:.2%}")
Truthound 3.x Compatibility
truthound-orchestration 3.x supports Truthound 3.x only.
- Supported Truthound versions:
>=3.0,<4.0 - Unsupported Truthound versions:
1.xand2.x - This policy applies to the root package and the platform integration packages in this repository
- If you need an older Truthound engine line, stay on an older
truthound-orchestrationrelease line
This release line exists to provide a clear compatibility boundary for the Truthound 3 runtime and result contracts.
Table of Contents
- Quick Start
- Truthound 3.x Compatibility
- Overview
- Architecture
- Implementation Status
- Supported Platforms
- Installation
- Usage Examples
- Advanced Engine Support
- Development
- Contributing
- License
- Common Module Documentation
- Enterprise Features
- Related Projects
- Support
Overview
Motivation
Modern data ecosystems need data quality checks that fit naturally inside orchestration systems without losing runtime semantics. Truthound Orchestration exists to make Truthound 3.x a first-class citizen in Airflow, Dagster, Prefect, dbt, Mage, and Kestra through official adapters, shared result semantics, and a clearly documented compatibility line.
Design Principles
| Principle | Description |
|---|---|
| Truthound-First Experience | Official orchestration integrations for Truthound 3.x with Truthound as the primary documented path |
| Platform-Native Patterns | Adheres to the idiomatic conventions of each target platform |
| Protocol-Based Architecture | Employs Python Protocols internally for loose coupling, maintainability, and advanced extensibility |
| Clear Compatibility Line | truthound-orchestration 3.x targets Truthound 3.x only |
| Advanced Extensibility | Alternative and custom engines remain available for advanced integrations |
Core Capabilities
- Truthound 3.x Integration: Official orchestration adapters for validation, profiling, schema learning, drift, anomaly detection, and streaming workflows
- Data Validation: Execute comprehensive validation rules across multiple data quality dimensions
- Data Profiling: Perform automated statistical analysis and pattern detection
- Schema Learning: Automatically infer validation rules from data characteristics
- Data Drift Detection: Detect distribution changes between baseline and current data using 14 statistical methods (KS, PSI, Chi2, KL, JS, Wasserstein, etc.)
- Anomaly Detection: ML-based anomaly detection with Isolation Forest, Z-Score, LOF, and Ensemble detectors
- Streaming Validation: Memory-efficient batch-by-batch validation of streaming data via Iterator/Generator patterns
- Cross-Platform Consistency: Maintain uniform validation semantics across all supported platforms
- Advanced Extensibility: Protocol-based hooks for alternative engines and custom integrations when needed
Architecture
The system architecture keeps a protocol-based core so the first-party Truthound integration remains maintainable and testable. For the 3.x release line, Truthound is the primary supported runtime, while extended protocols preserve room for advanced custom integrations.
┌─────────────────────────────────────────────────────────────────────────────┐
│ Workflow Orchestration Layer │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────┐ ┌──────┐ ┌────────┐ │
│ │ Airflow │ │ Dagster │ │ Prefect │ │ dbt │ │ Mage │ │ Kestra │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └──┬──┘ └──┬───┘ └───┬────┘ │
└───────┼───────────┼───────────┼─────────┼───────┼─────────┼────────────────┘
└───────────┴───────────┴────┬────┴───────┴─────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Common Module │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Protocols │ │ Config │ │ Serializers │ │ Exceptions │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ DataQualityEngine Protocol (Core) │ │
│ │ check(data, rules) -> CheckResult │ │
│ │ profile(data) -> ProfileResult │ │
│ │ learn(data) -> LearnResult │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Extended Protocols (Opt-in) │ │
│ │ DriftDetectionEngine -> detect_drift() (14 methods) │ │
│ │ AnomalyDetectionEngine -> detect_anomalies() (4 detectors) │ │
│ │ StreamingEngine -> check_stream() (Iterator pattern) │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
│
┌────────────────────────┼────────────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────────────┐ ┌─────────────────┐
│ Truthound │ │ Great Expectations │ │ Custom Engine │
│ (Default) │ │ (Optional) │ │ (Optional) │
│ Drift/Anomaly/ │ └─────────────────────────┘ └─────────────────┘
│ Streaming │
└─────────────────┘
Implementation Status
Common Module
| Status | Complete |
|---|
The Common Module provides the foundational types and utilities shared across all platform integrations, including the core DataQualityEngine Protocol.
| Component | Description | Status |
|---|---|---|
base.py |
Protocols (including DataQualityEngine), enumerations, configuration, and result types |
Complete |
config.py |
Environment and file-based configuration management | Complete |
exceptions.py |
Hierarchical exception system | Complete |
logging.py |
Structured logging with context propagation and sensitive data masking | Complete |
retry.py |
Retry decorator with configurable backoff strategies | Complete |
circuit_breaker.py |
Circuit breaker pattern for fault tolerance | Complete |
health.py |
Health check system with composite checks and aggregation strategies | Complete |
metrics.py |
Metrics collection and distributed tracing | Complete |
rate_limiter.py |
Rate limiting with multiple algorithms (Token Bucket, Sliding Window, etc.) | Complete |
cache.py |
Caching infrastructure with configurable eviction policies and backend abstraction | Complete |
serializers.py |
Platform-specific serialization utilities | Complete |
testing.py |
Mock objects, fixtures, and assertion helpers | Complete |
rule_validation.py |
Rule validation with schema definitions and engine-specific validators | Complete |
engines/ |
Engine implementations (Truthound default, adapter for other engines) | Complete |
engines/batch.py |
Batch operations with chunking, parallel execution, and result aggregation | Complete |
engines/config.py |
Engine configuration system (Builder, Loader, Validator, Registry) | Complete |
engines/metrics.py |
Engine metrics integration with hooks for logging, metrics, and tracing | Complete |
engines/aggregation.py |
Multi-engine result aggregation with strategies, comparison, and weighted scoring | Complete |
engines/version.py |
Semantic versioning, version constraints, compatibility checking, and version registry | Complete |
exporters/prometheus.py |
Prometheus metrics export with Push Gateway, HTTP server, and multi-tenant support | Complete |
Key Components:
- Engine Protocol:
DataQualityEngine,AsyncDataQualityEngine- core abstraction for any data quality engine - Extended Protocols:
DriftDetectionEngine,AnomalyDetectionEngine,StreamingEngine,AsyncStreamingEngine- opt-in protocols for advanced capabilities - Engine Lifecycle:
ManagedEngine,AsyncManagedEngine,EngineLifecycleManager,AsyncEngineLifecycleManager- lifecycle management with start/stop/health_check - Engine Implementations:
TruthoundEngine(default, supports drift/anomaly/streaming),GreatExpectationsAdapter,PanderaAdapter - Engine Configuration:
BaseEngineConfig,ConfigBuilder,ConfigLoader,ConfigValidator,ConfigRegistry,EnvironmentConfig- flexible configuration with builder pattern, multi-source loading, and validation - Protocols:
WorkflowIntegration,AsyncWorkflowIntegration,ExtendedWorkflowIntegration - Configuration Types:
CheckConfig,ProfileConfig,LearnConfig,DriftConfig,AnomalyConfig,StreamConfig,RetryConfig,CircuitBreakerConfig,HealthCheckConfig,MetricsConfig,TracingConfig,RateLimitConfig - Result Types:
CheckResult,ProfileResult,LearnResult,DriftResult,AnomalyResult,HealthCheckResult - Drift/Anomaly Types:
DriftStatus,AnomalyStatus,DriftMethod,ColumnDrift,AnomalyScore - Feature Detection:
supports_drift(),supports_anomaly(),supports_streaming()- runtime engine capability checking - Serializers:
AirflowXComSerializer,DagsterOutputSerializer,PrefectArtifactSerializer - Logging:
TruthoundLogger,LogContext,PerformanceLogger,SensitiveDataMasker - Retry:
retry,RetryConfig,RetryStrategy,RetryExecutor,LoggingRetryHook - Circuit Breaker:
circuit_breaker,CircuitBreaker,CircuitBreakerConfig,CircuitState - Health Check:
health_check,HealthCheckConfig,HealthStatus,CompositeHealthChecker,AggregationStrategy - Metrics:
Counter,Gauge,Histogram,Summary,MetricsRegistry,timed,counted - Tracing:
Span,trace,TracingRegistry,TraceContext,SpanKind,SpanStatus - Rate Limiting:
rate_limit,RateLimitConfig,RateLimitAlgorithm,RateLimiterRegistry,TokenBucketRateLimiter,SlidingWindowRateLimiter - Caching:
cached,CacheConfig,EvictionPolicy,CacheBackend,LRUCache,LFUCache,TTLCache,CacheRegistry,CacheExecutor - Engine Lifecycle:
ManagedEngine,AsyncManagedEngine,EngineState,EngineConfig,EngineLifecycleManager,AsyncEngineLifecycleManager,EngineHealthChecker,AsyncEngineHealthChecker,ManagedEngineMixin,AsyncManagedEngineMixin - Async Adapters:
SyncEngineAsyncAdapter,SyncToAsyncLifecycleHookAdapter- wrap sync engines/hooks for async contexts - Async Lifecycle Hooks:
AsyncLifecycleHook,AsyncLoggingLifecycleHook,AsyncMetricsLifecycleHook,AsyncCompositeLifecycleHook - Batch Operations:
BatchExecutor,AsyncBatchExecutor,BatchConfig,ExecutionStrategy,AggregationStrategy,ChunkingStrategy- large dataset processing with chunking and parallel execution - Engine Metrics:
InstrumentedEngine,AsyncInstrumentedEngine,EngineMetricsHook,MetricsEngineHook,LoggingEngineHook,TracingEngineHook,StatsCollectorHook- automatic metrics collection for engine operations - Result Aggregation:
MultiEngineAggregator,AggregationConfig,ResultAggregationStrategy,CheckResultMergeAggregator,CheckResultWeightedAggregator,AggregatorRegistry- multi-engine result combination with strategies - Engine Versioning:
SemanticVersion,VersionConstraint,VersionRange,VersionCompatibilityChecker,VersionRegistry,parse_version,parse_constraint,require_version- SemVer 2.0.0 support with compatibility checking - Testing Utilities:
MockDataQualityEngine,MockDriftDetectionEngine,MockAnomalyDetectionEngine,MockStreamingEngine,MockFullEngine,AsyncMockDataQualityEngine,AsyncMockManagedEngine,MockCacheBackend,DataQualityTestContext - Prometheus Export:
PrometheusExporter,PrometheusPushGatewayClient,PrometheusHttpServer,TenantAwarePrometheusExporter,AsyncPrometheusExporter
Platform Integrations
| Status | Complete |
|---|
The platform integration layer provides native adapters for major workflow orchestration systems, enabling seamless incorporation of data quality validation into existing pipeline architectures.
| Platform | Package | Description | Status |
|---|---|---|---|
| Apache Airflow | packages/airflow/ |
Operators, Sensors, Hooks with SLA integration | Complete |
| Dagster | packages/dagster/ |
Resources, Assets, Ops with native type support | Complete |
| Prefect | packages/prefect/ |
Blocks, Tasks, Flows with async support | Complete |
| dbt | packages/dbt/ |
Generic Tests, Jinja macros, cross-adapter support | Complete |
| Mage AI | packages/mage/ |
Transformer, Sensor, Condition blocks with SLA monitoring | Complete |
| Kestra | packages/kestra/ |
Python scripts, YAML flow generators, output handlers | Complete |
Enterprise Extensions
| Status | Complete |
|---|
The enterprise module extends the core framework with production-grade capabilities for large-scale deployments.
| Component | Location | Description | Status |
|---|---|---|---|
| Enterprise Engines | packages/enterprise/engines/ |
Informatica, Talend, IBM InfoSphere, SAP Data Services adapters | Complete |
| Notifications | packages/enterprise/notifications/ |
Multi-channel notification system (Slack, Email, Webhook, PagerDuty, Opsgenie) | Complete |
| Multi-Tenant | packages/enterprise/multi_tenant/ |
Tenant isolation, quota management, context propagation | Complete |
| Secrets | packages/enterprise/secrets/ |
Secret management with multiple backends and audit logging | Complete |
Supported Platforms
truthound-airflow
| Status | Complete |
|---|
Apache Airflow Provider package implementing native Operators, Sensors, and Hooks for Truthound-first data quality workflows.
| Component | Description |
|---|---|
DataQualityCheckOperator |
Execute Truthound-backed data quality validation with optional advanced engine injection |
DataQualityProfileOperator |
Perform statistical profiling of datasets |
DataQualityLearnOperator |
Automatically infer validation schemas |
DataQualityDriftOperator |
Detect data drift between baseline and current datasets |
DataQualityAnomalyOperator |
Detect anomalies using ML-based detectors |
DataQualitySensor |
Monitor data quality conditions |
DataQualityHook |
Manage connections and data source interactions |
Key Features:
- Native Airflow Provider architecture with XCom serialization
- SLA monitoring integration with configurable alerting thresholds
- Connection management via Airflow Hooks
- Support for multiple data sources (S3, GCS, BigQuery, Snowflake)
truthound-dagster
| Status | Complete |
|---|
Dagster integration utilizing ConfigurableResource and Software-Defined Assets with Truthound-first defaults.
| Component | Description |
|---|---|
DataQualityResource |
Configurable resource with Truthound-first defaults and optional custom engine injection |
create_quality_check_asset |
Factory function for quality validation assets |
data_quality_check_op |
Op implementation for graph-based workflows |
data_quality_drift_op |
Drift detection op with dual-input (baseline + current) |
data_quality_anomaly_op |
Anomaly detection op with ML detectors |
DataQualitySensor |
Event-driven quality monitoring |
Key Features:
- Software-Defined Assets with automatic lineage tracking
- Type-safe configuration via Pydantic integration
- Native IOManager support for result persistence
- Freshness policies for data quality SLAs
truthound-prefect
| Status | Complete |
|---|
Prefect integration providing Blocks, Tasks, and Flow templates for Truthound-first orchestration workflows.
| Component | Description |
|---|---|
DataQualityBlock |
Persistent configuration storage with Truthound-first defaults and optional advanced engine selection |
data_quality_check |
Task decorator for validation operations |
data_quality_profile |
Task decorator for profiling operations |
data_quality_drift_task |
Async drift detection task with table artifact visualization |
data_quality_anomaly_task |
Async anomaly detection task with table artifact visualization |
validation_flow |
Reusable flow template for quality pipelines |
Key Features:
- Native async/await support for concurrent execution
- Block-based configuration with versioned storage
- Artifact generation for validation result visualization
- Integration with Prefect Cloud for centralized monitoring
truthound-dbt
| Status | Complete |
|---|
dbt package providing Generic Tests, Jinja macros, and Python utilities for SQL-based data quality validation with cross-adapter support.
| Component | Description |
|---|---|
test_truthound_check |
Generic test for declarative rule specification |
truthound_check.sql |
Main validation macro |
truthound_rules.sql |
Rule-specific SQL generators |
truthound_utils.sql |
Cross-adapter utility macros |
adapters/ |
Database-specific optimizations (Snowflake, BigQuery, Redshift, Databricks, PostgreSQL) |
| Python Package | Adapters, converters, generators, parsers, and hooks |
| Drift SQL Handlers | SQL-based drift detection (mean, stddev, null rate, distinct count, row count) |
| Anomaly SQL Handlers | SQL-based anomaly detection (Z-Score, IQR, range) |
Supported Databases:
- PostgreSQL (default)
- Snowflake
- BigQuery
- Redshift
- Databricks
truthound-mage
| Status | Complete |
|---|
Mage AI integration providing custom block implementations for data quality operations.
| Component | Description |
|---|---|
CheckTransformer |
Transformer block for data quality validation |
ProfileTransformer |
Transformer block for statistical profiling |
LearnTransformer |
Transformer block for schema inference |
DriftTransformer |
Transformer block for drift detection (dual-input) |
AnomalyTransformer |
Transformer block for ML anomaly detection |
DataQualitySensor |
Sensor block for quality condition monitoring |
DataQualityCondition |
Condition block for pipeline branching |
SLAMonitor |
SLA monitoring with violation tracking |
Key Features:
- Native Mage block architecture with execution context
- SLA monitoring with configurable thresholds and hooks
- Thread-safe consecutive failure tracking
- Builder pattern for immutable configuration
truthound-kestra
| Status | Complete |
|---|
Kestra integration providing Python script executors and YAML flow generators.
| Component | Description |
|---|---|
check_quality_script |
Script executor for data validation |
profile_data_script |
Script executor for statistical profiling |
learn_schema_script |
Script executor for schema inference |
drift_detection_script |
Script executor for drift detection |
anomaly_detection_script |
Script executor for anomaly detection |
FlowGenerator |
YAML flow generation from configuration |
KestraOutputHandler |
Native Kestra output integration |
SLAMonitor |
SLA monitoring with evaluation results |
Key Features:
- YAML flow generation for check, profile, learn, and pipeline flows
- Script executors with Kestra-native output handling
- Support for schedule, flow, and webhook triggers
- Retry configuration with exponential backoff
Installation
Requirements
| Requirement | Version |
|---|---|
| Python | >= 3.11 |
| Truthound (primary engine) | >= 3.0, < 4.0 |
Recommended Truthound 3.x Installation
This project is published as a single package with optional dependencies. For the primary 3.x support path, install truthound-orchestration together with Truthound 3.x.
# Core package + Truthound 3.x
pip install truthound-orchestration "truthound>=3.0,<4.0"
# With specific platform integration + Truthound 3.x
pip install truthound-orchestration[airflow] "truthound>=3.0,<4.0"
pip install truthound-orchestration[dagster] "truthound>=3.0,<4.0"
pip install truthound-orchestration[prefect] "truthound>=3.0,<4.0"
pip install truthound-orchestration[mage] "truthound>=3.0,<4.0"
pip install truthound-orchestration[kestra] "truthound>=3.0,<4.0"
# Multiple platforms + Truthound 3.x
pip install truthound-orchestration[airflow,dagster] "truthound>=3.0,<4.0"
# All platforms + Truthound 3.x
pip install truthound-orchestration[all] "truthound>=3.0,<4.0"
Truthound Engine Installation
Install Truthound explicitly for the supported 3.x runtime:
# Truthound 3.x (recommended)
pip install "truthound>=3.0,<4.0"
Complete Truthound Examples
# Airflow user with Truthound 3.x
pip install truthound-orchestration[airflow] "truthound>=3.0,<4.0"
# Dagster user with Truthound 3.x
pip install truthound-orchestration[dagster] "truthound>=3.0,<4.0"
# Prefect user with Truthound 3.x
pip install truthound-orchestration[prefect] "truthound>=3.0,<4.0"
Why Single Package?
| Aspect | Single Package (truthound-orchestration[airflow]) |
Multi Package (truthound-airflow) |
|---|---|---|
| Developer Experience | Simple, consistent | Multiple package names to remember |
| Versioning | Unified Truthound 3.x compatibility line | Separate version per package |
| Size | ~50KB core + platform deps | Same total size |
| Maintenance | Single first-party release line | 5 separate packages |
The common/ module is lightweight (approximately 50KB of pure Python). Platform-specific dependencies (Airflow approximately 200MB, Dagster approximately 150MB) are installed only when the corresponding extra is specified.
dbt Integration
For dbt integration, add the following to your packages.yml:
packages:
- package: truthound/truthound
version: ">=0.1.0"
Version Compatibility
| Extra | Default Engine | Platform Requirement |
|---|---|---|
[airflow] |
Truthound 3.x only | Apache Airflow >= 2.6.0 |
[dagster] |
Truthound 3.x only | Dagster >= 1.5.0 |
[prefect] |
Truthound 3.x only | Prefect >= 2.14.0 |
[opentelemetry] |
- | OpenTelemetry SDK >= 1.20.0 |
Advanced: Alternative and Custom Engines
Truthound 3.x is the primary documented and tested runtime for this release line. Alternative engines remain available for advanced integrations through the shared protocol and adapter boundaries.
# Great Expectations
pip install truthound-orchestration[dagster] great-expectations
# Pandera
pip install truthound-orchestration[airflow,prefect] pandera
Advanced engine options beyond Truthound:
- Great Expectations (via adapter)
- Pandera (via adapter)
- Custom engines (implement
DataQualityEngineProtocol)
Usage Examples
The following examples show the primary Truthound 3.x workflow across supported platforms.
Apache Airflow
from airflow import DAG
from airflow.utils.dates import days_ago
from truthound_airflow import DataQualityCheckOperator
with DAG(
dag_id="data_quality_pipeline",
start_date=days_ago(1),
schedule_interval="@daily",
) as dag:
# Using default Truthound engine
validate_data = DataQualityCheckOperator(
task_id="validate_user_data",
rules=[
{"column": "user_id", "type": "not_null"},
{"column": "user_id", "type": "unique"},
{"column": "email", "type": "regex", "pattern": r"^[\w\.-]+@[\w\.-]+\.\w+$"},
],
data_path="s3://data-lake/users/{{ ds }}/data.parquet",
fail_on_error=True,
)
Dagster
from dagster import asset, Definitions
from truthound_dagster import DataQualityResource, create_quality_check_asset
import polars as pl
@asset(group_name="raw")
def raw_users() -> pl.DataFrame:
return pl.read_parquet("s3://bucket/users.parquet")
validated_users = create_quality_check_asset(
name="validated_users",
upstream_asset="raw_users",
rules=[
{"column": "user_id", "type": "not_null"},
{"column": "email", "type": "regex", "pattern": r"^[\w\.-]+@[\w\.-]+\.\w+$"},
],
)
# Default: uses Truthound engine
defs = Definitions(
assets=[raw_users, validated_users],
resources={"data_quality": DataQualityResource()},
)
Prefect
from prefect import flow, task
from truthound_prefect import data_quality_check, DataQualityBlock
import polars as pl
@task
def load_data() -> pl.DataFrame:
return pl.read_parquet("s3://bucket/data.parquet")
@flow(name="quality_validation_pipeline")
async def validation_pipeline():
data = load_data()
# Using default Truthound engine
result = await data_quality_check(
data=data,
rules=[
{"column": "id", "type": "not_null"},
{"column": "amount", "type": "in_range", "min": 0},
],
)
# Or using a configured Truthound block
block = await DataQualityBlock.load("my-truthound-config")
result = await block.check(data, rules=[...])
return result
dbt
# models/staging/schema.yml
version: 2
models:
- name: stg_customers
tests:
- data_quality_check:
rules:
- column: customer_id
type: not_null
- column: customer_id
type: unique
- column: email
type: regex
pattern: "^[\\w\\.-]+@[\\w\\.-]+\\.\\w+$"
Drift Detection
from common.engines import TruthoundEngine
import polars as pl
engine = TruthoundEngine()
baseline = pl.read_parquet("baseline.parquet")
current = pl.read_parquet("current.parquet")
# Detect drift with auto method selection
result = engine.detect_drift(baseline, current)
if result.is_drifted:
print(f"Drift detected! {result.drifted_count}/{result.total_columns} columns")
for col in result.drifted_columns:
print(f" {col.column}: {col.method.name} stat={col.statistic:.4f}")
# Specify method and columns
result = engine.detect_drift(
baseline, current,
method="ks",
columns=["revenue", "user_count"],
threshold=0.05,
)
Anomaly Detection
from common.engines import TruthoundEngine
import polars as pl
engine = TruthoundEngine()
data = pl.read_parquet("data.parquet")
# Detect anomalies with Isolation Forest (default)
result = engine.detect_anomalies(data)
if result.has_anomalies:
print(f"Anomalies found! rate={result.anomaly_rate:.2%}")
for score in result.anomalies:
print(f" {score.column}: score={score.score:.4f}, anomaly={score.is_anomaly}")
# Use Z-Score detector on specific columns
result = engine.detect_anomalies(
data,
detector="z_score",
columns=["transaction_amount", "login_count"],
contamination=0.03,
)
Streaming Validation
from common.engines import TruthoundEngine
engine = TruthoundEngine()
def data_stream():
for chunk in read_large_file("data.csv", chunk_size=10000):
yield chunk
# Memory-efficient batch-by-batch validation
for batch_result in engine.check_stream(data_stream(), batch_size=5000):
print(f"Batch: {batch_result.status.name}")
if batch_result.status.name == "FAILED":
break # fail-fast
Advanced Engine Support
Alternative and custom engines remain available for advanced use cases, but they are not the primary compatibility story for the 3.x release line.
Airflow With a Custom Engine
from airflow import DAG
from airflow.utils.dates import days_ago
from truthound_airflow import DataQualityCheckOperator
from my_project import CustomEngine
with DAG(
dag_id="custom_engine_quality_pipeline",
start_date=days_ago(1),
schedule_interval="@daily",
) as dag:
validate_with_custom = DataQualityCheckOperator(
task_id="validate_with_custom",
engine=CustomEngine(),
rules=[...],
data_path="...",
)
Dagster With Great Expectations
from dagster import Definitions
from common.engines import GreatExpectationsAdapter
from truthound_dagster import DataQualityResource
defs = Definitions(
assets=[...],
resources={"data_quality": DataQualityResource(engine=GreatExpectationsAdapter())},
)
Prefect With a Custom Block Configuration
from prefect import flow
from truthound_prefect import DataQualityBlock
@flow
async def validation_pipeline(data):
block = await DataQualityBlock.load("my-custom-engine-config")
return await block.check(data, rules=[...])
Development
Environment Setup
# Clone the repository
git clone https://github.com/seadonggyun4/truthound-orchestration.git
cd truthound-orchestration
# Create virtual environment and install dependencies
uv venv
source .venv/bin/activate
uv sync --all-extras
# Install pre-commit hooks
pre-commit install
Code Quality
| Tool | Purpose | Configuration |
|---|---|---|
| Ruff | Linting and formatting | ruff.toml |
| MyPy | Static type checking | mypy.ini |
| pytest | Testing framework | pyproject.toml |
| pre-commit | Git hooks | .pre-commit-config.yaml |
Commands
# Run linter
ruff check .
# Run type checker
mypy common/
# Run the monorepo tests with importlib mode
PYTHONPATH=. pytest --import-mode=importlib
# Run the common test suite only
PYTHONPATH=. pytest --import-mode=importlib tests/common
# Run platform package tests with explicit src paths
PYTHONPATH=.:packages/prefect/src pytest --import-mode=importlib packages/prefect/tests
PYTHONPATH=.:packages/dagster/src pytest --import-mode=importlib packages/dagster/tests
# Run all pre-commit checks
pre-commit run --all-files
Repository Structure
truthound-orchestration/
├── common/ # Shared module (Complete)
│ ├── base.py # Protocols (DataQualityEngine), Config, Result types
│ ├── config.py # Environment/file configuration
│ ├── exceptions.py # Exception hierarchy
│ ├── logging.py # Structured logging, masking
│ ├── retry.py # Retry decorator, backoff strategies
│ ├── circuit_breaker.py # Circuit breaker pattern
│ ├── health.py # Health check system
│ ├── metrics.py # Metrics and distributed tracing
│ ├── rate_limiter.py # Rate limiting algorithms
│ ├── cache.py # Caching infrastructure
│ ├── serializers.py # Platform serialization
│ ├── testing.py # Mock objects, fixtures
│ ├── exporters/ # Metric exporters
│ │ └── prometheus.py # Prometheus export (Push Gateway, HTTP Server)
│ └── engines/ # Engine implementations
│ ├── base.py # DataQualityEngine Protocol
│ ├── batch.py # Batch operations (BatchExecutor, chunking)
│ ├── chain.py # Engine chain/fallback (EngineChain, strategies)
│ ├── config.py # Engine configuration system (Builder, Loader)
│ ├── context.py # Context managers (EngineContext, EngineSession)
│ ├── lifecycle.py # Lifecycle management (ManagedEngine, EngineState)
│ ├── metrics.py # Engine metrics (InstrumentedEngine, hooks)
│ ├── aggregation.py # Result aggregation (MultiEngineAggregator)
│ ├── version.py # Semantic versioning, compatibility checking
│ ├── plugin.py # Plugin discovery (Entry Point based)
│ ├── registry.py # Engine registry
│ ├── truthound.py # Truthound engine (default)
│ ├── great_expectations.py # Great Expectations adapter
│ └── pandera.py # Pandera adapter
├── packages/ # Platform integrations (Complete)
│ ├── airflow/ # Airflow Operators, Sensors, Hooks, SLA (Complete)
│ │ ├── operators/ # DataQualityCheckOperator, ProfileOperator
│ │ ├── sensors/ # DataQualitySensor
│ │ ├── hooks/ # DataQualityHook
│ │ └── sla/ # SLA monitoring integration
│ ├── dagster/ # Dagster Resources, Assets, Ops (Complete)
│ │ ├── resources/ # DataQualityResource
│ │ ├── assets/ # Quality check asset factories
│ │ ├── ops/ # data_quality_check_op
│ │ └── sensors/ # DataQualitySensor
│ ├── prefect/ # Prefect Blocks, Tasks, Flows (Complete)
│ │ ├── blocks/ # DataQualityBlock
│ │ ├── tasks/ # data_quality_check, data_quality_profile
│ │ └── flows/ # validation_flow templates
│ ├── dbt/ # dbt Tests (Complete)
│ │ ├── src/truthound_dbt/ # Python package
│ │ │ ├── adapters/ # Database adapters (Postgres, Snowflake, BigQuery, Redshift, Databricks)
│ │ │ ├── converters/ # Rule converters
│ │ │ ├── generators/ # SQL, schema, and test generators
│ │ │ ├── parsers/ # Manifest and results parsers
│ │ │ └── hooks/ # dbt hook system
│ │ ├── macros/ # SQL macros (truthound_check, truthound_rules, truthound_utils)
│ │ ├── tests/generic/ # Generic test implementations
│ │ └── integration_tests/ # Integration test suite
│ ├── mage/ # Mage AI Blocks (Complete)
│ │ ├── blocks/ # Transformer, Sensor, Condition blocks
│ │ ├── io/ # IO configuration
│ │ ├── sla/ # SLA monitoring
│ │ └── utils/ # Utilities and exceptions
│ ├── kestra/ # Kestra Scripts and Flows (Complete)
│ │ ├── scripts/ # Python script executors
│ │ ├── flows/ # YAML flow generators
│ │ ├── outputs/ # Output handlers
│ │ ├── sla/ # SLA monitoring
│ │ └── utils/ # Utilities and exceptions
│ └── enterprise/ # Enterprise extensions (Complete)
│ ├── engines/ # Enterprise engine adapters
│ │ ├── base.py # EnterpriseEngineAdapter, protocols
│ │ ├── informatica.py # Informatica Data Quality adapter
│ │ ├── talend.py # Talend Data Quality adapter
│ │ ├── ibm_infosphere.py # IBM InfoSphere adapter
│ │ ├── sap_data_services.py # SAP Data Services adapter
│ │ └── registry.py # Enterprise engine registry
│ ├── notifications/ # Multi-channel notification system
│ │ ├── types.py # NotificationChannel, NotificationLevel
│ │ ├── handlers/ # Slack, Email, Webhook, PagerDuty, Opsgenie
│ │ ├── formatters/ # Message formatting (Markdown, HTML, Plain)
│ │ ├── routing.py # NotificationRouter, routing rules
│ │ └── registry.py # NotificationRegistry
│ ├── multi_tenant/ # Multi-tenant support
│ │ ├── types.py # TenantStatus, IsolationLevel, QuotaType
│ │ ├── context.py # TenantContext, context propagation
│ │ ├── registry.py # TenantRegistry
│ │ ├── isolation.py # IsolationEnforcer, TenantIsolator
│ │ ├── storage/ # TenantStorage backends
│ │ └── middleware.py # TenantMiddleware for web frameworks
│ └── secrets/ # Secret management
│ ├── base.py # SecretProvider protocol, types
│ ├── config.py # Configuration classes
│ ├── registry.py # Provider registry
│ ├── cache.py # Secret caching (TTL, tiered)
│ ├── encryption.py # Client-side encryption
│ ├── rotation.py # Automatic secret rotation
│ ├── hooks.py # Audit logging hooks
│ └── backends/ # Storage backends (Vault, AWS, GCP, Azure)
├── docs/ # User documentation
│ ├── index.md # Documentation index
│ ├── getting-started.md # Installation and quick start
│ ├── common/ # Common module guides
│ ├── engines/ # Engine documentation
│ ├── airflow/ # Airflow integration guide
│ ├── dagster/ # Dagster integration guide
│ ├── prefect/ # Prefect integration guide
│ ├── dbt/ # dbt integration guide
│ ├── mage/ # Mage AI integration guide
│ ├── kestra/ # Kestra integration guide
│ ├── enterprise/ # Enterprise features guide
│ └── api-reference/ # API reference documentation
└── tests/ # Test suites
├── common/ # Common module tests
├── dbt/ # dbt module tests
├── mage/ # Mage module tests
├── kestra/ # Kestra module tests
└── enterprise/ # Enterprise module tests
Contributing
Contributions are welcome.
This project follows Conventional Commits for commit messages:
<type>(<scope>): <description>
feat(airflow): add TruthoundSensor for quality monitoring
fix(dagster): resolve resource initialization error
docs(common): update configuration examples
License
This project is licensed under the Apache License 2.0. See LICENSE for details.
Common Module Documentation
The common/ module provides foundational components shared across all workflow orchestration integrations.
Basic Usage
from common.engines import TruthoundEngine
# Context manager usage (recommended)
with TruthoundEngine() as engine:
result = engine.check(data, auto_schema=True)
print(f"Status: {result.status.name}")
print(f"Passed: {result.passed_count}, Failed: {result.failed_count}")
Using Different Engines
from common.engines import get_engine
# Get engine by name
engine = get_engine("truthound") # Default
engine = get_engine("great_expectations")
engine = get_engine("pandera")
Feature Detection
from common.engines.base import supports_drift, supports_anomaly, supports_streaming
engine = get_engine("truthound")
print(supports_drift(engine)) # True
print(supports_anomaly(engine)) # True
print(supports_streaming(engine)) # True
ge = get_engine("great_expectations")
print(supports_drift(ge)) # False
Enterprise Features
The enterprise module (packages/enterprise/) provides production-grade capabilities designed for large-scale, multi-tenant deployments requiring integration with commercial data quality platforms and sophisticated operational workflows.
Enterprise Engine Adapters
The framework provides adapters for leading commercial data quality platforms, enabling organizations to leverage existing investments while benefiting from the unified orchestration layer.
| Engine | Module | Description |
|---|---|---|
| Informatica Data Quality | informatica.py |
IDQ integration with scorecard support |
| Talend Data Quality | talend.py |
TMC integration with profiling capabilities |
| IBM InfoSphere Information Analyzer | ibm_infosphere.py |
Analysis and rule management |
| SAP Data Services | sap_data_services.py |
Address cleansing and validation |
from packages.enterprise.engines import (
get_enterprise_engine,
create_informatica_adapter,
create_ibm_infosphere_adapter,
)
# Retrieve engine from registry
engine = get_enterprise_engine("informatica")
# Create with explicit configuration
adapter = create_informatica_adapter(
api_endpoint="https://idq.example.com/api/v2",
api_key="your-api-key",
domain="Production",
)
# Usage follows standard DataQualityEngine protocol
with adapter:
result = adapter.check(data, rules)
Multi-Channel Notification System
The notification subsystem enables automated alerting across multiple communication channels with configurable routing, formatting, and retry logic.
| Channel | Handler | Description |
|---|---|---|
| Slack | SlackNotificationHandler |
Webhook-based Slack integration |
EmailNotificationHandler |
SMTP-based email delivery | |
| Webhook | WebhookNotificationHandler |
Generic HTTP endpoint integration |
| PagerDuty | PagerDutyNotificationHandler |
Incident management integration |
| Opsgenie | OpsgenieNotificationHandler |
Alert management integration |
from packages.enterprise.notifications import (
NotificationManager,
NotificationPayload,
NotificationLevel,
create_slack_handler,
)
# Configure notification handler
slack = create_slack_handler(
webhook_url="https://hooks.slack.com/services/...",
default_channel="#data-quality-alerts",
)
# Create notification manager
manager = NotificationManager(handlers=[slack])
# Send notification
payload = NotificationPayload(
message="Data quality check failed: 15 records with null values",
level=NotificationLevel.ERROR,
title="Validation Failure Alert",
)
result = await manager.notify(payload)
Multi-Tenant Architecture
The multi-tenant module provides comprehensive isolation, quota management, and context propagation for organizations serving multiple tenants from a shared infrastructure.
| Component | Description |
|---|---|
TenantContext |
Thread-local tenant context propagation |
TenantRegistry |
Tenant lifecycle and metadata management |
IsolationEnforcer |
Resource access control and isolation |
TenantStorage |
Backend abstraction (Memory, Redis, Database) |
TenantMiddleware |
Web framework integration (ASGI/WSGI) |
from packages.enterprise.multi_tenant import (
TenantContext,
TenantRegistry,
IsolationLevel,
create_memory_storage,
)
# Initialize tenant registry
storage = create_memory_storage()
registry = TenantRegistry(storage=storage)
# Register tenant
await registry.register_tenant(
tenant_id="acme-corp",
name="ACME Corporation",
tier=TenantTier.ENTERPRISE,
isolation_level=IsolationLevel.DEDICATED,
)
# Set tenant context for current execution
with TenantContext(tenant_id="acme-corp"):
# All operations within this context are tenant-scoped
result = engine.check(data, rules)
Isolation Levels:
| Level | Description | Use Case |
|---|---|---|
SHARED |
Resources shared between tenants | Cost-optimized multi-tenant |
LOGICAL |
Logical separation with shared infrastructure | Standard multi-tenant |
PHYSICAL |
Physical separation (e.g., separate databases) | Compliance requirements |
DEDICATED |
Fully dedicated resources per tenant | Enterprise isolation |
Secret Management
The secrets module provides a unified interface for secret storage and retrieval across multiple backend systems, with support for caching, encryption, rotation, and audit logging.
| Backend | Description |
|---|---|
| HashiCorp Vault | KV v1/v2 secret engine integration |
| AWS Secrets Manager | AWS-native secret storage |
| GCP Secret Manager | Google Cloud secret storage |
| Azure Key Vault | Azure-native secret storage |
| Environment Variables | Environment-based secret injection |
| Encrypted Files | Local encrypted file storage |
from packages.enterprise.secrets import (
get_secret_registry,
get_secret,
set_secret,
)
from packages.enterprise.secrets.backends import InMemorySecretProvider
# Initialize registry and register provider
registry = get_secret_registry()
registry.register("memory", InMemorySecretProvider())
# Store and retrieve secrets
set_secret("database/password", "secret-value")
secret = get_secret("database/password")
Security Features:
| Feature | Description |
|---|---|
| Client-side Encryption | Fernet, AES-GCM, ChaCha20-Poly1305 algorithms |
| Secret Caching | TTL-based caching with tiered cache support |
| Automatic Rotation | Configurable rotation schedules with generators |
| Audit Logging | Comprehensive audit hooks (values never logged) |
| Multi-Tenant Isolation | Tenant-scoped secret namespacing |
Observability
The framework provides Prometheus-compatible metrics export for monitoring and alerting integration.
from common.exporters import (
create_prometheus_exporter,
create_pushgateway_exporter,
create_prometheus_http_server,
)
# Create Prometheus exporter
exporter = create_prometheus_exporter(
namespace="truthound",
job_name="data_quality",
)
# Export metrics to Push Gateway
pushgateway = create_pushgateway_exporter(
gateway_url="http://pushgateway:9091",
job_name="batch_job",
)
# Expose HTTP endpoint for scraping
server = create_prometheus_http_server(port=9090)
server.start()
Export Capabilities:
| Feature | Description |
|---|---|
| Push Gateway | Batch job metrics via HTTP POST |
| HTTP Server | Scrape endpoint exposure |
| Multi-Tenant | Tenant-aware metric isolation |
| Async Support | Non-blocking export operations |
Related Projects
| Project | Description |
|---|---|
| Truthound | Core data quality validation framework |
| Apache Airflow | Workflow orchestration platform |
| Dagster | Data orchestration platform |
| Prefect | Workflow automation platform |
| dbt | Data transformation tool |
Support
- Issues: GitHub Issues
- Documentation: Truthound Docs
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file truthound_orchestration-3.0.0.tar.gz.
File metadata
- Download URL: truthound_orchestration-3.0.0.tar.gz
- Upload date:
- Size: 818.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
26a11cfaa1fddce7c9deccb8f068651829c62b66a2fbd9d181d664acccc562dc
|
|
| MD5 |
cb3d1ceacdb5b41b9ea6b5c57ece3076
|
|
| BLAKE2b-256 |
920fdc6097b28dac80ab66eb9290fd870e05db944faa7737022e217c37d8f95e
|
File details
Details for the file truthound_orchestration-3.0.0-py3-none-any.whl.
File metadata
- Download URL: truthound_orchestration-3.0.0-py3-none-any.whl
- Upload date:
- Size: 1.1 MB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d92e397ba302e16c841963ecba281e69f58b4fe26fc7950aad9ed64d2b583f77
|
|
| MD5 |
0aec3e2a3bf1a1c3d6acecacc77e55e4
|
|
| BLAKE2b-256 |
cf5abfb00fd40d8d4e9df96798054eabea798a1d20ec97435e6975bab06d896d
|