Skip to main content

Structured health and readiness check system for FastAPI

Project description

fastapi-watch

Structured health and readiness check system for FastAPI.

Add /health/live, /health/ready, and /health/status endpoints to any FastAPI app with a single registry call. All probes run concurrently, so a slow dependency never blocks the others. Each probe returns rich service-specific details alongside the pass/fail result.


Table of contents


Installation

Install only the extras you actually use. Nothing is pulled in by default beyond FastAPI and Pydantic.

# Core package — includes the always-passing MemoryProbe, no other deps
pip install fastapi-watch

# Add individual service probes as needed
pip install fastapi-watch[postgres]     # PostgreSQL        (asyncpg)
pip install fastapi-watch[mysql]        # MySQL / MariaDB   (aiomysql)
pip install fastapi-watch[sqlalchemy]   # Any SQLAlchemy 2.x async engine
pip install fastapi-watch[redis]        # Redis             (aioredis)
pip install fastapi-watch[memcached]    # Memcached         (aiomcache)
pip install fastapi-watch[rabbitmq]     # RabbitMQ          (aio-pika + aiohttp)
pip install fastapi-watch[kafka]        # Kafka             (aiokafka)
pip install fastapi-watch[mongo]        # MongoDB           (motor)
pip install fastapi-watch[http]         # HTTP endpoint     (aiohttp)

# Or pull everything in one shot
pip install fastapi-watch[all]

Multiple extras can be combined:

pip install fastapi-watch[postgres,redis,rabbitmq]

How it works

Create a HealthRegistry, attach it to your FastAPI app, and call .add() for each service you want to monitor. The registry mounts the three health endpoints automatically and runs every probe concurrently on each request.

from fastapi import FastAPI
from fastapi_watch import HealthRegistry

app = FastAPI()
registry = HealthRegistry(app)

Add probes individually, as a list, or chain calls. Adding the same instance twice is a no-op.

# Single probe
registry.add(probe_a)

# Multiple probes in one call
registry.add([probe_a, probe_b, probe_c])

# Chained
registry.add(probe_a).add(probe_b).add(probe_c)

# Duplicate ignored — probe_a is only registered once
registry.add(probe_a)
registry.add(probe_a)

Endpoints

Endpoint Purpose Healthy Degraded
GET /health/live Liveness — is the process alive? 200 OK never fails
GET /health/ready Readiness — are all probes passing? 200 OK 503 Service Unavailable
GET /health/status Status — full detail on every probe 200 OK 207 Multi-Status

The prefix defaults to /health and can be changed:

registry = HealthRegistry(app, prefix="/ops/health")
# → /ops/health/live, /ops/health/ready, /ops/health/status

Response format

Every probe result has the same base shape:

Field Type Description
name string Probe identifier
status "healthy" | "unhealthy" Pass/fail result
latency_ms number How long the check took in milliseconds
error string | null Error message, only present on failure
details object | null Service-specific metadata (see Probe details)

All healthy — 200

{
  "status": "healthy",
  "probes": [
    {
      "name": "postgresql",
      "status": "healthy",
      "latency_ms": 1.8,
      "details": {
        "version": "PostgreSQL 16.2 on aarch64-unknown-linux-gnu",
        "active_connections": 5,
        "max_connections": 100,
        "database_size": "42 MB"
      }
    },
    {
      "name": "redis",
      "status": "healthy",
      "latency_ms": 0.6,
      "details": {
        "version": "7.2.4",
        "uptime_seconds": 86400,
        "used_memory_human": "2.50M",
        "connected_clients": 8,
        "total_keys": 312,
        "clusters": {
          "session": { "keys": 150, "ttl_seconds": 3600 },
          "cache":   { "keys": 162, "ttl_seconds": 900  }
        }
      }
    }
  ]
}

One probe failing — 503 on /ready, 207 on /status

{
  "status": "unhealthy",
  "probes": [
    { "name": "postgresql", "status": "healthy",   "latency_ms": 1.8, "details": { ... } },
    { "name": "redis",      "status": "unhealthy", "latency_ms": 5002.1, "error": "Connection refused" }
  ]
}

Probe details

Every built-in probe populates the details field with service-specific metadata. Details are always best-effort — if the metadata query fails after a successful connectivity check, details will contain whatever was collected up to that point. The probe status reflects connectivity only, not the completeness of details.


Watching PostgreSQL

pip install fastapi-watch[postgres]

