Skip to main content

Pipeline error handler: capture, deduplicate, diagnose, and auto-fix failures.

Project description

Flow Doctor

Python License: MIT Tests PyPI Typed

Pipeline error handler for Python. Captures exceptions, deduplicates failure signatures, optionally diagnoses root causes with LLMs, routes alerts (Telegram / Slack / email / GitHub / S3 / custom), and can generate fix PRs.

Typed, IDE-discoverable configuration. Pydantic v2 models + a fluent FlowDoctor.builder() mean you don't need a yaml file — and when you have one, the schema is enforced at load time.

Fail-loud by default. Configuration errors — missing tokens, unresolved ${VAR} references, misconfigured notifiers — raise ConfigError at construction time instead of silently degrading. A silently-degraded error monitor defeats the purpose.

from flow_doctor import FlowDoctor, TelegramNotifierConfig

fd = (
    FlowDoctor.builder("morning-signal")
    .add_notifier(TelegramNotifierConfig())  # creds from FLOW_DOCTOR_TELEGRAM_*
    .with_dedup(cooldown_minutes=60)
    .build()
)

with fd.guard():
    run_pipeline()  # exceptions captured, deduplicated, routed, re-raised

How It Works

Exception → Capture → Dedup → Diagnose (LLM, opt) → Notify (Telegram/...) → Fix PR (opt)
  1. Capture — exception, traceback, logs, contextvars (flow_name, stage)
  2. Dedup — same error signature within cooldown window is suppressed (normalized to ignore reqIds, UUIDs, contract symbols, etc.)
  3. Cascade — if a declared upstream dependency also failed, tag it and skip diagnosis
  4. Diagnose (opt) — check the knowledge base (free), then call Claude if rate limit allows
  5. Notify — Telegram / Slack / email / GitHub issue / S3 changelog (rate-limited with daily digest fallback)
  6. Fix (opt) — human adds flow-doctor:fix label on a filed issue, triggering automated fix PR generation

Installation

While 0.5.0 is in the rc cycle:

pip install --pre flow-doctor                          # core only
pip install --pre "flow-doctor[diagnosis]"             # + LLM diagnosis (anthropic SDK)
pip install --pre "flow-doctor[diagnosis,remediation]" # + auto-remediation (boto3)
pip install --pre "flow-doctor[all]"                   # everything

The --pre flag is required while the version tag has an rc suffix; drop it once 0.5.0 final ships. Pinning flow-doctor==0.5.0rc2 works regardless.

Quick Start — FlowDoctor.builder() (recommended)

The builder is typed, IDE-discoverable, and works without a yaml file. Notifier credentials fall through to FLOW_DOCTOR_* env vars when not passed inline.

from flow_doctor import FlowDoctor, TelegramNotifierConfig

fd = (
    FlowDoctor.builder("morning-signal")
    .add_notifier(TelegramNotifierConfig())  # bot_token + chat_id from env
    .with_dedup(cooldown_minutes=60)
    .build()
)

Three idiomatic ways to use the resulting FlowDoctor in your pipeline:

# 1. Context manager — exception is captured + re-raised
with fd.guard():
    run_pipeline()

# 2. Decorator
@fd.monitor
def lambda_handler(event, context):
    run_pipeline()

# 3. Direct reporting — never crashes the caller
try:
    run_pipeline()
except Exception as e:
    fd.report(e, context={"date": "2026-05-13"})

Async pipelines:

async def run():
    try:
        await pipeline()
    except Exception as exc:
        await fd.report_async(exc)

Contextvars propagate automatically. Stamp flow_name / stage once and any fd.report() inside picks them up — no need to thread context=... through every layer:

import flow_doctor

with flow_doctor.context(flow_name="morning-signal", stage="rank"):
    run_rank()  # any fd.report() inside auto-records flow_name + stage

Notifier configs (typed)

Five first-class notifiers ship today, each with its own Pydantic config exposed via the discriminated union NotifierConfig:

Config Channel Setup
TelegramNotifierConfig Telegram Bot API @BotFather/newbot → bot token + chat_id. Recommended default.
SlackNotifierConfig Slack incoming webhook Slack app → incoming webhook URL
EmailNotifierConfig SMTP (Gmail / any) sender + recipients + SMTP password (Gmail App Password works)
GitHubNotifierConfig GitHub issues PAT with Issues: write on the target repo
S3NotifierConfig System-wide changelog corpus Bucket + subsystem; IAM allows s3:PutObject on the prefix

