Skip to main content

Structured health and readiness check system for FastAPI

Reason this release was yanked:

HTTP Probe would consume 3rd party API usages

Project description

fastapi-watch

Structured health and readiness check system for FastAPI.

Add /health/live, /health/ready, /health/status, and /health/history endpoints to any FastAPI app with a single registry call. All probes run concurrently, so a slow dependency never blocks the others. Each probe returns rich service-specific details alongside the pass/fail result.

Connect a browser or monitoring tool to the SSE streaming endpoints (/health/ready/stream, /health/status/stream) and receive live updates as long as you stay connected — the background poll loop starts automatically on the first connection and stops when the last client disconnects.


Table of contents


Installation

Install only the extras you actually use. Nothing is pulled in by default beyond FastAPI and Pydantic.

zsh users: wrap the package name in quotes to prevent the shell from interpreting [ and ] as glob patterns.

# Core package — includes the always-passing MemoryProbe, no other deps
pip install fastapi-watch

# Add individual service probes as needed
pip install "fastapi-watch[postgres]"     # PostgreSQL        (asyncpg)
pip install "fastapi-watch[mysql]"        # MySQL / MariaDB   (aiomysql)
pip install "fastapi-watch[sqlalchemy]"   # Any SQLAlchemy 2.x async engine
pip install "fastapi-watch[redis]"        # Redis             (redis)
pip install "fastapi-watch[memcached]"    # Memcached         (aiomcache)
pip install "fastapi-watch[rabbitmq]"     # RabbitMQ          (aio-pika + aiohttp)
pip install "fastapi-watch[kafka]"        # Kafka             (aiokafka)
pip install "fastapi-watch[mongo]"        # MongoDB           (motor)
pip install "fastapi-watch[http]"         # HTTP endpoint     (aiohttp)

# Or pull everything in one shot
pip install "fastapi-watch[all]"

Multiple extras can be combined:

pip install "fastapi-watch[postgres,redis,rabbitmq]"

Quick start

Create a HealthRegistry, attach it to your FastAPI app, and call .add() for each service you want to monitor. The registry mounts all health endpoints automatically.

import logging
from fastapi import FastAPI
from fastapi_watch import HealthRegistry
from fastapi_watch.probes import PostgreSQLProbe, RedisProbe

app = FastAPI()

registry = HealthRegistry(
    app,
    poll_interval_ms=60_000,           # re-run probes every 60 s while streaming
    logger=logging.getLogger(__name__), # optional — omit to silence all logging
    grace_period_ms=10_000,            # hold /ready for 10 s while the app warms up
    history_size=20,                   # keep the last 20 results per probe
)

registry.add(PostgreSQLProbe(url="postgresql://user:pass@localhost/mydb"))
registry.add(RedisProbe(url="redis://localhost:6379"), critical=False)

That's it. The following routes are now live:

GET /health/live          → always 200
GET /health/ready         → 200 / 503
GET /health/status        → 200 / 207
GET /health/ready/stream  → SSE stream
GET /health/status/stream → SSE stream
GET /health/history       → rolling probe history

Endpoints

Endpoint Purpose Healthy Degraded
GET /health/live Liveness — is the process alive? 200 OK never fails
GET /health/ready Readiness — are all critical probes passing? 200 OK 503 Service Unavailable
GET /health/status Status — full detail on every probe 200 OK 207 Multi-Status
GET /health/ready/stream Readiness stream — SSE; polls while connected 200 OK stream of events
GET /health/status/stream Status stream — SSE; polls while connected 200 OK stream of events
GET /health/history History — last N results per probe 200 OK always 200

The prefix defaults to /health and can be changed at construction time:

registry = HealthRegistry(app, prefix="/ops/health")
# → /ops/health/live
# → /ops/health/ready
# → /ops/health/status
# → /ops/health/ready/stream
# → /ops/health/status/stream
# → /ops/health/history

Probe management

Adding probes

Add probes one at a time with add(), or pass a list with add_probes(). Both methods return self for chaining. Adding the same instance twice is a no-op.

# Single probe
registry.add(probe_a)

# Multiple probes in one call
registry.add_probes([probe_a, probe_b, probe_c])

# Chained
registry.add(probe_a).add(probe_b).add(probe_c)

# Duplicate ignored — probe_a is only registered once
registry.add(probe_a)
registry.add(probe_a)

Probes run concurrently on every check — a slow or failing probe never delays the others.

Critical vs non-critical probes

By default every probe is critical — a failing critical probe sets the overall status to "unhealthy" and causes /health/ready to return 503.

Mark a probe as non-critical when its failure should be visible in reports but shouldn't block traffic:

