Skip to main content

A production-ready WebSocket stabilization layer for FastAPI applications

Project description

FastAPI WebSocket Stabilizer

A production-ready WebSocket stabilization layer for FastAPI applications that provides robust connection lifecycle management, automatic heartbeat/ping-pong detection, graceful shutdown, and reconnection support.

Motivation

Building reliable WebSocket applications in production requires handling many edge cases:

  • Connection timeouts: Proxy servers often have idle connection timeouts (60-90 seconds)
  • Dead connections: Network partitions can leave zombie connections that silently hang
  • Ungraceful restarts: Server restarts need clean connection closure to prevent client reconnection storms
  • Load balancer issues: Behind ALBs/NLBs, stale connections accumulate without proper heartbeat
  • Cloud platform quirks: Azure App Service, AWS Lambda, and other serverless platforms have specific WebSocket constraints
  • Session resumption: Clients may disconnect due to network switching and need to restore state

This library abstracts away these concerns with a simple, production-tested API.

Key Features

  • Connection Management: Automatic tracking of active WebSocket connections
  • Heartbeat/Ping-Pong: Configurable automatic heartbeat detection of dead connections
  • Graceful Shutdown: Clean connection closure with proper async task cleanup
  • Reconnection Tokens: Server-side session token support for seamless reconnection
  • Structured Logging: Cloud-friendly logging for easy monitoring and debugging
  • Broadcast Messaging: Send messages to individual clients, groups, or all clients
  • Connection Limits: Configurable max connections with automatic enforcement
  • Type Safety: Full type hints for all public APIs (Python 3.10+)
  • Zero Dependencies: Only depends on FastAPI (which provides WebSocket support)

Installation

Install via pip:

pip install fastapi-websocket-stabilizer

Or from source:

git clone https://github.com/fastapi-websocket-stabilizer/fastapi-websocket-stabilizer.git
cd fastapi-websocket-stabilizer
pip install -e ".[dev]"

Quick Start

Basic Chat Application

import json
from contextlib import asynccontextmanager
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi_websocket_stabilizer import WebSocketConnectionManager, WebSocketConfig

# Configure the manager
ws_config = WebSocketConfig(
    heartbeat_interval=30.0,
    heartbeat_timeout=60.0,
    max_connections=100,
)
ws_manager = WebSocketConnectionManager(ws_config)

# Setup lifespan events
@asynccontextmanager
async def lifespan(app: FastAPI):
    await ws_manager.start()
    yield
    await ws_manager.graceful_shutdown()

app = FastAPI(lifespan=lifespan)

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await websocket.accept()

    try:
        # Register connection
        await ws_manager.connect(client_id, websocket)

        # Handle messages
        while True:
            data = await websocket.receive_text()

            # Handle heartbeat responses
            if data == "pong":
                await ws_manager.handle_pong(client_id)
                continue

            # Broadcast to all clients
            await ws_manager.broadcast(json.dumps({
                "user": client_id,
                "message": data
            }))

    except WebSocketDisconnect:
        await ws_manager.disconnect(client_id)

Usage Patterns

Pattern 1: Simple Broadcast

# Send message to all clients
await ws_manager.broadcast("System message")

# Send to all except sender
await ws_manager.broadcast(
    json.dumps({"message": "chat"}),
    exclude_client=sender_id
)

Pattern 2: Direct Messaging

# Send to specific client
success = await ws_manager.send_to_client(client_id, "Direct message")

if not success:
    print(f"Client {client_id} is no longer connected")

Pattern 3: Reconnection Support

@app.post("/reconnect-token/{client_id}")
async def get_reconnect_token(client_id: str):
    """Get a token before intentional disconnection."""
    token = ws_manager.generate_reconnect_token(client_id)
    return {"token": token}

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str, token: str = None):
    await websocket.accept()

    # Restore session if token provided
    if token:
        try:
            restored_id = await ws_manager.validate_reconnect_token(token)
            # Continue with restored session
        except TokenExpiredError:
            await websocket.close(code=1008, reason="Token expired")
            return

    await ws_manager.connect(client_id, websocket)

Pattern 4: Statistics and Monitoring

@app.get("/stats")
async def get_stats():
    return {
        "active_connections": ws_manager.get_connection_count(),
        "client_ids": ws_manager.get_active_connections(),
    }

Configuration Reference

WebSocketConfig

All configuration options with defaults:

from fastapi_websocket_stabilizer import WebSocketConfig
from datetime import timedelta

config = WebSocketConfig(
    # Heartbeat settings
    heartbeat_interval=30.0,      # Seconds between pings
    heartbeat_timeout=60.0,       # Seconds to wait for pong response
    heartbeat_type="ping",        # Type of heartbeat ("ping" or "custom")

    # Connection limits
    max_connections=None,         # Max concurrent connections (None = unlimited)
    max_message_size=1048576,     # Max message size in bytes (1MB default)

    # Reconnection
    reconnect_token_ttl=timedelta(hours=1),  # Token lifetime
    enable_reconnect=True,        # Enable token support

    # Compression
    compression="deflate",        # "deflate" or "none"

    # Logging
    log_level="INFO",             # DEBUG, INFO, WARNING, ERROR, CRITICAL
    enable_metrics=False,         # Collect metrics (experimental)
)