Mix freely:

from flow_doctor import (
    EmailNotifierConfig,
    FlowDoctor,
    GitHubNotifierConfig,
    TelegramNotifierConfig,
)

fd = (
    FlowDoctor.builder("alpha-engine-predictor")
    .add_notifier(TelegramNotifierConfig(message_thread_id=42))   # forum topic
    .add_notifier(GitHubNotifierConfig(repo="me/alpha-engine"))   # token from env
    .add_notifier(EmailNotifierConfig(sender="me@x.com",
                                       recipients=["me@x.com"]))
    .build()
)

Telegram — recommended default

Why Telegram leads the examples:

  • Two-minute setup. Message @BotFather/newbot → save the token. Then DM your bot, then GET https://api.telegram.org/bot<TOKEN>/getUpdates and grab result[0].message.chat.id.
  • Per-flow routing for free. One bot, N channels via chat_id, or N forum-topic threads via message_thread_id in a supergroup.
  • Mobile push is automatic. No "did the email go to spam" mystery.
  • Token rotation is one @BotFather call. No app password / SES verified identity / Slack workspace admin.
export FLOW_DOCTOR_TELEGRAM_BOT_TOKEN=1234567890:ABC...
export FLOW_DOCTOR_TELEGRAM_CHAT_ID=-1001234567890  # negative for supergroups/channels
FlowDoctor.builder("pipeline").add_notifier(TelegramNotifierConfig()).build()

Testing — flow_doctor.testing pytest plugin

The plugin is auto-discovered (registered via [project.entry-points.pytest11]). Downstream tests get a flow_doctor_recorder fixture with no imports required.

def test_pipeline_reports_db_errors(flow_doctor_recorder):
    run_pipeline_that_should_fail(flow_doctor_recorder)
    assert len(flow_doctor_recorder.reports) == 1
    assert flow_doctor_recorder.last.exc_type == "DBError"
    assert flow_doctor_recorder.last.ambient_context["stage"] == "ingest"

flow_doctor_recorder is a RecordingFlowDoctor — it implements FlowDoctorProtocol, so wherever production code expects a FlowDoctorProtocol you can swap it in directly. It also snapshots any active flow_doctor.context() scope onto each captured incident's ambient_context field.

Helpers on the recorder: .clear(), .last, .of_type(exc_name), plus full async support via await recorder.report_async(...).

Type-checked contract — FlowDoctorProtocol

from flow_doctor import FlowDoctorProtocol

def make_pipeline(fd: FlowDoctorProtocol):
    with fd.guard():
        ...

@runtime_checkable, so isinstance(fd, FlowDoctorProtocol) works at runtime as well as at type-check time. Combined with the shipped py.typed marker, mypy --strict and pyright treat flow-doctor's annotations as authoritative.

OpenTelemetry — flow_doctor.otel

Pure-Python adapter that serializes a Report into an OTel SpanEvent-shaped dict — ready to ship to a collector via your existing OTLP exporter today. No opentelemetry-* dependency in this release; the bundled OTLP exporter notifier is on the 0.6.0 roadmap.

from flow_doctor.otel import report_to_otel_span_event

span_event = report_to_otel_span_event(report)
# {
#   "resource": {"service.name": "<flow_name>"},
#   "name": "<stage>",
#   "time_unix_nano": ...,
#   "severity_text": "ERROR", "severity_number": 17,
#   "attributes": {
#     "exception.type": "ValueError",
#     "exception.message": "...",
#     "exception.stacktrace": "...",
#     "flow_doctor.error_signature": "...",
#     "context.run_id": "...",
#   },
# }

Configuration

Inline kwargs / builder (recommended)

See the Quick Start. No yaml required.

YAML file (legacy / multi-environment)

flow_name: my-pipeline
repo: owner/repo

notify:
  - type: telegram
    bot_token: ${FLOW_DOCTOR_TELEGRAM_BOT_TOKEN}
    chat_id: -1001234567890
    message_thread_id: 42      # optional: forum-topic routing
  - type: github
    repo: owner/repo
  - type: s3
    bucket: my-changelog-bucket
    subsystem: predictor       # one of the documented vocab values

store:
  type: sqlite
  path: flow_doctor.db

