Skip to main content

A lightweight, zero-dependency Airflow plugin that monitors DAG/task health by querying the metadata DB — runtime anomalies, failure spikes, missed deadlines, stuck tasks, and schedule anomalies.

Project description

airflow-plugin-watchdog

Category Badges
License License
PyPI python airflow PyPI Downloads
CI lint unit tests integration codecov

A lightweight, zero-dependency Airflow plugin that monitors DAG and task health by querying the metadata database.

No Prometheus. No Grafana. No Datadog. No DAG to deploy. Just pip install and go.

Screenshots

📸 Placeholder — add docs/dashboard.png and docs/config.png, then uncomment the block below.

What it detects

Detector What it catches How it works
Runtime anomaly Tasks running unusually slow or fast IQR-based outlier detection on task durations
Failure spike Sudden increase in DAG failure rate Compares recent failure rate vs historical baseline
Missed deadline DAG runs taking too long Flags running DAGs exceeding N× their median duration
Stuck task Zombie or hung tasks Flags tasks in running state beyond N× their historical max
Schedule anomaly Tasks starting or ending at unusual times IQR-based outlier detection on time-of-day (handles midnight wraparound)

Quick start

pip install airflow-plugin-watchdog
  1. Install (above), then restart the Airflow API server so the plugin and its background scheduler load.
  2. Open the dashboard — click Watchdog in the Airflow navbar (under Browse), or go to /watchdog/.
  3. Tune it (optional) — click Configuration on the dashboard to toggle detectors, adjust thresholds, and add alert destinations.

That's it — no DAG to deploy, no extra services. Detection runs every 30 minutes by default and works out of the box.

Requirements

  • Apache Airflow >= 3.0.0
  • Python >= 3.10
  • A metadata database: PostgreSQL (recommended, integration-tested) or SQLite (tested). MySQL is not tested in CI but should work, as the code is backend-agnostic.

Installation

pip install airflow-plugin-watchdog

This registers the /watchdog/ dashboard (accessible from the Airflow UI under Browse → Watchdog) and the detection scheduler. There is no DAG to deploy — detection runs on a background scheduler inside the Airflow API-server process, which is started automatically by the plugin.

Why no DAG? Airflow 3 isolates task execution from the metadata database (AIP-72): a task may not access it via the ORM. Watchdog's detectors need that data, so detection runs on the API server — where direct metadata-DB access is sanctioned — instead of inside a worker task. See How it works.

After installing, restart the API server so the plugin and its scheduler are picked up. By default detection runs every 30 minutes (configurable — see below).

Configuration

All settings live in a single Airflow Variable, watchdog_config (a JSON object). All fields are optional — sensible defaults apply. There are two equivalent ways to edit it:

  • Watchdog Configuration page (recommended) — open the dashboard and click Configuration. It's a structured, validated editor for the same Variable, organized into three tabs: Detectors (enable/disable per DAG, plus excluded DAGs), Thresholds (numeric tuning), and Alerts (emails and webhook URLs).
  • Admin → Variables → watchdog_config — edit the raw JSON by hand. Same effect, no validation.

Either way the value is stored in Airflow's metadata DB (the variable table), read fresh each detection cycle and shared across API-server replicas. Editing either way takes effect on the next run.

Note: Watchdog also writes a second Variable, watchdog_last_results — this is output, not config: the scheduler overwrites it every cycle with the latest alert summary that the dashboard reads. It shows up under Admin → Variables too, but don't edit it (your changes are clobbered on the next run).

The full set of watchdog_config fields:

{
    "schedule_interval_minutes": 30,
    "lookback_runs": 20,
    "runtime_iqr_multiplier": 1.5,
    "runtime_min_deviation_secs": 5.0,
    "failure_window_runs": 10,
    "failure_baseline_runs": 50,
    "failure_spike_ratio": 2.0,
    "deadline_multiplier": 2.0,
    "stuck_multiplier": 2.0,
    "schedule_iqr_multiplier": 1.5,
    "schedule_min_deviation_minutes": 5.0,
    "exclude_dags": [],
    "disable_detectors": [],
    "dag_overrides": {
        "my_dag": {
            "disable_detectors": ["schedule_anomaly"]
        }
    },
    "alert_emails": ["team@example.com"],
    "alert_slack_webhook": "https://hooks.slack.com/services/...",
    "alert_teams_webhook": "https://outlook.office.com/webhook/...",
    "alert_discord_webhook": "https://discord.com/api/webhooks/..."
}

Configuration reference