# Database is essential; fail readiness if it's unreachable
registry.add(PostgreSQLProbe(url="postgresql://..."), critical=True)

# Cache is nice-to-have; don't fail readiness if it's down
registry.add(RedisProbe(url="redis://localhost"), critical=False)

Non-critical probes always appear in /health/status with their real result and a "critical": false field. They simply don't affect the overall status or /ready.

add_probes() accepts the same flag, applied to every probe in the list:

registry.add_probes([probe_a, probe_b], critical=False)

Per-probe timeout

Set a timeout (in seconds) on any probe class or instance. If the check doesn't complete within that time, the probe is recorded as unhealthy — all other probes are unaffected and still run concurrently.

On the class:

class MyServiceProbe(BaseProbe):
    name = "my-service"
    timeout = 5.0  # fail after 5 seconds

    async def check(self) -> ProbeResult:
        ...

On an instance:

probe = MyServiceProbe()
probe.timeout = 2.0
registry.add(probe)

timeout = None (the default) means no limit. Timed-out probes produce an unhealthy result with error: "TimeoutError: ".


Live streaming

The two streaming endpoints (/health/ready/stream, /health/status/stream) use Server-Sent Events (SSE) to push probe results to connected clients.

The poll loop is demand-driven — it starts when the first SSE client connects and stops automatically when the last one disconnects. No background work is done when nobody is watching.

Each event is a JSON-encoded health report on a data: line:

data: {"status": "healthy", "checked_at": "2024-06-01T12:00:00.123456+00:00", "probes": [...]}

data: {"status": "unhealthy", "checked_at": "2024-06-01T12:00:05.456789+00:00", "probes": [...]}

Connecting from JavaScript

const es = new EventSource('/health/status/stream');

es.onmessage = (event) => {
    const report = JSON.parse(event.data);
    console.log(report.status, report.probes);
};

es.onerror = () => es.close();

Connecting with curl

curl -N http://localhost:8000/health/status/stream

Configuring the poll interval

# Default — poll every 60 seconds while a client is connected
registry = HealthRegistry(app)

# Custom interval
registry = HealthRegistry(app, poll_interval_ms=10_000)  # every 10 s

# Minimum enforced interval is 1000 ms; lower values are clamped
registry = HealthRegistry(app, poll_interval_ms=500)     # → 1000 ms

# Disable polling — streaming endpoints emit one result then close
registry = HealthRegistry(app, poll_interval_ms=None)

The interval can also be changed at any point after startup:

registry.set_poll_interval(30_000)   # switch to every 30 s
registry.set_poll_interval(0)        # disable — single-fetch mode

Polling and caching

The regular GET /health/ready and GET /health/status endpoints always respond immediately:

  • When SSE clients are connected — the poll loop is running, so these endpoints serve the most recent cached probe results without re-running any probes.
  • When no streaming is active — probes are run on demand. A built-in lock prevents a thundering herd if multiple requests arrive simultaneously before the first result is cached.

This means your GET endpoints are fast under all conditions, regardless of whether anyone is streaming.


State-change callbacks

React to probe status transitions in real time. Register one or more callbacks with on_state_change(); each receives the probe name, old status, and new status whenever a probe's result changes.

import logging

logger = logging.getLogger(__name__)

def on_change(probe_name: str, old_status, new_status):
    logger.warning("Probe %s changed: %s%s", probe_name, old_status, new_status)

registry.on_state_change(on_change)

Async callbacks are also supported:

async def alert(probe_name, old_status, new_status):
    await send_slack_alert(f"{probe_name} is now {new_status}")

registry.on_state_change(alert)

Key behaviours:

  • Callbacks fire after every run_all() for each probe whose status differs from the previous run.
  • The first run seeds the initial state — no callbacks are fired until a subsequent run sees a different result.
  • Multiple callbacks can be registered; all are called in registration order.
  • on_state_change() returns self for chaining.

Startup grace period

Pass grace_period_ms to hold /health/ready in a 503 {"status": "starting"} state for a fixed window after the registry is created. This prevents a load balancer from routing traffic before the application has had time to warm up — without requiring all probes to pass immediately on boot.

registry = HealthRegistry(
    app,
    grace_period_ms=15_000,  # hold readiness for 15 s after startup
)
  • /health/ready returns 503 {"status": "starting"} while the grace period is active.
  • /health/status and /health/live are not affected — they always reflect real probe results.
  • After the grace period expires, /ready resumes normal probe-based behaviour.
  • grace_period_ms=0 (default) disables the grace period entirely.

Pair with Kubernetes' initialDelaySeconds for belt-and-suspenders protection during slow startup:

readinessProbe:
  httpGet:
    path: /health/ready
    port: 8000
  initialDelaySeconds: 5   # k8s waits 5 s before its first check
  periodSeconds: 10
