Production monitoring for ML model drift - detect data drift, concept drift, and performance degradation
Project description
Model Drift Detector
A comprehensive Python library for detecting and monitoring model drift in production ML systems. Supports data drift, concept drift, prediction drift, and performance degradation detection with multiple statistical tests.
Features
- Multiple Drift Types: Data drift, concept drift, prediction drift, feature drift, label drift, and covariate shift
- Statistical Tests: KS test, Chi-square, PSI, JS divergence, Wasserstein distance, and more
- Performance Monitoring: Track accuracy, precision, recall, F1, AUC, MSE, and custom metrics
- Alerting System: Configurable alerts with multiple severity levels
- Baseline Management: Create and manage reference data snapshots
- Zero Dependencies Core: Works with pure Python, optional NumPy/pandas support
- Async Support: Non-blocking drift detection for high-throughput systems
- Comprehensive Reporting: Detailed reports with recommendations
Installation
pip install model-drift-detector
With optional dependencies:
# With NumPy/SciPy for advanced statistical tests
pip install model-drift-detector[numpy]
# With pandas for DataFrame support
pip install model-drift-detector[pandas]
# Full installation
pip install model-drift-detector[all]
Quick Start
Basic Drift Detection
from model_drift_detector import DriftDetector
# Initialize detector
detector = DriftDetector()
# Reference data (training distribution)
reference_data = {
"age": [25, 30, 35, 40, 45, 50, 55, 60],
"income": [30000, 45000, 55000, 65000, 75000, 85000, 95000, 105000],
"category": ["A", "B", "A", "C", "B", "A", "C", "B"]
}
# Current production data
current_data = {
"age": [35, 40, 45, 50, 55, 60, 65, 70], # Shifted older
"income": [35000, 50000, 60000, 70000, 80000, 90000, 100000, 110000],
"category": ["B", "C", "C", "C", "B", "C", "C", "C"] # Category shift
}
# Detect drift
report = detector.detect(reference_data, current_data)
# Check results
print(f"Drift detected: {report.features_with_drift}/{report.total_features} features")
print(f"Overall severity: {report.overall_severity.value}")
for result in report.feature_results:
if result.is_drift_detected:
print(f" - {result.feature_name}: {result.drift_type.value} "
f"(severity: {result.severity.value})")
Monitor with Baseline
from model_drift_detector import DriftMonitor
# Create monitor with baseline
monitor = DriftMonitor(name="production-model-v1")
# Set baseline from training data
monitor.set_baseline(training_data)
# Monitor incoming data batches
for batch in production_batches:
report = monitor.check(batch)
if report.features_with_drift > 0:
print(f"⚠️ Drift detected in batch!")
for feature in report.get_drifted_features():
print(f" - {feature}")
Configure Alerts
from model_drift_detector import DriftMonitor, MonitoringConfig, AlertLevel
# Custom configuration
config = MonitoringConfig(
name="critical-model",
features=["age", "income", "category"],
drift_threshold=0.05,
severity_thresholds={
"low": 0.05,
"medium": 0.1,
"high": 0.2,
"critical": 0.3
},
alert_on_drift=True
)
# Alert callback
def handle_alert(alert):
if alert.level == AlertLevel.CRITICAL:
send_pager_duty_alert(alert.message)
elif alert.level == AlertLevel.ERROR:
send_slack_notification(alert.message)
else:
log_alert(alert)
monitor = DriftMonitor(config=config)
monitor.on_alert(handle_alert)
Performance Drift Detection
from model_drift_detector import PerformanceMonitor
# Track model performance over time
perf_monitor = PerformanceMonitor(
baseline_metrics={
"accuracy": 0.95,
"f1_score": 0.92,
"auc_roc": 0.98
},
degradation_threshold=0.05
)
# Check current performance
current_metrics = {
"accuracy": 0.89, # Degraded
"f1_score": 0.85, # Degraded
"auc_roc": 0.96
}
degradation = perf_monitor.check(current_metrics)
if degradation.is_degraded:
print(f"Performance degradation detected!")
for metric, drop in degradation.metrics_dropped.items():
print(f" - {metric}: dropped {drop:.2%}")
API Reference
DriftDetector
Main class for drift detection.
from model_drift_detector import DriftDetector, StatisticalTest
detector = DriftDetector(
default_test=StatisticalTest.KS_TEST, # Default statistical test
p_value_threshold=0.05, # Significance threshold
auto_detect_types=True # Auto-detect feature types
)
# Detect drift between two datasets
report = detector.detect(
reference_data, # Reference/baseline data
current_data, # Current production data
features=None, # Specific features (None = all)
tests=None # Override tests per feature
)
DriftMonitor
Continuous monitoring with baseline management.
from model_drift_detector import DriftMonitor
monitor = DriftMonitor(
name="my-model",
config=None, # Optional MonitoringConfig
baseline=None # Optional initial baseline
)
# Set baseline
monitor.set_baseline(data, sample_size=10000)
# Check for drift
report = monitor.check(current_data)
# Get baseline info
baseline = monitor.get_baseline()
# Update baseline (e.g., after retraining)
monitor.update_baseline(new_data)
Statistical Tests
Available statistical tests for drift detection:
| Test | Use Case | Data Type |
|---|---|---|
KS_TEST |
Distribution comparison | Numerical |
CHI_SQUARE |
Category frequency | Categorical |
PSI |
Population stability | Both |
JS_DIVERGENCE |
Distribution similarity | Both |
KL_DIVERGENCE |
Information loss | Both |
WASSERSTEIN |
Distribution distance | Numerical |
T_TEST |
Mean comparison | Numerical |
MANN_WHITNEY |
Rank comparison | Numerical |
CRAMERS_V |
Association strength | Categorical |
from model_drift_detector import StatisticalTest
# Use specific tests per feature
detector.detect(
reference_data,
current_data,
tests={
"age": StatisticalTest.KS_TEST,
"income": StatisticalTest.WASSERSTEIN,
"category": StatisticalTest.CHI_SQUARE
}
)
DriftReport
Comprehensive drift analysis report.
report = detector.detect(reference, current)
# Access report properties
report.report_id # Unique identifier
report.created_at # Timestamp
report.total_features # Number of features analyzed
report.features_with_drift # Number with detected drift
report.drift_rate # Ratio of drifted features
report.overall_drift_score # Combined drift score
report.overall_severity # Overall severity level
report.feature_results # List of DriftResult objects
report.recommendations # Suggested actions
# Filter results
drifted = report.get_drifted_features()
critical = report.get_results_by_severity(DriftSeverity.CRITICAL)
# Export
report_dict = report.to_dict()
report_json = report.to_json()
Severity Levels
from model_drift_detector import DriftSeverity
DriftSeverity.NONE # No drift detected
DriftSeverity.LOW # Minor drift, monitor
DriftSeverity.MEDIUM # Moderate drift, investigate
DriftSeverity.HIGH # Significant drift, action needed
DriftSeverity.CRITICAL # Severe drift, immediate action
Advanced Usage
Custom Statistical Tests
from model_drift_detector import DriftDetector, DriftResult
class CustomDriftDetector(DriftDetector):
def custom_test(self, reference, current, feature_name):
"""Implement custom drift test."""
# Your custom logic
statistic = calculate_custom_statistic(reference, current)
return DriftResult(
feature_name=feature_name,
drift_type=DriftType.DATA_DRIFT,
test_used=StatisticalTest.KS_TEST, # Or custom
statistic=statistic,
p_value=None,
threshold=0.1,
is_drift_detected=statistic > 0.1,
severity=self._calculate_severity(statistic)
)
Streaming Detection
from model_drift_detector import StreamingDetector
# Window-based streaming detection
stream_detector = StreamingDetector(
window_size=1000,
slide_size=100,
baseline_data=training_data
)
# Process streaming data
for record in data_stream:
stream_detector.add(record)
if stream_detector.window_ready():
result = stream_detector.check_window()
if result.is_drift_detected:
handle_drift(result)
Multi-Model Monitoring
from model_drift_detector import DriftMonitorRegistry
# Central registry for multiple models
registry = DriftMonitorRegistry()
# Register monitors
registry.register("model-a", DriftMonitor(name="model-a"))
registry.register("model-b", DriftMonitor(name="model-b"))
# Set baselines
for name, monitor in registry.items():
monitor.set_baseline(baselines[name])
# Check all models
for name, monitor in registry.items():
report = monitor.check(current_data[name])
if report.features_with_drift > 0:
alert_team(name, report)
Integration with MLflow
import mlflow
from model_drift_detector import DriftMonitor
monitor = DriftMonitor(name="mlflow-tracked-model")
monitor.set_baseline(training_data)
# Log drift metrics to MLflow
with mlflow.start_run():
report = monitor.check(production_data)
mlflow.log_metric("drift_score", report.overall_drift_score)
mlflow.log_metric("features_drifted", report.features_with_drift)
mlflow.log_metric("drift_rate", report.drift_rate)
for result in report.feature_results:
mlflow.log_metric(
f"drift_{result.feature_name}",
result.statistic
)
Database Baseline Storage
from model_drift_detector import DriftMonitor, BaselineStore
# Custom baseline storage
class SQLBaselineStore(BaselineStore):
def __init__(self, connection_string):
self.conn = create_connection(connection_string)
def save(self, snapshot):
# Save to database
pass
def load(self, snapshot_id):
# Load from database
pass
# Use custom storage
store = SQLBaselineStore("postgresql://...")
monitor = DriftMonitor(name="db-backed", baseline_store=store)
Comparison with Other Libraries
| Feature | model-drift-detector | evidently | alibi-detect |
|---|---|---|---|
| Zero dependencies | ✅ | ❌ | ❌ |
| Streaming support | ✅ | ❌ | ✅ |
| Custom tests | ✅ | ✅ | ✅ |
| Built-in alerting | ✅ | ❌ | ❌ |
| Async support | ✅ | ❌ | ❌ |
| Pure Python | ✅ | ❌ | ❌ |
Best Practices
- Set Representative Baselines: Use diverse, representative training data
- Monitor Multiple Drift Types: Combine data drift and performance drift
- Tune Thresholds: Adjust based on your tolerance for false positives
- Regular Baseline Updates: Update after model retraining
- Feature-Specific Tests: Use appropriate tests for each feature type
- Alert Thoughtfully: Avoid alert fatigue with proper severity mapping
Troubleshooting
High False Positive Rate
- Increase
drift_threshold(e.g., 0.05 → 0.1) - Use larger sample sizes for comparison
- Consider domain-specific thresholds
Missing Drift Detection
- Decrease
drift_threshold - Use more sensitive tests (e.g., Wasserstein)
- Check feature type detection
Performance Issues
- Reduce
window_sizefor streaming - Use batch processing for large datasets
- Enable sampling for very large datasets
License
MIT License - Pranay M
Contributing
Contributions welcome! See CONTRIBUTING.md for guidelines.
Changelog
1.0.0
- Initial release
- Core drift detection with 9 statistical tests
- Monitoring and alerting system
- Baseline management
- Streaming support
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file model_drift_detector-0.1.0.tar.gz.
File metadata
- Download URL: model_drift_detector-0.1.0.tar.gz
- Upload date:
- Size: 21.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a281062a46cafd65e12b1498c37ef43b9d503ba619f796f905e97741ee83c8bc
|
|
| MD5 |
3777ba65a1ed093d8e76a9993ea413ea
|
|
| BLAKE2b-256 |
75c8d5d82e6d9bffb7e53fa532c51081df6277e05a1a3577c28f89fc34d13a5c
|
File details
Details for the file model_drift_detector-0.1.0-py3-none-any.whl.
File metadata
- Download URL: model_drift_detector-0.1.0-py3-none-any.whl
- Upload date:
- Size: 18.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6405b56ea61eef2f6fd7e87f20d1c0efd68995f2bd5249a7f196dd96c41fd145
|
|
| MD5 |
1cbd00bacc38a62a7e82fa22db6cfc5c
|
|
| BLAKE2b-256 |
4c06dd47b7950ce3fc252aee2154b964e16c6ca076272562afeea1732a82c1f5
|