Skip to main content

No project description provided

Project description

DAGnostics 🔍

DAGnostics is an intelligent ETL monitoring system that leverages LLMs to analyze, categorize, and report DAG failures in data pipelines. It provides automated parsing of DAG errors and is designed to generate comprehensive statistics for better observability.

🌟 Features (Current Implementation)

  • Intelligent Error Analysis: Automated DAG error log parsing and categorization using multiple LLM providers (Ollama, OpenAI, Anthropic, Gemini)
  • Smart Baseline System: Advanced error pattern recognition using Drain3 log clustering with baseline creation from successful task runs
  • Few-Shot Learning: Configurable prompts with curated examples for improved error extraction accuracy
  • Multi-Provider LLM Support: Seamless switching between local (Ollama) and cloud LLM providers
  • Airflow Integration: Direct integration with Airflow API and database for real-time log collection
  • Configurable Prompts: Customize LLM prompts without code deployment via configuration files
  • Anomaly Detection: Identify new error patterns by comparing against successful task baselines
  • Web Dashboard UI: Modern dashboard for monitoring (backend API may be incomplete)
  • CLI Interface: Comprehensive command-line tools for analysis and monitoring
  • Smart Alerting: SMS/Email notifications with concise error summaries
  • Daemon Service: Background monitoring service for continuous error detection

Planned / Not Yet Implemented:

  • Report generation and export (HTML, JSON, etc.)
  • Full integration with existing ETL monitoring systems
  • Advanced analytics and trend analysis

System Architecture

System Architecture

🛠 Tech Stack

Core Framework

  • Python 3.10+ with modern async/await patterns
  • uv for lightning-fast dependency management
  • Pydantic for type-safe configuration and data validation
  • SQLAlchemy for database operations

LLM & AI Components

  • Ollama for local LLM deployment (privacy-focused, cost-effective)
  • OpenAI API (GPT-3.5, GPT-4) for cloud-based analysis
  • Anthropic Claude for advanced reasoning capabilities
  • Google Gemini for multimodal analysis
  • Drain3 for intelligent log clustering and pattern recognition

Web & API

  • FastAPI for high-performance REST API endpoints
  • Typer for intuitive CLI interface
  • Jinja2 for web dashboard templating

Data Processing

  • Pandas for log data analysis
  • PyYAML for configuration management
  • Requests for HTTP integrations (Airflow API, SMS gateways)

📋 Prerequisites

  • Python 3.10 or higher
  • uv installed on your system (pip install uv)
  • Ollama installed and running locally (for default LLM usage)
  • Access to your ETL system's logs

🚀 Quick Start

  1. Navigate to the project and install dependencies:
cd dagnostics
uv sync
  1. Set up pre-commit hooks:
uv run pre-commit install
  1. Set up Ollama with your preferred model:
ollama pull mistral
  1. Configure your environment:
cp config/config.yaml.example config/config.yaml
# Edit config.yaml with your Airflow credentials and LLM provider settings
  1. Test the system with built-in few-shot learning:
# Analyze a specific task failure (replace with actual values)
uv run dagnostics analyze my_dag my_task 2025-08-13T10:00:00 1 --llm ollama

# Start background monitoring daemon
uv run dagnostics daemon start

# Check daemon status
uv run dagnostics daemon status

📁 Project Structure

dagnostics/
├── data/
│   ├── clusters/              # Drain3 cluster persistence & baselines
│   ├── raw/                   # Raw log files
│   ├── processed/             # Processed analysis results
│   └── training_data.jsonl    # Generated training datasets
├── src/dagnostics/
│   ├── api/                   # FastAPI REST API
│   ├── cli/                   # Command-line interface
│   ├── core/                  # Models, config, database
│   ├── daemon/                # Background monitoring service
│   ├── llm/                   # LLM providers & configurable prompts
│   ├── clustering/            # Drain3 log clustering & baselines
│   ├── heuristics/            # Pattern filtering engines
│   ├── monitoring/            # Airflow integration & collectors
│   ├── reporting/             # Report generation (stub)
│   ├── web/                   # Web dashboard UI
│   └── utils/                 # Helpers, logging, SMS
├── config/
│   ├── config.yaml            # Main configuration
│   ├── drain3.ini            # Drain3 clustering settings
│   ├── filter_patterns.yaml  # Heuristic filtering patterns
│   └── logging.yaml          # Logging configuration
├── tests/                     # Test suites
├── scripts/                   # Development & deployment scripts
└── docs/                     # Documentation

🔧 Configuration

DAGnostics is highly configurable through config/config.yaml. Key configuration areas include:

Core Configuration Sections

  • Airflow: Connection settings, database URL, authentication
  • LLM Providers: Configure multiple LLM providers (Ollama, OpenAI, Anthropic, Gemini)
  • Prompts: Customize prompts and add few-shot examples for better accuracy
  • Monitoring: Baseline settings, check intervals, log processing limits
  • Drain3: Log clustering parameters for pattern recognition
  • Alerts: SMS/Email notification settings
  • Database: DAGnostics internal database configuration

Configurable Prompts System

DAGnostics now supports configurable prompts with few-shot learning:

prompts:
  # Few-shot examples for better error extraction
  few_shot_examples:
    error_extraction:
      - log_context: |
          [2025-08-13 10:15:25] ERROR: psycopg2.OperationalError: FATAL: database "analytics_db" does not exist
        extracted_response: |
          {
            "error_message": "psycopg2.OperationalError: FATAL: database \"analytics_db\" does not exist",
            "confidence": 0.95,
            "category": "configuration_error",
            "severity": "high",
            "reasoning": "Database connection error due to missing database"
          }

  # Custom prompt templates (override defaults)
  templates:
    error_extraction: |
      You are an expert ETL engineer analyzing Airflow task failure logs...
      {few_shot_examples}

      Now analyze this log:
      {log_context}

LLM Provider Configuration

llm:
  default_provider: "ollama"  # ollama, openai, anthropic, gemini
  providers:
    ollama:
      base_url: "http://localhost:11434"
      model: "mistral"
      temperature: 0.1
    gemini:
      api_key: "YOUR_API_KEY"
      model: "gemini-2.5-flash"
      temperature: 0.0

Customizing Prompts and Examples

Adding Your Own Few-Shot Examples

Edit config/config.yaml to add domain-specific examples:

prompts:
  few_shot_examples:
    error_extraction:
      - log_context: |
          [2025-08-13 15:30:25] ERROR: Your custom error pattern here
          [2025-08-13 15:30:25] ERROR: Additional context
        extracted_response: |
          {
            "error_message": "Extracted error message",
            "confidence": 0.90,
            "category": "configuration_error",
            "severity": "high",
            "reasoning": "Why this is the root cause"
          }

Creating Custom Prompt Templates

Override any default prompt by adding to config.yaml:

prompts:
  templates:
    sms_error_extraction: |
      Custom SMS prompt template here.
      Extract concise error for: {dag_id}.{task_id}
      Log: {log_context}

Best Practices for Prompt Customization

  1. Include Diverse Examples: Cover different error types, severity levels, and log formats
  2. Be Specific: Include actual log snippets and exact expected outputs
  3. Test Iteratively: Use the CLI to test prompt changes before deployment
  4. Keep Examples Current: Update examples as your systems evolve
  5. Limit Example Count: 3-5 examples per prompt type for optimal performance

🧠 How It Works

Smart Baseline System

DAGnostics uses an intelligent baseline approach for error detection:

  1. Baseline Creation: For each DAG task, DAGnostics analyzes successful runs to create a "normal behavior" baseline using Drain3 log clustering
  2. Anomaly Detection: When tasks fail, logs are compared against baselines to identify truly anomalous patterns vs. known issues
  3. Adaptive Learning: Baselines are automatically refreshed based on configurable intervals to adapt to evolving systems

Few-Shot Learning for Error Extraction

The system includes curated examples covering common Airflow error patterns:

  • Database Connection Errors: PostgreSQL, MySQL connection failures
  • Data Quality Issues: Empty files, schema mismatches, validation failures
  • Dependency Failures: Upstream task failures, service unavailability
  • Timeout Errors: Query timeouts, connection timeouts, deadlocks
  • Permission Errors: S3 access denied, database permission issues
  • Resource Errors: Memory limits, disk space, connection pools

These examples help LLMs provide more accurate error categorization and confidence scores.

Multi-Provider LLM Support

  • Local Models (Ollama): Privacy-focused, no external API calls, cost-effective
  • Cloud Models (OpenAI, Anthropic, Gemini): Higher accuracy, latest models, requires API keys
  • Provider-Specific Optimizations: Customized prompts and parameters per provider
  • Fallback Mechanisms: Heuristic error extraction when LLM fails

📊 Usage

Command-Line Interface (CLI)

DAGnostics provides a CLI for managing the monitoring and reporting system. Use the following commands:

Start the System (Stub)

uv run dagnostics start

Note: The monitoring daemon is not yet implemented. This command is a placeholder.

Analyze a Specific Task Failure

uv run dagnostics analyze <dag-id> <task-id> <run-id> <try-number>
  • Options:
    • --llm/-l: LLM provider (ollama, openai, anthropic, gemini)
    • --format/-f: Output format (json, yaml, text)
    • --verbose/-v: Verbose output
    • --baseline: Use baseline comparison for anomaly detection

Monitor DAGs (Daemon Service)

# Start the monitoring daemon
uv run dagnostics daemon start

# Stop the daemon
uv run dagnostics daemon stop

# Check daemon status
uv run dagnostics daemon status

Baseline Management

# Create baseline for a specific DAG task
uv run dagnostics baseline create <dag-id> <task-id>

# List existing baselines
uv run dagnostics baseline list

# Refresh stale baselines
uv run dagnostics baseline refresh

Generate a Report (Not Yet Implemented)

uv run dagnostics report
uv run dagnostics report --daily

Note: Report generation and export are not yet implemented. These commands are placeholders.

Python API

# LLM Engine Usage
from dagnostics.llm.engine import LLMEngine, OllamaProvider
from dagnostics.core.config import load_config
from dagnostics.core.models import LogEntry

# Load configuration with custom prompts
config = load_config()