PostgreSQLProbe uses asyncpg directly — no SQLAlchemy required. It opens a connection, runs SELECT 1 to verify the server is responsive, collects metadata, then closes the connection.

from fastapi_watch.probes import PostgreSQLProbe

registry.add(
    PostgreSQLProbe(
        url="postgresql://app_user:secret@localhost:5432/mydb",
        name="primary-db",  # default: "postgresql"
    )
)

Details returned:

{
  "version": "PostgreSQL 16.2 on aarch64-unknown-linux-gnu, compiled by gcc 12.2.0",
  "active_connections": 5,
  "max_connections": 100,
  "database_size": "42 MB"
}

Checking a read replica separately:

registry.add(PostgreSQLProbe(url="postgresql://reader:secret@replica.host/mydb", name="replica-db"))

With a connection timeout (default 5 seconds):

registry.add(PostgreSQLProbe(url="postgresql://...", timeout=2.0))

If you are already using SQLAlchemy, see SQLAlchemy engine probe to reuse your existing engine instead.


Watching MySQL / MariaDB

pip install fastapi-watch[mysql]

MySQLProbe accepts either a URL or explicit connection kwargs.

from fastapi_watch.probes import MySQLProbe

# URL form
registry.add(MySQLProbe(url="mysql://app_user:secret@localhost:3306/mydb"))

# Keyword form
registry.add(MySQLProbe(host="localhost", port=3306, user="app_user", password="secret", db="mydb"))

Details returned:

{
  "version": "8.0.36",
  "connected_threads": 4,
  "uptime_seconds": 172800,
  "max_used_connections": 12
}

Constructor arguments:

Argument Default Description
url None Full DSN — overrides all other kwargs when set
host "localhost"
port 3306
user "root"
password ""
db ""
name "mysql" Probe label
connect_timeout 5 Seconds

Watching Redis

pip install fastapi-watch[redis]

RedisProbe sends PING, then collects server info and scans key prefixes to build a cluster breakdown.

from fastapi_watch.probes import RedisProbe

registry.add(RedisProbe(url="redis://localhost:6379"))

Details returned:

{
  "version": "7.2.4",
  "uptime_seconds": 86400,
  "used_memory_human": "2.50M",
  "connected_clients": 8,
  "role": "master",
  "total_keys": 312,
  "clusters": {
    "session": { "keys": 150, "ttl_seconds": 3600 },
    "cache":   { "keys": 162, "ttl_seconds": 900  }
  }
}

clusters groups keys by the segment before the first :. For example, a key named session:abc123 falls into the session cluster. ttl_seconds is sampled from one key in the group; null means the key has no expiry.

Common URL forms:

# Password-protected
RedisProbe(url="redis://:mypassword@localhost:6379")

# Specific database index
RedisProbe(url="redis://localhost:6379/2", name="task-queue")

# TLS
RedisProbe(url="rediss://redis.internal:6380")

# Watching Redis as both a cache and a queue
registry.add(RedisProbe(url="redis://localhost:6379/0", name="cache"))
registry.add(RedisProbe(url="redis://localhost:6379/1", name="task-queue"))

Watching Memcached

pip install fastapi-watch[memcached]

MemcachedProbe calls stats() to verify the server is reachable and responding.

from fastapi_watch.probes import MemcachedProbe

registry.add(MemcachedProbe(host="localhost", port=11211))

Constructor arguments:

Argument Default Description
host "localhost"
port 11211
name "memcached" Probe label
pool_size 1 aiomcache connection pool size

Watching RabbitMQ

pip install fastapi-watch[rabbitmq]

RabbitMQProbe has two modes:

  • Connectivity only (default) — opens and closes an AMQP connection. No channels or queues are touched.
  • Rich mode — when management_url is set, the probe also calls the RabbitMQ Management HTTP API and returns per-queue stats, message rates, and cluster metadata.

Connectivity only

from fastapi_watch.probes import RabbitMQProbe

registry.add(
    RabbitMQProbe(
        url="amqp://guest:guest@localhost:5672/",
        name="rabbitmq",  # default
    )
)

Details returned (connectivity only):

{ "connected": true }

Rich mode — with Management API

Pass management_url pointing at the RabbitMQ Management plugin (default port 15672). Credentials are taken from the AMQP URL automatically.

registry.add(
    RabbitMQProbe(
        url="amqp://guest:guest@localhost:5672/",
        management_url="http://localhost:15672",
    )
)

Details returned (rich mode):