diagnosis:
  enabled: true
  model: claude-sonnet-4-6-20250514
  api_key: ${ANTHROPIC_API_KEY}
  timeout_seconds: 30
  max_daily_cost_usd: 1.00

github:
  token: ${GITHUB_TOKEN}
  labels: [flow-doctor]

rate_limits:
  max_diagnosed_per_day: 3
  max_issues_per_day: 3
  dedup_cooldown_minutes: 60

dependencies:
  - upstream-pipeline

remediation:
  enabled: true
  dry_run: true
  auto_remediate_min_confidence: 0.9

auto_fix:
  enabled: true
  confidence_threshold: 0.90
  test_command: "python -m pytest tests/ -x -q"
  scope:
    allow: ["src/", "lib/"]
    deny: ["*.yaml", "*.yml"]
# Deprecated since 0.5.0; will be removed in 0.6.0. Use FlowDoctor.builder() instead.
fd = flow_doctor.init(config_path="flow-doctor.yaml")

${VAR} references resolve from the process environment at load time. Unresolved references raise ConfigError — no silent passthrough.

Environment Variables

flow-doctor reads credentials from environment variables as its primary configuration mechanism. Every notifier has a documented fallback chain: explicit value → FLOW_DOCTOR_* canonical name → common conventions.

Canonical contract

Variable Used by Fallback chain Required when
FLOW_DOCTOR_TELEGRAM_BOT_TOKEN Telegram notifier FLOW_DOCTOR_TELEGRAM_BOT_TOKENTELEGRAM_BOT_TOKEN Telegram notifier config has no explicit bot_token field
FLOW_DOCTOR_TELEGRAM_CHAT_ID Telegram notifier FLOW_DOCTOR_TELEGRAM_CHAT_IDTELEGRAM_CHAT_ID Telegram notifier config has no explicit chat_id field
FLOW_DOCTOR_GITHUB_TOKEN GitHub notifier, auto-fix PR creator FLOW_DOCTOR_GITHUB_TOKENGH_TOKENGITHUB_TOKEN Any GitHub notifier or auto-fix is configured
FLOW_DOCTOR_GITHUB_REPO GitHub notifier FLOW_DOCTOR_GITHUB_REPO GitHub notifier config has no explicit repo field
FLOW_DOCTOR_SMTP_PASSWORD Email notifier FLOW_DOCTOR_SMTP_PASSWORDGMAIL_APP_PASSWORD SMTP requires auth
FLOW_DOCTOR_SMTP_SENDER Email notifier FLOW_DOCTOR_SMTP_SENDEREMAIL_SENDER Email notifier config has no explicit sender field
FLOW_DOCTOR_SMTP_RECIPIENTS Email notifier FLOW_DOCTOR_SMTP_RECIPIENTSEMAIL_RECIPIENTS Email notifier config has no explicit recipients field
FLOW_DOCTOR_SLACK_WEBHOOK Slack notifier FLOW_DOCTOR_SLACK_WEBHOOKSLACK_WEBHOOK_URL Slack notifier config has no explicit webhook_url field
FLOW_DOCTOR_S3_BUCKET S3 notifier FLOW_DOCTOR_S3_BUCKETCHANGELOG_BUCKET S3 notifier config has no explicit bucket field
FLOW_DOCTOR_ANTHROPIC_API_KEY LLM diagnosis, auto-fix generator FLOW_DOCTOR_ANTHROPIC_API_KEYANTHROPIC_API_KEY diagnosis.enabled: true or auto-fix is on
FLOW_DOCTOR_SKIP_PREFLIGHT All notifiers' validate() (literal) Set to 1 in tests / offline boot to bypass token/preflight network calls

Precedence for every field is: explicit value in kwargs/yaml → canonical FLOW_DOCTOR_* env var → convention fallbacks in the order listed. The first non-empty value wins. Missing values raise ConfigError at construction time naming the specific field and the env vars that would satisfy it.

Env-var-only quickstart — Telegram

Two env vars, two lines of Python, working alerts on the next exception:

export FLOW_DOCTOR_TELEGRAM_BOT_TOKEN=1234567890:ABC...
export FLOW_DOCTOR_TELEGRAM_CHAT_ID=-1001234567890
from flow_doctor import FlowDoctor, TelegramNotifierConfig

fd = FlowDoctor.builder("pipeline").add_notifier(TelegramNotifierConfig()).build()