Example Configurations

High-frequency trading/gaming (low latency)

WebSocketConfig(
    heartbeat_interval=5.0,       # More frequent heartbeats
    heartbeat_timeout=10.0,       # Detect dead connections quickly
    max_connections=10000,        # Expect many connections
)

Behind slow proxy (e.g., old reverse proxy)

WebSocketConfig(
    heartbeat_interval=45.0,      # Slower heartbeat (adapt to proxy timeout)
    heartbeat_timeout=90.0,
    compression="none",           # Reduce overhead
)

Serverless (Azure Functions, AWS Lambda)

WebSocketConfig(
    heartbeat_interval=20.0,      # Serverless platforms have stricter timeouts
    max_connections=500,          # Limited resource per instance
)

Deployment Guide

Local Development

# Install dependencies
pip install "fastapi[standard]" uvicorn

# Run example
uvicorn examples.basic_app:app --reload

# Test with curl or WebSocket client
# ws://localhost:8000/ws/user123

Behind Reverse Proxy (nginx, Apache)

nginx configuration:

upstream app {
    server localhost:8000;
}

server {
    listen 80;

    location /ws/ {
        proxy_pass http://app;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_read_timeout 3600s;
        proxy_send_timeout 3600s;
        # Set read timeout higher than heartbeat timeout + margin
    }
}

Key considerations:

  • Ensure proxy read_timeout and send_timeout exceed your heartbeat_interval
  • Set to at least 2x your heartbeat_interval for safety
  • Default is often 60s, which may be too low; increase to 300s+ for long-lived connections

AWS Application Load Balancer (ALB)

Target Group Configuration:

  • Deregistration delay: 30-60 seconds
  • Connection idle timeout: 60+ seconds (ALB default is 60s)

Application Configuration:

config = WebSocketConfig(
    heartbeat_interval=25.0,      # Less than ALB's 60s default
    heartbeat_timeout=50.0,
)

Azure App Service

Azure App Service has specific WebSocket constraints:

Settings in Azure Portal:

  • Enable WebSocket: ON
  • Connection timeout (Web socket idle timeout): 600 seconds

Application Configuration:

config = WebSocketConfig(
    heartbeat_interval=30.0,      # Well within Azure's limits
    max_connections=1000,         # App Service tier dependent
)

Deployment via Docker:

FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]

Kubernetes

ConfigMap for configuration:

apiVersion: v1
kind: ConfigMap
metadata:
  name: websocket-config
data:
  heartbeat_interval: "30"
  heartbeat_timeout: "60"
  max_connections: "1000"

Deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: websocket-app
spec:
  replicas: 3
  template:
    spec:
      containers:
      - name: app
        image: websocket-app:1.0
        ports:
        - containerPort: 8000
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 10
          periodSeconds: 10

Docker Compose

version: '3.8'

services:
  app:
    build: .
    ports:
      - "8000:8000"
    environment:
      LOG_LEVEL: INFO
    command: uvicorn app:app --host 0.0.0.0 --port 8000 --workers 1

Troubleshooting

Connections Disconnecting Unexpectedly

Problem: Clients disconnect after ~60 seconds of inactivity

Solution: Reduce heartbeat_interval below your proxy/firewall timeout

# Adjust based on your environment's idle timeout
WebSocketConfig(heartbeat_interval=25.0)  # < 60s proxy timeout

High CPU Usage

Problem: Manager consuming excessive CPU

Solution: Reduce heartbeat frequency or use larger batches

WebSocketConfig(
    heartbeat_interval=60.0,  # Increase from default 30s
    max_connections=500,      # Reduce if possible
)

"Connection Limit Exceeded" Errors

Problem: New connections rejected with limit errors

Solutions:

  1. Increase max_connections:
    WebSocketConfig(max_connections=5000)
    
  2. Implement graceful connection rejection with backoff
  3. Scale horizontally (multiple app instances)

Reconnection Tokens Not Working

Problem: Token validation fails repeatedly

Solutions:

  • Check token hasn't expired: tokenExpiredError means TTL passed
  • Verify same WebSocketConnectionManager instance used for both generation and validation
  • Ensure client sends token in query parameter or auth header

Memory Leaks

Problem: Memory usage increases over time

Solutions:

  1. Ensure graceful_shutdown() is called during app shutdown
  2. Check that disconnected clients are properly cleaned up
  3. Monitor token storage (should be auto-cleaned every 5 minutes)

API Reference

WebSocketConnectionManager

