Structured health and readiness check system for FastAPI
Reason this release was yanked:
HTTP Probe would consume 3rd party API usages
Project description
fastapi-watch
Structured health and readiness check system for FastAPI.
Add /health/live, /health/ready, /health/status, and /health/history endpoints to any FastAPI app with a single registry call. All probes run concurrently, so a slow dependency never blocks the others. Each probe returns rich service-specific details alongside the pass/fail result.
Connect a browser or monitoring tool to the SSE streaming endpoints (/health/ready/stream, /health/status/stream) and receive live updates as long as you stay connected — the background poll loop starts automatically on the first connection and stops when the last client disconnects.
Table of contents
- Installation
- Quick start
- Endpoints
- Probe management
- Live streaming
- Polling and caching
- State-change callbacks
- Startup grace period
- Probe result history
- Response format
- Writing a custom probe
- Built-in probes
- Configuration reference
- Kubernetes integration
- License
Installation
Install only the extras you actually use. Nothing is pulled in by default beyond FastAPI and Pydantic.
zsh users: wrap the package name in quotes to prevent the shell from interpreting
[and]as glob patterns.
# Core package — includes the always-passing MemoryProbe, no other deps
pip install fastapi-watch
# Add individual service probes as needed
pip install "fastapi-watch[postgres]" # PostgreSQL (asyncpg)
pip install "fastapi-watch[mysql]" # MySQL / MariaDB (aiomysql)
pip install "fastapi-watch[sqlalchemy]" # Any SQLAlchemy 2.x async engine
pip install "fastapi-watch[redis]" # Redis (redis)
pip install "fastapi-watch[memcached]" # Memcached (aiomcache)
pip install "fastapi-watch[rabbitmq]" # RabbitMQ (aio-pika + aiohttp)
pip install "fastapi-watch[kafka]" # Kafka (aiokafka)
pip install "fastapi-watch[mongo]" # MongoDB (motor)
pip install "fastapi-watch[http]" # HTTP endpoint (aiohttp)
# Or pull everything in one shot
pip install "fastapi-watch[all]"
Multiple extras can be combined:
pip install "fastapi-watch[postgres,redis,rabbitmq]"
Quick start
Create a HealthRegistry, attach it to your FastAPI app, and call .add() for each service you want to monitor. The registry mounts all health endpoints automatically.
import logging
from fastapi import FastAPI
from fastapi_watch import HealthRegistry
from fastapi_watch.probes import PostgreSQLProbe, RedisProbe
app = FastAPI()
registry = HealthRegistry(
app,
poll_interval_ms=60_000, # re-run probes every 60 s while streaming
logger=logging.getLogger(__name__), # optional — omit to silence all logging
grace_period_ms=10_000, # hold /ready for 10 s while the app warms up
history_size=20, # keep the last 20 results per probe
)
registry.add(PostgreSQLProbe(url="postgresql://user:pass@localhost/mydb"))
registry.add(RedisProbe(url="redis://localhost:6379"), critical=False)
That's it. The following routes are now live:
GET /health/live → always 200
GET /health/ready → 200 / 503
GET /health/status → 200 / 207
GET /health/ready/stream → SSE stream
GET /health/status/stream → SSE stream
GET /health/history → rolling probe history
Endpoints
| Endpoint | Purpose | Healthy | Degraded |
|---|---|---|---|
GET /health/live |
Liveness — is the process alive? | 200 OK |
never fails |
GET /health/ready |
Readiness — are all critical probes passing? | 200 OK |
503 Service Unavailable |
GET /health/status |
Status — full detail on every probe | 200 OK |
207 Multi-Status |
GET /health/ready/stream |
Readiness stream — SSE; polls while connected | 200 OK |
stream of events |
GET /health/status/stream |
Status stream — SSE; polls while connected | 200 OK |
stream of events |
GET /health/history |
History — last N results per probe | 200 OK |
always 200 |
The prefix defaults to /health and can be changed at construction time:
registry = HealthRegistry(app, prefix="/ops/health")
# → /ops/health/live
# → /ops/health/ready
# → /ops/health/status
# → /ops/health/ready/stream
# → /ops/health/status/stream
# → /ops/health/history
Probe management
Adding probes
Add probes one at a time with add(), or pass a list with add_probes(). Both methods return self for chaining. Adding the same instance twice is a no-op.
# Single probe
registry.add(probe_a)
# Multiple probes in one call
registry.add_probes([probe_a, probe_b, probe_c])
# Chained
registry.add(probe_a).add(probe_b).add(probe_c)
# Duplicate ignored — probe_a is only registered once
registry.add(probe_a)
registry.add(probe_a)
Probes run concurrently on every check — a slow or failing probe never delays the others.
Critical vs non-critical probes
By default every probe is critical — a failing critical probe sets the overall status to "unhealthy" and causes /health/ready to return 503.
Mark a probe as non-critical when its failure should be visible in reports but shouldn't block traffic:
# Database is essential; fail readiness if it's unreachable
registry.add(PostgreSQLProbe(url="postgresql://..."), critical=True)
# Cache is nice-to-have; don't fail readiness if it's down
registry.add(RedisProbe(url="redis://localhost"), critical=False)
Non-critical probes always appear in /health/status with their real result and a "critical": false field. They simply don't affect the overall status or /ready.
add_probes() accepts the same flag, applied to every probe in the list:
registry.add_probes([probe_a, probe_b], critical=False)
Per-probe timeout
Set a timeout (in seconds) on any probe class or instance. If the check doesn't complete within that time, the probe is recorded as unhealthy — all other probes are unaffected and still run concurrently.
On the class:
class MyServiceProbe(BaseProbe):
name = "my-service"
timeout = 5.0 # fail after 5 seconds
async def check(self) -> ProbeResult:
...
On an instance:
probe = MyServiceProbe()
probe.timeout = 2.0
registry.add(probe)
timeout = None (the default) means no limit. Timed-out probes produce an unhealthy result with error: "TimeoutError: ".
Live streaming
The two streaming endpoints (/health/ready/stream, /health/status/stream) use Server-Sent Events (SSE) to push probe results to connected clients.
The poll loop is demand-driven — it starts when the first SSE client connects and stops automatically when the last one disconnects. No background work is done when nobody is watching.
Each event is a JSON-encoded health report on a data: line:
data: {"status": "healthy", "checked_at": "2024-06-01T12:00:00.123456+00:00", "probes": [...]}
data: {"status": "unhealthy", "checked_at": "2024-06-01T12:00:05.456789+00:00", "probes": [...]}
Connecting from JavaScript
const es = new EventSource('/health/status/stream');
es.onmessage = (event) => {
const report = JSON.parse(event.data);
console.log(report.status, report.probes);
};
es.onerror = () => es.close();
Connecting with curl
curl -N http://localhost:8000/health/status/stream
Configuring the poll interval
# Default — poll every 60 seconds while a client is connected
registry = HealthRegistry(app)
# Custom interval
registry = HealthRegistry(app, poll_interval_ms=10_000) # every 10 s
# Minimum enforced interval is 1000 ms; lower values are clamped
registry = HealthRegistry(app, poll_interval_ms=500) # → 1000 ms
# Disable polling — streaming endpoints emit one result then close
registry = HealthRegistry(app, poll_interval_ms=None)
The interval can also be changed at any point after startup:
registry.set_poll_interval(30_000) # switch to every 30 s
registry.set_poll_interval(0) # disable — single-fetch mode
Polling and caching
The regular GET /health/ready and GET /health/status endpoints always respond immediately:
- When SSE clients are connected — the poll loop is running, so these endpoints serve the most recent cached probe results without re-running any probes.
- When no streaming is active — probes are run on demand. A built-in lock prevents a thundering herd if multiple requests arrive simultaneously before the first result is cached.
This means your GET endpoints are fast under all conditions, regardless of whether anyone is streaming.
State-change callbacks
React to probe status transitions in real time. Register one or more callbacks with on_state_change(); each receives the probe name, old status, and new status whenever a probe's result changes.
import logging
logger = logging.getLogger(__name__)
def on_change(probe_name: str, old_status, new_status):
logger.warning("Probe %s changed: %s → %s", probe_name, old_status, new_status)
registry.on_state_change(on_change)
Async callbacks are also supported:
async def alert(probe_name, old_status, new_status):
await send_slack_alert(f"{probe_name} is now {new_status}")
registry.on_state_change(alert)
Key behaviours:
- Callbacks fire after every
run_all()for each probe whose status differs from the previous run. - The first run seeds the initial state — no callbacks are fired until a subsequent run sees a different result.
- Multiple callbacks can be registered; all are called in registration order.
on_state_change()returnsselffor chaining.
Startup grace period
Pass grace_period_ms to hold /health/ready in a 503 {"status": "starting"} state for a fixed window after the registry is created. This prevents a load balancer from routing traffic before the application has had time to warm up — without requiring all probes to pass immediately on boot.
registry = HealthRegistry(
app,
grace_period_ms=15_000, # hold readiness for 15 s after startup
)
/health/readyreturns503 {"status": "starting"}while the grace period is active./health/statusand/health/liveare not affected — they always reflect real probe results.- After the grace period expires,
/readyresumes normal probe-based behaviour. grace_period_ms=0(default) disables the grace period entirely.
Pair with Kubernetes' initialDelaySeconds for belt-and-suspenders protection during slow startup:
readinessProbe:
httpGet:
path: /health/ready
port: 8000
initialDelaySeconds: 5 # k8s waits 5 s before its first check
periodSeconds: 10
# App-side grace covers the remaining warmup window
registry = HealthRegistry(app, grace_period_ms=20_000)
Probe result history
Every probe result is stored in a rolling per-probe history. Use GET /health/history to inspect past runs — useful for debugging flapping probes or tracking latency over time.
registry = HealthRegistry(
app,
history_size=20, # keep the last 20 results per probe (default: 10)
)
GET /health/history — response format:
{
"probes": {
"postgresql": [
{
"name": "postgresql",
"status": "healthy",
"critical": true,
"latency_ms": 1.8,
"error": null,
"details": { "version": "PostgreSQL 16.2 ...", "active_connections": 5 }
},
{
"name": "postgresql",
"status": "healthy",
"critical": true,
"latency_ms": 2.1,
"error": null,
"details": { "version": "PostgreSQL 16.2 ...", "active_connections": 6 }
}
],
"redis": [
{
"name": "redis",
"status": "unhealthy",
"critical": false,
"latency_ms": 5002.0,
"error": "Connection refused",
"details": null
},
{
"name": "redis",
"status": "healthy",
"critical": false,
"latency_ms": 0.9,
"error": null,
"details": { "version": "7.2.4", "total_keys": 312 }
}
]
}
}
Results are ordered oldest-first. History is in-memory and resets on process restart.
Response format
Every response from /health/ready, /health/status, and the SSE streams shares the same shape.
Health report
| Field | Type | Description |
|---|---|---|
status |
"healthy" | "unhealthy" |
Overall result — determined by critical probes only |
checked_at |
string | null |
UTC ISO 8601 timestamp of the last probe run; null before the first run |
probes |
array |
Individual probe results (see below) |
Probe result
| Field | Type | Description |
|---|---|---|
name |
string |
Probe identifier |
status |
"healthy" | "unhealthy" |
Pass/fail for this probe |
critical |
boolean |
true if the probe affects overall status and readiness |
latency_ms |
number |
How long the check took in milliseconds |
error |
string | null |
Error message; only present on failure |
details |
object | null |
Service-specific metadata (see each probe's section) |
Example: all healthy — 200
{
"status": "healthy",
"checked_at": "2024-06-01T12:00:00.123456+00:00",
"probes": [
{
"name": "postgresql",
"status": "healthy",
"critical": true,
"latency_ms": 1.8,
"error": null,
"details": {
"version": "PostgreSQL 16.2 on aarch64-unknown-linux-gnu",
"active_connections": 5,
"max_connections": 100,
"database_size": "42 MB"
}
},
{
"name": "redis",
"status": "healthy",
"critical": false,
"latency_ms": 0.6,
"error": null,
"details": {
"version": "7.2.4",
"uptime_seconds": 86400,
"used_memory_human": "2.50M",
"connected_clients": 8,
"total_keys": 312
}
}
]
}
Example: one critical probe failing — 503 on /ready, 207 on /status
{
"status": "unhealthy",
"checked_at": "2024-06-01T12:00:05.456789+00:00",
"probes": [
{
"name": "postgresql",
"status": "unhealthy",
"critical": true,
"latency_ms": 5002.1,
"error": "Connection refused",
"details": null
},
{
"name": "redis",
"status": "healthy",
"critical": false,
"latency_ms": 0.6,
"error": null,
"details": { "version": "7.2.4" }
}
]
}
Example: non-critical probe failing — still 200 on /ready
{
"status": "healthy",
"checked_at": "2024-06-01T12:00:10.000000+00:00",
"probes": [
{
"name": "postgresql",
"status": "healthy",
"critical": true,
"latency_ms": 1.9,
"error": null,
"details": { "active_connections": 5 }
},
{
"name": "redis",
"status": "unhealthy",
"critical": false,
"latency_ms": 5001.3,
"error": "Connection timed out",
"details": null
}
]
}
Writing a custom probe
Any class that extends BaseProbe and implements check() works as a probe. This is the right approach for internal services, third-party SDKs, business-logic checks, or composite conditions.
Minimal probe
from fastapi_watch.probes import BaseProbe
from fastapi_watch.models import ProbeResult, ProbeStatus
class MyServiceProbe(BaseProbe):
name = "my-service"
async def check(self) -> ProbeResult:
ok = await call_my_service()
return ProbeResult(
name=self.name,
status=ProbeStatus.HEALTHY if ok else ProbeStatus.UNHEALTHY,
)
registry.add(MyServiceProbe())
check() must be an async method and must return a ProbeResult. Any unhandled exception raised by check() is caught by the registry, automatically recorded as an unhealthy result, and optionally logged — your probe never needs to worry about crashing the health system.
Recording latency and details
Use time.perf_counter() to measure the check duration and populate latency_ms. The details dict accepts any JSON-serializable data.
import time
from fastapi_watch.probes import BaseProbe
from fastapi_watch.models import ProbeResult, ProbeStatus
class PaymentGatewayProbe(BaseProbe):
name = "payment-gateway"
async def check(self) -> ProbeResult:
start = time.perf_counter()
try:
info = await ping_payment_gateway()
latency = (time.perf_counter() - start) * 1000
return ProbeResult(
name=self.name,
status=ProbeStatus.HEALTHY,
latency_ms=round(latency, 2),
details={
"region": info.region,
"provider_version": info.version,
"response_ms": round(latency, 2),
},
)
except Exception as exc:
latency = (time.perf_counter() - start) * 1000
return ProbeResult(
name=self.name,
status=ProbeStatus.UNHEALTHY,
latency_ms=round(latency, 2),
error=str(exc),
)
Configurable probe
Pass configuration through __init__ so the same probe class can be reused with different settings.
class S3BucketProbe(BaseProbe):
def __init__(self, bucket: str, region: str = "us-east-1", name: str = "s3") -> None:
self.bucket = bucket
self.region = region
self.name = name
async def check(self) -> ProbeResult:
import time
import aiobotocore.session
start = time.perf_counter()
try:
session = aiobotocore.session.get_session()
async with session.create_client("s3", region_name=self.region) as client:
await client.head_bucket(Bucket=self.bucket)
latency = (time.perf_counter() - start) * 1000
return ProbeResult(
name=self.name,
status=ProbeStatus.HEALTHY,
latency_ms=round(latency, 2),
details={"bucket": self.bucket, "region": self.region},
)
except Exception as exc:
latency = (time.perf_counter() - start) * 1000
return ProbeResult(
name=self.name,
status=ProbeStatus.UNHEALTHY,
latency_ms=round(latency, 2),
error=str(exc),
)
# Register multiple buckets as separate probes
registry.add(S3BucketProbe(bucket="my-app-uploads", region="eu-west-1", name="s3-uploads"))
registry.add(S3BucketProbe(bucket="my-app-backups", region="us-east-1", name="s3-backups"))
Adding a timeout
Set the timeout attribute (in seconds) on the class or instance. The registry will cancel the check and record it as unhealthy if it runs too long.
class SlowExternalProbe(BaseProbe):
name = "slow-external"
timeout = 3.0 # class-level default
async def check(self) -> ProbeResult:
result = await call_slow_external_api()
return ProbeResult(name=self.name, status=ProbeStatus.HEALTHY)
# Override on a specific instance
probe = SlowExternalProbe()
probe.timeout = 1.5
registry.add(probe)
Composite probe
Wrap multiple inner probes to build custom aggregation logic — for example, reporting unhealthy only when both a primary and replica are down simultaneously.
import asyncio
from fastapi_watch.probes import BaseProbe, RedisProbe
from fastapi_watch.models import ProbeResult, ProbeStatus
class RedisHAProbe(BaseProbe):
name = "redis-ha"
def __init__(self, primary_url: str, replica_url: str) -> None:
self._primary = RedisProbe(url=primary_url, name="primary")
self._replica = RedisProbe(url=replica_url, name="replica")
async def check(self) -> ProbeResult:
primary, replica = await asyncio.gather(
self._primary.check(), self._replica.check()
)
if primary.is_healthy or replica.is_healthy:
return ProbeResult(
name=self.name,
status=ProbeStatus.HEALTHY,
details={
"primary": primary.status.value,
"replica": replica.status.value,
},
)
return ProbeResult(
name=self.name,
status=ProbeStatus.UNHEALTHY,
error=f"both nodes down — primary: {primary.error}, replica: {replica.error}",
)
registry.add(RedisHAProbe(
primary_url="redis://primary.internal:6379",
replica_url="redis://replica.internal:6379",
))
Exception handling
If check() raises an unhandled exception, the registry catches it and returns an unhealthy result automatically — you do not need to wrap your entire probe body in a try/except for this purpose. The auto-generated result looks like:
{
"name": "my-service",
"status": "unhealthy",
"critical": true,
"latency_ms": 0.0,
"error": "RuntimeError: connection pool exhausted",
"details": null
}
If a logger was passed to HealthRegistry, the exception is also logged with full traceback via logger.exception().
You should still catch exceptions yourself inside check() if you want to record partial details, a meaningful latency_ms, or a more specific error message.
Testing a custom probe
Use pytest-asyncio to test check() directly without needing to spin up an HTTP server.
import pytest
from fastapi_watch.models import ProbeStatus
from myapp.probes import MyServiceProbe
@pytest.mark.asyncio
async def test_healthy_when_service_responds():
probe = MyServiceProbe()
result = await probe.check()
assert result.status == ProbeStatus.HEALTHY
assert result.name == "my-service"
@pytest.mark.asyncio
async def test_unhealthy_when_service_raises(monkeypatch):
async def fail():
raise ConnectionError("refused")
monkeypatch.setattr("myapp.probes.call_my_service", fail)
probe = MyServiceProbe()
result = await probe.check()
assert result.status == ProbeStatus.UNHEALTHY
assert "refused" in result.error
You can also run the full registry against a real or mock dependency:
from fastapi import FastAPI
from fastapi_watch import HealthRegistry
@pytest.mark.asyncio
async def test_registry_run_all():
app = FastAPI()
registry = HealthRegistry(app, poll_interval_ms=None)
registry.add(MyServiceProbe())
results = await registry.run_all()
assert results[0].status == ProbeStatus.HEALTHY
Probe implementation checklist
namemust be set — either as a class attribute or in__init__viaself.name.check()must beasyncand return aProbeResult.- Set
latency_msfor probes where response time matters. - Populate
detailswith any data useful for diagnosis. - Set
timeoutif the underlying call can hang indefinitely. - Do not call
registry.run_all()or other registry methods from insidecheck().
Built-in probes
Probe details
Every built-in probe populates the details field with service-specific metadata. Details are always best-effort — if the metadata query fails after a successful connectivity check, details will contain whatever was collected up to that point. The probe status reflects connectivity only, not the completeness of details.
Watching PostgreSQL
pip install "fastapi-watch[postgres]"
PostgreSQLProbe uses asyncpg directly — no SQLAlchemy required. It opens a connection, runs SELECT version() and a set of metadata queries concurrently, then closes the connection.
from fastapi_watch.probes import PostgreSQLProbe
registry.add(
PostgreSQLProbe(
url="postgresql://app_user:secret@localhost:5432/mydb",
name="primary-db", # default: "postgresql"
)
)
Details returned:
{
"version": "PostgreSQL 16.2 on aarch64-unknown-linux-gnu, compiled by gcc 12.2.0",
"active_connections": 5,
"max_connections": 100,
"database_size": "42 MB"
}
Checking a read replica separately:
registry.add(PostgreSQLProbe(url="postgresql://reader:secret@replica.host/mydb", name="replica-db"))
With a connection timeout (default 5 seconds):
registry.add(PostgreSQLProbe(url="postgresql://...", timeout=2.0))
If you are already using SQLAlchemy, see SQLAlchemy engine probe to reuse your existing engine instead.
Watching MySQL / MariaDB
pip install "fastapi-watch[mysql]"
MySQLProbe accepts either a URL or explicit connection kwargs.
from fastapi_watch.probes import MySQLProbe
# URL form
registry.add(MySQLProbe(url="mysql://app_user:secret@localhost:3306/mydb"))
# Keyword form
registry.add(MySQLProbe(host="localhost", port=3306, user="app_user", password="secret", db="mydb"))
Details returned:
{
"version": "8.0.36",
"connected_threads": 4,
"uptime_seconds": 172800,
"max_used_connections": 12
}
Constructor arguments:
| Argument | Default | Description |
|---|---|---|
url |
None |
Full DSN — overrides all other kwargs when set |
host |
"localhost" |
|
port |
3306 |
|
user |
"root" |
|
password |
"" |
|
db |
"" |
|
name |
"mysql" |
Probe label |
connect_timeout |
5 |
Seconds |
Watching Redis
pip install "fastapi-watch[redis]"
RedisProbe sends PING, then collects server info and scans key prefixes to build a cluster breakdown.
from fastapi_watch.probes import RedisProbe
registry.add(RedisProbe(url="redis://localhost:6379"))
Details returned:
{
"version": "7.2.4",
"uptime_seconds": 86400,
"used_memory_human": "2.50M",
"connected_clients": 8,
"role": "master",
"total_keys": 312,
"clusters": {
"session": { "keys": 150, "ttl_seconds": 3600 },
"cache": { "keys": 162, "ttl_seconds": 900 }
}
}
clusters groups keys by the segment before the first :. For example, a key named session:abc123 falls into the session cluster. ttl_seconds is sampled from one key in the group; null means the key has no expiry.
Common URL forms:
# Password-protected
RedisProbe(url="redis://:mypassword@localhost:6379")
# Specific database index
RedisProbe(url="redis://localhost:6379/2", name="task-queue")
# TLS
RedisProbe(url="rediss://redis.internal:6380")
# Watching Redis as both a cache and a queue
registry.add(RedisProbe(url="redis://localhost:6379/0", name="cache"))
registry.add(RedisProbe(url="redis://localhost:6379/1", name="task-queue"))
Watching Memcached
pip install "fastapi-watch[memcached]"
MemcachedProbe calls stats() to verify the server is reachable and responding.
from fastapi_watch.probes import MemcachedProbe
registry.add(MemcachedProbe(host="localhost", port=11211))
Constructor arguments:
| Argument | Default | Description |
|---|---|---|
host |
"localhost" |
|
port |
11211 |
|
name |
"memcached" |
Probe label |
pool_size |
1 |
aiomcache connection pool size |
Watching RabbitMQ
pip install "fastapi-watch[rabbitmq]"
RabbitMQProbe has two modes:
- Connectivity only (default) — opens and closes an AMQP connection. No channels or queues are touched.
- Rich mode — when
management_urlis set, the probe also calls the RabbitMQ Management HTTP API and returns per-queue stats, message rates, and cluster metadata.
Connectivity only
from fastapi_watch.probes import RabbitMQProbe
registry.add(
RabbitMQProbe(
url="amqp://guest:guest@localhost:5672/",
name="rabbitmq", # default
)
)
Details returned (connectivity only):
{ "connected": true }
Rich mode — with Management API
Pass management_url pointing at the RabbitMQ Management plugin (default port 15672). Credentials are taken from the AMQP URL automatically.
registry.add(
RabbitMQProbe(
url="amqp://guest:guest@localhost:5672/",
management_url="http://localhost:15672",
)
)
Details returned (rich mode):
{
"connected": true,
"server": {
"rabbitmq_version": "3.12.0",
"erlang_version": "26.0",
"cluster_name": "rabbit@my-node",
"node": "rabbit@my-node",
"connections": 4,
"channels": 8,
"exchanges": 14,
"queues": 3,
"consumers": 6
},
"totals": {
"messages": 142,
"messages_ready": 140,
"messages_unacknowledged": 2,
"publish_rate": 12.5,
"deliver_rate": 11.8,
"ack_rate": 11.8
},
"queues": {
"tasks": {
"state": "running",
"messages": 120,
"messages_ready": 118,
"messages_unacknowledged": 2,
"consumers": 4,
"memory_bytes": 32768,
"publish_rate": 10.0,
"deliver_rate": 9.5,
"ack_rate": 9.5,
"durable": true,
"auto_delete": false,
"idle_since": null
}
}
}
If the Management API is unreachable, a management_api_error key is added to details and the probe still reports the AMQP connection status.
Other connection forms:
# Dedicated monitoring vhost
RabbitMQProbe(url="amqp://monitor:secret@rabbitmq.internal/monitoring", management_url="http://rabbitmq.internal:15672")
# TLS / AMQPS
RabbitMQProbe(url="amqps://user:secret@rabbitmq.internal/", name="rabbitmq-tls")
# Multiple cluster nodes — one probe per node
for i, host in enumerate(["rmq-1.internal", "rmq-2.internal", "rmq-3.internal"], start=1):
registry.add(RabbitMQProbe(url=f"amqp://guest:guest@{host}/", name=f"rabbitmq-node-{i}"))
Constructor arguments:
| Argument | Default | Description |
|---|---|---|
url |
"amqp://guest:guest@localhost/" |
AMQP(S) connection URL |
name |
"rabbitmq" |
Probe label |
management_url |
None |
Base URL of the Management HTTP API. When set, enables rich queue-level details. Credentials are taken from url. |
Watching Kafka
pip install "fastapi-watch[kafka]"
KafkaProbe starts an AIOKafkaAdminClient to verify broker reachability, then lists topics and describes the cluster.
from fastapi_watch.probes import KafkaProbe
# Single broker
registry.add(KafkaProbe(bootstrap_servers="localhost:9092"))
# Multiple brokers
registry.add(KafkaProbe(bootstrap_servers=["b1:9092", "b2:9092", "b3:9092"]))
Details returned:
{
"broker_count": 3,
"controller_id": 1,
"topics": ["orders", "payments", "notifications"],
"internal_topics": ["__consumer_offsets"]
}
topics contains user-defined topics only. internal_topics lists Kafka-managed topics (those prefixed with __).
Constructor arguments:
| Argument | Default | Description |
|---|---|---|
bootstrap_servers |
"localhost:9092" |
String or list of host:port entries |
name |
"kafka" |
Probe label |
request_timeout_ms |
5000 |
Admin client metadata request timeout |
Watching MongoDB
pip install "fastapi-watch[mongo]"
MongoProbe runs serverStatus on the admin database to collect version, connection pool stats, memory, and storage engine.
from fastapi_watch.probes import MongoProbe
registry.add(MongoProbe(url="mongodb://localhost:27017"))
Details returned:
{
"version": "7.0.5",
"uptime_seconds": 172800,
"connections": {
"current": 12,
"available": 838,
"total_created": 150
},
"memory_mb": {
"resident": 128,
"virtual": 1024
},
"storage_engine": "wiredTiger"
}
Constructor arguments:
| Argument | Default | Description |
|---|---|---|
url |
"mongodb://localhost:27017" |
MongoDB connection URI |
name |
"mongodb" |
Probe label |
server_selection_timeout_ms |
2000 |
How long to wait for a server before giving up |
Watching an HTTP endpoint
pip install "fastapi-watch[http]"
HttpProbe performs an HTTP GET and checks the response status code.
from fastapi_watch.probes import HttpProbe
registry.add(HttpProbe(url="https://api.upstream.com/health"))
Details returned:
{
"status_code": 200,
"content_type": "application/json",
"response_bytes": 43
}
details is populated for both healthy and unhealthy responses so you can see what status code an upstream actually returned.
Constructor arguments:
| Argument | Default | Description |
|---|---|---|
url |
required | URL to GET |
timeout |
5.0 |
Request timeout in seconds |
name |
URL host | Probe label |
expected_status |
200 |
HTTP status code considered healthy |
# Expect a 204 instead of 200
registry.add(HttpProbe(url="https://api.example.com/ping", expected_status=204))
# Shorter timeout, explicit name
registry.add(HttpProbe(url="https://api.payments.com/health", timeout=2.0, name="payments-api"))
SQLAlchemy engine probe
pip install "fastapi-watch[sqlalchemy]"
SqlAlchemyProbe reuses your existing AsyncEngine so no extra connections are opened. Works with any database SQLAlchemy supports (PostgreSQL, MySQL, SQLite, etc.).
from sqlalchemy.ext.asyncio import create_async_engine
from fastapi_watch.probes import SqlAlchemyProbe
engine = create_async_engine("postgresql+asyncpg://app_user:secret@localhost/mydb")
registry.add(SqlAlchemyProbe(engine=engine, name="primary-db"))
Details returned:
{
"dialect": "postgresql",
"driver": "asyncpg",
"server_version": "16.2.0"
}
Constructor arguments:
| Argument | Default | Description |
|---|---|---|
engine |
required | A SQLAlchemy 2.x AsyncEngine instance |
name |
"database" |
Probe label |
All built-in probes
Databases
| Probe | Extra | Key constructor args | Details fields |
|---|---|---|---|
PostgreSQLProbe |
postgres |
url, name, timeout |
version, active_connections, max_connections, database_size |
MySQLProbe |
mysql |
url or host/port/user/password/db, name, connect_timeout |
version, connected_threads, uptime_seconds, max_used_connections |
SqlAlchemyProbe |
sqlalchemy |
engine, name |
dialect, driver, server_version |
Caches
| Probe | Extra | Key constructor args | Details fields |
|---|---|---|---|
RedisProbe |
redis |
url, name |
version, uptime_seconds, used_memory_human, connected_clients, role, total_keys, clusters |
MemcachedProbe |
memcached |
host, port, name, pool_size |
— |
Queues / messaging
| Probe | Extra | Key constructor args | Details fields |
|---|---|---|---|
RabbitMQProbe |
rabbitmq |
url, name, management_url |
connected; + server, totals, queues when management_url is set |
KafkaProbe |
kafka |
bootstrap_servers, name, request_timeout_ms |
broker_count, controller_id, topics, internal_topics |
Document stores
| Probe | Extra | Key constructor args | Details fields |
|---|---|---|---|
MongoProbe |
mongo |
url, name, server_selection_timeout_ms |
version, uptime_seconds, connections, memory_mb, storage_engine |
HTTP
| Probe | Extra | Key constructor args | Details fields |
|---|---|---|---|
HttpProbe |
http |
url, timeout, name, expected_status |
status_code, content_type, response_bytes |
Testing / placeholder
| Probe | Extra | Key constructor args | Details fields |
|---|---|---|---|
MemoryProbe |
built-in | name |
— |
Configuration reference
HealthRegistry
| Argument | Type | Default | Description |
|---|---|---|---|
app |
FastAPI |
required | The FastAPI application instance |
prefix |
str |
"/health" |
URL prefix for all health endpoints |
tags |
list[str] |
["health"] |
OpenAPI tags applied to all health routes |
poll_interval_ms |
int | None |
60000 |
How often (ms) to re-run probes while an SSE client is connected. 0 or None disables polling — each request or stream event runs probes on demand. Values below 1000 are clamped to 1000. |
logger |
logging.Logger | None |
None |
Logger for warnings (e.g. clamped interval) and probe exception messages. Pass None to emit no logs. |
grace_period_ms |
int |
0 |
How long (ms) after startup to return 503 {"status": "starting"} from /ready. 0 disables the grace period. |
history_size |
int |
10 |
Number of past probe results to retain per probe. Retrieved via GET /health/history. Minimum 1. |
HealthRegistry.add(probe, critical=True)
Adds a single probe. Returns self for chaining. Adding the same instance more than once is a no-op.
critical=True (default) — a failing probe causes the overall status to be "unhealthy" and /ready to return 503. Set critical=False to include the probe in reports without affecting readiness.
HealthRegistry.add_probes(probes, critical=True)
Adds a list of probes. The critical flag applies to every probe in the list. Returns self for chaining. Duplicate instances are silently skipped.
HealthRegistry.on_state_change(callback)
Registers a callback invoked whenever a probe's status changes between runs. The callback receives (probe_name: str, old_status: ProbeStatus, new_status: ProbeStatus) and may be a plain function or an async coroutine. Returns self for chaining.
HealthRegistry.set_poll_interval(ms)
Updates the poll interval at runtime. Pass 0 or None to switch to single-fetch mode. If SSE clients are currently connected the poll task is restarted immediately with the new interval.
registry.set_poll_interval(30_000) # every 30 s
registry.set_poll_interval(0) # disable — each event runs probes on demand
HealthRegistry.run_all()
Async method — runs every registered probe concurrently and returns list[ProbeResult]. Probe exceptions are caught and converted to unhealthy results. Useful for testing or building custom aggregation outside of the mounted routes.
results = await registry.run_all()
for r in results:
print(r.name, r.status, r.latency_ms, r.details)
BaseProbe
| Attribute | Type | Default | Description |
|---|---|---|---|
name |
str |
"unnamed" |
Label used in health reports. Override as a class attribute or set in __init__. |
timeout |
float | None |
None |
Per-probe timeout in seconds. The check is cancelled and recorded as unhealthy if it exceeds this value. None means no limit. |
ProbeResult
| Field | Type | Description |
|---|---|---|
name |
str |
Probe identifier |
status |
ProbeStatus |
"healthy" or "unhealthy" |
critical |
bool |
True if the probe was registered as critical; affects overall status and readiness |
latency_ms |
float |
Duration of the check in milliseconds |
error |
str | None |
Error message; only present on failure |
details |
dict | None |
Service-specific metadata |
is_healthy |
bool (property) |
True when status == "healthy" |
Kubernetes integration
livenessProbe:
httpGet:
path: /health/live
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
readinessProbe:
httpGet:
path: /health/ready
port: 8000
initialDelaySeconds: 10
periodSeconds: 15
failureThreshold: 3
Use /health/ready for the readiness probe — Kubernetes stops routing traffic to a pod the moment any critical dependency becomes unreachable. Use /health/live for liveness so the process is only restarted when it is genuinely stuck, not because an external service is temporarily down.
For applications that need time to warm up (loading models, seeding caches, running migrations), combine grace_period_ms with a short initialDelaySeconds:
readinessProbe:
httpGet:
path: /health/ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
failureThreshold: 6 # allow up to 60 s of failures before marking unready
# App holds /ready as "starting" for 30 s regardless of probe results
registry = HealthRegistry(app, grace_period_ms=30_000)
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fastapi_watch-1.0.1.tar.gz.
File metadata
- Download URL: fastapi_watch-1.0.1.tar.gz
- Upload date:
- Size: 41.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
28371eb999b42d046de73a17fce78c3604a13e5b3640ab321833da19e7457d7b
|
|
| MD5 |
eb6b47ed121901b04d21764039e8c71c
|
|
| BLAKE2b-256 |
fbe7e22add7cde68f31993a9aa3aa4e276d0b4d488df3b001ce70bcde641a24a
|
Provenance
The following attestation bundles were made for fastapi_watch-1.0.1.tar.gz:
Publisher:
publish.yml on rgreen1207/fastapi-watch
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
fastapi_watch-1.0.1.tar.gz -
Subject digest:
28371eb999b42d046de73a17fce78c3604a13e5b3640ab321833da19e7457d7b - Sigstore transparency entry: 1186816074
- Sigstore integration time:
-
Permalink:
rgreen1207/fastapi-watch@174a817c922b97f93944da41cad9fe9de8311296 -
Branch / Tag:
refs/tags/v1.0.1 - Owner: https://github.com/rgreen1207
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@174a817c922b97f93944da41cad9fe9de8311296 -
Trigger Event:
push
-
Statement type:
File details
Details for the file fastapi_watch-1.0.1-py3-none-any.whl.
File metadata
- Download URL: fastapi_watch-1.0.1-py3-none-any.whl
- Upload date:
- Size: 30.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0dbe4d7bf711ebf2a8614a5ec30eb1f91637e227200698104ac0f6a458bdd97f
|
|
| MD5 |
bd72bab8f8ab6896058512f647a054ae
|
|
| BLAKE2b-256 |
cd2977588056a2bd88f982a24accd2faa182df2d86cc9d2488baa5b0c8122b79
|
Provenance
The following attestation bundles were made for fastapi_watch-1.0.1-py3-none-any.whl:
Publisher:
publish.yml on rgreen1207/fastapi-watch
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
fastapi_watch-1.0.1-py3-none-any.whl -
Subject digest:
0dbe4d7bf711ebf2a8614a5ec30eb1f91637e227200698104ac0f6a458bdd97f - Sigstore transparency entry: 1186816075
- Sigstore integration time:
-
Permalink:
rgreen1207/fastapi-watch@174a817c922b97f93944da41cad9fe9de8311296 -
Branch / Tag:
refs/tags/v1.0.1 - Owner: https://github.com/rgreen1207
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@174a817c922b97f93944da41cad9fe9de8311296 -
Trigger Event:
push
-
Statement type: