A production-ready WebSocket stabilization layer for FastAPI applications
Project description
FastAPI WebSocket Stabilizer
A production-ready WebSocket stabilization layer for FastAPI applications that provides robust connection lifecycle management, automatic heartbeat/ping-pong detection, graceful shutdown, and reconnection support.
Motivation
Building reliable WebSocket applications in production requires handling many edge cases:
- Connection timeouts: Proxy servers often have idle connection timeouts (60-90 seconds)
- Dead connections: Network partitions can leave zombie connections that silently hang
- Ungraceful restarts: Server restarts need clean connection closure to prevent client reconnection storms
- Load balancer issues: Behind ALBs/NLBs, stale connections accumulate without proper heartbeat
- Cloud platform quirks: Azure App Service, AWS Lambda, and other serverless platforms have specific WebSocket constraints
- Session resumption: Clients may disconnect due to network switching and need to restore state
This library abstracts away these concerns with a simple, production-tested API.
Key Features
- ✅ Connection Management: Automatic tracking of active WebSocket connections
- ✅ Heartbeat/Ping-Pong: Configurable automatic heartbeat detection of dead connections
- ✅ Graceful Shutdown: Clean connection closure with proper async task cleanup
- ✅ Reconnection Tokens: Server-side session token support for seamless reconnection
- ✅ Structured Logging: Cloud-friendly logging for easy monitoring and debugging
- ✅ Broadcast Messaging: Send messages to individual clients, groups, or all clients
- ✅ Connection Limits: Configurable max connections with automatic enforcement
- ✅ Type Safety: Full type hints for all public APIs (Python 3.10+)
- ✅ Zero Dependencies: Only depends on FastAPI (which provides WebSocket support)
Installation
Install via pip:
pip install fastapi-websocket-stabilizer
Or from source:
git clone https://github.com/fastapi-websocket-stabilizer/fastapi-websocket-stabilizer.git
cd fastapi-websocket-stabilizer
pip install -e ".[dev]"
Quick Start
Basic Chat Application
import json
from contextlib import asynccontextmanager
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi_websocket_stabilizer import WebSocketConnectionManager, WebSocketConfig
# Configure the manager
ws_config = WebSocketConfig(
heartbeat_interval=30.0,
heartbeat_timeout=60.0,
max_connections=100,
)
ws_manager = WebSocketConnectionManager(ws_config)
# Setup lifespan events
@asynccontextmanager
async def lifespan(app: FastAPI):
await ws_manager.start()
yield
await ws_manager.graceful_shutdown()
app = FastAPI(lifespan=lifespan)
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await websocket.accept()
try:
# Register connection
await ws_manager.connect(client_id, websocket)
# Handle messages
while True:
data = await websocket.receive_text()
# Handle heartbeat responses
if data == "pong":
await ws_manager.handle_pong(client_id)
continue
# Broadcast to all clients
await ws_manager.broadcast(json.dumps({
"user": client_id,
"message": data
}))
except WebSocketDisconnect:
await ws_manager.disconnect(client_id)
Usage Patterns
Pattern 1: Simple Broadcast
# Send message to all clients
await ws_manager.broadcast("System message")
# Send to all except sender
await ws_manager.broadcast(
json.dumps({"message": "chat"}),
exclude_client=sender_id
)
Pattern 2: Direct Messaging
# Send to specific client
success = await ws_manager.send_to_client(client_id, "Direct message")
if not success:
print(f"Client {client_id} is no longer connected")
Pattern 3: Reconnection Support
@app.post("/reconnect-token/{client_id}")
async def get_reconnect_token(client_id: str):
"""Get a token before intentional disconnection."""
token = ws_manager.generate_reconnect_token(client_id)
return {"token": token}
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str, token: str = None):
await websocket.accept()
# Restore session if token provided
if token:
try:
restored_id = await ws_manager.validate_reconnect_token(token)
# Continue with restored session
except TokenExpiredError:
await websocket.close(code=1008, reason="Token expired")
return
await ws_manager.connect(client_id, websocket)
Pattern 4: Statistics and Monitoring
@app.get("/stats")
async def get_stats():
return {
"active_connections": ws_manager.get_connection_count(),
"client_ids": ws_manager.get_active_connections(),
}
Configuration Reference
WebSocketConfig
All configuration options with defaults:
from fastapi_websocket_stabilizer import WebSocketConfig
from datetime import timedelta
config = WebSocketConfig(
# Heartbeat settings
heartbeat_interval=30.0, # Seconds between pings
heartbeat_timeout=60.0, # Seconds to wait for pong response
heartbeat_type="ping", # Type of heartbeat ("ping" or "custom")
# Connection limits
max_connections=None, # Max concurrent connections (None = unlimited)
max_message_size=1048576, # Max message size in bytes (1MB default)
# Reconnection
reconnect_token_ttl=timedelta(hours=1), # Token lifetime
enable_reconnect=True, # Enable token support
# Compression
compression="deflate", # "deflate" or "none"
# Logging
log_level="INFO", # DEBUG, INFO, WARNING, ERROR, CRITICAL
enable_metrics=False, # Collect metrics (experimental)
)
Example Configurations
High-frequency trading/gaming (low latency)
WebSocketConfig(
heartbeat_interval=5.0, # More frequent heartbeats
heartbeat_timeout=10.0, # Detect dead connections quickly
max_connections=10000, # Expect many connections
)
Behind slow proxy (e.g., old reverse proxy)
WebSocketConfig(
heartbeat_interval=45.0, # Slower heartbeat (adapt to proxy timeout)
heartbeat_timeout=90.0,
compression="none", # Reduce overhead
)
Serverless (Azure Functions, AWS Lambda)
WebSocketConfig(
heartbeat_interval=20.0, # Serverless platforms have stricter timeouts
max_connections=500, # Limited resource per instance
)
Deployment Guide
Local Development
# Install dependencies
pip install "fastapi[standard]" uvicorn
# Run example
uvicorn examples.basic_app:app --reload
# Test with curl or WebSocket client
# ws://localhost:8000/ws/user123
Behind Reverse Proxy (nginx, Apache)
nginx configuration:
upstream app {
server localhost:8000;
}
server {
listen 80;
location /ws/ {
proxy_pass http://app;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
# Set read timeout higher than heartbeat timeout + margin
}
}
Key considerations:
- Ensure proxy
read_timeoutandsend_timeoutexceed yourheartbeat_interval - Set to at least 2x your
heartbeat_intervalfor safety - Default is often 60s, which may be too low; increase to 300s+ for long-lived connections
AWS Application Load Balancer (ALB)
Target Group Configuration:
- Deregistration delay: 30-60 seconds
- Connection idle timeout: 60+ seconds (ALB default is 60s)
Application Configuration:
config = WebSocketConfig(
heartbeat_interval=25.0, # Less than ALB's 60s default
heartbeat_timeout=50.0,
)
Azure App Service
Azure App Service has specific WebSocket constraints:
Settings in Azure Portal:
- Enable WebSocket: ON
- Connection timeout (Web socket idle timeout): 600 seconds
Application Configuration:
config = WebSocketConfig(
heartbeat_interval=30.0, # Well within Azure's limits
max_connections=1000, # App Service tier dependent
)
Deployment via Docker:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
Kubernetes
ConfigMap for configuration:
apiVersion: v1
kind: ConfigMap
metadata:
name: websocket-config
data:
heartbeat_interval: "30"
heartbeat_timeout: "60"
max_connections: "1000"
Deployment:
apiVersion: apps/v1
kind: Deployment
metadata:
name: websocket-app
spec:
replicas: 3
template:
spec:
containers:
- name: app
image: websocket-app:1.0
ports:
- containerPort: 8000
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 10
Docker Compose
version: '3.8'
services:
app:
build: .
ports:
- "8000:8000"
environment:
LOG_LEVEL: INFO
command: uvicorn app:app --host 0.0.0.0 --port 8000 --workers 1
Troubleshooting
Connections Disconnecting Unexpectedly
Problem: Clients disconnect after ~60 seconds of inactivity
Solution: Reduce heartbeat_interval below your proxy/firewall timeout
# Adjust based on your environment's idle timeout
WebSocketConfig(heartbeat_interval=25.0) # < 60s proxy timeout
High CPU Usage
Problem: Manager consuming excessive CPU
Solution: Reduce heartbeat frequency or use larger batches
WebSocketConfig(
heartbeat_interval=60.0, # Increase from default 30s
max_connections=500, # Reduce if possible
)
"Connection Limit Exceeded" Errors
Problem: New connections rejected with limit errors
Solutions:
- Increase
max_connections:WebSocketConfig(max_connections=5000)
- Implement graceful connection rejection with backoff
- Scale horizontally (multiple app instances)
Reconnection Tokens Not Working
Problem: Token validation fails repeatedly
Solutions:
- Check token hasn't expired:
tokenExpiredErrormeans TTL passed - Verify same
WebSocketConnectionManagerinstance used for both generation and validation - Ensure client sends token in query parameter or auth header
Memory Leaks
Problem: Memory usage increases over time
Solutions:
- Ensure
graceful_shutdown()is called during app shutdown - Check that disconnected clients are properly cleaned up
- Monitor token storage (should be auto-cleaned every 5 minutes)
API Reference
WebSocketConnectionManager
class WebSocketConnectionManager:
def __init__(self, config: Optional[WebSocketConfig] = None) -> None:
"""Initialize manager with optional config."""
async def start(self) -> None:
"""Start heartbeat and cleanup tasks."""
async def connect(
self,
client_id: str,
websocket: WebSocket,
metadata: Optional[dict] = None
) -> None:
"""Register a new WebSocket connection."""
async def disconnect(self, client_id: str) -> None:
"""Gracefully disconnect a client."""
async def send_to_client(
self,
client_id: str,
message: str | dict
) -> bool:
"""Send message to specific client."""
async def broadcast(
self,
message: str | dict,
exclude_client: Optional[str] = None
) -> BroadcastResult:
"""Send message to all clients."""
def get_active_connections(self) -> list[str]:
"""Get list of connected client IDs."""
def get_connection_count(self) -> int:
"""Get number of active connections."""
def generate_reconnect_token(self, client_id: str) -> str:
"""Generate a reconnection token."""
async def validate_reconnect_token(self, token: str) -> Optional[str]:
"""Validate token and return client_id."""
async def handle_pong(self, client_id: str) -> None:
"""Handle pong response from client."""
async def graceful_shutdown(self, timeout: float = 30.0) -> ShutdownReport:
"""Shut down manager and close all connections."""
Exceptions
from fastapi_websocket_stabilizer import (
WebSocketStabilizerError, # Base exception
ConnectionNotFoundError, # Client not found
ConnectionLimitExceededError, # Max connections reached
ConnectionTimeoutError, # Heartbeat timeout
TokenExpiredError, # Token TTL exceeded
InvalidTokenError, # Token invalid/tampered
BroadcastFailedError, # Broadcast failed
ShutdownError, # Shutdown error
)
Roadmap
Planned Features
- Metrics collection (Prometheus-compatible)
- Connection lifecycle hooks (on_connect, on_disconnect callbacks)
- Message compression strategies
- Distributed mode (Redis-backed connection registry for multi-instance deployments)
- Built-in rate limiting
- Automatic reconnection client library (JavaScript/TypeScript)
Possible Enhancements
- OpenTelemetry instrumentation
- Support for WebSocket subprotocols
- Custom heartbeat message types
- Connection grouping/rooms functionality
- Pub/Sub integration
Contributing
We welcome contributions! Please:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Write tests for new functionality
- Ensure all tests pass:
pytest tests/ - Format code with black:
black src/ tests/ - Lint with ruff:
ruff check src/ tests/ - Commit with clear messages
- Push and create a Pull Request
Development Setup
# Clone repository
git clone https://github.com/fastapi-websocket-stabilizer/fastapi-websocket-stabilizer.git
cd fastapi-websocket-stabilizer
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install in development mode
pip install -e ".[dev]"
# Run tests
pytest tests/ -v
# Run specific test
pytest tests/test_manager.py::TestConnectionManager::test_should_track_connection_when_connect_called -v
# Check types
mypy src/
# Format code
black src/ tests/
# Lint
ruff check src/ tests/
License
This project is licensed under the MIT License - see the LICENSE file for details.
Support
Acknowledgments
Inspired by production WebSocket patterns in large-scale FastAPI deployments, with particular consideration for:
- FastAPI's excellent WebSocket support
- Real-world challenges in cloud environments
- Community feedback on WebSocket reliability
Version History
0.1.1
- Fix GitHub URLs and setuptools compatibility
0.1.0 (Initial Release)
- Core connection management
- Heartbeat/ping-pong mechanism
- Reconnection token support
- Graceful shutdown
- Comprehensive tests and documentation
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fastapi_websocket_stabilizer-1.0.0.tar.gz.
File metadata
- Download URL: fastapi_websocket_stabilizer-1.0.0.tar.gz
- Upload date:
- Size: 22.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
31d80f7b8a6475bad125d1de19701e6144726cd1877c5d00837817465b1a0517
|
|
| MD5 |
6d3e031863e4223befd5160699a6c7cf
|
|
| BLAKE2b-256 |
c4ef7f054b4175d472d3be99ea21c374a344581fba24f8f0f8e0e41393b4962a
|
File details
Details for the file fastapi_websocket_stabilizer-1.0.0-py3-none-any.whl.
File metadata
- Download URL: fastapi_websocket_stabilizer-1.0.0-py3-none-any.whl
- Upload date:
- Size: 17.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
93df62dca6c0504ae24e8fefe11d2b27ad36c7b5b526b84515bdad2bdfeb201a
|
|
| MD5 |
3990f439400419a04a6b6c23ebc2a625
|
|
| BLAKE2b-256 |
90ad2ef94f71db2add0ee98a206d7b2e3f405b5a40fdb4b311e4e2aaa3e66ef4
|