# App-side grace covers the remaining warmup window
registry = HealthRegistry(app, grace_period_ms=20_000)

Probe result history

Every probe result is stored in a rolling per-probe history. Use GET /health/history to inspect past runs — useful for debugging flapping probes or tracking latency over time.

registry = HealthRegistry(
    app,
    history_size=20,  # keep the last 20 results per probe (default: 10)
)

GET /health/history — response format:

{
  "probes": {
    "postgresql": [
      {
        "name": "postgresql",
        "status": "healthy",
        "critical": true,
        "latency_ms": 1.8,
        "error": null,
        "details": { "version": "PostgreSQL 16.2 ...", "active_connections": 5 }
      },
      {
        "name": "postgresql",
        "status": "healthy",
        "critical": true,
        "latency_ms": 2.1,
        "error": null,
        "details": { "version": "PostgreSQL 16.2 ...", "active_connections": 6 }
      }
    ],
    "redis": [
      {
        "name": "redis",
        "status": "unhealthy",
        "critical": false,
        "latency_ms": 5002.0,
        "error": "Connection refused",
        "details": null
      },
      {
        "name": "redis",
        "status": "healthy",
        "critical": false,
        "latency_ms": 0.9,
        "error": null,
        "details": { "version": "7.2.4", "total_keys": 312 }
      }
    ]
  }
}

Results are ordered oldest-first. History is in-memory and resets on process restart.


Response format

Every response from /health/ready, /health/status, and the SSE streams shares the same shape.

Health report

Field Type Description
status "healthy" | "unhealthy" Overall result — determined by critical probes only
checked_at string | null UTC ISO 8601 timestamp of the last probe run; null before the first run
probes array Individual probe results (see below)

Probe result