{
  "connected": true,
  "server": {
    "rabbitmq_version": "3.12.0",
    "erlang_version": "26.0",
    "cluster_name": "rabbit@my-node",
    "node": "rabbit@my-node",
    "connections": 4,
    "channels": 8,
    "exchanges": 14,
    "queues": 3,
    "consumers": 6
  },
  "totals": {
    "messages": 142,
    "messages_ready": 140,
    "messages_unacknowledged": 2,
    "publish_rate": 12.5,
    "deliver_rate": 11.8,
    "ack_rate": 11.8
  },
  "queues": {
    "tasks": {
      "state": "running",
      "messages": 120,
      "messages_ready": 118,
      "messages_unacknowledged": 2,
      "consumers": 4,
      "memory_bytes": 32768,
      "publish_rate": 10.0,
      "deliver_rate": 9.5,
      "ack_rate": 9.5,
      "durable": true,
      "auto_delete": false,
      "idle_since": null
    },
    "dead-letter": {
      "state": "running",
      "messages": 22,
      "consumers": 0,
      ...
    }
  }
}

If the Management API is unreachable, a management_api_error key is added to details and the probe still reports the AMQP connection status.

Other connection forms:

# Dedicated monitoring vhost
RabbitMQProbe(url="amqp://monitor:secret@rabbitmq.internal/monitoring", management_url="http://rabbitmq.internal:15672")

# TLS / AMQPS
RabbitMQProbe(url="amqps://user:secret@rabbitmq.internal/", name="rabbitmq-tls")

# Multiple cluster nodes — one probe per node
for i, host in enumerate(["rmq-1.internal", "rmq-2.internal", "rmq-3.internal"], start=1):
    registry.add(RabbitMQProbe(url=f"amqp://guest:guest@{host}/", name=f"rabbitmq-node-{i}"))

Constructor arguments:

Argument Default Description
url "amqp://guest:guest@localhost/" AMQP(S) connection URL
name "rabbitmq" Probe label
management_url None Base URL of the Management HTTP API. When set, enables rich queue-level details. Credentials are taken from url.

Watching Kafka

pip install fastapi-watch[kafka]

KafkaProbe starts an AIOKafkaAdminClient to verify broker reachability, then lists topics and describes the cluster.

from fastapi_watch.probes import KafkaProbe

# Single broker
registry.add(KafkaProbe(bootstrap_servers="localhost:9092"))

# Multiple brokers
registry.add(KafkaProbe(bootstrap_servers=["b1:9092", "b2:9092", "b3:9092"]))

Details returned:

{
  "broker_count": 3,
  "controller_id": 1,
  "topics": ["orders", "payments", "notifications"],
  "internal_topics": ["__consumer_offsets"]
}

topics contains user-defined topics only. internal_topics lists Kafka-managed topics (those prefixed with __).

Constructor arguments:

Argument Default Description
bootstrap_servers "localhost:9092" String or list of host:port entries
name "kafka" Probe label
request_timeout_ms 5000 Admin client metadata request timeout

Watching MongoDB

pip install fastapi-watch[mongo]

MongoProbe runs serverStatus on the admin database to collect version, connection pool stats, memory, and storage engine.

from fastapi_watch.probes import MongoProbe

registry.add(MongoProbe(url="mongodb://localhost:27017"))

Details returned:

{
  "version": "7.0.5",
  "uptime_seconds": 172800,
  "connections": {
    "current": 12,
    "available": 838,
    "total_created": 150
  },
  "memory_mb": {
    "resident": 128,
    "virtual": 1024
  },
  "storage_engine": "wiredTiger"
}

Constructor arguments:

Argument Default Description
url "mongodb://localhost:27017" MongoDB connection URI
name "mongodb" Probe label
server_selection_timeout_ms 2000 How long to wait for a server before giving up

Watching an HTTP endpoint

pip install fastapi-watch[http]

HttpProbe performs an HTTP GET and checks the response status code.

from fastapi_watch.probes import HttpProbe

registry.add(HttpProbe(url="https://api.upstream.com/health"))

Details returned:

{
  "status_code": 200,
  "content_type": "application/json",
  "response_bytes": 43
}

details is populated for both healthy and unhealthy responses so you can see what status code an upstream actually returned.

Constructor arguments:

Argument Default Description
url required URL to GET
timeout 5.0 Request timeout in seconds
name URL host Probe label
expected_status 200 HTTP status code considered healthy
# Expect a 204 instead of 200
registry.add(HttpProbe(url="https://api.example.com/ping", expected_status=204))