Strict mode and degraded mode

FlowDoctor.builder().build() and flow_doctor.init() both default to strict=True. Any configuration error (missing required field, unresolved ${VAR}, unknown notifier type) raises ConfigError and prevents startup. This is the recommended default — a non-running flow-doctor is a loud failure; a silently-degraded flow-doctor is a silent one.

If you genuinely want best-effort init that logs errors but keeps running with no notifiers, opt in explicitly:

fd = FlowDoctor.builder("pipeline").build(strict=False)

Logging-handler integration

Attach to Python's logging system if you want every WARNING+ log to flow through dedup + diagnosis + notify without touching call sites:

import logging
import flow_doctor

fd = flow_doctor.FlowDoctor.builder("pipeline").add_notifier(
    flow_doctor.TelegramNotifierConfig()
).build()

handler = fd.get_handler(level=logging.WARNING)
logging.getLogger().addHandler(handler)

logger.warning("Upstream data is 48h stale")  # → captured + routed
logger.error("S3 backup failed: AccessDenied")
logger.exception("Pipeline crashed")

The handler is non-blockingemit() enqueues work and returns immediately; a background thread calls fd.report() asynchronously.

Log capture

Attach recent logs to the next error report for richer diagnosis context:

with fd.capture_logs(level=logging.INFO):
    logger.info("Starting scan with 900 tickers...")
    run_pipeline()
    # All captured logs are attached to the next fd.report() call

Features

Error capture and dedup

  • Traceback extraction with frame-based signature hashing
  • Configurable cooldown window (default 60 min) — same error captured once, not spammed
  • Variable-token normalization: reqIds, conIds, contract symbols, UUIDs, AWS request IDs are stripped before hashing, so a library logging the same error against N objects collapses to one signature
  • Cascade detection tags downstream failures caused by upstream dependency outages
  • Automatic secret scrubbing (AWS keys, Bearer tokens, passwords in URLs)

LLM diagnosis

  • Structured root cause analysis via Claude: category, confidence, affected files, remediation
  • Six categories: TRANSIENT, DATA, CODE, CONFIG, EXTERNAL, INFRA
  • Knowledge base caching — known patterns matched for free before calling the LLM
  • Git context assembly (recent commits, changed files) for better diagnosis accuracy
  • Daily cost cap (default $1.00) and rate limiting (default 3 diagnoses/day)

Notifications

  • Telegram — Bot API, per-chat / per-thread routing, mobile push (recommended default)
  • Slack — webhook-based alerts with severity emoji + diagnosis snippet
  • Email — SMTP (Gmail/any) with detailed body
  • GitHub issues — auto-filed with diagnosis, traceback, captured logs, machine-readable metadata
  • S3 — writes schema-1.0.0 entries to a system-wide changelog corpus
  • Daily digest — summarizes rate-limited / suppressed errors at end of day
  • Custom notifiers — subclass flow_doctor.notify.base.Notifier; the abstract base is a public extension point

Auto-Fix PRs

Human-in-the-loop: a human reviews a filed issue's diagnosis, adds a flow-doctor:fix label, and a GitHub Actions workflow generates a validated fix PR.

  1. An error occurs and Flow Doctor creates a GitHub issue with structured diagnosis
  2. A human reviews the diagnosis and adds the flow-doctor:fix label
  3. GitHub Actions triggers flow-doctor generate-fix
  4. The CLI generates a diff via LLM, validates against scope rules, runs tests
  5. If tests pass, a PR is opened. If tests fail, a comment explains what went wrong.

Safety gates — fix generation is skipped when:

  • Confidence below threshold (default 90%)
  • Category is EXTERNAL or INFRA (nothing to fix in code)
  • Config issue involves credentials/secrets
  • Generated diff touches files outside configured scope
  • Tests fail after applying the fix

Remediation playbooks

Define patterns that map failure signatures to automated actions:

from flow_doctor.remediation.playbook import (
    Playbook, PlaybookPattern, RemediationAction, RemediationType,
)

my_playbook = Playbook(patterns=[
    PlaybookPattern(
        name="service_down",
        description="App service not responding",
        category="INFRA",
        message_pattern=r"(connection refused|service unavailable)",
        action=RemediationAction(
            action_type=RemediationType.RESTART_SERVICE,
            description="Restart the app service",
            commands=["sudo systemctl restart myapp"],
            ssm_target="app-server",
        ),
    ),
])