Field Type Description
name string Probe identifier
status "healthy" | "unhealthy" Pass/fail for this probe
critical boolean true if the probe affects overall status and readiness
latency_ms number How long the check took in milliseconds
error string | null Error message; only present on failure
details object | null Service-specific metadata (see each probe's section)

Example: all healthy — 200

{
  "status": "healthy",
  "checked_at": "2024-06-01T12:00:00.123456+00:00",
  "probes": [
    {
      "name": "postgresql",
      "status": "healthy",
      "critical": true,
      "latency_ms": 1.8,
      "error": null,
      "details": {
        "version": "PostgreSQL 16.2 on aarch64-unknown-linux-gnu",
        "active_connections": 5,
        "max_connections": 100,
        "database_size": "42 MB"
      }
    },
    {
      "name": "redis",
      "status": "healthy",
      "critical": false,
      "latency_ms": 0.6,
      "error": null,
      "details": {
        "version": "7.2.4",
        "uptime_seconds": 86400,
        "used_memory_human": "2.50M",
        "connected_clients": 8,
        "total_keys": 312
      }
    }
  ]
}

Example: one critical probe failing — 503 on /ready, 207 on /status

{
  "status": "unhealthy",
  "checked_at": "2024-06-01T12:00:05.456789+00:00",
  "probes": [
    {
      "name": "postgresql",
      "status": "unhealthy",
      "critical": true,
      "latency_ms": 5002.1,
      "error": "Connection refused",
      "details": null
    },
    {
      "name": "redis",
      "status": "healthy",
      "critical": false,
      "latency_ms": 0.6,
      "error": null,
      "details": { "version": "7.2.4" }
    }
  ]
}

Example: non-critical probe failing — still 200 on /ready

{
  "status": "healthy",
  "checked_at": "2024-06-01T12:00:10.000000+00:00",
  "probes": [
    {
      "name": "postgresql",
      "status": "healthy",
      "critical": true,
      "latency_ms": 1.9,
      "error": null,
      "details": { "active_connections": 5 }
    },
    {
      "name": "redis",
      "status": "unhealthy",
      "critical": false,
      "latency_ms": 5001.3,
      "error": "Connection timed out",
      "details": null
    }
  ]
}

Writing a custom probe

Any class that extends BaseProbe and implements check() works as a probe. This is the right approach for internal services, third-party SDKs, business-logic checks, or composite conditions.

Minimal probe

from fastapi_watch.probes import BaseProbe
from fastapi_watch.models import ProbeResult, ProbeStatus

class MyServiceProbe(BaseProbe):
    name = "my-service"

    async def check(self) -> ProbeResult:
        ok = await call_my_service()
        return ProbeResult(
            name=self.name,
            status=ProbeStatus.HEALTHY if ok else ProbeStatus.UNHEALTHY,
        )

registry.add(MyServiceProbe())

check() must be an async method and must return a ProbeResult. Any unhandled exception raised by check() is caught by the registry, automatically recorded as an unhealthy result, and optionally logged — your probe never needs to worry about crashing the health system.

Recording latency and details

Use time.perf_counter() to measure the check duration and populate latency_ms. The details dict accepts any JSON-serializable data.

import time
from fastapi_watch.probes import BaseProbe
from fastapi_watch.models import ProbeResult, ProbeStatus

class PaymentGatewayProbe(BaseProbe):
    name = "payment-gateway"

    async def check(self) -> ProbeResult:
        start = time.perf_counter()
        try:
            info = await ping_payment_gateway()
            latency = (time.perf_counter() - start) * 1000
            return ProbeResult(
                name=self.name,
                status=ProbeStatus.HEALTHY,
                latency_ms=round(latency, 2),
                details={
                    "region": info.region,
                    "provider_version": info.version,
                    "response_ms": round(latency, 2),
                },
            )
        except Exception as exc:
            latency = (time.perf_counter() - start) * 1000
            return ProbeResult(
                name=self.name,
                status=ProbeStatus.UNHEALTHY,
                latency_ms=round(latency, 2),
                error=str(exc),
            )

Configurable probe

Pass configuration through __init__ so the same probe class can be reused with different settings.

class S3BucketProbe(BaseProbe):
    def __init__(self, bucket: str, region: str = "us-east-1", name: str = "s3") -> None:
        self.bucket = bucket
        self.region = region
        self.name = name

    async def check(self) -> ProbeResult:
        import time
        import aiobotocore.session

        start = time.perf_counter()
        try:
            session = aiobotocore.session.get_session()
            async with session.create_client("s3", region_name=self.region) as client:
                await client.head_bucket(Bucket=self.bucket)
            latency = (time.perf_counter() - start) * 1000
            return ProbeResult(
                name=self.name,
                status=ProbeStatus.HEALTHY,
                latency_ms=round(latency, 2),
                details={"bucket": self.bucket, "region": self.region},
            )
        except Exception as exc:
            latency = (time.perf_counter() - start) * 1000
            return ProbeResult(
                name=self.name,
                status=ProbeStatus.UNHEALTHY,
                latency_ms=round(latency, 2),
                error=str(exc),
            )

# Register multiple buckets as separate probes
registry.add(S3BucketProbe(bucket="my-app-uploads", region="eu-west-1", name="s3-uploads"))
registry.add(S3BucketProbe(bucket="my-app-backups", region="us-east-1", name="s3-backups"))

Adding a timeout

Set the timeout attribute (in seconds) on the class or instance. The registry will cancel the check and record it as unhealthy if it runs too long.

class SlowExternalProbe(BaseProbe):
    name = "slow-external"
    timeout = 3.0  # class-level default

    async def check(self) -> ProbeResult:
        result = await call_slow_external_api()
        return ProbeResult(name=self.name, status=ProbeStatus.HEALTHY)

# Override on a specific instance
probe = SlowExternalProbe()
probe.timeout = 1.5
registry.add(probe)

Composite probe

Wrap multiple inner probes to build custom aggregation logic — for example, reporting unhealthy only when both a primary and replica are down simultaneously.

import asyncio
from fastapi_watch.probes import BaseProbe, RedisProbe
from fastapi_watch.models import ProbeResult, ProbeStatus

class RedisHAProbe(BaseProbe):
    name = "redis-ha"

    def __init__(self, primary_url: str, replica_url: str) -> None:
        self._primary = RedisProbe(url=primary_url, name="primary")
        self._replica = RedisProbe(url=replica_url, name="replica")

    async def check(self) -> ProbeResult:
        primary, replica = await asyncio.gather(
            self._primary.check(), self._replica.check()
        )
        if primary.is_healthy or replica.is_healthy:
            return ProbeResult(
                name=self.name,
                status=ProbeStatus.HEALTHY,
                details={
                    "primary": primary.status.value,
                    "replica": replica.status.value,
                },
            )
        return ProbeResult(
            name=self.name,
            status=ProbeStatus.UNHEALTHY,
            error=f"both nodes down — primary: {primary.error}, replica: {replica.error}",
        )

registry.add(RedisHAProbe(
    primary_url="redis://primary.internal:6379",
    replica_url="redis://replica.internal:6379",
))

Exception handling

If check() raises an unhandled exception, the registry catches it and returns an unhealthy result automatically — you do not need to wrap your entire probe body in a try/except for this purpose. The auto-generated result looks like:

{
  "name": "my-service",
  "status": "unhealthy",
  "critical": true,
  "latency_ms": 0.0,
  "error": "RuntimeError: connection pool exhausted",
  "details": null
}

If a logger was passed to HealthRegistry, the exception is also logged with full traceback via logger.exception().

You should still catch exceptions yourself inside check() if you want to record partial details, a meaningful latency_ms, or a more specific error message.

Testing a custom probe

Use pytest-asyncio to test check() directly without needing to spin up an HTTP server.

import pytest
from fastapi_watch.models import ProbeStatus
from myapp.probes import MyServiceProbe

@pytest.mark.asyncio
async def test_healthy_when_service_responds():
    probe = MyServiceProbe()
    result = await probe.check()
    assert result.status == ProbeStatus.HEALTHY
    assert result.name == "my-service"

@pytest.mark.asyncio
async def test_unhealthy_when_service_raises(monkeypatch):
    async def fail():
        raise ConnectionError("refused")

    monkeypatch.setattr("myapp.probes.call_my_service", fail)
    probe = MyServiceProbe()
    result = await probe.check()
    assert result.status == ProbeStatus.UNHEALTHY
    assert "refused" in result.error

You can also run the full registry against a real or mock dependency:

from fastapi import FastAPI
from fastapi_watch import HealthRegistry

@pytest.mark.asyncio
async def test_registry_run_all():
    app = FastAPI()
    registry = HealthRegistry(app, poll_interval_ms=None)
    registry.add(MyServiceProbe())

    results = await registry.run_all()
    assert results[0].status == ProbeStatus.HEALTHY

Probe implementation checklist

  • name must be set — either as a class attribute or in __init__ via self.name.
  • check() must be async and return a ProbeResult.
  • Set latency_ms for probes where response time matters.
  • Populate details with any data useful for diagnosis.
  • Set timeout if the underlying call can hang indefinitely.
  • Do not call registry.run_all() or other registry methods from inside check().

Built-in probes

Probe details

Every built-in probe populates the details field with service-specific metadata. Details are always best-effort — if the metadata query fails after a successful connectivity check, details will contain whatever was collected up to that point. The probe status reflects connectivity only, not the completeness of details.


Watching PostgreSQL

pip install "fastapi-watch[postgres]"

PostgreSQLProbe uses asyncpg directly — no SQLAlchemy required. It opens a connection, runs SELECT version() and a set of metadata queries concurrently, then closes the connection.

from fastapi_watch.probes import PostgreSQLProbe

registry.add(
    PostgreSQLProbe(
        url="postgresql://app_user:secret@localhost:5432/mydb",
        name="primary-db",  # default: "postgresql"
    )
)

Details returned:

{
  "version": "PostgreSQL 16.2 on aarch64-unknown-linux-gnu, compiled by gcc 12.2.0",
  "active_connections": 5,
  "max_connections": 100,
  "database_size": "42 MB"
}

Checking a read replica separately:

registry.add(PostgreSQLProbe(url="postgresql://reader:secret@replica.host/mydb", name="replica-db"))

With a connection timeout (default 5 seconds):

registry.add(PostgreSQLProbe(url="postgresql://...", timeout=2.0))

If you are already using SQLAlchemy, see SQLAlchemy engine probe to reuse your existing engine instead.


Watching MySQL / MariaDB

pip install "fastapi-watch[mysql]"

MySQLProbe accepts either a URL or explicit connection kwargs.

from fastapi_watch.probes import MySQLProbe

# URL form
registry.add(MySQLProbe(url="mysql://app_user:secret@localhost:3306/mydb"))

# Keyword form
registry.add(MySQLProbe(host="localhost", port=3306, user="app_user", password="secret", db="mydb"))

Details returned:

{
  "version": "8.0.36",
  "connected_threads": 4,
  "uptime_seconds": 172800,
  "max_used_connections": 12
}

Constructor arguments:

Argument Default Description
url None Full DSN — overrides all other kwargs when set
host "localhost"
port 3306
user "root"
password ""
db ""
name "mysql" Probe label
connect_timeout 5 Seconds

Watching Redis

pip install "fastapi-watch[redis]"

RedisProbe sends PING, then collects server info and scans key prefixes to build a cluster breakdown.

from fastapi_watch.probes import RedisProbe

registry.add(RedisProbe(url="redis://localhost:6379"))

Details returned:

{
  "version": "7.2.4",
  "uptime_seconds": 86400,
  "used_memory_human": "2.50M",
  "connected_clients": 8,
  "role": "master",
  "total_keys": 312,
  "clusters": {
    "session": { "keys": 150, "ttl_seconds": 3600 },
    "cache":   { "keys": 162, "ttl_seconds": 900  }
  }
}

clusters groups keys by the segment before the first :. For example, a key named session:abc123 falls into the session cluster. ttl_seconds is sampled from one key in the group; null means the key has no expiry.

Common URL forms:

# Password-protected
RedisProbe(url="redis://:mypassword@localhost:6379")

# Specific database index
RedisProbe(url="redis://localhost:6379/2", name="task-queue")

# TLS
RedisProbe(url="rediss://redis.internal:6380")

# Watching Redis as both a cache and a queue
registry.add(RedisProbe(url="redis://localhost:6379/0", name="cache"))
registry.add(RedisProbe(url="redis://localhost:6379/1", name="task-queue"))

Watching Memcached

pip install "fastapi-watch[memcached]"

MemcachedProbe calls stats() to verify the server is reachable and responding.

from fastapi_watch.probes import MemcachedProbe

registry.add(MemcachedProbe(host="localhost", port=11211))

Constructor arguments:

Argument Default Description
host "localhost"
port 11211
name "memcached" Probe label
pool_size 1 aiomcache connection pool size

Watching RabbitMQ

pip install "fastapi-watch[rabbitmq]"

RabbitMQProbe has two modes:

  • Connectivity only (default) — opens and closes an AMQP connection. No channels or queues are touched.
  • Rich mode — when management_url is set, the probe also calls the RabbitMQ Management HTTP API and returns per-queue stats, message rates, and cluster metadata.

Connectivity only

from fastapi_watch.probes import RabbitMQProbe

registry.add(
    RabbitMQProbe(
        url="amqp://guest:guest@localhost:5672/",
        name="rabbitmq",  # default
    )
)

Details returned (connectivity only):

{ "connected": true }

Rich mode — with Management API

Pass management_url pointing at the RabbitMQ Management plugin (default port 15672). Credentials are taken from the AMQP URL automatically.

registry.add(
    RabbitMQProbe(
        url="amqp://guest:guest@localhost:5672/",
        management_url="http://localhost:15672",
    )
)

Details returned (rich mode):

{
  "connected": true,
  "server": {
    "rabbitmq_version": "3.12.0",
    "erlang_version": "26.0",
    "cluster_name": "rabbit@my-node",
    "node": "rabbit@my-node",
    "connections": 4,
    "channels": 8,
    "exchanges": 14,
    "queues": 3,
    "consumers": 6
  },
  "totals": {
    "messages": 142,
    "messages_ready": 140,
    "messages_unacknowledged": 2,
    "publish_rate": 12.5,
    "deliver_rate": 11.8,
    "ack_rate": 11.8
  },
  "queues": {
    "tasks": {
      "state": "running",
      "messages": 120,
      "messages_ready": 118,
      "messages_unacknowledged": 2,
      "consumers": 4,
      "memory_bytes": 32768,
      "publish_rate": 10.0,
      "deliver_rate": 9.5,
      "ack_rate": 9.5,
      "durable": true,
      "auto_delete": false,
      "idle_since": null
    }
  }
}

If the Management API is unreachable, a management_api_error key is added to details and the probe still reports the AMQP connection status.

Other connection forms:

# Dedicated monitoring vhost
RabbitMQProbe(url="amqp://monitor:secret@rabbitmq.internal/monitoring", management_url="http://rabbitmq.internal:15672")

# TLS / AMQPS
RabbitMQProbe(url="amqps://user:secret@rabbitmq.internal/", name="rabbitmq-tls")

# Multiple cluster nodes — one probe per node
for i, host in enumerate(["rmq-1.internal", "rmq-2.internal", "rmq-3.internal"], start=1):
    registry.add(RabbitMQProbe(url=f"amqp://guest:guest@{host}/", name=f"rabbitmq-node-{i}"))

Constructor arguments:

Argument Default Description
url "amqp://guest:guest@localhost/" AMQP(S) connection URL
name "rabbitmq" Probe label
management_url None Base URL of the Management HTTP API. When set, enables rich queue-level details. Credentials are taken from url.

Watching Kafka

pip install "fastapi-watch[kafka]"

KafkaProbe starts an AIOKafkaAdminClient to verify broker reachability, then lists topics and describes the cluster.

from fastapi_watch.probes import KafkaProbe

# Single broker
registry.add(KafkaProbe(bootstrap_servers="localhost:9092"))

# Multiple brokers
registry.add(KafkaProbe(bootstrap_servers=["b1:9092", "b2:9092", "b3:9092"]))

Details returned:

{
  "broker_count": 3,
  "controller_id": 1,
  "topics": ["orders", "payments", "notifications"],
  "internal_topics": ["__consumer_offsets"]
}

topics contains user-defined topics only. internal_topics lists Kafka-managed topics (those prefixed with __).

Constructor arguments:

Argument Default Description
bootstrap_servers "localhost:9092" String or list of host:port entries
name "kafka" Probe label
request_timeout_ms 5000 Admin client metadata request timeout

Watching MongoDB

pip install "fastapi-watch[mongo]"

MongoProbe runs serverStatus on the admin database to collect version, connection pool stats, memory, and storage engine.

from fastapi_watch.probes import MongoProbe

registry.add(MongoProbe(url="mongodb://localhost:27017"))

Details returned:

{
  "version": "7.0.5",
  "uptime_seconds": 172800,
  "connections": {
    "current": 12,
    "available": 838,
    "total_created": 150
  },
  "memory_mb": {
    "resident": 128,
    "virtual": 1024
  },
  "storage_engine": "wiredTiger"
}

Constructor arguments:

Argument Default Description
url "mongodb://localhost:27017" MongoDB connection URI
name "mongodb" Probe label
server_selection_timeout_ms 2000 How long to wait for a server before giving up

Watching an HTTP endpoint

pip install "fastapi-watch[http]"

HttpProbe performs an HTTP GET and checks the response status code.

from fastapi_watch.probes import HttpProbe

registry.add(HttpProbe(url="https://api.upstream.com/health"))

Details returned:

{
  "status_code": 200,
  "content_type": "application/json",
  "response_bytes": 43
}

details is populated for both healthy and unhealthy responses so you can see what status code an upstream actually returned.

Constructor arguments:

Argument Default Description
url required URL to GET
timeout 5.0 Request timeout in seconds
name URL host Probe label
expected_status 200 HTTP status code considered healthy
# Expect a 204 instead of 200
registry.add(HttpProbe(url="https://api.example.com/ping", expected_status=204))

# Shorter timeout, explicit name
registry.add(HttpProbe(url="https://api.payments.com/health", timeout=2.0, name="payments-api"))

SQLAlchemy engine probe

pip install "fastapi-watch[sqlalchemy]"

SqlAlchemyProbe reuses your existing AsyncEngine so no extra connections are opened. Works with any database SQLAlchemy supports (PostgreSQL, MySQL, SQLite, etc.).

from sqlalchemy.ext.asyncio import create_async_engine
from fastapi_watch.probes import SqlAlchemyProbe

engine = create_async_engine("postgresql+asyncpg://app_user:secret@localhost/mydb")

registry.add(SqlAlchemyProbe(engine=engine, name="primary-db"))

Details returned:

{
  "dialect": "postgresql",
  "driver": "asyncpg",
  "server_version": "16.2.0"
}

Constructor arguments:

Argument Default Description
engine required A SQLAlchemy 2.x AsyncEngine instance
name "database" Probe label

All built-in probes

Databases

Probe Extra Key constructor args Details fields
PostgreSQLProbe postgres url, name, timeout version, active_connections, max_connections, database_size
MySQLProbe mysql url or host/port/user/password/db, name, connect_timeout version, connected_threads, uptime_seconds, max_used_connections
SqlAlchemyProbe sqlalchemy engine, name dialect, driver, server_version

Caches

Probe Extra Key constructor args Details fields
RedisProbe redis url, name version, uptime_seconds, used_memory_human, connected_clients, role, total_keys, clusters
MemcachedProbe memcached host, port, name, pool_size

Queues / messaging

Probe Extra Key constructor args Details fields
RabbitMQProbe rabbitmq url, name, management_url connected; + server, totals, queues when management_url is set
KafkaProbe kafka bootstrap_servers, name, request_timeout_ms broker_count, controller_id, topics, internal_topics

Document stores

Probe Extra Key constructor args Details fields
MongoProbe mongo url, name, server_selection_timeout_ms version, uptime_seconds, connections, memory_mb, storage_engine

HTTP

Probe Extra Key constructor args Details fields
HttpProbe http url, timeout, name, expected_status status_code, content_type, response_bytes

Testing / placeholder

Probe Extra Key constructor args Details fields
MemoryProbe built-in name

Configuration reference

HealthRegistry

Argument Type Default Description
app FastAPI required The FastAPI application instance
prefix str "/health" URL prefix for all health endpoints
tags list[str] ["health"] OpenAPI tags applied to all health routes
poll_interval_ms int | None 60000 How often (ms) to re-run probes while an SSE client is connected. 0 or None disables polling — each request or stream event runs probes on demand. Values below 1000 are clamped to 1000.
logger logging.Logger | None None Logger for warnings (e.g. clamped interval) and probe exception messages. Pass None to emit no logs.
grace_period_ms int 0 How long (ms) after startup to return 503 {"status": "starting"} from /ready. 0 disables the grace period.
history_size int 10 Number of past probe results to retain per probe. Retrieved via GET /health/history. Minimum 1.

HealthRegistry.add(probe, critical=True)

Adds a single probe. Returns self for chaining. Adding the same instance more than once is a no-op.

critical=True (default) — a failing probe causes the overall status to be "unhealthy" and /ready to return 503. Set critical=False to include the probe in reports without affecting readiness.

HealthRegistry.add_probes(probes, critical=True)

Adds a list of probes. The critical flag applies to every probe in the list. Returns self for chaining. Duplicate instances are silently skipped.

HealthRegistry.on_state_change(callback)

Registers a callback invoked whenever a probe's status changes between runs. The callback receives (probe_name: str, old_status: ProbeStatus, new_status: ProbeStatus) and may be a plain function or an async coroutine. Returns self for chaining.

HealthRegistry.set_poll_interval(ms)

Updates the poll interval at runtime. Pass 0 or None to switch to single-fetch mode. If SSE clients are currently connected the poll task is restarted immediately with the new interval.

registry.set_poll_interval(30_000)   # every 30 s
registry.set_poll_interval(0)        # disable — each event runs probes on demand

HealthRegistry.run_all()

Async method — runs every registered probe concurrently and returns list[ProbeResult]. Probe exceptions are caught and converted to unhealthy results. Useful for testing or building custom aggregation outside of the mounted routes.

results = await registry.run_all()
for r in results:
    print(r.name, r.status, r.latency_ms, r.details)

BaseProbe

Attribute Type Default Description
name str "unnamed" Label used in health reports. Override as a class attribute or set in __init__.
timeout float | None None Per-probe timeout in seconds. The check is cancelled and recorded as unhealthy if it exceeds this value. None means no limit.

ProbeResult

Field Type Description
name str Probe identifier
status ProbeStatus "healthy" or "unhealthy"
critical bool True if the probe was registered as critical; affects overall status and readiness
latency_ms float Duration of the check in milliseconds
error str | None Error message; only present on failure
details dict | None Service-specific metadata
is_healthy bool (property) True when status == "healthy"

Kubernetes integration

livenessProbe:
  httpGet:
    path: /health/live
    port: 8000
  initialDelaySeconds: 5
  periodSeconds: 10

readinessProbe:
  httpGet:
    path: /health/ready
    port: 8000
  initialDelaySeconds: 10
  periodSeconds: 15
  failureThreshold: 3

Use /health/ready for the readiness probe — Kubernetes stops routing traffic to a pod the moment any critical dependency becomes unreachable. Use /health/live for liveness so the process is only restarted when it is genuinely stuck, not because an external service is temporarily down.

For applications that need time to warm up (loading models, seeding caches, running migrations), combine grace_period_ms with a short initialDelaySeconds:

readinessProbe:
  httpGet:
    path: /health/ready
    port: 8000
  initialDelaySeconds: 5
  periodSeconds: 10
  failureThreshold: 6   # allow up to 60 s of failures before marking unready
# App holds /ready as "starting" for 30 s regardless of probe results
registry = HealthRegistry(app, grace_period_ms=30_000)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastapi_watch-1.0.1.tar.gz (41.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastapi_watch-1.0.1-py3-none-any.whl (30.9 kB view details)

Uploaded Python 3

File details

Details for the file fastapi_watch-1.0.1.tar.gz.

File metadata

  • Download URL: fastapi_watch-1.0.1.tar.gz
  • Upload date:
  • Size: 41.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fastapi_watch-1.0.1.tar.gz
Algorithm Hash digest
SHA256 28371eb999b42d046de73a17fce78c3604a13e5b3640ab321833da19e7457d7b
MD5 eb6b47ed121901b04d21764039e8c71c
BLAKE2b-256 fbe7e22add7cde68f31993a9aa3aa4e276d0b4d488df3b001ce70bcde641a24a

See more details on using hashes here.

Provenance

The following attestation bundles were made for fastapi_watch-1.0.1.tar.gz:

Publisher: publish.yml on rgreen1207/fastapi-watch

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file fastapi_watch-1.0.1-py3-none-any.whl.

File metadata

  • Download URL: fastapi_watch-1.0.1-py3-none-any.whl
  • Upload date:
  • Size: 30.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fastapi_watch-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 0dbe4d7bf711ebf2a8614a5ec30eb1f91637e227200698104ac0f6a458bdd97f
MD5 bd72bab8f8ab6896058512f647a054ae
BLAKE2b-256 cd2977588056a2bd88f982a24accd2faa182df2d86cc9d2488baa5b0c8122b79

See more details on using hashes here.

Provenance

The following attestation bundles were made for fastapi_watch-1.0.1-py3-none-any.whl:

Publisher: publish.yml on rgreen1207/fastapi-watch

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page