class WebSocketConnectionManager:
    def __init__(self, config: Optional[WebSocketConfig] = None) -> None:
        """Initialize manager with optional config."""

    async def start(self) -> None:
        """Start heartbeat and cleanup tasks."""

    async def connect(
        self,
        client_id: str,
        websocket: WebSocket,
        metadata: Optional[dict] = None
    ) -> None:
        """Register a new WebSocket connection."""

    async def disconnect(self, client_id: str) -> None:
        """Gracefully disconnect a client."""

    async def send_to_client(
        self,
        client_id: str,
        message: str | dict
    ) -> bool:
        """Send message to specific client."""

    async def broadcast(
        self,
        message: str | dict,
        exclude_client: Optional[str] = None
    ) -> BroadcastResult:
        """Send message to all clients."""

    def get_active_connections(self) -> list[str]:
        """Get list of connected client IDs."""

    def get_connection_count(self) -> int:
        """Get number of active connections."""

    def generate_reconnect_token(self, client_id: str) -> str:
        """Generate a reconnection token."""

    async def validate_reconnect_token(self, token: str) -> Optional[str]:
        """Validate token and return client_id."""

    async def handle_pong(self, client_id: str) -> None:
        """Handle pong response from client."""

    async def graceful_shutdown(self, timeout: float = 30.0) -> ShutdownReport:
        """Shut down manager and close all connections."""

Exceptions

from fastapi_websocket_stabilizer import (
    WebSocketStabilizerError,          # Base exception
    ConnectionNotFoundError,            # Client not found
    ConnectionLimitExceededError,       # Max connections reached
    ConnectionTimeoutError,             # Heartbeat timeout
    TokenExpiredError,                  # Token TTL exceeded
    InvalidTokenError,                  # Token invalid/tampered
    BroadcastFailedError,               # Broadcast failed
    ShutdownError,                      # Shutdown error
)

Roadmap

Planned Features

  • Metrics collection (Prometheus-compatible)
  • Connection lifecycle hooks (on_connect, on_disconnect callbacks)
  • Message compression strategies
  • Distributed mode (Redis-backed connection registry for multi-instance deployments)
  • Built-in rate limiting
  • Automatic reconnection client library (JavaScript/TypeScript)

Possible Enhancements

  • OpenTelemetry instrumentation
  • Support for WebSocket subprotocols
  • Custom heartbeat message types
  • Connection grouping/rooms functionality
  • Pub/Sub integration

Contributing

We welcome contributions! Please:

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Write tests for new functionality
  4. Ensure all tests pass: pytest tests/
  5. Format code with black: black src/ tests/
  6. Lint with ruff: ruff check src/ tests/
  7. Commit with clear messages
  8. Push and create a Pull Request

Development Setup

# Clone repository
git clone https://github.com/fastapi-websocket-stabilizer/fastapi-websocket-stabilizer.git
cd fastapi-websocket-stabilizer

# Create virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install in development mode
pip install -e ".[dev]"

# Run tests
pytest tests/ -v

# Run specific test
pytest tests/test_manager.py::TestConnectionManager::test_should_track_connection_when_connect_called -v

# Check types
mypy src/

# Format code
black src/ tests/

# Lint
ruff check src/ tests/

License

This project is licensed under the MIT License - see the LICENSE file for details.

Support

Acknowledgments

Inspired by production WebSocket patterns in large-scale FastAPI deployments, with particular consideration for:

  • FastAPI's excellent WebSocket support
  • Real-world challenges in cloud environments
  • Community feedback on WebSocket reliability

Version History

0.1.1

  • Fix GitHub URLs and setuptools compatibility

0.1.0 (Initial Release)

  • Core connection management
  • Heartbeat/ping-pong mechanism
  • Reconnection token support
  • Graceful shutdown
  • Comprehensive tests and documentation

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastapi_websocket_stabilizer-1.0.0.tar.gz (22.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastapi_websocket_stabilizer-1.0.0-py3-none-any.whl (17.2 kB view details)

Uploaded Python 3

File details

Details for the file fastapi_websocket_stabilizer-1.0.0.tar.gz.

File metadata

File hashes

Hashes for fastapi_websocket_stabilizer-1.0.0.tar.gz
Algorithm Hash digest
SHA256 31d80f7b8a6475bad125d1de19701e6144726cd1877c5d00837817465b1a0517
MD5 6d3e031863e4223befd5160699a6c7cf
BLAKE2b-256 c4ef7f054b4175d472d3be99ea21c374a344581fba24f8f0f8e0e41393b4962a

See more details on using hashes here.

File details

Details for the file fastapi_websocket_stabilizer-1.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for fastapi_websocket_stabilizer-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 93df62dca6c0504ae24e8fefe11d2b27ad36c7b5b526b84515bdad2bdfeb201a
MD5 3990f439400419a04a6b6c23ebc2a625
BLAKE2b-256 90ad2ef94f71db2add0ee98a206d7b2e3f405b5a40fdb4b311e4e2aaa3e66ef4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page