Skip to main content

Structured health and readiness check system for FastAPI

Reason this release was yanked:

HTTP Probe would consume 3rd party API usages

Project description

fastapi-watch

Structured health and readiness check system for FastAPI.

Add /health/live, /health/ready, and /health/status endpoints to any FastAPI app with a single registry call. All probes run concurrently, so a slow dependency never blocks the others. Each probe returns rich service-specific details alongside the pass/fail result.


Table of contents


Installation

Install only the extras you actually use. Nothing is pulled in by default beyond FastAPI and Pydantic.

# Core package — includes the always-passing MemoryProbe, no other deps
pip install fastapi-watch

# Add individual service probes as needed
pip install fastapi-watch[postgres]     # PostgreSQL        (asyncpg)
pip install fastapi-watch[mysql]        # MySQL / MariaDB   (aiomysql)
pip install fastapi-watch[sqlalchemy]   # Any SQLAlchemy 2.x async engine
pip install fastapi-watch[redis]        # Redis             (aioredis)
pip install fastapi-watch[memcached]    # Memcached         (aiomcache)
pip install fastapi-watch[rabbitmq]     # RabbitMQ          (aio-pika + aiohttp)
pip install fastapi-watch[kafka]        # Kafka             (aiokafka)
pip install fastapi-watch[mongo]        # MongoDB           (motor)
pip install fastapi-watch[http]         # HTTP endpoint     (aiohttp)

# Or pull everything in one shot
pip install fastapi-watch[all]

Multiple extras can be combined:

pip install fastapi-watch[postgres,redis,rabbitmq]

How it works

Create a HealthRegistry, attach it to your FastAPI app, and call .add() for each service you want to monitor. The registry mounts the three health endpoints automatically and runs every probe concurrently on each request.

from fastapi import FastAPI
from fastapi_watch import HealthRegistry

app = FastAPI()
registry = HealthRegistry(app)

Add probes one at a time or chain them:

registry.add(probe_a)
registry.add(probe_b)

# chained
registry.add(probe_a).add(probe_b).add(probe_c)

Endpoints

Endpoint Purpose Healthy Degraded
GET /health/live Liveness — is the process alive? 200 OK never fails
GET /health/ready Readiness — are all probes passing? 200 OK 503 Service Unavailable
GET /health/status Status — full detail on every probe 200 OK 207 Multi-Status

The prefix defaults to /health and can be changed:

registry = HealthRegistry(app, prefix="/ops/health")
# → /ops/health/live, /ops/health/ready, /ops/health/status

Response format

Every probe result has the same base shape:

Field Type Description
name string Probe identifier
status "healthy" | "unhealthy" Pass/fail result
latency_ms number How long the check took in milliseconds
error string | null Error message, only present on failure
details object | null Service-specific metadata (see Probe details)

All healthy — 200

{
  "status": "healthy",
  "probes": [
    {
      "name": "postgresql",
      "status": "healthy",
      "latency_ms": 1.8,
      "details": {
        "version": "PostgreSQL 16.2 on aarch64-unknown-linux-gnu",
        "active_connections": 5,
        "max_connections": 100,
        "database_size": "42 MB"
      }
    },
    {
      "name": "redis",
      "status": "healthy",
      "latency_ms": 0.6,
      "details": {
        "version": "7.2.4",
        "uptime_seconds": 86400,
        "used_memory_human": "2.50M",
        "connected_clients": 8,
        "total_keys": 312,
        "clusters": {
          "session": { "keys": 150, "ttl_seconds": 3600 },
          "cache":   { "keys": 162, "ttl_seconds": 900  }
        }
      }
    }
  ]
}

One probe failing — 503 on /ready, 207 on /status

{
  "status": "unhealthy",
  "probes": [
    { "name": "postgresql", "status": "healthy",   "latency_ms": 1.8, "details": { ... } },
    { "name": "redis",      "status": "unhealthy", "latency_ms": 5002.1, "error": "Connection refused" }
  ]
}

Probe details

Every built-in probe populates the details field with service-specific metadata. Details are always best-effort — if the metadata query fails after a successful connectivity check, details will contain whatever was collected up to that point. The probe status reflects connectivity only, not the completeness of details.


Watching PostgreSQL

pip install fastapi-watch[postgres]