# Shorter timeout, explicit name
registry.add(HttpProbe(url="https://api.payments.com/health", timeout=2.0, name="payments-api"))

SQLAlchemy engine probe

pip install fastapi-watch[sqlalchemy]

SqlAlchemyProbe reuses your existing AsyncEngine so no extra connections are opened. Works with any database SQLAlchemy supports (PostgreSQL, MySQL, SQLite, etc.).

from sqlalchemy.ext.asyncio import create_async_engine
from fastapi_watch.probes import SqlAlchemyProbe

engine = create_async_engine("postgresql+asyncpg://app_user:secret@localhost/mydb")

registry.add(SqlAlchemyProbe(engine=engine, name="primary-db"))

Details returned:

{
  "dialect": "postgresql",
  "driver": "asyncpg",
  "server_version": "16.2.0"
}

Constructor arguments:

Argument Default Description
engine required A SQLAlchemy 2.x AsyncEngine instance
name "database" Probe label

Writing a custom probe

Any class that extends BaseProbe and implements check() works as a probe. This is the right approach for internal services, third-party SDKs, business-logic checks, or composite conditions.

Minimal example

from fastapi_watch.probes import BaseProbe
from fastapi_watch.models import ProbeResult, ProbeStatus

class MyServiceProbe(BaseProbe):
    name = "my-service"

    async def check(self) -> ProbeResult:
        ok = await call_my_service()
        return ProbeResult(
            name=self.name,
            status=ProbeStatus.HEALTHY if ok else ProbeStatus.UNHEALTHY,
        )

registry.add(MyServiceProbe())

With latency and details

import time
from fastapi_watch.probes import BaseProbe
from fastapi_watch.models import ProbeResult, ProbeStatus

class PaymentGatewayProbe(BaseProbe):
    name = "payment-gateway"

    async def check(self) -> ProbeResult:
        start = time.perf_counter()
        try:
            info = await ping_payment_gateway()
            latency = (time.perf_counter() - start) * 1000
            return ProbeResult(
                name=self.name,
                status=ProbeStatus.HEALTHY,
                latency_ms=round(latency, 2),
                details={
                    "region": info.region,
                    "provider_version": info.version,
                },
            )
        except Exception as exc:
            latency = (time.perf_counter() - start) * 1000
            return ProbeResult(
                name=self.name,
                status=ProbeStatus.UNHEALTHY,
                latency_ms=round(latency, 2),
                error=str(exc),
            )

Configurable probe

class S3BucketProbe(BaseProbe):
    """Checks that an S3 bucket is reachable and the credentials are valid."""

    def __init__(self, bucket: str, region: str = "us-east-1", name: str = "s3") -> None:
        self.bucket = bucket
        self.region = region
        self.name = name

    async def check(self) -> ProbeResult:
        import time
        import aiobotocore.session

        start = time.perf_counter()
        try:
            session = aiobotocore.session.get_session()
            async with session.create_client("s3", region_name=self.region) as client:
                await client.head_bucket(Bucket=self.bucket)
            latency = (time.perf_counter() - start) * 1000
            return ProbeResult(
                name=self.name,
                status=ProbeStatus.HEALTHY,
                latency_ms=round(latency, 2),
                details={"bucket": self.bucket, "region": self.region},
            )
        except Exception as exc:
            latency = (time.perf_counter() - start) * 1000
            return ProbeResult(
                name=self.name,
                status=ProbeStatus.UNHEALTHY,
                latency_ms=round(latency, 2),
                error=str(exc),
            )

registry.add(S3BucketProbe(bucket="my-app-uploads", region="eu-west-1"))

Composite probe

Report unhealthy only when both Redis nodes are down simultaneously:

class CompositeRedisProbe(BaseProbe):
    name = "redis-composite"

    def __init__(self, primary_url: str, replica_url: str) -> None:
        self._primary = RedisProbe(url=primary_url, name="primary")
        self._replica = RedisProbe(url=replica_url, name="replica")

    async def check(self) -> ProbeResult:
        import asyncio
        primary, replica = await asyncio.gather(
            self._primary.check(), self._replica.check()
        )
        if primary.is_healthy or replica.is_healthy:
            return ProbeResult(name=self.name, status=ProbeStatus.HEALTHY)
        return ProbeResult(
            name=self.name,
            status=ProbeStatus.UNHEALTHY,
            error=f"both nodes down — primary: {primary.error}, replica: {replica.error}",
        )