Auto-Fix CLI

flow-doctor generate-fix \
  --issue-number 42 \
  --repo owner/repo \
  --token $GITHUB_TOKEN \
  --config flow-doctor.yaml \
  --dry-run

GitHub Actions workflow (copy to your repo at .github/workflows/flow-doctor-fix.yml):

name: Flow Doctor Fix
on:
  issues:
    types: [labeled]
jobs:
  generate-fix:
    if: github.event.label.name == 'flow-doctor:fix'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: '3.12'
      - run: pip install --pre "flow-doctor[diagnosis]"
      - run: |
          python -m flow_doctor.fix.cli generate-fix \
            --issue-number ${{ github.event.issue.number }} \
            --repo ${{ github.repository }} \
            --token $GITHUB_TOKEN
        env:
          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
          ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}

Migrating from 0.4.x

flow_doctor.init(config_path=...) and direct construction of NotifyChannelConfig are both @deprecated (PEP 702) in 0.5.0. Both still work — they'll be removed in 0.6.0. Migration:

# 0.4.x — still works in 0.5.x, but mypy/pyright will flag it
fd = flow_doctor.init(config_path="flow-doctor.yaml")

# 0.5.x — typed, IDE-discoverable, no yaml
fd = (
    FlowDoctor.builder("pipeline")
    .add_notifier(TelegramNotifierConfig())
    .build()
)

The 0.5.x yaml shim is API-compatible with 0.4.x configs; existing yaml files keep working through the deprecation window.

Architecture

flow_doctor/
  core/           # Client, builder, config (Pydantic v2), models, dedup,
                  # rate limiting, scrubber, logging handler, contextvars
  _protocol.py    # FlowDoctorProtocol public contract
  notify/         # Telegram, Slack, Email, GitHub, S3 — concrete notifiers
                  # + typed Pydantic config models (discriminated union)
  diagnosis/      # LLM provider, context assembly, knowledge base, git context
  digest/         # Daily digest generator
  fix/            # Auto-fix: LLM generator, scope guard, test validator, PR creator, CLI
  remediation/    # Decision gate, executor, playbook patterns
  storage/        # SQLite backend (thread-safe, per-thread connections)
  testing/        # RecordingFlowDoctor + pytest plugin (auto-discovered)
  otel.py         # Report → OTel SpanEvent serialization adapter
  py.typed        # PEP 561 marker — annotations are authoritative for mypy/pyright

Development

git clone https://github.com/cipher813/flow-doctor.git
cd flow-doctor
python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev,diagnosis]"

python -m pytest tests/ -x -q             # 376 tests
python -m pytest tests/ --cov=flow_doctor # coverage report
python examples/smoke_test.py              # end-to-end smoke test

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flow_doctor-0.5.0rc3.tar.gz (138.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flow_doctor-0.5.0rc3-py3-none-any.whl (102.5 kB view details)

Uploaded Python 3

File details

Details for the file flow_doctor-0.5.0rc3.tar.gz.

File metadata

  • Download URL: flow_doctor-0.5.0rc3.tar.gz
  • Upload date:
  • Size: 138.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.6

File hashes

Hashes for flow_doctor-0.5.0rc3.tar.gz
Algorithm Hash digest
SHA256 3dc71126fa9b862ad7f4970d13cd9ac9a265651b24afab1f164a4a40da85705a
MD5 a6f8a98a9814e0f4a55d6398e4097b87
BLAKE2b-256 f167b0fbb4daaa2ed123fc2c34ce0905d9bdd364df3c458091c0f9c33efefd6f

See more details on using hashes here.

File details

Details for the file flow_doctor-0.5.0rc3-py3-none-any.whl.

File metadata

  • Download URL: flow_doctor-0.5.0rc3-py3-none-any.whl
  • Upload date:
  • Size: 102.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.6

File hashes

Hashes for flow_doctor-0.5.0rc3-py3-none-any.whl
Algorithm Hash digest
SHA256 a2edf22fc5aff4d91fcc6f3b0d8b91be96a2da6096408063ebaac46b0fb75bd5
MD5 b41c6b1693f610aaa8efa5373770bffd
BLAKE2b-256 928889ac9a4334239921a283b6986035ca4852e15f3b4af2ab40ec614f5493e9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page