PostgreSQLProbe uses asyncpg directly — no SQLAlchemy required. It opens a connection, runs SELECT 1 to verify the server is responsive, collects metadata, then closes the connection.

from fastapi_watch.probes import PostgreSQLProbe

registry.add(
    PostgreSQLProbe(
        url="postgresql://app_user:secret@localhost:5432/mydb",
        name="primary-db",  # default: "postgresql"
    )
)

Details returned:

{
  "version": "PostgreSQL 16.2 on aarch64-unknown-linux-gnu, compiled by gcc 12.2.0",
  "active_connections": 5,
  "max_connections": 100,
  "database_size": "42 MB"
}

Checking a read replica separately:

registry.add(PostgreSQLProbe(url="postgresql://reader:secret@replica.host/mydb", name="replica-db"))

With a connection timeout (default 5 seconds):

registry.add(PostgreSQLProbe(url="postgresql://...", timeout=2.0))

If you are already using SQLAlchemy, see SQLAlchemy engine probe to reuse your existing engine instead.


Watching MySQL / MariaDB

pip install fastapi-watch[mysql]

MySQLProbe accepts either a URL or explicit connection kwargs.

from fastapi_watch.probes import MySQLProbe

# URL form
registry.add(MySQLProbe(url="mysql://app_user:secret@localhost:3306/mydb"))

# Keyword form
registry.add(MySQLProbe(host="localhost", port=3306, user="app_user", password="secret", db="mydb"))

Details returned:

{
  "version": "8.0.36",
  "connected_threads": 4,
  "uptime_seconds": 172800,
  "max_used_connections": 12
}

Constructor arguments:

Argument Default Description
url None Full DSN — overrides all other kwargs when set
host "localhost"
port 3306
user "root"
password ""
db ""
name "mysql" Probe label
connect_timeout 5 Seconds

Watching Redis

pip install fastapi-watch[redis]

RedisProbe sends PING, then collects server info and scans key prefixes to build a cluster breakdown.

from fastapi_watch.probes import RedisProbe

registry.add(RedisProbe(url="redis://localhost:6379"))

Details returned:

{
  "version": "7.2.4",
  "uptime_seconds": 86400,
  "used_memory_human": "2.50M",
  "connected_clients": 8,
  "role": "master",
  "total_keys": 312,
  "clusters": {
    "session": { "keys": 150, "ttl_seconds": 3600 },
    "cache":   { "keys": 162, "ttl_seconds": 900  }
  }
}

clusters groups keys by the segment before the first :. For example, a key named session:abc123 falls into the session cluster. ttl_seconds is sampled from one key in the group; null means the key has no expiry.

Common URL forms:

# Password-protected
RedisProbe(url="redis://:mypassword@localhost:6379")

# Specific database index
RedisProbe(url="redis://localhost:6379/2", name="task-queue")

# TLS
RedisProbe(url="rediss://redis.internal:6380")

# Watching Redis as both a cache and a queue
registry.add(RedisProbe(url="redis://localhost:6379/0", name="cache"))
registry.add(RedisProbe(url="redis://localhost:6379/1", name="task-queue"))

Watching Memcached

pip install fastapi-watch[memcached]

MemcachedProbe calls stats() to verify the server is reachable and responding.

from fastapi_watch.probes import MemcachedProbe

registry.add(MemcachedProbe(host="localhost", port=11211))

Constructor arguments:

Argument Default Description
host "localhost"
port 11211
name "memcached" Probe label
pool_size 1 aiomcache connection pool size

Watching RabbitMQ

pip install fastapi-watch[rabbitmq]

RabbitMQProbe has two modes:

  • Connectivity only (default) — opens and closes an AMQP connection. No channels or queues are touched.
  • Rich mode — when management_url is set, the probe also calls the RabbitMQ Management HTTP API and returns per-queue stats, message rates, and cluster metadata.

Connectivity only

from fastapi_watch.probes import RabbitMQProbe

registry.add(
    RabbitMQProbe(
        url="amqp://guest:guest@localhost:5672/",
        name="rabbitmq",  # default
    )
)

Details returned (connectivity only):

{ "connected": true }

Rich mode — with Management API

Pass management_url pointing at the RabbitMQ Management plugin (default port 15672). Credentials are taken from the AMQP URL automatically.

registry.add(
    RabbitMQProbe(
        url="amqp://guest:guest@localhost:5672/",
        management_url="http://localhost:15672",
    )
)