Field Default Description
schedule_interval_minutes 30 How often detection runs (read each cycle — changes apply without a restart)
lookback_runs 20 Number of recent runs used for statistical baselines
runtime_iqr_multiplier 1.5 IQR multiplier for runtime anomaly fences
runtime_min_deviation_secs 5.0 Minimum absolute duration change before a runtime anomaly fires (suppresses noise from steady/very short tasks)
failure_window_runs 10 Recent window size for failure rate calculation
failure_baseline_runs 50 Historical baseline size for failure rate comparison
failure_spike_ratio 2.0 Alert when recent rate exceeds this × baseline rate
deadline_multiplier 2.0 Alert when DAG run exceeds this × median duration
stuck_multiplier 2.0 Alert when task exceeds this × historical max duration
schedule_iqr_multiplier 1.5 IQR multiplier for start/end time-of-day fences
schedule_min_deviation_minutes 5.0 Minimum deviation from the median time-of-day before a schedule anomaly fires (suppresses sub-minute jitter)
exclude_dags [] DAG IDs to skip during detection
disable_detectors [] Detector names to disable globally (e.g. ["schedule_anomaly"])
dag_overrides {} Per-DAG overrides: {"dag_id": {"disable_detectors": [...]}}
alert_emails [] Email addresses for alert notifications
alert_slack_webhook null Slack incoming webhook URL
alert_teams_webhook null MS Teams incoming webhook URL
alert_discord_webhook null Discord incoming webhook URL

How it works

Architecture

Detection runs entirely inside the Airflow API-server process, started by the plugin's FastAPI lifespan. A background scheduler fires every schedule_interval_minutes; across multiple API-server replicas a database advisory lock plus a last-run check ensures only one cycle runs per interval. Both the detectors and the dashboard read the metadata DB directly here — the sanctioned place for it in Airflow 3 — so nothing ever runs in a worker task.

flowchart TD
    subgraph api["Airflow API server (FastAPI plugin)"]
        direction TB
        timer["Background scheduler<br/><i>every N min · advisory-locked</i>"]

        subgraph det["Detectors"]
            runtime["Runtime"]
            failure["Failure"]
            deadline["Deadline"]
            stuck["Stuck"]
            schedule["Schedule"]
        end

        timer --> det
        det --> alerting["Alerting<br/><i>Log / Email / Slack / Teams / Discord</i>"]
        det --> var[("Variable<br/>watchdog_last_results")]
        var --> dashboard["/watchdog/ Dashboard"]
    end

    db[("Metadata DB<br/><i>dag_run · task_instance</i>")] -.read.-> det
    db -.read.-> dashboard

Detection methods

Runtime anomaly (IQR): For each (dag_id, task_id), the detector computes Q1, Q3, and IQR from the last N successful runs. If the most recent duration falls outside [Q1 - 1.5×IQR, Q3 + 1.5×IQR], it's flagged. This is more robust than z-score because outliers don't skew the baseline.