All built-in probes

Databases

Probe Extra Key constructor args Details fields
PostgreSQLProbe postgres url, name, timeout version, active_connections, max_connections, database_size
MySQLProbe mysql url or host/port/user/password/db, name, connect_timeout version, connected_threads, uptime_seconds, max_used_connections
SqlAlchemyProbe sqlalchemy engine, name dialect, driver, server_version

Caches

Probe Extra Key constructor args Details fields
RedisProbe redis url, name version, uptime_seconds, used_memory_human, connected_clients, role, total_keys, clusters
MemcachedProbe memcached host, port, name, pool_size

Queues / messaging

Probe Extra Key constructor args Details fields
RabbitMQProbe rabbitmq url, name, management_url connected; + server, totals, queues when management_url is set
KafkaProbe kafka bootstrap_servers, name, request_timeout_ms broker_count, controller_id, topics, internal_topics

Document stores

Probe Extra Key constructor args Details fields
MongoProbe mongo url, name, server_selection_timeout_ms version, uptime_seconds, connections, memory_mb, storage_engine

HTTP

Probe Extra Key constructor args Details fields
HttpProbe http url, timeout, name, expected_status status_code, content_type, response_bytes

Testing / placeholder

Probe Extra Key constructor args Details fields
MemoryProbe built-in name

Configuration reference

HealthRegistry

Argument Type Default Description
app FastAPI required The FastAPI application instance
prefix str "/health" URL prefix for all three endpoints
tags list[str] ["health"] OpenAPI tags applied to the health routes

HealthRegistry.add(probe_or_list)

Accepts a single BaseProbe instance or a list of them. Returns self for chaining. Adding the same instance more than once is a no-op. Probes run concurrently on every request in the order they were added.

HealthRegistry.run_all()

Async method — runs every registered probe and returns list[ProbeResult]. Useful for testing or building custom aggregation logic outside of the mounted routes.

results = await registry.run_all()
for r in results:
    print(r.name, r.status, r.details)

ProbeResult

Field Type Description
name str Probe identifier
status ProbeStatus "healthy" or "unhealthy"
latency_ms float Duration of the check in milliseconds
error str | None Error message; only present on failure
details dict | None Service-specific metadata; see each probe's section above
is_healthy bool (property) True when status == "healthy"

Kubernetes integration

livenessProbe:
  httpGet:
    path: /health/live
    port: 8000
  initialDelaySeconds: 5
  periodSeconds: 10

readinessProbe:
  httpGet:
    path: /health/ready
    port: 8000
  initialDelaySeconds: 10
  periodSeconds: 15
  failureThreshold: 3

Use /health/ready for the readiness probe — Kubernetes stops routing traffic to a pod the moment any dependency becomes unreachable. Use /health/live for liveness so the process is only restarted when it is genuinely stuck, not because an external service is temporarily down.


License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastapi_watch-0.1.1.tar.gz (18.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastapi_watch-0.1.1-py3-none-any.whl (22.7 kB view details)

Uploaded Python 3

File details

Details for the file fastapi_watch-0.1.1.tar.gz.

File metadata

  • Download URL: fastapi_watch-0.1.1.tar.gz
  • Upload date:
  • Size: 18.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fastapi_watch-0.1.1.tar.gz
Algorithm Hash digest
SHA256 7d20cae63561fb35b5026e2b828c87aa6212f1181b8f38e9f182b81b0758ddc8
MD5 2bd1a4fe3fa44a5b9a0f669d5002a455
BLAKE2b-256 8f52421f2aa26fa38b9bf124430ef15b123b1948daf3329525a9503beea4e9de

See more details on using hashes here.

Provenance

The following attestation bundles were made for fastapi_watch-0.1.1.tar.gz:

Publisher: publish.yml on rgreen1207/fastapi-watch

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file fastapi_watch-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: fastapi_watch-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 22.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fastapi_watch-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 a911b678ac13aed13487ae78e0a8be5cae102debedac9188a11e417c276577d0
MD5 aa7335a008412b2421f2ad3161235500
BLAKE2b-256 0bff29b489c2c447ad36965c9e21ecd0ae61224d326ccc7443c2236015f31046

See more details on using hashes here.

Provenance

The following attestation bundles were made for fastapi_watch-0.1.1-py3-none-any.whl:

Publisher: publish.yml on rgreen1207/fastapi-watch

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page