Details returned (rich mode):

{
  "connected": true,
  "server": {
    "rabbitmq_version": "3.12.0",
    "erlang_version": "26.0",
    "cluster_name": "rabbit@my-node",
    "node": "rabbit@my-node",
    "connections": 4,
    "channels": 8,
    "exchanges": 14,
    "queues": 3,
    "consumers": 6
  },
  "totals": {
    "messages": 142,
    "messages_ready": 140,
    "messages_unacknowledged": 2,
    "publish_rate": 12.5,
    "deliver_rate": 11.8,
    "ack_rate": 11.8
  },
  "queues": {
    "tasks": {
      "state": "running",
      "messages": 120,
      "messages_ready": 118,
      "messages_unacknowledged": 2,
      "consumers": 4,
      "memory_bytes": 32768,
      "publish_rate": 10.0,
      "deliver_rate": 9.5,
      "ack_rate": 9.5,
      "durable": true,
      "auto_delete": false,
      "idle_since": null
    },
    "dead-letter": {
      "state": "running",
      "messages": 22,
      "consumers": 0,
      ...
    }
  }
}

If the Management API is unreachable, a management_api_error key is added to details and the probe still reports the AMQP connection status.

Other connection forms:

# Dedicated monitoring vhost
RabbitMQProbe(url="amqp://monitor:secret@rabbitmq.internal/monitoring", management_url="http://rabbitmq.internal:15672")

# TLS / AMQPS
RabbitMQProbe(url="amqps://user:secret@rabbitmq.internal/", name="rabbitmq-tls")

# Multiple cluster nodes — one probe per node
for i, host in enumerate(["rmq-1.internal", "rmq-2.internal", "rmq-3.internal"], start=1):
    registry.add(RabbitMQProbe(url=f"amqp://guest:guest@{host}/", name=f"rabbitmq-node-{i}"))

Constructor arguments:

Argument Default Description
url "amqp://guest:guest@localhost/" AMQP(S) connection URL
name "rabbitmq" Probe label
management_url None Base URL of the Management HTTP API. When set, enables rich queue-level details. Credentials are taken from url.

Watching Kafka

pip install fastapi-watch[kafka]

KafkaProbe starts an AIOKafkaAdminClient to verify broker reachability, then lists topics and describes the cluster.

from fastapi_watch.probes import KafkaProbe

# Single broker
registry.add(KafkaProbe(bootstrap_servers="localhost:9092"))

# Multiple brokers
registry.add(KafkaProbe(bootstrap_servers=["b1:9092", "b2:9092", "b3:9092"]))

Details returned:

{
  "broker_count": 3,
  "controller_id": 1,
  "topics": ["orders", "payments", "notifications"],
  "internal_topics": ["__consumer_offsets"]
}

topics contains user-defined topics only. internal_topics lists Kafka-managed topics (those prefixed with __).

Constructor arguments:

Argument Default Description
bootstrap_servers "localhost:9092" String or list of host:port entries
name "kafka" Probe label
request_timeout_ms 5000 Admin client metadata request timeout

Watching MongoDB

pip install fastapi-watch[mongo]

MongoProbe runs serverStatus on the admin database to collect version, connection pool stats, memory, and storage engine.

from fastapi_watch.probes import MongoProbe

registry.add(MongoProbe(url="mongodb://localhost:27017"))

Details returned:

{
  "version": "7.0.5",
  "uptime_seconds": 172800,
  "connections": {
    "current": 12,
    "available": 838,
    "total_created": 150
  },
  "memory_mb": {
    "resident": 128,
    "virtual": 1024
  },
  "storage_engine": "wiredTiger"
}

Constructor arguments:

Argument Default Description
url "mongodb://localhost:27017" MongoDB connection URI
name "mongodb" Probe label
server_selection_timeout_ms 2000 How long to wait for a server before giving up

Watching an HTTP endpoint

pip install fastapi-watch[http]

HttpProbe performs an HTTP GET and checks the response status code.

from fastapi_watch.probes import HttpProbe

registry.add(HttpProbe(url="https://api.upstream.com/health"))

Details returned:

{
  "status_code": 200,
  "content_type": "application/json",
  "response_bytes": 43
}

details is populated for both healthy and unhealthy responses so you can see what status code an upstream actually returned.

Constructor arguments:

Argument Default Description
url required URL to GET
timeout 5.0 Request timeout in seconds
name URL host Probe label
expected_status 200 HTTP status code considered healthy
# Expect a 204 instead of 200
registry.add(HttpProbe(url="https://api.example.com/ping", expected_status=204))

# Shorter timeout, explicit name
registry.add(HttpProbe(url="https://api.payments.com/health", timeout=2.0, name="payments-api"))

SQLAlchemy engine probe

pip install fastapi-watch[sqlalchemy]

SqlAlchemyProbe reuses your existing AsyncEngine so no extra connections are opened. Works with any database SQLAlchemy supports (PostgreSQL, MySQL, SQLite, etc.).

from sqlalchemy.ext.asyncio import create_async_engine
from fastapi_watch.probes import SqlAlchemyProbe

engine = create_async_engine("postgresql+asyncpg://app_user:secret@localhost/mydb")

registry.add(SqlAlchemyProbe(engine=engine, name="primary-db"))

Details returned:

{
  "dialect": "postgresql",
  "driver": "asyncpg",
  "server_version": "16.2.0"
}

Constructor arguments:

Argument Default Description
engine required A SQLAlchemy 2.x AsyncEngine instance
name "database" Probe label

Writing a custom probe

Any class that extends BaseProbe and implements check() works as a probe. This is the right approach for internal services, third-party SDKs, business-logic checks, or composite conditions.

Minimal example

from fastapi_watch.probes import BaseProbe
from fastapi_watch.models import ProbeResult, ProbeStatus

class MyServiceProbe(BaseProbe):
    name = "my-service"

    async def check(self) -> ProbeResult:
        ok = await call_my_service()
        return ProbeResult(
            name=self.name,
            status=ProbeStatus.HEALTHY if ok else ProbeStatus.UNHEALTHY,
        )

registry.add(MyServiceProbe())

With latency and details

import time
from fastapi_watch.probes import BaseProbe
from fastapi_watch.models import ProbeResult, ProbeStatus

class PaymentGatewayProbe(BaseProbe):
    name = "payment-gateway"

    async def check(self) -> ProbeResult:
        start = time.perf_counter()
        try:
            info = await ping_payment_gateway()
            latency = (time.perf_counter() - start) * 1000
            return ProbeResult(
                name=self.name,
                status=ProbeStatus.HEALTHY,
                latency_ms=round(latency, 2),
                details={
                    "region": info.region,
                    "provider_version": info.version,
                },
            )
        except Exception as exc:
            latency = (time.perf_counter() - start) * 1000
            return ProbeResult(
                name=self.name,
                status=ProbeStatus.UNHEALTHY,
                latency_ms=round(latency, 2),
                error=str(exc),
            )

Configurable probe

class S3BucketProbe(BaseProbe):
    """Checks that an S3 bucket is reachable and the credentials are valid."""

    def __init__(self, bucket: str, region: str = "us-east-1", name: str = "s3") -> None:
        self.bucket = bucket
        self.region = region
        self.name = name

    async def check(self) -> ProbeResult:
        import time
        import aiobotocore.session

        start = time.perf_counter()
        try:
            session = aiobotocore.session.get_session()
            async with session.create_client("s3", region_name=self.region) as client:
                await client.head_bucket(Bucket=self.bucket)
            latency = (time.perf_counter() - start) * 1000
            return ProbeResult(
                name=self.name,
                status=ProbeStatus.HEALTHY,
                latency_ms=round(latency, 2),
                details={"bucket": self.bucket, "region": self.region},
            )
        except Exception as exc:
            latency = (time.perf_counter() - start) * 1000
            return ProbeResult(
                name=self.name,
                status=ProbeStatus.UNHEALTHY,
                latency_ms=round(latency, 2),
                error=str(exc),
            )

registry.add(S3BucketProbe(bucket="my-app-uploads", region="eu-west-1"))

Composite probe

Report unhealthy only when both Redis nodes are down simultaneously:

class CompositeRedisProbe(BaseProbe):
    name = "redis-composite"

    def __init__(self, primary_url: str, replica_url: str) -> None:
        self._primary = RedisProbe(url=primary_url, name="primary")
        self._replica = RedisProbe(url=replica_url, name="replica")

    async def check(self) -> ProbeResult:
        import asyncio
        primary, replica = await asyncio.gather(
            self._primary.check(), self._replica.check()
        )
        if primary.is_healthy or replica.is_healthy:
            return ProbeResult(name=self.name, status=ProbeStatus.HEALTHY)
        return ProbeResult(
            name=self.name,
            status=ProbeStatus.UNHEALTHY,
            error=f"both nodes down — primary: {primary.error}, replica: {replica.error}",
        )

All built-in probes

Databases

Probe Extra Key constructor args Details fields
PostgreSQLProbe postgres url, name, timeout version, active_connections, max_connections, database_size
MySQLProbe mysql url or host/port/user/password/db, name, connect_timeout version, connected_threads, uptime_seconds, max_used_connections
SqlAlchemyProbe sqlalchemy engine, name dialect, driver, server_version

Caches

Probe Extra Key constructor args Details fields
RedisProbe redis url, name version, uptime_seconds, used_memory_human, connected_clients, role, total_keys, clusters
MemcachedProbe memcached host, port, name, pool_size

Queues / messaging

Probe Extra Key constructor args Details fields
RabbitMQProbe rabbitmq url, name, management_url connected; + server, totals, queues when management_url is set
KafkaProbe kafka bootstrap_servers, name, request_timeout_ms broker_count, controller_id, topics, internal_topics

Document stores

Probe Extra Key constructor args Details fields
MongoProbe mongo url, name, server_selection_timeout_ms version, uptime_seconds, connections, memory_mb, storage_engine

HTTP

Probe Extra Key constructor args Details fields
HttpProbe http url, timeout, name, expected_status status_code, content_type, response_bytes

Testing / placeholder

Probe Extra Key constructor args Details fields
MemoryProbe built-in name

Configuration reference

HealthRegistry

Argument Type Default Description
app FastAPI required The FastAPI application instance
prefix str "/health" URL prefix for all three endpoints
tags list[str] ["health"] OpenAPI tags applied to the health routes

HealthRegistry.add(probe)

Returns self for chaining. Probes run concurrently on every request in the order they were added.

HealthRegistry.run_all()

Async method — runs every registered probe and returns list[ProbeResult]. Useful for testing or building custom aggregation logic outside of the mounted routes.

results = await registry.run_all()
for r in results:
    print(r.name, r.status, r.details)

ProbeResult

Field Type Description
name str Probe identifier
status ProbeStatus "healthy" or "unhealthy"
latency_ms float Duration of the check in milliseconds
error str | None Error message; only present on failure
details dict | None Service-specific metadata; see each probe's section above
is_healthy bool (property) True when status == "healthy"

Kubernetes integration

livenessProbe:
  httpGet:
    path: /health/live
    port: 8000
  initialDelaySeconds: 5
  periodSeconds: 10

readinessProbe:
  httpGet:
    path: /health/ready
    port: 8000
  initialDelaySeconds: 10
  periodSeconds: 15
  failureThreshold: 3

Use /health/ready for the readiness probe — Kubernetes stops routing traffic to a pod the moment any dependency becomes unreachable. Use /health/live for liveness so the process is only restarted when it is genuinely stuck, not because an external service is temporarily down.


License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastapi_watch-0.1.0.tar.gz (18.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastapi_watch-0.1.0-py3-none-any.whl (22.6 kB view details)

Uploaded Python 3

File details

Details for the file fastapi_watch-0.1.0.tar.gz.

File metadata

  • Download URL: fastapi_watch-0.1.0.tar.gz
  • Upload date:
  • Size: 18.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for fastapi_watch-0.1.0.tar.gz
Algorithm Hash digest
SHA256 6604825718c22ffd88d6c201c0fa2fe336ee9c42fa7225a075d3a0d8b8204b23
MD5 74a351193ed8f48e71926b69737b9738
BLAKE2b-256 35155d938455d1cbe46bfd4b91f2729e8e49c6917bde2768182b9eb05b08b574

See more details on using hashes here.

File details

Details for the file fastapi_watch-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: fastapi_watch-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 22.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for fastapi_watch-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3261a47886a421e54e1b9a7f7a03da260d906caa7d5b7b2be454668aea5b1bde
MD5 c269eeb957bf47868c95c7dc39e5c1c2
BLAKE2b-256 3d875c44606c2c4b81919f2638cc53c5289946b2416f327274a981a11d7512c0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page