Failure spike: Compares the failure rate in the last 10 runs against the rate over the preceding baseline runs (the baseline excludes the recent window, so a fresh spike doesn't dilute its own reference point). If the recent rate exceeds 2× baseline, it fires. Also catches DAGs that suddenly start failing when they historically never did.

Missed deadline: Checks currently-running DAG runs and compares their elapsed time against 2× median historical duration. Catches DAGs that are silently hanging.

Stuck task: Checks currently-running task instances against 2× historical max duration for that specific task. Catches zombie tasks, hung queries, and unresponsive external calls.

Schedule anomaly (IQR): For each (dag_id, task_id), converts start and end times to minutes-since-midnight and computes IQR fences. Flags tasks that started or ended at an unusual time-of-day. Handles midnight wraparound (e.g. tasks normally running between 23:30–00:30).

Dashboard

The dashboard is available at /watchdog/ in the Airflow webserver. It shows:

  • Summary cards: total DAGs, healthy, warning, critical counts
  • DAG health table: sorted with problems at the top
  • Per-DAG alerts with severity indicators
  • Auto-refreshes every 60 seconds

Access it via Browse → Watchdog in the Airflow UI navbar.

The dashboard and its API require an authenticated Airflow user. Reading the dashboard needs website (view) access; saving configuration changes requires permission to edit Airflow Variables — enforced through Airflow's auth manager.

Alerting

Alerts are dispatched through five channels. Only the task log is on by default — every other channel stays silent until you set the matching field in watchdog_config:

Channel Config field Default
API-server logs (none) always on — alerts are logged by the scheduler in the Airflow API-server logs
Email alert_emails off — also requires Airflow SMTP (see below)
Slack alert_slack_webhook off
MS Teams (Adaptive Card) alert_teams_webhook off
Discord alert_discord_webhook off

Webhook channels need no extra setup — paste the incoming-webhook URL into the corresponding field and you're done.

Email requires Airflow SMTP

Email is two-part: Watchdog only decides who to notify (alert_emails); the actual sending goes through Airflow's own airflow.utils.email.send_email, which reads Airflow's [smtp] settings. So filling in alert_emails alone is not enough — you must also configure SMTP on the Airflow side.

  1. Tell Watchdog who to email — in the watchdog_config Variable:

    { "alert_emails": ["team@example.com"] }
    
  2. Configure SMTP in Airflow — via airflow.cfg:

    [smtp]
    smtp_host = smtp.example.com
    smtp_starttls = True
    smtp_port = 587
    smtp_user = alerts@example.com
    smtp_password = <app-password>
    smtp_mail_from = alerts@example.com
    

    …or the equivalent environment variables (AIRFLOW__SMTP__SMTP_HOST, etc.). Some Airflow 3 deployments use an SMTP Connection (smtp_default) instead of airflow.cfg — configure it under Admin → Connections in that case.

Fail-soft: a delivery failure never breaks the detection cycle. If a channel is misconfigured (e.g. SMTP not set up, or a bad webhook URL), the cycle still completes and the error is logged in the API-server logs (Failed to send watchdog email / … Slack notification, etc.). If you configured a channel but see nothing, check that log first. Also note alerts are only dispatched when there's something to report — a clean run sends nothing.

Development

git clone https://github.com/Redevil10/airflow-plugin-watchdog.git
cd airflow-plugin-watchdog
uv sync --extra dev
uv run pytest tests/unit   # fast unit tests (Airflow mocked)

Integration tests

The unit suite mocks Airflow. A separate integration suite runs the detector and dashboard SQL, the results-Variable round trip, and the auth dependencies against a real Airflow metadata database. PostgreSQL is the supported production backend, so it is the primary target; SQLite is also exercised because timestamps and JSON are read via raw SQL and differ by driver.

# PostgreSQL (recommended — matches production)
docker run -d --rm --name wd_pg -e POSTGRES_USER=airflow \
    -e POSTGRES_PASSWORD=airflow -e POSTGRES_DB=airflow -p 5432:5432 postgres:16
WATCHDOG_IT_DB_URL="postgresql+psycopg2://airflow:airflow@localhost:5432/airflow" \
    uv run --extra dev pytest tests/integration -m integration

# SQLite (no service needed)
WATCHDOG_IT_DB_URL="sqlite:////tmp/watchdog_it.db" \
    uv run --extra dev pytest tests/integration -m integration

Integration tests are marked integration and skipped by default; CI runs them against a PostgreSQL service container (see .github/workflows/integration_test.yml).

Known limitations

  • Latest-run only — the dashboard shows the most recent detection cycle (stored in the watchdog_last_results Variable); there is no alert history. A future version may store results in a dedicated table for historical trending.
  • Detection is not a DAG run — because detection runs on the API server rather than as a task, it does not appear in Airflow's DAG/run list; its activity is visible in the dashboard and the API-server logs instead.

Roadmap

  • Historical alert storage (dedicated table) for trend analysis
  • Sparkline charts in the dashboard showing duration trends
  • Per-DAG detector enable/disable via dag_overrides config
  • Multi-database support — PostgreSQL (primary, integration-tested) and SQLite (tested); MySQL should work but is not covered in CI
  • GitHub Actions CI (lint, unit, integration, publish)
  • Contribution to the Airflow ecosystem page

License

Apache License 2.0 — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_plugin_watchdog-0.6.2.tar.gz (274.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_plugin_watchdog-0.6.2-py3-none-any.whl (49.3 kB view details)

Uploaded Python 3

File details

Details for the file airflow_plugin_watchdog-0.6.2.tar.gz.

File metadata

  • Download URL: airflow_plugin_watchdog-0.6.2.tar.gz
  • Upload date:
  • Size: 274.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for airflow_plugin_watchdog-0.6.2.tar.gz
Algorithm Hash digest
SHA256 78dd5b38e1765d532faf6d78d1f367064e492a5366a5b97d9fb87a253b56abe5
MD5 0730c7dd945317b879f160e1fcafe88f
BLAKE2b-256 0a648ca1053feb60b7f11aa2eafcd2d5c0da48ef6648f8b61bb76e5bd6c36141

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_plugin_watchdog-0.6.2.tar.gz:

Publisher: publish.yml on Redevil10/airflow-plugin-watchdog

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file airflow_plugin_watchdog-0.6.2-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_plugin_watchdog-0.6.2-py3-none-any.whl
Algorithm Hash digest
SHA256 6937f71c92725b00af0ebf8778b3f5b618717288c17bf36acd938ace8f113466
MD5 f427f4d9d54fa105316281e665d567d6
BLAKE2b-256 e520a75f2501a74350801f2dd785ef8042b75faa2d7dabb622bbc2ae5b99af2e

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_plugin_watchdog-0.6.2-py3-none-any.whl:

Publisher: publish.yml on Redevil10/airflow-plugin-watchdog

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page