Structured health and readiness check system for FastAPI
Project description
fastapi-watch
Structured health and readiness check system for FastAPI.
Add /health/live, /health/ready, /health/status, /health/history, /health/metrics, and /health/dashboard endpoints to any FastAPI app with a single registry call. All probes run concurrently, so a slow dependency never blocks the others. Each probe returns rich service-specific details alongside the pass/fail result.
Probes report one of three states: healthy (all clear), degraded (under stress but still serving traffic), or unhealthy (critical failure — stop routing traffic). A degraded critical probe returns 200 on /health/ready so load balancers keep sending requests while you investigate.
Instrument individual FastAPI route handlers with FastAPIRouteProbe to collect real-traffic metrics — latency percentiles, error rate, throughput, and consecutive failure counts — or attach RequestMetricsMiddleware to capture the same metrics for every route in your app without touching individual handlers.
Organize probes across your codebase using ProbeGroup, the same file-splitting pattern FastAPI uses for routes. Declare probes in any module, compose them into groups, and pass them to HealthRegistry in one line at startup.
Connect a browser or monitoring tool to the Server-Sent Events (SSE) streaming endpoints (/health/ready/stream, /health/status/stream) and receive live updates as long as you stay connected — the background poll loop starts automatically on the first connection and stops when the last client disconnects.
Open /health/dashboard for a live HTML page that shows all probe results, updates in real time over SSE, and requires no extra dependencies. Scrape /health/metrics for a Prometheus-compatible text export of every probe's health, latency, and circuit breaker state.
Table of contents
- Installation
- Quick start
- Endpoints
- Health Dashboard
- Probe management
- ProbeGroup — organizing probes across files
- Live streaming
- Polling and caching
- Per-probe poll frequency
- Three-state health (DEGRADED)
- Circuit breaker
- Circuit breaker metrics
- Maintenance mode
- Prometheus metrics
- App-wide request metrics
- Webhook on state change
- Authentication
- Startup probe
- State-change callbacks
- Startup grace period
- Probe result history
- Alert history
- Custom storage backend
- Response format
- Writing a custom probe
- Built-in probes
- App-wide request metrics probe
- Watching a FastAPI route
- Watching a WebSocket handler
- Event loop lag
- Disk space
- TCP / DNS reachability
- SMTP
- Threshold wrapper
- Watching PostgreSQL
- Watching MySQL / MariaDB
- Watching Redis
- Watching Memcached
- Watching RabbitMQ
- Watching Kafka
- Watching MongoDB
- Watching an HTTP endpoint
- Watching Celery workers
- SQLAlchemy engine probe
- All built-in probes
- Configuration reference
- Kubernetes integration
- License
Installation
Install only the extras you actually use. Nothing is pulled in by default beyond FastAPI and Pydantic.
zsh users: wrap the package name in quotes to prevent the shell from interpreting
[and]as glob patterns.
# Core package — includes the always-passing NoOpProbe, no other deps
pip install fastapi-watch
# Add individual service probes as needed
pip install "fastapi-watch[postgres]" # PostgreSQL (asyncpg)
pip install "fastapi-watch[mysql]" # MySQL / MariaDB (aiomysql)
pip install "fastapi-watch[sqlalchemy]" # Any SQLAlchemy 2.x async engine
pip install "fastapi-watch[redis]" # Redis (redis)
pip install "fastapi-watch[memcached]" # Memcached (aiomcache)
pip install "fastapi-watch[rabbitmq]" # RabbitMQ (aio-pika + aiohttp)
pip install "fastapi-watch[kafka]" # Kafka (aiokafka)
pip install "fastapi-watch[mongo]" # MongoDB (motor)
pip install "fastapi-watch[celery]" # Celery workers (celery)
# Or pull everything in one shot
pip install "fastapi-watch[all]"
Multiple extras can be combined:
pip install "fastapi-watch[postgres,redis,rabbitmq]"
Quick start
Create a HealthRegistry, attach it to your FastAPI app, and call .add() for each service you want to monitor. The registry mounts all health endpoints automatically.
import logging
from fastapi import FastAPI
from fastapi_watch import HealthRegistry
from fastapi_watch.probes import PostgreSQLProbe, RedisProbe
app = FastAPI()
registry = HealthRegistry(
app,
poll_interval_ms=60_000, # re-run probes every 60 s while streaming
logger=logging.getLogger(__name__), # optional — omit to silence all logging
grace_period_ms=10_000, # hold /ready for 10 s while the app warms up
history_size=20, # keep the last 20 results per probe (default: 120)
)
registry.add(PostgreSQLProbe(url="postgresql://user:pass@localhost/mydb"))
registry.add(RedisProbe(url="redis://localhost:6379"), critical=False)
That's it. The following routes are now live:
GET /health/live → always 200
GET /health/ready → 200 (healthy/degraded) / 503 (unhealthy)
GET /health/status → 200 / 207
GET /health/history → rolling probe history (TTL: 2 hours)
GET /health/alerts → probe state-change alert log (TTL: 72 hours)
GET /health/metrics → Prometheus text format
GET /health/startup → 200 after set_started(); 503 before
GET /health/dashboard → HTML dashboard with live SSE updates
GET /health/ready/stream → SSE stream
GET /health/status/stream → SSE stream
Quick start with ProbeGroup
For larger applications, define probes in each feature module and collect them all in main.py via ProbeGroup:
# features/database/probes.py
from fastapi_watch import ProbeGroup
from fastapi_watch.probes import PostgreSQLProbe, RedisProbe
router = ProbeGroup()
router.add(PostgreSQLProbe(url="postgresql://..."))
router.add(RedisProbe(url="redis://..."), critical=False)
# features/users/probes.py
from fastapi_watch import ProbeGroup, FastAPIRouteProbe
router = ProbeGroup()
users_probe = FastAPIRouteProbe(name="users-api", max_error_rate=0.05)
router.add(users_probe)
# main.py
from fastapi import FastAPI
from fastapi_watch import HealthRegistry
from features.database.probes import router as db_router
from features.users.probes import router as users_router, users_probe
app = FastAPI()
registry = HealthRegistry(app, groups=[db_router, users_router])
@app.get("/users")
@users_probe.watch
async def list_users():
...
Endpoints
| Endpoint | Purpose | Healthy | Degraded | Unhealthy |
|---|---|---|---|---|
GET /health/live |
Liveness — is the process alive? | 200 OK |
200 OK |
200 OK |
GET /health/ready |
Readiness — are all critical probes passing or degraded? | 200 OK |
200 OK |
503 Service Unavailable |
GET /health/status |
Status — full detail on every probe | 200 OK |
207 Multi-Status |
207 Multi-Status |
GET /health/history |
History — last N results per probe (within TTL window) | 200 OK |
200 OK |
200 OK |
GET /health/alerts |
Alerts — probe state-change log (within alert TTL window) | 200 OK |
200 OK |
200 OK |
GET /health/metrics |
Prometheus metrics — text format 0.0.4 | 200 OK |
200 OK |
200 OK |
GET /health/startup |
Startup — has set_started() been called and do startup probes pass? |
200 OK |
— | 503 Service Unavailable |
GET /health/dashboard |
Dashboard — live HTML page | 200 OK |
200 OK (amber banner) |
200 OK (red header) |
GET /health/ready/stream |
Readiness stream — SSE; polls while connected | 200 OK |
stream of events | stream of events |
GET /health/status/stream |
Status stream — SSE; polls while connected | 200 OK |
stream of events | stream of events |
The prefix defaults to /health and can be changed at construction time:
registry = HealthRegistry(app, prefix="/ops/health")
# → /ops/health/live
# → /ops/health/ready
# → /ops/health/status
# → /ops/health/history
# → /ops/health/alerts
# → /ops/health/metrics
# → /ops/health/startup
# → /ops/health/dashboard
# → /ops/health/ready/stream
# → /ops/health/status/stream
Health Dashboard
GET /health/dashboard returns a server-rendered HTML page that shows all probe results in a card grid and updates live over SSE. No extra Python dependencies are required — the page is generated inline and all CSS and JavaScript are embedded.
The dashboard is registered by default. Disable it with dashboard=False if you don't want to expose a human-readable view:
registry = HealthRegistry(app) # dashboard on — GET /health/dashboard
registry = HealthRegistry(app, dashboard=False) # dashboard off
What the dashboard shows
Header bar — the overall status is displayed prominently at the top. The bar is green when all critical probes are healthy, amber when any critical probe is degraded, and red when any critical probe is unhealthy. A pulsing animation signals active degradation or failure. The "Last checked" timestamp and timezone are shown in the top right alongside a live connection indicator. When maintenance mode is active, an amber banner appears below the header.
Probe cards — one card per registered probe, arranged in a responsive grid. Each card contains:
- A colored left border (green = healthy, amber = degraded, red = unhealthy) that updates live
- The probe name and an
optionalbadge for non-critical probes - The probe's average latency in milliseconds
- A status pill (
Healthy/Degraded/Unhealthy) - The error message, if the probe is failing
- A details table with all service-specific metadata — connection counts, memory usage, error rates, latency percentiles, and so on
Footer links — quick links to the raw JSON endpoints (/health/status, /health/history, /health/ready).
Live updates
When the page loads, a small embedded JavaScript snippet opens an EventSource connection to /health/status/stream. Each SSE event surgically updates the DOM — colors, text, and detail table rows — without a full page reload or any visible flash. The live indicator in the header glows green when connected and goes grey on disconnect.
No external JavaScript frameworks or CDN resources are used. The page is fully self-contained.
Probe management
Adding probes
Add probes one at a time with add(), or pass a list with add_probes(). Both methods return self for chaining. Adding the same instance twice is a no-op.
# Single probe
registry.add(probe_a)
# Multiple probes in one call
registry.add_probes([probe_a, probe_b, probe_c])
# Chained
registry.add(probe_a).add(probe_b).add(probe_c)
# Duplicate ignored — probe_a is only registered once
registry.add(probe_a)
registry.add(probe_a)
Probes run concurrently on every check — a slow or failing probe never delays the others.
Critical vs non-critical probes
By default every probe is critical — a failing critical probe sets the overall status to "unhealthy" and causes /health/ready to return 503.
Mark a probe as non-critical when its failure should be visible in reports but shouldn't block traffic:
# Database is essential; fail readiness if it's unreachable
registry.add(PostgreSQLProbe(url="postgresql://..."), critical=True)
# Cache is nice-to-have; don't fail readiness if it's down
registry.add(RedisProbe(url="redis://localhost"), critical=False)
Non-critical probes always appear in /health/status with their real result and a "critical": false field. They simply don't affect the overall status or /ready.
add_probes() accepts the same flag, applied to every probe in the list:
registry.add_probes([probe_a, probe_b], critical=False)
Per-probe timeout
Set a timeout (in seconds) on any probe class or instance. If the check doesn't complete within that time, the probe is recorded as unhealthy — all other probes are unaffected and still run concurrently.
On the class:
class MyServiceProbe(BaseProbe):
name = "my-service"
timeout = 5.0 # fail after 5 seconds
async def check(self) -> ProbeResult:
...
On an instance:
probe = MyServiceProbe()
probe.timeout = 2.0
registry.add(probe)
timeout = None (the default) means no limit. Timed-out probes produce an unhealthy result with error: "TimeoutError: ".
ProbeGroup — organizing probes across files
As an application grows, defining every probe in main.py becomes unwieldy. ProbeGroup mirrors the pattern FastAPI uses for APIRouter: declare probes in the modules that own them, and include all of them in the registry at startup.
Basic usage
# features/database/probes.py
from fastapi_watch import ProbeGroup
from fastapi_watch.probes import PostgreSQLProbe, RedisProbe
router = ProbeGroup()
router.add(PostgreSQLProbe(url="postgresql://user:pass@db.internal/app"))
router.add(RedisProbe(url="redis://cache.internal:6379"), critical=False)
# features/payments/probes.py
from fastapi_watch import ProbeGroup
from fastapi_watch.probes import HttpProbe
router = ProbeGroup()
router.add(HttpProbe(url="https://api.stripe.com/v1/health", name="stripe"))
# main.py
from fastapi import FastAPI
from fastapi_watch import HealthRegistry
from features.database.probes import router as db_router
from features.payments.probes import router as payments_router
app = FastAPI()
registry = HealthRegistry(app, groups=[db_router, payments_router])
The groups= parameter accepts a list of ProbeGroup instances. All probes from every group are registered in the order they were added, preserving each probe's critical setting.
If you need to add individual probes alongside groups, include( is also available after construction and returns self for chaining:
registry = HealthRegistry(app)
registry.include(db_router).include(payments_router).add(some_extra_probe)
Composing groups
Groups can include other groups, letting you build a single top-level aggregator that collects probes from every submodule:
# probes/__init__.py
from fastapi_watch import ProbeGroup
from .database import router as db_router
from .payments import router as payments_router
from .messaging import router as messaging_router
router = ProbeGroup()
router.include(db_router)
router.include(payments_router)
router.include(messaging_router)
# main.py — one import, one line
from fastapi_watch import HealthRegistry
from probes import router
registry = HealthRegistry(app, groups=[router])
ProbeGroup API
ProbeGroup exposes the same fluent interface as HealthRegistry. All methods return self for chaining. Duplicate probe instances (same object, identity check) are silently skipped.
router = ProbeGroup()
router.add(probe) # single probe, critical by default
router.add(probe, critical=False) # mark as non-critical
router.add_probes([probe_a, probe_b]) # multiple probes, same criticality
router.add_probes([probe_c], critical=False)
router.include(another_router) # merge another group's probes
Live streaming
The two streaming endpoints (/health/ready/stream, /health/status/stream) use Server-Sent Events (SSE) to push probe results to connected clients.
The poll loop is demand-driven — it starts when the first SSE client connects and stops automatically when the last one disconnects. No background work is done when nobody is watching.
Each event is a JSON-encoded health report on a data: line:
data: {"status": "healthy", "checked_at": "2024-06-01T12:00:00.123456+00:00", "probes": [...]}
data: {"status": "unhealthy", "checked_at": "2024-06-01T12:00:05.456789+00:00", "probes": [...]}
Connecting from JavaScript
const es = new EventSource('/health/status/stream');
es.onmessage = (event) => {
const report = JSON.parse(event.data);
console.log(report.status, report.probes);
};
es.onerror = () => es.close();
Connecting with curl
curl -N http://localhost:8000/health/status/stream
Configuring the poll interval
# Default — poll every 60 seconds while a client is connected
registry = HealthRegistry(app)
# Custom interval
registry = HealthRegistry(app, poll_interval_ms=10_000) # every 10 s
# Minimum enforced interval is 1000 ms; lower values are clamped
registry = HealthRegistry(app, poll_interval_ms=500) # → 1000 ms
# Disable polling — streaming endpoints emit one result then close
registry = HealthRegistry(app, poll_interval_ms=None)
The interval can also be changed at any point after startup:
registry.set_poll_interval(30_000) # switch to every 30 s
registry.set_poll_interval(0) # disable — single-fetch mode
Polling and caching
The regular GET /health/ready and GET /health/status endpoints always respond immediately:
- When SSE clients are connected — the poll loop is running, so these endpoints serve the most recent cached probe results without re-running any probes.
- When no streaming is active — probes are run on demand. A built-in lock prevents a thundering herd if multiple requests arrive simultaneously before the first result is cached.
This means your GET endpoints are fast under all conditions, regardless of whether anyone is streaming.
Per-probe poll frequency
By default every probe uses the registry's poll_interval_ms. Set poll_interval_ms on any active probe to override this for that probe only. Probes with their own interval run on their own schedule — a slow probe doesn't delay fast ones.
Passive probes (HttpProbe, SMTPProbe, RedisProbe, PostgreSQLProbe, MySQLProbe, SqlAlchemyProbe, MongoProbe, FastAPIRouteProbe, FastAPIWebSocketProbe) do not accept poll_interval_ms — their check() only reads in-memory stats so they always use the registry default, which is fine since no external calls are made.
registry = HealthRegistry(app, poll_interval_ms=60_000) # global: every 60 s
registry.add(TCPProbe("db.internal", 5432, poll_interval_ms=30_000)) # every 30 s
registry.add(KafkaProbe("broker:9092", poll_interval_ms=10_000)) # every 10 s
registry.add(NoOpProbe()) # uses global: 60 s
The minimum enforced interval is 1000 ms — lower values are clamped. Pass poll_interval_ms=None on the probe to explicitly use the registry default. Probes without a custom interval are always in sync with the registry-level setting.
Active probes expose poll_interval_ms as a constructor argument. Custom probes inherit the attribute from BaseProbe:
class MyServiceProbe(BaseProbe):
name = "my-service"
poll_interval_ms = 5_000 # class-level default
async def check(self) -> ProbeResult:
...
Three-state health (DEGRADED)
Every probe result and the overall health report can be in one of three states:
| State | Meaning | /health/ready |
/health/status |
|---|---|---|---|
"healthy" |
All clear | 200 OK |
200 OK |
"degraded" |
Under stress — still serving traffic | 200 OK |
207 Multi-Status |
"unhealthy" |
Critical failure — stop routing traffic | 503 Service Unavailable |
207 Multi-Status |
DEGRADED is a first-class signal. It lets probes communicate "something is wrong but the service is still responding" without triggering an emergency. Load balancers keep routing traffic (200), the dashboard shows an amber card, and Prometheus scrapes surface a probe_degraded gauge.
Built-in probes that emit DEGRADED: EventLoopProbe, ThresholdProbe. Custom probes can return it at any time:
from fastapi_watch.models import ProbeResult, ProbeStatus
return ProbeResult(
name=self.name,
status=ProbeStatus.DEGRADED,
details={"queue_depth": 950, "threshold": 800},
)
Overall status rules (critical probes only):
- Any UNHEALTHY critical probe → overall
"unhealthy" - Any DEGRADED critical probe (no UNHEALTHY) → overall
"degraded" - All healthy → overall
"healthy" - Non-critical probes never affect the overall status.
Circuit breaker interaction: DEGRADED counts as a passing result for the circuit breaker (is_passing = True). A probe oscillating between healthy and degraded never trips its own circuit.
ProbeResult properties:
| Property | Returns True when |
|---|---|
is_healthy |
status == "healthy" (strict) |
is_degraded |
status == "degraded" |
is_passing |
status != "unhealthy" (healthy or degraded) |
Circuit breaker
The circuit breaker prevents a broken dependency from being called repeatedly when it is clearly failing. After a probe fails a configurable number of consecutive times, the circuit opens and the probe is suspended — it returns the last known result with a "circuit_breaker_open": true flag in details until the cooldown period expires.
# Defaults: opens after 5 consecutive failures, stays open 10 minutes
registry = HealthRegistry(app)
# Custom threshold and cooldown
registry = HealthRegistry(
app,
circuit_breaker_threshold=3, # open after 3 consecutive failures
circuit_breaker_cooldown_ms=120_000, # try again after 2 minutes
)
# Disable entirely — probes always run regardless of failure history
registry = HealthRegistry(app, circuit_breaker=False)
Per-probe overrides let you tune the behaviour for individual dependencies without changing the global defaults:
from fastapi_watch.probes import PostgreSQLProbe
probe = PostgreSQLProbe(url="postgresql://...")
probe.circuit_breaker_threshold = 2 # stricter — open after 2 failures
probe.circuit_breaker_cooldown_ms = 60_000 # shorter cooldown — retry after 1 minute
registry.add(probe)
All other fields (status, error, latency_ms) reflect the last result before the circuit opened. Once the cooldown expires the probe runs again — if it succeeds, the circuit closes and the error counter resets.
Circuit breaker metrics
When the circuit breaker is enabled (the default), a circuit_breaker dict is injected into every probe result's details on every check — whether the circuit is open or closed. This gives you live visibility into failure accumulation before a trip occurs.
{
"circuit_breaker": {
"open": false,
"consecutive_failures": 3,
"trips_total": 1
}
}
| Field | Description |
|---|---|
open |
true when the circuit is open and the probe is suspended |
consecutive_failures |
Unbroken run of failures since the last success; resets to 0 on any passing result |
trips_total |
Lifetime count of times this probe's circuit has tripped |
When the circuit is open, the probe is not called — the dict reflects the state at the time the circuit tripped:
{
"circuit_breaker": {
"open": true,
"consecutive_failures": 5,
"trips_total": 2
}
}
These fields are also exported via /health/metrics as Prometheus gauges (probe_circuit_open, probe_circuit_consecutive_failures) and a counter (probe_circuit_trips_total).
Disable circuit breaker metrics injection with circuit_breaker=False:
registry = HealthRegistry(app, circuit_breaker=False)
# → no "circuit_breaker" key in any probe result's details
Maintenance mode
Signal to the health system that your application is undergoing planned maintenance. While active:
/health/readyreturns200 {"status": "maintenance"}regardless of probe results — load balancers keep routing traffic and alerts stay quiet.- State-change webhooks are suppressed — probe flaps during maintenance don't trigger pages.
- The dashboard shows an amber maintenance banner.
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
# Maintenance until a specific time
registry.set_maintenance(until=datetime.now(ZoneInfo("UTC")) + timedelta(hours=2))
# Clear maintenance early
registry.clear_maintenance()
Both set_maintenance() and clear_maintenance() return self for chaining.
The until datetime must be timezone-aware. Once the until time passes, maintenance mode deactivates automatically — no explicit clear_maintenance() call is needed.
# Check programmatically
if registry._in_maintenance():
...
Prometheus metrics
GET /health/metrics returns a Prometheus text format 0.0.4 export of every probe's current state. Scrape it from your Prometheus instance like any other target — no extra dependencies are required.
# HELP probe_healthy 1 if the probe is healthy, 0 otherwise
# TYPE probe_healthy gauge
probe_healthy{name="postgresql",critical="true"} 1
probe_healthy{name="redis",critical="false"} 0
# HELP probe_degraded 1 if the probe is degraded, 0 otherwise
# TYPE probe_degraded gauge
probe_degraded{name="postgresql",critical="true"} 0
# HELP probe_latency_ms Last probe latency in milliseconds
# TYPE probe_latency_ms gauge
probe_latency_ms{name="postgresql",critical="true"} 1.83
# HELP probe_circuit_open 1 if the circuit breaker is open
# TYPE probe_circuit_open gauge
probe_circuit_open{name="redis",critical="false"} 1
# HELP probe_circuit_consecutive_failures Consecutive failure count
# TYPE probe_circuit_consecutive_failures gauge
probe_circuit_consecutive_failures{name="redis",critical="false"} 5
# HELP probe_circuit_trips_total Total circuit breaker trips
# TYPE probe_circuit_trips_total counter
probe_circuit_trips_total{name="redis",critical="false"} 2
The endpoint always returns 200 OK with Content-Type: text/plain; version=0.0.4; charset=utf-8.
Prometheus scrape config:
scrape_configs:
- job_name: myapp_health
static_configs:
- targets: ["localhost:8000"]
metrics_path: /health/metrics
App-wide request metrics
RequestMetricsMiddleware wraps your entire FastAPI app at the ASGI layer and records aggregate request statistics across all routes — no decorator needed on individual handlers. Pair it with RequestMetricsProbe to surface those statistics as a health probe.
from fastapi import FastAPI
from fastapi_watch import HealthRegistry, RequestMetricsMiddleware, RequestMetricsProbe
app = FastAPI()
# ... define your routes ...
registry = HealthRegistry(app, poll_interval_ms=None)
# Create the middleware manually so the probe shares the same instance
middleware = RequestMetricsMiddleware(app, per_route=True)
probe = RequestMetricsProbe(
middleware,
max_error_rate=0.05, # fail if >5% of requests error
max_avg_rtt_ms=500, # fail if average RTT exceeds 500 ms
)
registry.add(probe)
# Use the middleware as the ASGI app so TestClient / uvicorn see the same instance
# In production: uvicorn myapp:middleware
Important: Create the middleware yourself rather than using
app.add_middleware().add_middleware()creates a new internal instance that the probe cannot see. Pass the samemiddlewareobject to bothRequestMetricsProbeand your ASGI server.
Per-route breakdown
Set per_route=True (the default) to track stats broken down by route template. Path parameters are normalized so /users/1 and /users/2 both count under /users/{user_id}:
{
"request_count": 1500,
"error_count": 12,
"error_rate": 0.008,
"avg_rtt_ms": 45.2,
"consecutive_errors": 0,
"routes": {
"GET /users": { "request_count": 900, "error_count": 2, "error_rate": 0.0022, "avg_rtt_ms": 38.1 },
"GET /users/{user_id}": { "request_count": 550, "error_count": 8, "error_rate": 0.0145, "avg_rtt_ms": 61.4 },
"POST /users": { "request_count": 50, "error_count": 2, "error_rate": 0.04, "avg_rtt_ms": 120.7 }
}
}
Set per_route=False to collect only the aggregate totals (lower memory overhead for apps with many routes).
Constructor arguments — RequestMetricsMiddleware:
| Argument | Default | Description |
|---|---|---|
app |
required | The FastAPI (or any ASGI) app to wrap |
per_route |
True |
Track per-route-template breakdown in addition to aggregate stats |
window_size |
200 |
Sliding window size for RTT tracking |
ema_alpha |
0.1 |
EMA smoothing factor for avg_rtt_ms |
Constructor arguments — RequestMetricsProbe:
| Argument | Default | Description |
|---|---|---|
middleware |
required | The RequestMetricsMiddleware instance to read from |
name |
"request_metrics" |
Probe label |
max_error_rate |
0.1 |
Error-rate threshold (0–1) above which the probe is UNHEALTHY |
max_avg_rtt_ms |
None |
Average-RTT threshold in milliseconds. None disables the threshold |
poll_interval_ms |
None |
Per-probe poll interval; None uses the registry default |
Webhook on state change
Pass one or more alerters to HealthRegistry to receive notifications when a probe's status changes. Alerts are fire-and-forget — they never block health-check execution and failures are logged silently.
from fastapi_watch.alerts import SlackAlerter, TeamsAlerter, PagerDutyAlerter, WebhookAlerter
registry = HealthRegistry(
app,
alerters=[
SlackAlerter(
webhook_url="https://hooks.slack.com/services/T.../B.../...",
channel="#ops-alerts",
),
TeamsAlerter(webhook_url="https://outlook.office.com/webhook/..."),
PagerDutyAlerter(routing_key="your-32-char-routing-key"),
],
)
| Alerter | Description |
|---|---|
WebhookAlerter(url, headers) |
Generic JSON POST to any HTTP endpoint |
SlackAlerter(webhook_url, channel, username) |
Slack Incoming Webhook with color-coded attachments |
TeamsAlerter(webhook_url) |
Microsoft Teams MessageCard |
PagerDutyAlerter(routing_key, source) |
PagerDuty Events API v2; auto-resolves on HEALTHY |
webhook_url on HealthRegistry is still accepted for backwards compatibility and wraps into a WebhookAlerter internally.
Custom alerter:
from fastapi_watch.alerts import BaseAlerter
from fastapi_watch.models import AlertRecord
class SMSAlerter(BaseAlerter):
async def notify(self, alert: AlertRecord) -> None:
message = f"[health] {alert.probe}: {alert.old_status.value} → {alert.new_status.value}"
# send via your SMS provider ...
Authentication
Protect all health endpoints with optional authentication. The auth parameter accepts three forms.
No authentication (default)
registry = HealthRegistry(app) # all endpoints are open
HTTP Basic auth
registry = HealthRegistry(
app,
auth={"type": "basic", "username": "admin", "password": "s3cr3t"},
)
Requests without a valid Authorization: Basic ... header receive 401 Unauthorized with a WWW-Authenticate challenge.
API key header
registry = HealthRegistry(
app,
auth={"type": "apikey", "key": "my-secret-key"},
)
By default the key is read from the X-API-Key header. Use "header" to specify a different header name:
auth={"type": "apikey", "key": "my-secret-key", "header": "Authorization"}
Requests without the correct key receive 403 Forbidden.
Custom callable
For anything more complex — JWT validation, IP allowlists, multi-factor logic — pass a callable that returns True to allow or False to reject:
from fastapi import Request
def my_auth(request: Request) -> bool:
token = request.headers.get("X-Internal-Token", "")
return token == "expected-value"
registry = HealthRegistry(app, auth=my_auth)
Async callables are also supported:
async def my_auth(request: Request) -> bool:
return await verify_token(request.headers.get("Authorization", ""))
registry = HealthRegistry(app, auth=my_auth)
Returning False results in a 403 Forbidden response.
Startup probe
GET /health/startup returns 503 until set_started() is called. Use it as a Kubernetes startupProbe target to hold traffic away while the application initialises.
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi_watch import HealthRegistry
@asynccontextmanager
async def lifespan(app: FastAPI):
registry.set_started() # signal that startup is complete
yield
app = FastAPI(lifespan=lifespan)
registry = HealthRegistry(app)
Before set_started() is called:
{"status": "starting"} → HTTP 503
After set_started() is called:
{"status": "started"} → HTTP 200
Startup probes
Pass startup_probes to run additional checks as part of the startup gate. The /health/startup endpoint stays at 503 until both set_started() has been called and every startup probe passes.
from fastapi_watch.probes import PostgreSQLProbe
db_probe = PostgreSQLProbe(url="postgresql://...")
registry = HealthRegistry(
app,
startup_probes=[db_probe],
)
Startup probes are separate from the main probe registry — they do not appear in /health/status and are not subject to the circuit breaker. They are evaluated on every /health/startup request.
# Kubernetes — hold traffic until app is fully started and DB is reachable
startupProbe:
httpGet:
path: /health/startup
port: 8000
failureThreshold: 30
periodSeconds: 5
State-change callbacks
React to probe status transitions in real time. Register one or more callbacks with on_state_change(); each receives the probe name, old status, and new status whenever a probe's result changes.
import logging
logger = logging.getLogger(__name__)
def on_change(probe_name: str, old_status, new_status):
logger.warning("Probe %s changed: %s → %s", probe_name, old_status, new_status)
registry.on_state_change(on_change)
Async callbacks are also supported:
async def alert(probe_name, old_status, new_status):
await send_slack_alert(f"{probe_name} is now {new_status}")
registry.on_state_change(alert)
Key behaviours:
- Callbacks fire after every
run_all()for each probe whose status differs from the previous run. - The first run seeds the initial state — no callbacks are fired until a subsequent run sees a different result.
- Multiple callbacks can be registered; all are called in registration order.
on_state_change()returnsselffor chaining.
Startup grace period
Pass grace_period_ms to hold /health/ready in a 503 {"status": "starting"} state for a fixed window after the registry is created. This prevents a load balancer from routing traffic before the application has had time to warm up — without requiring all probes to pass immediately on boot.
registry = HealthRegistry(
app,
grace_period_ms=15_000, # hold readiness for 15 s after startup
)
/health/readyreturns503 {"status": "starting"}while the grace period is active./health/statusand/health/liveare not affected — they always reflect real probe results.- After the grace period expires,
/readyresumes normal probe-based behaviour. grace_period_ms=0(default) disables the grace period entirely.
Pair with Kubernetes' initialDelaySeconds for belt-and-suspenders protection during slow startup:
readinessProbe:
httpGet:
path: /health/ready
port: 8000
initialDelaySeconds: 5 # k8s waits 5 s before its first check
periodSeconds: 10
# App-side grace covers the remaining warmup window
registry = HealthRegistry(app, grace_period_ms=20_000)
Probe result history
Every probe result is stored in a rolling per-probe history. Use GET /health/history to inspect past runs — useful for debugging flapping probes or tracking latency over time.
registry = HealthRegistry(
app,
history_size=20, # max results kept per probe (default: 120)
result_ttl_seconds=3600, # drop results older than 1 hour (default: 7200 = 2 hours)
)
Results older than result_ttl_seconds are excluded from /health/history responses. When the per-probe history_size cap is reached, the oldest entry is dropped regardless of TTL.
GET /health/history — response format:
{
"probes": {
"postgresql": [
{
"name": "postgresql",
"status": "healthy",
"critical": true,
"latency_ms": 1.8,
"error": null,
"details": { "version": "PostgreSQL 16.2 ...", "active_connections": 5 }
},
{
"name": "postgresql",
"status": "healthy",
"critical": true,
"latency_ms": 2.1,
"error": null,
"details": { "version": "PostgreSQL 16.2 ...", "active_connections": 6 }
}
],
"redis": [
{
"name": "redis",
"status": "unhealthy",
"critical": false,
"latency_ms": 5002.0,
"error": "Connection refused",
"details": null
},
{
"name": "redis",
"status": "healthy",
"critical": false,
"latency_ms": 0.9,
"error": null,
"details": { "version": "7.2.4", "total_keys": 312 }
}
]
}
}
Results are ordered oldest-first. History is in-memory by default and resets on process restart. See Custom storage backend to persist across restarts.
Alert history
Every probe state change is recorded as an alert. Use GET /health/alerts to retrieve them — useful for auditing when and how often services flapped.
registry = HealthRegistry(
app,
alert_ttl_seconds=86400, # keep alerts for 24 hours (default: 259200 = 72 hours)
max_alerts=500, # hard cap on stored alerts (default: 120)
)
Alerts are retained for up to alert_ttl_seconds. When max_alerts is reached the oldest alert is dropped immediately regardless of TTL. Alerts are recorded for every state transition including maintenance-suppressed ones.
GET /health/alerts — response format:
{
"alerts": [
{
"probe": "redis",
"old_status": "healthy",
"new_status": "unhealthy",
"timestamp": "2026-03-29T14:22:01.843+00:00"
},
{
"probe": "redis",
"old_status": "unhealthy",
"new_status": "healthy",
"timestamp": "2026-03-29T14:25:17.112+00:00"
}
]
}
Alerts are ordered oldest-first.
Custom storage backend
By default probe results and alerts are held in memory (InMemoryProbeStorage). Pass a custom storage to persist across restarts or share state across multiple instances.
from fastapi_watch import HealthRegistry, ProbeStorage
class MyRedisStorage:
"""Minimal example — see ProbeStorage docstring for a full Redis sketch."""
async def get_latest(self, name): ...
async def get_all_latest(self): ...
async def set_latest(self, result): ...
def clear_latest(self): ...
async def append_history(self, result): ...
async def get_history(self): ...
async def append_alert(self, alert): ...
async def get_alerts(self): ...
registry = HealthRegistry(app, storage=MyRedisStorage())
Any class that implements all eight methods satisfies the ProbeStorage protocol — no inheritance required. The ProbeStorage docstring in storage.py contains a complete annotated Redis implementation sketch.
When storage is supplied, result_ttl_seconds, alert_ttl_seconds, max_alerts, and history_size are not passed to the custom backend — configure those limits inside your own implementation.
Response format
Every response from /health/ready, /health/status, and the SSE streams shares the same shape.
Health report
| Field | Type | Description |
|---|---|---|
status |
"healthy" | "degraded" | "unhealthy" |
Overall result — determined by critical probes only |
checked_at |
string | null |
UTC ISO 8601 timestamp of the last probe run; null before the first run |
timezone |
string | null |
IANA timezone name used for checked_at |
probes |
array |
Individual probe results (see below) |
Probe result
| Field | Type | Description |
|---|---|---|
name |
string |
Probe identifier |
status |
"healthy" | "degraded" | "unhealthy" |
State for this probe |
critical |
boolean |
true if the probe affects overall status and readiness |
latency_ms |
number |
How long the check took in milliseconds |
error |
string | null |
Error message; only present on failure |
details |
object | null |
Service-specific metadata (see each probe's section) |
Example: all healthy — 200
{
"status": "healthy",
"checked_at": "2024-06-01T12:00:00.123456+00:00",
"timezone": "UTC",
"probes": [
{
"name": "postgresql",
"status": "healthy",
"critical": true,
"latency_ms": 1.8,
"error": null,
"details": {
"version": "PostgreSQL 16.2 on aarch64-unknown-linux-gnu",
"active_connections": 5,
"max_connections": 100,
"database_size": "42 MB"
}
},
{
"name": "redis",
"status": "healthy",
"critical": false,
"latency_ms": 0.6,
"error": null,
"details": {
"version": "7.2.4",
"uptime_seconds": 86400,
"used_memory_human": "2.50M",
"connected_clients": 8,
"total_keys": 312
}
}
]
}
Example: one critical probe failing — 503 on /ready, 207 on /status
{
"status": "unhealthy",
"checked_at": "2024-06-01T12:00:05.456789+00:00",
"timezone": "UTC",
"probes": [
{
"name": "postgresql",
"status": "unhealthy",
"critical": true,
"latency_ms": 5002.1,
"error": "Connection refused",
"details": null
},
{
"name": "redis",
"status": "healthy",
"critical": false,
"latency_ms": 0.6,
"error": null,
"details": { "version": "7.2.4" }
}
]
}
Example: non-critical probe failing — still 200 on /ready
{
"status": "healthy",
"checked_at": "2024-06-01T12:00:10.000000+00:00",
"timezone": "UTC",
"probes": [
{
"name": "postgresql",
"status": "healthy",
"critical": true,
"latency_ms": 1.9,
"error": null,
"details": { "active_connections": 5 }
},
{
"name": "redis",
"status": "unhealthy",
"critical": false,
"latency_ms": 5001.3,
"error": "Connection timed out",
"details": null
}
]
}
Writing a custom probe
Any class that extends BaseProbe and implements check() works as a probe. This is the right approach for internal services, third-party SDKs, business-logic checks, or composite conditions.
Minimal probe
from fastapi_watch.probes import BaseProbe
from fastapi_watch.models import ProbeResult, ProbeStatus
class MyServiceProbe(BaseProbe):
name = "my-service"
async def check(self) -> ProbeResult:
ok = await call_my_service()
return ProbeResult(
name=self.name,
status=ProbeStatus.HEALTHY if ok else ProbeStatus.UNHEALTHY,
)
registry.add(MyServiceProbe())
check() must be an async method and must return a ProbeResult. Any unhandled exception raised by check() is caught by the registry, automatically recorded as an unhealthy result, and optionally logged — your probe never needs to worry about crashing the health system.
Recording latency and details
Use time.perf_counter() to measure the check duration and populate latency_ms. The details dict accepts any JSON-serializable data.
import time
from fastapi_watch.probes import BaseProbe
from fastapi_watch.models import ProbeResult, ProbeStatus
class PaymentGatewayProbe(BaseProbe):
name = "payment-gateway"
async def check(self) -> ProbeResult:
start = time.perf_counter()
try:
info = await ping_payment_gateway()
latency = (time.perf_counter() - start) * 1000
return ProbeResult(
name=self.name,
status=ProbeStatus.HEALTHY,
latency_ms=round(latency, 2),
details={
"region": info.region,
"provider_version": info.version,
"response_ms": round(latency, 2),
},
)
except Exception as exc:
latency = (time.perf_counter() - start) * 1000
return ProbeResult(
name=self.name,
status=ProbeStatus.UNHEALTHY,
latency_ms=round(latency, 2),
error=str(exc),
)
Configurable probe
Pass configuration through __init__ so the same probe class can be reused with different settings.
class S3BucketProbe(BaseProbe):
def __init__(self, bucket: str, region: str = "us-east-1", name: str = "s3") -> None:
self.bucket = bucket
self.region = region
self.name = name
async def check(self) -> ProbeResult:
import time
import aiobotocore.session
start = time.perf_counter()
try:
session = aiobotocore.session.get_session()
async with session.create_client("s3", region_name=self.region) as client:
await client.head_bucket(Bucket=self.bucket)
latency = (time.perf_counter() - start) * 1000
return ProbeResult(
name=self.name,
status=ProbeStatus.HEALTHY,
latency_ms=round(latency, 2),
details={"bucket": self.bucket, "region": self.region},
)
except Exception as exc:
latency = (time.perf_counter() - start) * 1000
return ProbeResult(
name=self.name,
status=ProbeStatus.UNHEALTHY,
latency_ms=round(latency, 2),
error=str(exc),
)
# Register multiple buckets as separate probes
registry.add(S3BucketProbe(bucket="my-app-uploads", region="eu-west-1", name="s3-uploads"))
registry.add(S3BucketProbe(bucket="my-app-backups", region="us-east-1", name="s3-backups"))
Adding a timeout
Set the timeout attribute (in seconds) on the class or instance. The registry will cancel the check and record it as unhealthy if it runs too long.
class SlowExternalProbe(BaseProbe):
name = "slow-external"
timeout = 3.0 # class-level default
async def check(self) -> ProbeResult:
result = await call_slow_external_api()
return ProbeResult(name=self.name, status=ProbeStatus.HEALTHY)
# Override on a specific instance
probe = SlowExternalProbe()
probe.timeout = 1.5
registry.add(probe)
Composite probe
Wrap multiple inner probes to build custom aggregation logic — for example, reporting unhealthy only when both a primary and replica are down simultaneously.
import asyncio
from fastapi_watch.probes import BaseProbe, RedisProbe
from fastapi_watch.models import ProbeResult, ProbeStatus
class RedisHAProbe(BaseProbe):
name = "redis-ha"
def __init__(self, primary_url: str, replica_url: str) -> None:
self._primary = RedisProbe(url=primary_url, name="primary")
self._replica = RedisProbe(url=replica_url, name="replica")
async def check(self) -> ProbeResult:
primary, replica = await asyncio.gather(
self._primary.check(), self._replica.check()
)
if primary.is_healthy or replica.is_healthy:
return ProbeResult(
name=self.name,
status=ProbeStatus.HEALTHY,
details={
"primary": primary.status.value,
"replica": replica.status.value,
},
)
return ProbeResult(
name=self.name,
status=ProbeStatus.UNHEALTHY,
error=f"both nodes down — primary: {primary.error}, replica: {replica.error}",
)
registry.add(RedisHAProbe(
primary_url="redis://primary.internal:6379",
replica_url="redis://replica.internal:6379",
))
Exception handling
If check() raises an unhandled exception, the registry catches it and returns an unhealthy result automatically — you do not need to wrap your entire probe body in a try/except for this purpose. The auto-generated result looks like:
{
"name": "my-service",
"status": "unhealthy",
"critical": true,
"latency_ms": 0.0,
"error": "RuntimeError: connection pool exhausted",
"details": null
}
If a logger was passed to HealthRegistry, the exception is also logged with full traceback via logger.exception().
You should still catch exceptions yourself inside check() if you want to record partial details, a meaningful latency_ms, or a more specific error message.
Testing a custom probe
Use pytest-asyncio to test check() directly without needing to spin up an HTTP server.
import pytest
from fastapi_watch.models import ProbeStatus
from myapp.probes import MyServiceProbe
@pytest.mark.asyncio
async def test_healthy_when_service_responds():
probe = MyServiceProbe()
result = await probe.check()
assert result.status == ProbeStatus.HEALTHY
assert result.name == "my-service"
@pytest.mark.asyncio
async def test_unhealthy_when_service_raises(monkeypatch):
async def fail():
raise ConnectionError("refused")
monkeypatch.setattr("myapp.probes.call_my_service", fail)
probe = MyServiceProbe()
result = await probe.check()
assert result.status == ProbeStatus.UNHEALTHY
assert "refused" in result.error
You can also run the full registry against a real or mock dependency:
from fastapi import FastAPI
from fastapi_watch import HealthRegistry
@pytest.mark.asyncio
async def test_registry_run_all():
app = FastAPI()
registry = HealthRegistry(app, poll_interval_ms=None)
registry.add(MyServiceProbe())
results = await registry.run_all()
assert results[0].status == ProbeStatus.HEALTHY
Probe implementation checklist
namemust be set — either as a class attribute or in__init__viaself.name.check()must beasyncand return aProbeResult.- Set
latency_msfor probes where response time matters. - Populate
detailswith any data useful for diagnosis. - Set
timeoutif the underlying call can hang indefinitely. - Do not call
registry.run_all()or other registry methods from insidecheck().
Built-in probes
Probe details
Every built-in probe populates the details field with service-specific metadata. Details are always best-effort — if the metadata query fails after a successful connectivity check, details will contain whatever was collected up to that point. The probe status reflects connectivity only, not the completeness of details.
App-wide request metrics probe
See App-wide request metrics above for full documentation. Quick reference:
from fastapi_watch import RequestMetricsMiddleware, RequestMetricsProbe
middleware = RequestMetricsMiddleware(app, per_route=True)
probe = RequestMetricsProbe(middleware, max_error_rate=0.05)
registry.add(probe)
No extra install required. RequestMetricsMiddleware and RequestMetricsProbe are included in the base package.
Watching a FastAPI route
FastAPIRouteProbe is a passive observer — it instruments an existing route handler using the @probe.watch decorator and collects real-traffic metrics on every request. No test requests are made; the probe reports on what your actual users are hitting.
No extra install is required. FastAPIRouteProbe is included in the base package.
from fastapi import FastAPI
from fastapi_watch import HealthRegistry, FastAPIRouteProbe
app = FastAPI()
users_probe = FastAPIRouteProbe(name="users-api")
@app.get("/users")
@users_probe.watch
async def list_users():
return {"users": [...]}
registry = HealthRegistry(app)
registry.add(users_probe)
The @watch decorator wraps the handler function and preserves its signature — FastAPI's dependency injection continues to work exactly as before.
Metrics collected
Every time the decorated handler is called, FastAPIRouteProbe records:
| Metric | Description |
|---|---|
last_rtt_ms |
Handler execution time for the most recent request |
avg_rtt_ms |
Exponential moving average RTT across all requests; also used as the probe's latency_ms |
p95_rtt_ms |
95th-percentile RTT calculated over the last window_size requests |
min_rtt_ms / max_rtt_ms |
All-time RTT bounds |
last_status_code |
HTTP status code of the most recent response |
request_count |
Total requests observed since the probe was created |
error_count |
Requests that returned a 4xx or 5xx status code |
error_rate |
error_count / request_count |
consecutive_errors |
Unbroken run of failing requests; resets to 0 on any success |
requests_per_minute |
Throughput derived from the sliding request timestamp window; null until at least 2 requests have been observed |
HTTPException is caught, its status code recorded, and then it is re-raised so FastAPI's normal exception handling is unaffected. Any other unhandled exception is recorded as a 500 and re-raised.
Health thresholds
FastAPIRouteProbe declares itself UNHEALTHY when either configured threshold is exceeded:
max_error_rate(default0.1) — if more than 10 % of observed requests result in a 4xx/5xx, the probe fails.max_avg_rtt_ms(defaultNone) — if the exponential moving average latency exceeds this value in milliseconds, the probe fails.
# Tighter thresholds for a latency-sensitive endpoint
payments_probe = FastAPIRouteProbe(
name="checkout",
max_error_rate=0.01, # fail if >1% of requests error
max_avg_rtt_ms=200, # fail if average latency exceeds 200 ms
)
@app.post("/checkout")
@payments_probe.watch
async def checkout(body: CheckoutRequest):
...
Example probe result
{
"name": "users-api",
"status": "healthy",
"critical": true,
"latency_ms": 45.23,
"error": null,
"details": {
"request_count": 1042,
"error_count": 3,
"error_rate": 0.0029,
"consecutive_errors": 0,
"last_status_code": 200,
"last_rtt_ms": 38.12,
"avg_rtt_ms": 45.23,
"p95_rtt_ms": 120.10,
"min_rtt_ms": 12.04,
"max_rtt_ms": 843.21,
"requests_per_minute": 142.7
}
}
Before the first request
Until at least one request has been handled, FastAPIRouteProbe.check() returns HEALTHY with a "no requests observed yet" message in details. This prevents a fresh deployment from immediately showing as unhealthy simply because traffic hasn't arrived yet.
Watching sync handlers
@watch supports both async def and def route handlers — the wrapper detects the function type automatically.
Constructor arguments
| Argument | Default | Description |
|---|---|---|
name |
"route" |
Probe label |
max_error_rate |
0.1 |
Error-rate threshold above which the probe is UNHEALTHY (0–1) |
max_avg_rtt_ms |
None |
Average-RTT threshold in milliseconds. None disables this threshold |
window_size |
100 |
Number of recent requests used for percentile and throughput calculations |
ema_alpha |
0.1 |
Smoothing factor for the exponential moving average (0–1). Higher = reacts faster to changes |
timeout |
None |
Passed to the registry for the check() call; not used internally |
Using FastAPIRouteProbe with ProbeGroup
Because FastAPIRouteProbe is both a probe and a decorator, the instance needs to be accessible in both the module that owns the route and the module that owns the group. The most ergonomic pattern is to declare the probe in a probes.py alongside the routes and import it in both places:
# features/users/probes.py
from fastapi_watch import ProbeGroup, FastAPIRouteProbe
router = ProbeGroup()
users_list_probe = FastAPIRouteProbe(name="users-list", max_error_rate=0.05)
users_detail_probe = FastAPIRouteProbe(name="users-detail", max_avg_rtt_ms=150)
router.add(users_list_probe)
router.add(users_detail_probe)
# features/users/routes.py
from fastapi import APIRouter
from .probes import users_list_probe, users_detail_probe
router = APIRouter(prefix="/users")
@router.get("/")
@users_list_probe.watch
async def list_users():
...
@router.get("/{user_id}")
@users_detail_probe.watch
async def get_user(user_id: int):
...
# main.py
from fastapi import FastAPI
from fastapi_watch import HealthRegistry
from features.users.probes import router as users_health_router
from features.users.routes import router as users_api_router
app = FastAPI()
app.include_router(users_api_router)
registry = HealthRegistry(app, groups=[users_health_router])
Watching a WebSocket handler
FastAPIWebSocketProbe is a passive observer — it instruments a WebSocket handler using the @probe.watch decorator and collects real-traffic statistics on every connection without making synthetic connections.
No extra install required. FastAPIWebSocketProbe is included in the base package.
from fastapi import FastAPI, WebSocket
from fastapi_watch import HealthRegistry, FastAPIWebSocketProbe
app = FastAPI()
chat_probe = FastAPIWebSocketProbe(name="chat", max_error_rate=0.05)
@app.websocket("/ws/chat")
@chat_probe.watch
async def chat(websocket: WebSocket):
await websocket.accept()
while True:
msg = await websocket.receive_text()
await websocket.send_text(msg)
registry = HealthRegistry(app)
registry.add(chat_probe)
The @watch decorator injects a transparent proxy around the real WebSocket object. The proxy counts every receive_* and send_* call; all other WebSocket behaviour (accept, close, headers, state, etc.) is forwarded to the underlying socket unchanged.
WebSocketDisconnect is treated as a normal close and is never counted as an error. Any other unhandled exception increments error_count.
Metrics collected
| Metric | Description |
|---|---|
active_connections |
Sockets currently open |
total_connections |
All connections since the probe was created |
messages_received |
Total messages received across all connections |
messages_sent |
Total messages sent across all connections |
error_count |
Connections that closed due to an unhandled exception |
error_rate |
error_count / total_connections |
consecutive_errors |
Unbroken run of error closes; resets to 0 on any clean close |
avg_duration_ms |
Exponential moving average of connection lifetimes |
min_duration_ms / max_duration_ms |
All-time connection duration bounds |
Health thresholds
max_error_rate(default0.1) — UNHEALTHY if more than 10 % of connections close with an error.min_active_connections(default0, disabled) — UNHEALTHY if fewer than N sockets are open at check time. Useful for services that maintain persistent connections (live dashboards, data feeds).
FastAPIWebSocketProbe(
name="feed",
max_error_rate=0.01, # fail if >1% of connections error
min_active_connections=5, # fail if fewer than 5 sockets are open
)
Example probe result
{
"name": "chat",
"status": "healthy",
"critical": true,
"latency_ms": 312.5,
"error": null,
"details": {
"active_connections": 14,
"total_connections": 502,
"messages_received": 18340,
"messages_sent": 18290,
"error_count": 2,
"error_rate": 0.004,
"consecutive_errors": 0,
"avg_duration_ms": 312.5,
"min_duration_ms": 0.8,
"max_duration_ms": 95412.1
}
}
Before the first connection
Until at least one connection has been handled, FastAPIWebSocketProbe.check() returns HEALTHY with a "no connections observed yet" message in details.
Constructor arguments
| Argument | Default | Description |
|---|---|---|
name |
"websocket" |
Probe label |
max_error_rate |
0.1 |
Error-rate threshold (0–1) above which the probe is UNHEALTHY |
min_active_connections |
0 |
Minimum open sockets required. 0 disables the check |
window_size |
100 |
Number of recent connection durations kept for EMA calculations |
ema_alpha |
0.1 |
EMA smoothing factor (0–1). Higher = reacts faster to changes |
timeout |
None |
Passed to the registry for the check() call; not used internally |
Event loop lag
EventLoopProbe measures how long the asyncio event loop was blocked by scheduling a zero-delay coroutine (asyncio.sleep(0)) and timing how long it actually takes to resume. A lag spike means CPU-bound work or slow synchronous calls are blocking the loop.
No extra install required.
from fastapi_watch.probes import EventLoopProbe
registry.add(EventLoopProbe(
name="event_loop", # default
warn_ms=5.0, # DEGRADED if lag >= 5 ms (default)
fail_ms=20.0, # UNHEALTHY if lag >= 20 ms (default)
))
Details returned:
{ "lag_ms": 2.4, "warn_ms": 5.0, "fail_ms": 20.0 }
Constructor arguments:
| Argument | Default | Description |
|---|---|---|
name |
"event_loop" |
Probe label |
warn_ms |
5.0 |
Lag threshold for DEGRADED in milliseconds |
fail_ms |
20.0 |
Lag threshold for UNHEALTHY in milliseconds |
poll_interval_ms |
None |
Uses registry default |
TCP / DNS reachability
TCPProbe resolves a hostname and opens a TCP connection to verify that a host and port are reachable. Both DNS resolution and the TCP connect run in an executor so they never block the event loop. No extra install required — uses only the standard library.
from fastapi_watch.probes import TCPProbe
registry.add(TCPProbe(host="db.internal", port=5432))
registry.add(TCPProbe(host="redis.internal", port=6379, name="redis-tcp", timeout=2.0))
The default probe name is "tcp:{host}:{port}".
Details returned:
{
"host": "db.internal",
"port": 5432,
"resolved_ips": ["10.0.1.5"],
"connect_ms": 1.23
}
Constructor arguments:
| Argument | Default | Description |
|---|---|---|
host |
required | Hostname or IP address |
port |
required | TCP port |
timeout |
5.0 |
Connection timeout in seconds |
name |
"tcp:{host}:{port}" |
Probe label |
poll_interval_ms |
None |
Uses registry default |
SMTP
SMTPProbe passively observes outgoing email calls in your code via the @probe.watch decorator. Rather than repeatedly authenticating against a third-party mail service on a timer (which risks rate limits and security alerts), it instruments the functions that actually send mail and records latency and errors from real sends only.
from fastapi_watch.probes import SMTPProbe
smtp_probe = SMTPProbe(name="sendgrid", max_error_rate=0.05)
@smtp_probe.watch
async def send_welcome_email(to: str) -> None:
async with aiosmtplib.SMTP("smtp.sendgrid.net", port=587) as smtp:
await smtp.login("apikey", os.environ["SENDGRID_API_KEY"])
await smtp.sendmail(FROM, to, message.as_string())
registry.add(smtp_probe)
Every call to send_welcome_email is silently timed. Exceptions are counted as errors and re-raised normally. Works with any SMTP library and both async def and def functions.
Details returned:
{
"call_count": 54,
"error_count": 1,
"error_rate": 0.0185,
"consecutive_errors": 0,
"last_rtt_ms": 312.4,
"avg_rtt_ms": 287.1,
"p95_rtt_ms": 498.2,
"min_rtt_ms": 201.3,
"max_rtt_ms": 612.0
}
Constructor arguments:
| Argument | Default | Description |
|---|---|---|
name |
"smtp" |
Probe label |
max_error_rate |
0.1 |
Error-rate threshold above which the probe is UNHEALTHY (0–1) |
max_avg_rtt_ms |
None |
Average-RTT threshold in milliseconds. None disables it |
window_size |
100 |
Number of recent calls used for percentile calculations |
ema_alpha |
0.1 |
Smoothing factor for the exponential moving average (0–1) |
Threshold wrapper
ThresholdProbe wraps any existing probe and promotes or overrides its result based on values in the probe's details dict. This is the right tool when you want a probe to go DEGRADED or UNHEALTHY based on a metric it already reports — without modifying the probe itself.
No extra install required.
from fastapi_watch.probes import ThresholdProbe, RedisProbe
redis = RedisProbe(url="redis://localhost:6379")
registry.add(ThresholdProbe(
probe=redis,
name="redis-keys",
warn_if=lambda d: d.get("total_keys", 0) > 500_000, # DEGRADED
fail_if=lambda d: d.get("total_keys", 0) > 1_000_000, # UNHEALTHY
))
Semantics:
- If the inner probe returns UNHEALTHY, the result passes through unchanged —
fail_ifandwarn_ifare not evaluated. - If
fail_if(details)returnsTrue, the result is UNHEALTHY. - If
warn_if(details)returnsTrue(andfail_ifwasFalse), the result is DEGRADED. - If both return
False, the inner probe's status is preserved. - If either callable raises an exception, it is swallowed and treated as
False.
# Monitor Celery queue depth
from fastapi_watch.probes import CeleryProbe, ThresholdProbe
celery_probe = CeleryProbe(celery_app)
registry.add(ThresholdProbe(
probe=celery_probe,
name="celery-queue",
warn_if=lambda d: sum(
w.get("active_tasks", 0) + w.get("reserved_tasks", 0)
for w in d.get("workers", {}).values()
) > 100,
fail_if=lambda d: d.get("workers_online", 1) == 0,
))
Constructor arguments:
| Argument | Default | Description |
|---|---|---|
probe |
required | Any BaseProbe instance to wrap |
name |
inner probe's name | Probe label for the wrapper |
warn_if |
None |
Callable (details: dict) -> bool; True → DEGRADED |
fail_if |
None |
Callable (details: dict) -> bool; True → UNHEALTHY |
poll_interval_ms |
None |
Uses registry default |
Watching PostgreSQL
pip install "fastapi-watch[postgres]"
PostgreSQLProbe passively observes outgoing PostgreSQL calls via @probe.watch. Instruments the functions in your code that query PostgreSQL, recording latency and errors from real traffic rather than opening a synthetic connection on a poll timer.
pg_probe = PostgreSQLProbe(name="primary-db", max_error_rate=0.01)
@pg_probe.watch
async def get_user(user_id: int) -> dict | None:
async with pool.acquire() as conn:
return await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
registry.add(pg_probe)
Watching MySQL / MariaDB
pip install "fastapi-watch[mysql]"
MySQLProbe passively observes outgoing MySQL / MariaDB calls via @probe.watch. Records latency and errors from real traffic rather than opening a synthetic connection on a poll timer.
mysql_probe = MySQLProbe(name="mysql", max_error_rate=0.01)
@mysql_probe.watch
async def get_product(product_id: int) -> dict | None:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT * FROM products WHERE id = %s", (product_id,))
return await cur.fetchone()
registry.add(mysql_probe)
Watching Redis
pip install "fastapi-watch[redis]"
RedisProbe passively observes outgoing Redis calls via @probe.watch. Records latency and errors from real traffic rather than sending synthetic PING commands on a poll timer.
redis_probe = RedisProbe(name="cache", max_error_rate=0.05)
@redis_probe.watch
async def get_session(session_id: str) -> dict | None:
return await redis.hgetall(f"session:{session_id}")
registry.add(redis_probe)
Watching Memcached
pip install "fastapi-watch[memcached]"
MemcachedProbe calls stats() to verify the server is reachable and responding.
from fastapi_watch.probes import MemcachedProbe
registry.add(MemcachedProbe(host="localhost", port=11211))
Constructor arguments:
| Argument | Default | Description |
|---|---|---|
host |
"localhost" |
|
port |
11211 |
|
name |
"memcached" |
Probe label |
pool_size |
1 |
aiomcache connection pool size |
Watching RabbitMQ
pip install "fastapi-watch[rabbitmq]"
RabbitMQProbe has two modes:
- Connectivity only (default) — opens and closes an AMQP connection. No channels or queues are touched.
- Rich mode — when
management_urlis set, the probe also calls the RabbitMQ Management HTTP API and returns per-queue stats, message rates, and cluster metadata.
Connectivity only
from fastapi_watch.probes import RabbitMQProbe
registry.add(
RabbitMQProbe(
url="amqp://guest:guest@localhost:5672/",
name="rabbitmq", # default
)
)
Details returned (connectivity only):
{ "connected": true }
Rich mode — with Management API
Pass management_url pointing at the RabbitMQ Management plugin (default port 15672). Credentials are taken from the AMQP URL automatically.
registry.add(
RabbitMQProbe(
url="amqp://guest:guest@localhost:5672/",
management_url="http://localhost:15672",
)
)
Details returned (rich mode):
{
"connected": true,
"server": {
"rabbitmq_version": "3.12.0",
"erlang_version": "26.0",
"cluster_name": "rabbit@my-node",
"node": "rabbit@my-node",
"connections": 4,
"channels": 8,
"exchanges": 14,
"queues": 3,
"consumers": 6
},
"totals": {
"messages": 142,
"messages_ready": 140,
"messages_unacknowledged": 2,
"publish_rate": 12.5,
"deliver_rate": 11.8,
"ack_rate": 11.8
},
"queues": {
"tasks": {
"state": "running",
"messages": 120,
"messages_ready": 118,
"messages_unacknowledged": 2,
"consumers": 4,
"memory_bytes": 32768,
"publish_rate": 10.0,
"deliver_rate": 9.5,
"ack_rate": 9.5,
"durable": true,
"auto_delete": false,
"idle_since": null
}
}
}
If the Management API is unreachable, a management_api_error key is added to details and the probe still reports the AMQP connection status.
Other connection forms:
# Dedicated monitoring vhost
RabbitMQProbe(url="amqp://monitor:secret@rabbitmq.internal/monitoring", management_url="http://rabbitmq.internal:15672")
# TLS / AMQPS
RabbitMQProbe(url="amqps://user:secret@rabbitmq.internal/", name="rabbitmq-tls")
# Multiple cluster nodes — one probe per node
for i, host in enumerate(["rmq-1.internal", "rmq-2.internal", "rmq-3.internal"], start=1):
registry.add(RabbitMQProbe(url=f"amqp://guest:guest@{host}/", name=f"rabbitmq-node-{i}"))
Constructor arguments:
| Argument | Default | Description |
|---|---|---|
url |
"amqp://guest:guest@localhost/" |
AMQP(S) connection URL |
name |
"rabbitmq" |
Probe label |
management_url |
None |
Base URL of the Management HTTP API. When set, enables rich queue-level details. Credentials are taken from url. |
Watching Kafka
pip install "fastapi-watch[kafka]"
KafkaProbe starts an AIOKafkaAdminClient to verify broker reachability, then lists topics and describes the cluster.
from fastapi_watch.probes import KafkaProbe
# Single broker
registry.add(KafkaProbe(bootstrap_servers="localhost:9092"))
# Multiple brokers
registry.add(KafkaProbe(bootstrap_servers=["b1:9092", "b2:9092", "b3:9092"]))
Details returned:
{
"broker_count": 3,
"controller_id": 1,
"topics": ["orders", "payments", "notifications"],
"internal_topics": ["__consumer_offsets"]
}
topics contains user-defined topics only. internal_topics lists Kafka-managed topics (those prefixed with __).
Constructor arguments:
| Argument | Default | Description |
|---|---|---|
bootstrap_servers |
"localhost:9092" |
String or list of host:port entries |
name |
"kafka" |
Probe label |
request_timeout_ms |
5000 |
Admin client metadata request timeout |
Watching MongoDB
pip install "fastapi-watch[mongo]"
MongoProbe passively observes outgoing MongoDB calls via @probe.watch. Records latency and errors from real traffic rather than issuing a synthetic serverStatus command on a poll timer.
mongo_probe = MongoProbe(name="mongodb", max_error_rate=0.02)
@mongo_probe.watch
async def get_document(doc_id: str) -> dict | None:
return await db.documents.find_one({"_id": doc_id})
registry.add(mongo_probe)
Watching outgoing HTTP calls
HttpProbe passively observes outgoing HTTP calls your app makes to external services. Rather than making its own synthetic requests (which would burn API credits or trip rate limits), it instruments the functions in your code via the @probe.watch decorator — recording latency and errors from real traffic only.
from fastapi_watch.probes import HttpProbe
stripe_probe = HttpProbe(name="stripe", max_error_rate=0.05, max_avg_rtt_ms=500)
@stripe_probe.watch
async def charge_customer(amount: int, currency: str) -> dict:
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.stripe.com/v1/charges",
json={"amount": amount, "currency": currency},
) as response:
response.raise_for_status()
return await response.json()
registry.add(stripe_probe)
Every call to charge_customer is silently timed. If it raises an exception it is counted as an error. The exception still propagates normally — the probe never interferes with your code's behaviour.
Works with both async def and def functions, and any HTTP library.
Details returned:
{
"call_count": 142,
"error_count": 3,
"error_rate": 0.0211,
"consecutive_errors": 0,
"last_rtt_ms": 87.4,
"avg_rtt_ms": 91.2,
"p95_rtt_ms": 143.7,
"min_rtt_ms": 61.0,
"max_rtt_ms": 312.5
}
Constructor arguments:
| Argument | Default | Description |
|---|---|---|
name |
"http" |
Probe label |
max_error_rate |
0.1 |
Error-rate threshold above which the probe is UNHEALTHY (0–1) |
max_avg_rtt_ms |
None |
Average-RTT threshold in milliseconds. None disables it |
window_size |
100 |
Number of recent calls used for percentile calculations |
ema_alpha |
0.1 |
Smoothing factor for the exponential moving average (0–1) |
Watching Celery workers
pip install "fastapi-watch[celery]"
CeleryProbe uses Celery's control broadcast API to check worker liveness — no extra infrastructure required beyond the broker your app already uses.
By default only a single ping broadcast is issued per poll, confirming workers are alive without generating additional broker traffic. Set detailed=True for full per-worker inspection, which is useful in development and test environments.
from celery_app import celery
from fastapi_watch.probes import CeleryProbe
# Production — ping only (1 broadcast per poll)
registry.add(CeleryProbe(celery, min_workers=2))
# Development / test — full worker inspection
registry.add(CeleryProbe(celery, min_workers=1, detailed=True))
Details returned — ping only (default):
{
"workers_online": 2,
"workers": ["celery@host1", "celery@host2"]
}
Details returned — detailed=True:
{
"workers_online": 1,
"workers": {
"celery@host1": {
"status": "online",
"queues": ["celery", "high_priority"],
"active_tasks": 1,
"reserved_tasks": 3,
"scheduled_tasks": 0,
"pool": {
"implementation": "celery.concurrency.prefork:TaskPool",
"max_concurrency": 4,
"processes": [101, 102, 103, 104]
},
"total_tasks_executed": { "myapp.tasks.send_email": 99 },
"registered_tasks": ["myapp.tasks.cleanup", "myapp.tasks.send_email"],
"active": [
{ "id": "abc-123", "name": "myapp.tasks.send_email", "args": ["user@example.com"], "kwargs": {}, "time_start": 1700000000.0, "worker_pid": 101 }
],
"reserved": [],
"scheduled": []
}
}
}
Details returned (no workers online):
{
"workers_online": 0,
"reason": "no workers online — they may be scaled down because there are no pending tasks"
}
Scale-to-zero and min_workers
min_workers controls whether the probe considers having zero (or too few) online workers a failure.
min_workers |
No workers online | Fewer than min_workers |
|---|---|---|
0 (default) |
Healthy — silently explains workers may be scaled down | N/A |
≥ 1 |
Unhealthy — error lists how many are expected | Unhealthy — error lists count found vs. expected |
Constructor arguments:
| Argument | Default | Description |
|---|---|---|
app |
required | Your Celery application instance |
name |
"celery" |
Probe label |
timeout |
1.0 |
Seconds to wait for each inspector broadcast reply |
min_workers |
0 |
Minimum number of workers that must be online. 0 means scale-to-zero is acceptable |
detailed |
False |
Collect full per-worker stats (active/reserved/scheduled tasks, pool, queues). Enable in dev/test only |
SQLAlchemy engine probe
pip install "fastapi-watch[sqlalchemy]"
SqlAlchemyProbe passively observes outgoing SQLAlchemy calls via @probe.watch. Works with any database SQLAlchemy supports. Records latency and errors from real traffic rather than running a synthetic query on a poll timer.
db_probe = SqlAlchemyProbe(name="postgres", max_error_rate=0.01)
@db_probe.watch
async def get_user(user_id: int) -> User | None:
async with async_session() as session:
return await session.get(User, user_id)
registry.add(db_probe)
All built-in probes
Application / infrastructure
| Probe | Extra | Key constructor args | Details fields |
|---|---|---|---|
RequestMetricsMiddleware + RequestMetricsProbe |
built-in | per_route, max_error_rate, max_avg_rtt_ms |
request_count, error_count, error_rate, avg_rtt_ms, consecutive_errors; + routes dict when per_route=True |
FastAPIRouteProbe |
built-in | name, max_error_rate, max_avg_rtt_ms, window_size, ema_alpha |
request_count, error_count, error_rate, consecutive_errors, last_status_code, last_rtt_ms, avg_rtt_ms, p95_rtt_ms, min_rtt_ms, max_rtt_ms, requests_per_minute |
FastAPIWebSocketProbe |
built-in | name, max_error_rate, min_active_connections, window_size, ema_alpha |
active_connections, total_connections, messages_received, messages_sent, error_count, error_rate, consecutive_errors, avg_duration_ms, min_duration_ms, max_duration_ms |
EventLoopProbe |
built-in | name, warn_ms, fail_ms |
lag_ms |
TCPProbe |
built-in | host, port, timeout, name |
host, port, resolved_ips, connect_ms |
SMTPProbe |
built-in | name, max_error_rate, max_avg_rtt_ms, window_size, ema_alpha |
call_count, error_count, error_rate, consecutive_errors, last_rtt_ms, avg_rtt_ms, p95_rtt_ms, min_rtt_ms, max_rtt_ms |
ThresholdProbe |
built-in | probe, name, warn_if, fail_if |
(delegates to inner probe) |
Databases
| Probe | Extra | Key constructor args | Details fields |
|---|---|---|---|
PostgreSQLProbe |
postgres |
name, max_error_rate, max_avg_rtt_ms, window_size, ema_alpha |
call_count, error_count, error_rate, consecutive_errors, last_rtt_ms, avg_rtt_ms, p95_rtt_ms, min_rtt_ms, max_rtt_ms |
MySQLProbe |
mysql |
name, max_error_rate, max_avg_rtt_ms, window_size, ema_alpha |
call_count, error_count, error_rate, consecutive_errors, last_rtt_ms, avg_rtt_ms, p95_rtt_ms, min_rtt_ms, max_rtt_ms |
SqlAlchemyProbe |
sqlalchemy |
name, max_error_rate, max_avg_rtt_ms, window_size, ema_alpha |
call_count, error_count, error_rate, consecutive_errors, last_rtt_ms, avg_rtt_ms, p95_rtt_ms, min_rtt_ms, max_rtt_ms |
Caches
| Probe | Extra | Key constructor args | Details fields |
|---|---|---|---|
RedisProbe |
redis |
name, max_error_rate, max_avg_rtt_ms, window_size, ema_alpha |
call_count, error_count, error_rate, consecutive_errors, last_rtt_ms, avg_rtt_ms, p95_rtt_ms, min_rtt_ms, max_rtt_ms |
MemcachedProbe |
memcached |
host, port, name, pool_size |
— |
Queues / messaging
| Probe | Extra | Key constructor args | Details fields |
|---|---|---|---|
RabbitMQProbe |
rabbitmq |
url, name, management_url |
connected; + server, totals, queues when management_url is set |
KafkaProbe |
kafka |
bootstrap_servers, name, request_timeout_ms |
broker_count, controller_id, topics, internal_topics |
CeleryProbe |
celery |
app, name, timeout, min_workers, detailed |
workers_online, workers (list when ping-only, dict when detailed=True) |
Document stores
| Probe | Extra | Key constructor args | Details fields |
|---|---|---|---|
MongoProbe |
mongo |
name, max_error_rate, max_avg_rtt_ms, window_size, ema_alpha |
call_count, error_count, error_rate, consecutive_errors, last_rtt_ms, avg_rtt_ms, p95_rtt_ms, min_rtt_ms, max_rtt_ms |
HTTP
| Probe | Extra | Key constructor args | Details fields |
|---|---|---|---|
HttpProbe |
built-in | name, max_error_rate, max_avg_rtt_ms, window_size, ema_alpha |
call_count, error_count, error_rate, consecutive_errors, last_rtt_ms, avg_rtt_ms, p95_rtt_ms, min_rtt_ms, max_rtt_ms |
Testing / placeholder
| Probe | Extra | Key constructor args | Details fields |
|---|---|---|---|
NoOpProbe |
built-in | name |
— |
Configuration reference
HealthRegistry
| Argument | Type | Default | Description |
|---|---|---|---|
app |
FastAPI |
required | The FastAPI application instance |
prefix |
str |
"/health" |
URL prefix for all health endpoints |
tags |
list[str] |
["health"] |
OpenAPI tags applied to all health routes |
poll_interval_ms |
int | None |
60000 |
How often (ms) to re-run probes while an SSE client is connected. 0 or None disables polling — each request or stream event runs probes on demand. Values below 1000 are clamped to 1000. |
logger |
logging.Logger | None |
None |
Logger for warnings (e.g. clamped interval) and probe exception messages. Pass None to emit no logs. |
grace_period_ms |
int |
0 |
How long (ms) after startup to return 503 {"status": "starting"} from /ready. 0 disables the grace period. |
history_size |
int |
120 |
Maximum number of past probe results retained per probe. Oldest entry dropped when full. Retrieved via GET /health/history. Minimum 1. |
result_ttl_seconds |
float |
7200.0 |
How long (seconds) probe results are retained. Results older than this are excluded from /health/history. Set to 0 to disable time-based expiry. |
alert_ttl_seconds |
float |
259200.0 |
How long (seconds) alert records (state-change events) are retained. Set to 0 to keep until evicted by max_alerts. |
max_alerts |
int |
120 |
Hard cap on stored alert records. When full, the oldest alert is dropped before the new one is appended. |
storage |
ProbeStorage | None |
None |
Custom storage backend. None uses InMemoryProbeStorage. When supplied, history_size, result_ttl_seconds, alert_ttl_seconds, and max_alerts are ignored — configure limits inside the backend. |
timezone |
str |
"UTC" |
IANA timezone name for all checked_at timestamps. Reflected in the timezone field of every response. |
groups |
list[ProbeGroup] | None |
None |
One or more ProbeGroup instances to include at startup. Equivalent to calling include() for each. |
dashboard |
bool | Callable |
True |
True — built-in HTML dashboard at GET /health/dashboard. False — omit the route. Callable (report: HealthReport) -> str — custom renderer. |
circuit_breaker |
bool |
True |
Enable the circuit breaker. When a probe fails circuit_breaker_threshold consecutive times it is suspended for circuit_breaker_cooldown_ms ms. |
circuit_breaker_threshold |
int |
5 |
Consecutive failures before the circuit opens. |
circuit_breaker_cooldown_ms |
int |
600000 |
How long (ms) the circuit stays open before the probe is retried (default 10 minutes). |
webhook_url |
str | None |
None |
HTTP(S) URL that receives a JSON POST whenever a probe changes state. Fire-and-forget; never blocks health checks. Suppressed during maintenance mode. |
auth |
dict | Callable | None |
None |
Authentication for all health endpoints. None = open. See Authentication for accepted forms. |
startup_probes |
list[BaseProbe] | None |
None |
Probes that must pass for /health/startup to return 200. Evaluated separately from the main registry. |
HealthRegistry.set_maintenance(until=None)
Activates maintenance mode until the given timezone-aware datetime. While active, /health/ready returns 200 {"status": "maintenance"} and state-change webhooks are suppressed. Pass until=None (default) to set no expiry — use clear_maintenance() to exit. Returns self.
HealthRegistry.clear_maintenance()
Deactivates maintenance mode immediately. Returns self.
HealthRegistry.add(probe, critical=True)
Adds a single probe. Returns self for chaining. Adding the same instance more than once is a no-op.
critical=True (default) — a failing probe causes the overall status to be "unhealthy" and /ready to return 503. Set critical=False to include the probe in reports without affecting readiness.
HealthRegistry.add_probes(probes, critical=True)
Adds a list of probes. The critical flag applies to every probe in the list. Returns self for chaining. Duplicate instances are silently skipped.
HealthRegistry.include(router)
Includes all probes from a ProbeGroup, preserving each probe's criticality setting. Returns self for chaining. Duplicate instances are silently skipped.
registry.include(db_router).include(payments_router)
HealthRegistry.on_state_change(callback)
Registers a callback invoked whenever a probe's status changes between runs. The callback receives (probe_name: str, old_status: ProbeStatus, new_status: ProbeStatus) and may be a plain function or an async coroutine. Returns self for chaining.
HealthRegistry.set_poll_interval(ms)
Updates the poll interval at runtime. Pass 0 or None to switch to single-fetch mode. If SSE clients are currently connected the poll task is restarted immediately with the new interval.
registry.set_poll_interval(30_000) # every 30 s
registry.set_poll_interval(0) # disable — each event runs probes on demand
HealthRegistry.set_started()
Signals that application startup is complete. After this is called, GET /health/startup returns 200 (provided all startup probes pass). Call this at the end of your lifespan startup block or application boot sequence.
HealthRegistry.run_all()
Async method — runs every registered probe concurrently and returns list[ProbeResult]. Probe exceptions are caught and converted to unhealthy results. Useful for testing or building custom aggregation outside of the mounted routes.
results = await registry.run_all()
for r in results:
print(r.name, r.status, r.latency_ms, r.details)
ProbeGroup
Collects probe registrations defined across multiple modules so they can be passed to HealthRegistry at startup. Mirrors the APIRouter pattern from FastAPI.
| Method | Description |
|---|---|
add(probe, critical=True) |
Add a single probe. Returns self. Duplicate instances are silently skipped. |
add_probes(probes, critical=True) |
Add multiple probes with the same criticality. Returns self. |
include(router) |
Merge another ProbeGroup's probes into this one, preserving each probe's criticality. Returns self. |
FastAPIRouteProbe
Instruments a FastAPI route handler via the @probe.watch decorator and reports real-traffic metrics as a ProbeResult.
| Argument | Type | Default | Description |
|---|---|---|---|
name |
str |
"route" |
Probe label shown in health reports |
max_error_rate |
float |
0.1 |
Error-rate threshold (0–1). The probe becomes UNHEALTHY when exceeded. |
max_avg_rtt_ms |
float | None |
None |
Average-RTT threshold in milliseconds. None disables this threshold. |
window_size |
int |
100 |
Number of recent requests kept for percentile and throughput calculations |
ema_alpha |
float |
0.1 |
EMA smoothing factor (0–1). Higher values make avg_rtt_ms react faster to changes. |
timeout |
float | None |
None |
Passed to the registry for the check() call; not used internally |
FastAPIRouteProbe.watch(func) — decorator that wraps an async def or def route handler. Preserves the function's signature so FastAPI dependency injection continues to work. HTTPException is caught, its status code recorded, and then re-raised. Any other exception is recorded as a 500 and re-raised.
BaseProbe
| Attribute | Type | Default | Description |
|---|---|---|---|
name |
str |
"unnamed" |
Label used in health reports. Override as a class attribute or set in __init__. |
timeout |
float | None |
None |
Per-probe timeout in seconds. The check is cancelled and recorded as unhealthy if it exceeds this value. None means no limit. |
poll_interval_ms |
int | None |
None |
Per-probe poll interval override. When set, this probe runs on its own schedule independent of the registry default. None uses the registry poll_interval_ms. |
circuit_breaker_threshold |
int | None |
None |
Per-probe consecutive-failure threshold before opening the circuit. None uses the registry default. |
circuit_breaker_cooldown_ms |
int | None |
None |
Per-probe circuit-open cooldown in milliseconds. None uses the registry default. |
ProbeResult
| Field | Type | Description |
|---|---|---|
name |
str |
Probe identifier |
status |
ProbeStatus |
"healthy", "degraded", or "unhealthy" |
critical |
bool |
True if the probe was registered as critical; affects overall status and readiness |
latency_ms |
float |
Duration of the check in milliseconds |
error |
str | None |
Error message; only present on failure |
details |
dict | None |
Service-specific metadata |
is_healthy |
bool (property) |
True when status == "healthy" (strict) |
is_degraded |
bool (property) |
True when status == "degraded" |
is_passing |
bool (property) |
True when status != "unhealthy" (healthy or degraded) |
Kubernetes integration
livenessProbe:
httpGet:
path: /health/live
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
readinessProbe:
httpGet:
path: /health/ready
port: 8000
initialDelaySeconds: 10
periodSeconds: 15
failureThreshold: 3
Use /health/ready for the readiness probe — Kubernetes stops routing traffic to a pod the moment any critical dependency becomes unreachable. Use /health/live for liveness so the process is only restarted when it is genuinely stuck, not because an external service is temporarily down.
For applications that need time to warm up (loading models, seeding caches, running migrations), combine grace_period_ms with a short initialDelaySeconds:
readinessProbe:
httpGet:
path: /health/ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
failureThreshold: 6 # allow up to 60 s of failures before marking unready
# App holds /ready as "starting" for 30 s regardless of probe results
registry = HealthRegistry(app, grace_period_ms=30_000)
License
MIT
Claude used to write README, code annotation, help with test case coverage, and clean up my messy thoughts into readable code.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fastapi_watch-1.4.0.tar.gz.
File metadata
- Download URL: fastapi_watch-1.4.0.tar.gz
- Upload date:
- Size: 117.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
355578789affc643e29c5b7139c99c576840415e6ba732dec26d7212bd654a67
|
|
| MD5 |
f9b6549fafe486a4b4faf9d62b51e734
|
|
| BLAKE2b-256 |
1463f292ea4ca15a131489581c3885825ade0a9f4c422f333f98b5f8e6b3c6e3
|
Provenance
The following attestation bundles were made for fastapi_watch-1.4.0.tar.gz:
Publisher:
publish.yml on rgreen1207/fastapi-watch
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
fastapi_watch-1.4.0.tar.gz -
Subject digest:
355578789affc643e29c5b7139c99c576840415e6ba732dec26d7212bd654a67 - Sigstore transparency entry: 1198936817
- Sigstore integration time:
-
Permalink:
rgreen1207/fastapi-watch@66f42e3444ee40a43141f9e94fb27027288ff050 -
Branch / Tag:
refs/tags/v1.4.0 - Owner: https://github.com/rgreen1207
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@66f42e3444ee40a43141f9e94fb27027288ff050 -
Trigger Event:
push
-
Statement type:
File details
Details for the file fastapi_watch-1.4.0-py3-none-any.whl.
File metadata
- Download URL: fastapi_watch-1.4.0-py3-none-any.whl
- Upload date:
- Size: 84.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5d6447179ec7091d941154c7e4431f264c3e143d20e3f46d5e77d70e9c9e25ce
|
|
| MD5 |
93288ffe9604cef6dddacc950dddbe1b
|
|
| BLAKE2b-256 |
76c3ad30c94dcd05ffe72f87b26d5eccb4895113b3b3c2d3713c61cf85f7e372
|
Provenance
The following attestation bundles were made for fastapi_watch-1.4.0-py3-none-any.whl:
Publisher:
publish.yml on rgreen1207/fastapi-watch
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
fastapi_watch-1.4.0-py3-none-any.whl -
Subject digest:
5d6447179ec7091d941154c7e4431f264c3e143d20e3f46d5e77d70e9c9e25ce - Sigstore transparency entry: 1198937067
- Sigstore integration time:
-
Permalink:
rgreen1207/fastapi-watch@66f42e3444ee40a43141f9e94fb27027288ff050 -
Branch / Tag:
refs/tags/v1.4.0 - Owner: https://github.com/rgreen1207
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@66f42e3444ee40a43141f9e94fb27027288ff050 -
Trigger Event:
push
-
Statement type: