No project description provided
Project description
DAGnostics 🔍
DAGnostics is an intelligent ETL monitoring system that leverages LLMs to analyze, categorize, and report DAG failures in data pipelines. It provides automated parsing of DAG errors and is designed to generate comprehensive statistics for better observability.
🌟 Features (Current Implementation)
- Intelligent Error Analysis: Automated DAG error log parsing and categorization using multiple LLM providers (Ollama, OpenAI, Anthropic, Gemini)
- Smart Baseline System: Advanced error pattern recognition using Drain3 log clustering with baseline creation from successful task runs
- Few-Shot Learning: Configurable prompts with curated examples for improved error extraction accuracy
- Multi-Provider LLM Support: Seamless switching between local (Ollama) and cloud LLM providers
- Airflow Integration: Direct integration with Airflow API and database for real-time log collection
- Configurable Prompts: Customize LLM prompts without code deployment via configuration files
- Anomaly Detection: Identify new error patterns by comparing against successful task baselines
- Web Dashboard UI: Modern dashboard for monitoring (backend API may be incomplete)
- CLI Interface: Comprehensive command-line tools for analysis and monitoring
- Smart Alerting: SMS/Email notifications with concise error summaries
- Daemon Service: Background monitoring service for continuous error detection
Planned / Not Yet Implemented:
- Report generation and export (HTML, JSON, etc.)
- Full integration with existing ETL monitoring systems
- Advanced analytics and trend analysis
System Architecture
🛠 Tech Stack
Core Framework
- Python 3.10+ with modern async/await patterns
- uv for lightning-fast dependency management
- Pydantic for type-safe configuration and data validation
- SQLAlchemy for database operations
LLM & AI Components
- Ollama for local LLM deployment (privacy-focused, cost-effective)
- OpenAI API (GPT-3.5, GPT-4) for cloud-based analysis
- Anthropic Claude for advanced reasoning capabilities
- Google Gemini for multimodal analysis
- Drain3 for intelligent log clustering and pattern recognition
Web & API
- FastAPI for high-performance REST API endpoints
- Typer for intuitive CLI interface
- Jinja2 for web dashboard templating
Data Processing
- Pandas for log data analysis
- PyYAML for configuration management
- Requests for HTTP integrations (Airflow API, SMS gateways)
📋 Prerequisites
- Python 3.10 or higher
- uv installed on your system (
pip install uv) - Ollama installed and running locally (for default LLM usage)
- Access to your ETL system's logs
🚀 Quick Start
- Navigate to the project and install dependencies:
cd dagnostics
uv sync
- Set up pre-commit hooks:
uv run pre-commit install
- Set up Ollama with your preferred model:
ollama pull mistral
- Configure your environment:
cp config/config.yaml.example config/config.yaml
# Edit config.yaml with your Airflow credentials and LLM provider settings
- Test the system with built-in few-shot learning:
# Analyze a specific task failure (replace with actual values)
uv run dagnostics analyze my_dag my_task 2025-08-13T10:00:00 1 --llm ollama
# Start background monitoring daemon
uv run dagnostics daemon start
# Check daemon status
uv run dagnostics daemon status
📁 Project Structure
dagnostics/
├── data/
│ ├── clusters/ # Drain3 cluster persistence & baselines
│ ├── raw/ # Raw log files
│ ├── processed/ # Processed analysis results
│ └── training_data.jsonl # Generated training datasets
├── src/dagnostics/
│ ├── api/ # FastAPI REST API
│ ├── cli/ # Command-line interface
│ ├── core/ # Models, config, database
│ ├── daemon/ # Background monitoring service
│ ├── llm/ # LLM providers & configurable prompts
│ ├── clustering/ # Drain3 log clustering & baselines
│ ├── heuristics/ # Pattern filtering engines
│ ├── monitoring/ # Airflow integration & collectors
│ ├── reporting/ # Report generation (stub)
│ ├── web/ # Web dashboard UI
│ └── utils/ # Helpers, logging, SMS
├── config/
│ ├── config.yaml # Main configuration
│ ├── drain3.ini # Drain3 clustering settings
│ ├── filter_patterns.yaml # Heuristic filtering patterns
│ └── logging.yaml # Logging configuration
├── tests/ # Test suites
├── scripts/ # Development & deployment scripts
└── docs/ # Documentation
🔧 Configuration
DAGnostics is highly configurable through config/config.yaml. Key configuration areas include:
Core Configuration Sections
- Airflow: Connection settings, database URL, authentication
- LLM Providers: Configure multiple LLM providers (Ollama, OpenAI, Anthropic, Gemini)
- Prompts: Customize prompts and add few-shot examples for better accuracy
- Monitoring: Baseline settings, check intervals, log processing limits
- Drain3: Log clustering parameters for pattern recognition
- Alerts: SMS/Email notification settings
- Database: DAGnostics internal database configuration
Configurable Prompts System
DAGnostics now supports configurable prompts with few-shot learning:
prompts:
# Few-shot examples for better error extraction
few_shot_examples:
error_extraction:
- log_context: |
[2025-08-13 10:15:25] ERROR: psycopg2.OperationalError: FATAL: database "analytics_db" does not exist
extracted_response: |
{
"error_message": "psycopg2.OperationalError: FATAL: database \"analytics_db\" does not exist",
"confidence": 0.95,
"category": "configuration_error",
"severity": "high",
"reasoning": "Database connection error due to missing database"
}
# Custom prompt templates (override defaults)
templates:
error_extraction: |
You are an expert ETL engineer analyzing Airflow task failure logs...
{few_shot_examples}
Now analyze this log:
{log_context}
LLM Provider Configuration
llm:
default_provider: "ollama" # ollama, openai, anthropic, gemini
providers:
ollama:
base_url: "http://localhost:11434"
model: "mistral"
temperature: 0.1
gemini:
api_key: "YOUR_API_KEY"
model: "gemini-2.5-flash"
temperature: 0.0
Customizing Prompts and Examples
Adding Your Own Few-Shot Examples
Edit config/config.yaml to add domain-specific examples:
prompts:
few_shot_examples:
error_extraction:
- log_context: |
[2025-08-13 15:30:25] ERROR: Your custom error pattern here
[2025-08-13 15:30:25] ERROR: Additional context
extracted_response: |
{
"error_message": "Extracted error message",
"confidence": 0.90,
"category": "configuration_error",
"severity": "high",
"reasoning": "Why this is the root cause"
}
Creating Custom Prompt Templates
Override any default prompt by adding to config.yaml:
prompts:
templates:
sms_error_extraction: |
Custom SMS prompt template here.
Extract concise error for: {dag_id}.{task_id}
Log: {log_context}
Best Practices for Prompt Customization
- Include Diverse Examples: Cover different error types, severity levels, and log formats
- Be Specific: Include actual log snippets and exact expected outputs
- Test Iteratively: Use the CLI to test prompt changes before deployment
- Keep Examples Current: Update examples as your systems evolve
- Limit Example Count: 3-5 examples per prompt type for optimal performance
🧠 How It Works
Smart Baseline System
DAGnostics uses an intelligent baseline approach for error detection:
- Baseline Creation: For each DAG task, DAGnostics analyzes successful runs to create a "normal behavior" baseline using Drain3 log clustering
- Anomaly Detection: When tasks fail, logs are compared against baselines to identify truly anomalous patterns vs. known issues
- Adaptive Learning: Baselines are automatically refreshed based on configurable intervals to adapt to evolving systems
Few-Shot Learning for Error Extraction
The system includes curated examples covering common Airflow error patterns:
- Database Connection Errors: PostgreSQL, MySQL connection failures
- Data Quality Issues: Empty files, schema mismatches, validation failures
- Dependency Failures: Upstream task failures, service unavailability
- Timeout Errors: Query timeouts, connection timeouts, deadlocks
- Permission Errors: S3 access denied, database permission issues
- Resource Errors: Memory limits, disk space, connection pools
These examples help LLMs provide more accurate error categorization and confidence scores.
Multi-Provider LLM Support
- Local Models (Ollama): Privacy-focused, no external API calls, cost-effective
- Cloud Models (OpenAI, Anthropic, Gemini): Higher accuracy, latest models, requires API keys
- Provider-Specific Optimizations: Customized prompts and parameters per provider
- Fallback Mechanisms: Heuristic error extraction when LLM fails
📊 Usage
Command-Line Interface (CLI)
DAGnostics provides a CLI for managing the monitoring and reporting system. Use the following commands:
Start the System (Stub)
uv run dagnostics start
Note: The monitoring daemon is not yet implemented. This command is a placeholder.
Analyze a Specific Task Failure
uv run dagnostics analyze <dag-id> <task-id> <run-id> <try-number>
- Options:
--llm/-l: LLM provider (ollama,openai,anthropic,gemini)--format/-f: Output format (json,yaml,text)--verbose/-v: Verbose output--baseline: Use baseline comparison for anomaly detection
Monitor DAGs (Daemon Service)
# Start the monitoring daemon
uv run dagnostics daemon start
# Stop the daemon
uv run dagnostics daemon stop
# Check daemon status
uv run dagnostics daemon status
Baseline Management
# Create baseline for a specific DAG task
uv run dagnostics baseline create <dag-id> <task-id>
# List existing baselines
uv run dagnostics baseline list
# Refresh stale baselines
uv run dagnostics baseline refresh
Generate a Report (Not Yet Implemented)
uv run dagnostics report
uv run dagnostics report --daily
Note: Report generation and export are not yet implemented. These commands are placeholders.
Python API
# LLM Engine Usage
from dagnostics.llm.engine import LLMEngine, OllamaProvider
from dagnostics.core.config import load_config
from dagnostics.core.models import LogEntry
# Load configuration with custom prompts
config = load_config()
# Initialize LLM engine with config
provider = OllamaProvider()
engine = LLMEngine(provider, config=config)
# Analyze log entries (few-shot learning applied automatically)
log_entries = [LogEntry(...)]
analysis = engine.extract_error_message(log_entries)
print(f"Error: {analysis.error_message}")
print(f"Category: {analysis.category}")
print(f"Confidence: {analysis.confidence}")
# Baseline Management
from dagnostics.clustering.log_clusterer import LogClusterer
clusterer = LogClusterer(config)
baseline_clusters = clusterer.build_baseline_clusters(successful_logs, dag_id, task_id)
anomalous_logs = clusterer.identify_anomalous_patterns(failed_logs, dag_id, task_id)
🛠 Development Tasks
The tasks/ folder contains utility scripts for common development tasks, such as setting up the environment, linting, formatting, and running tests. These tasks are powered by Invoke.
Available Tasks
Run the following commands from the root of the project:
| Command | Description |
|---|---|
invoke dev.setup |
Set up the development environment. |
invoke dev.clean |
Clean build artifacts and temporary files. |
invoke dev.format |
Format the code using black and isort. |
invoke dev.lint |
Lint the code using flake8 and mypy. |
invoke dev.test |
Run all tests with pytest. |
🧪 Testing
# Run all tests
uv run pytest
# Run with coverage
uv run pytest --cov=dagnostics
# Run specific test file
uv run pytest tests/llm/test_parser.py
📝 Development
- Create a new branch:
git checkout -b feature/amazing-feature
- Make your changes and ensure tests pass:
./scripts/test.sh
- Format and lint your code:
./scripts/lint.sh
- Commit your changes:
git commit -m "Add amazing feature"
🌐 Web Dashboard
A modern web dashboard UI is included in src/dagnostics/web/. It provides:
- Monitor status and statistics (requires backend API)
- Error trends and categories (requires backend API)
- Task analysis form (requires backend API)
Note: The backend API endpoints for the dashboard may be incomplete or stubbed. Some dashboard features may not display real data yet.
🚧 Limitations / Roadmap
✅ Implemented Features
- ✅ LLM Integration: Multi-provider support (Ollama, OpenAI, Anthropic, Gemini) with provider-specific optimizations
- ✅ Smart Baselines: Drain3-based log clustering with anomaly detection
- ✅ Configurable Prompts: Few-shot learning system with customizable templates
- ✅ Daemon Service: Background monitoring with configurable intervals
- ✅ CLI Interface: Comprehensive command-line tools for analysis and management
- ✅ Alerting: SMS/Email notifications with concise error summaries
- ✅ Python API: Core analysis and baseline management APIs
🚧 In Progress / Roadmap
- Report generation and export: HTML, JSON, PDF report formats (stub implementation)
- Advanced Analytics: Trend analysis, error correlation, predictive insights
- Web Dashboard Backend: Complete REST API endpoints for dashboard functionality
- Fine-tuning Support: Custom model fine-tuning for domain-specific error patterns
- Integration Plugins: Connectors for popular monitoring tools (Datadog, Grafana, etc.)
- Advanced Filtering: ML-based log filtering and noise reduction
See CONTRIBUTING.md for how to help!
🤝 Contributing
See CONTRIBUTING.md for detailed guidelines.
📄 License
This project is licensed under the MIT License - see the LICENSE file for details.
🙏 Acknowledgments
- Inspired by the daily L1 support rotation practice and the need for intelligent error analysis
- Built with modern Python ecosystem: uv, FastAPI, Typer, Pydantic
- LLM Integration: Ollama (local), OpenAI, Anthropic Claude, Google Gemini
- Log Analysis: Drain3 for intelligent log clustering and pattern recognition
- Few-Shot Learning: Curated examples for improved error extraction
- Special thanks to the open-source community and enterprise ETL teams who inspired this project
📞 Support
For questions and support, please open an issue in the GitHub repository.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dagnostics-0.4.0.tar.gz.
File metadata
- Download URL: dagnostics-0.4.0.tar.gz
- Upload date:
- Size: 1.9 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.16
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c9245d24ddbd610e20a185f9f6b31bf77a04874e3d7072f9664efb13a0639caa
|
|
| MD5 |
cf19ed286fea0cbec152374709b7acaa
|
|
| BLAKE2b-256 |
2d1ac750ae89dca7c496c9e312c1cce3333e01b746b4665abe4b8bcbc977d0f8
|
File details
Details for the file dagnostics-0.4.0-py3-none-any.whl.
File metadata
- Download URL: dagnostics-0.4.0-py3-none-any.whl
- Upload date:
- Size: 99.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.16
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
34cb49be14d162efae25022dfaccec29a3918310f9ef1df58d2ff07cc23fdc7c
|
|
| MD5 |
c076d94478dd42a1453cf8a4fb3a4840
|
|
| BLAKE2b-256 |
ad8c8c46cb07e0e4f52084c0b434a00a1703178d72fd7e40f61c6999675211ef
|