# Initialize LLM engine with config
provider = OllamaProvider()
engine = LLMEngine(provider, config=config)

# Analyze log entries (few-shot learning applied automatically)
log_entries = [LogEntry(...)]
analysis = engine.extract_error_message(log_entries)
print(f"Error: {analysis.error_message}")
print(f"Category: {analysis.category}")
print(f"Confidence: {analysis.confidence}")

# Baseline Management
from dagnostics.clustering.log_clusterer import LogClusterer

clusterer = LogClusterer(config)
baseline_clusters = clusterer.build_baseline_clusters(successful_logs, dag_id, task_id)
anomalous_logs = clusterer.identify_anomalous_patterns(failed_logs, dag_id, task_id)

🛠 Development Tasks

The tasks/ folder contains utility scripts for common development tasks, such as setting up the environment, linting, formatting, and running tests. These tasks are powered by Invoke.

Available Tasks

Run the following commands from the root of the project:

Command                  Description                                     
invoke dev.setup        Set up the development environment.             
invoke dev.clean        Clean build artifacts and temporary files.     
invoke dev.format      Format the code using black and isort.     
invoke dev.lint        Lint the code using flake8 and mypy.       
invoke dev.test        Run all tests with pytest.                   

🧪 Testing

# Run all tests
uv run pytest

# Run with coverage
uv run pytest --cov=dagnostics

# Run specific test file
uv run pytest tests/llm/test_parser.py

📝 Development

  1. Create a new branch:
git checkout -b feature/amazing-feature
  1. Make your changes and ensure tests pass:
./scripts/test.sh
  1. Format and lint your code:
./scripts/lint.sh
  1. Commit your changes:
git commit -m "Add amazing feature"

🌐 Web Dashboard

A modern web dashboard UI is included in src/dagnostics/web/. It provides:

  • Monitor status and statistics (requires backend API)
  • Error trends and categories (requires backend API)
  • Task analysis form (requires backend API)

Note: The backend API endpoints for the dashboard may be incomplete or stubbed. Some dashboard features may not display real data yet.


🚧 Limitations / Roadmap

✅ Implemented Features

  • LLM Integration: Multi-provider support (Ollama, OpenAI, Anthropic, Gemini) with provider-specific optimizations
  • Smart Baselines: Drain3-based log clustering with anomaly detection
  • Configurable Prompts: Few-shot learning system with customizable templates
  • Daemon Service: Background monitoring with configurable intervals
  • CLI Interface: Comprehensive command-line tools for analysis and management
  • Alerting: SMS/Email notifications with concise error summaries
  • Python API: Core analysis and baseline management APIs

🚧 In Progress / Roadmap

  • Report generation and export: HTML, JSON, PDF report formats (stub implementation)
  • Advanced Analytics: Trend analysis, error correlation, predictive insights
  • Web Dashboard Backend: Complete REST API endpoints for dashboard functionality
  • Fine-tuning Support: Custom model fine-tuning for domain-specific error patterns
  • Integration Plugins: Connectors for popular monitoring tools (Datadog, Grafana, etc.)
  • Advanced Filtering: ML-based log filtering and noise reduction

See CONTRIBUTING.md for how to help!


🤝 Contributing

See CONTRIBUTING.md for detailed guidelines.


📄 License

This project is licensed under the MIT License - see the LICENSE file for details.


🙏 Acknowledgments

  • Inspired by the daily L1 support rotation practice and the need for intelligent error analysis
  • Built with modern Python ecosystem: uv, FastAPI, Typer, Pydantic
  • LLM Integration: Ollama (local), OpenAI, Anthropic Claude, Google Gemini
  • Log Analysis: Drain3 for intelligent log clustering and pattern recognition
  • Few-Shot Learning: Curated examples for improved error extraction
  • Special thanks to the open-source community and enterprise ETL teams who inspired this project

📞 Support

For questions and support, please open an issue in the GitHub repository.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagnostics-0.4.0.tar.gz (1.9 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagnostics-0.4.0-py3-none-any.whl (99.7 kB view details)

Uploaded Python 3

File details

Details for the file dagnostics-0.4.0.tar.gz.

File metadata

  • Download URL: dagnostics-0.4.0.tar.gz
  • Upload date:
  • Size: 1.9 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.16

File hashes

Hashes for dagnostics-0.4.0.tar.gz
Algorithm Hash digest
SHA256 c9245d24ddbd610e20a185f9f6b31bf77a04874e3d7072f9664efb13a0639caa
MD5 cf19ed286fea0cbec152374709b7acaa
BLAKE2b-256 2d1ac750ae89dca7c496c9e312c1cce3333e01b746b4665abe4b8bcbc977d0f8

See more details on using hashes here.

File details

Details for the file dagnostics-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: dagnostics-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 99.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.16

File hashes

Hashes for dagnostics-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 34cb49be14d162efae25022dfaccec29a3918310f9ef1df58d2ff07cc23fdc7c
MD5 c076d94478dd42a1453cf8a4fb3a4840
BLAKE2b-256 ad8c8c46cb07e0e4f52084c0b434a00a1703178d72fd7e40f61c6999675211ef

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page