Production-grade WebSocket connection management for Netrun Systems services
Project description
netrun-websocket
Production-grade WebSocket connection management for Netrun Systems services.
Features
- Connection Management: Pool-based connection management with per-user limits
- Session Persistence: Redis-backed session storage for cross-node coordination
- JWT Authentication: Token-based authentication with expiration handling
- Protocol Validation: Pydantic-based message type validation
- Reconnection: Exponential backoff reconnection with configurable max attempts
- Heartbeat Monitoring: Ping/pong health checks with stale connection cleanup
- Metrics Tracking: Comprehensive connection and latency metrics
- Type Safety: Full type hints for IDE support and type checking
- Async/Await: Built on async/await throughout for high performance
Installation
Basic Installation
pip install netrun-websocket
With Redis Support
pip install netrun-websocket[redis]
With JWT Authentication
pip install netrun-websocket[auth]
With All Optional Dependencies
pip install netrun-websocket[all]
Development Installation
pip install netrun-websocket[dev]
Quick Start
Basic WebSocket Connection Manager
from fastapi import FastAPI, WebSocket
from netrun.websocket import WebSocketConnectionManager
app = FastAPI()
manager = WebSocketConnectionManager(max_connections_per_user=5)
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
# Connect
connection_id = await manager.connect(websocket, user_id)
try:
while True:
# Receive message
data = await manager.receive_message(connection_id)
if data is None:
break
# Broadcast to all users except sender
await manager.broadcast(
{"type": "message", "data": data},
exclude_users={user_id}
)
finally:
# Disconnect
await manager.disconnect(connection_id)
With JWT Authentication
from fastapi import FastAPI, WebSocket, Query, WebSocketException
from netrun.websocket import WebSocketConnectionManager, JWTAuthService
app = FastAPI()
manager = WebSocketConnectionManager()
auth_service = JWTAuthService(secret_key="your-secret-key")
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, token: str = Query(...)):
# Validate JWT token
payload = auth_service.validate_token(token)
if not payload:
raise WebSocketException(code=1008, reason="Invalid token")
user_id = payload["user_id"]
# Connect
connection_id = await manager.connect(websocket, user_id)
try:
while True:
data = await manager.receive_message(connection_id)
if data is None:
break
# Handle message
await manager.send_to_user(user_id, {"echo": data})
finally:
await manager.disconnect(connection_id)
With Redis Session Management
from fastapi import FastAPI, WebSocket
from netrun.websocket import WebSocketConnectionManager, WebSocketSessionManager
app = FastAPI()
manager = WebSocketConnectionManager()
session_manager = WebSocketSessionManager(
redis_url="redis://localhost:6379/0"
)
@app.on_event("startup")
async def startup():
await session_manager.initialize()
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
# Create session connection
connection_id = await session_manager.create_connection(
session_id=f"session_{user_id}",
user_id=user_id,
username=f"User {user_id}"
)
# Connect WebSocket
await manager.connect(websocket, user_id, session_id=connection_id)
try:
while True:
data = await manager.receive_message(connection_id)
if data is None:
break
# Update heartbeat
await session_manager.update_heartbeat(connection_id)
# Handle message
await manager.send_message(connection_id, {"echo": data})
finally:
await manager.disconnect(connection_id)
await session_manager.disconnect(connection_id)
With Heartbeat Monitoring
from fastapi import FastAPI, WebSocket
from netrun.websocket import (
WebSocketConnectionManager,
HeartbeatMonitor,
HeartbeatConfig
)
app = FastAPI()
manager = WebSocketConnectionManager()
# Configure heartbeat
heartbeat_config = HeartbeatConfig(
interval=30, # Send ping every 30 seconds
timeout=90, # Consider stale after 90 seconds
max_missed=3 # Disconnect after 3 missed heartbeats
)
heartbeat = HeartbeatMonitor(heartbeat_config)
async def send_ping(connection_id: str) -> bool:
"""Send ping to connection."""
return await manager.send_message(
connection_id,
{"type": "ping", "timestamp": time.time()}
)
async def cleanup_connection(connection_id: str):
"""Cleanup stale connection."""
await manager.disconnect(connection_id, code=1001, reason="Heartbeat timeout")
@app.on_event("startup")
async def startup():
# Start heartbeat monitoring
await heartbeat.start(
ping_callback=send_ping,
cleanup_callback=cleanup_connection
)
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
connection_id = await manager.connect(websocket, user_id)
# Register for heartbeat monitoring
heartbeat.register_connection(connection_id)
try:
while True:
data = await manager.receive_message(connection_id)
if data is None:
break
# Update heartbeat on activity
if data.get("type") == "pong":
heartbeat.update_heartbeat(connection_id)
await manager.send_message(connection_id, {"echo": data})
finally:
heartbeat.unregister_connection(connection_id)
await manager.disconnect(connection_id)
With Metrics Collection
from fastapi import FastAPI, WebSocket
from netrun.websocket import WebSocketConnectionManager, MetricsCollector
app = FastAPI()
manager = WebSocketConnectionManager()
metrics = MetricsCollector()
@app.get("/metrics")
async def get_metrics():
"""Get WebSocket metrics."""
return metrics.get_stats()
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
connection_id = await manager.connect(websocket, user_id)
# Register for metrics
metrics.register_connection(connection_id, user_id)
try:
while True:
start_time = time.time()
data = await manager.receive_message(connection_id)
if data is None:
break
# Record metrics
metrics.record_message_received(connection_id, len(str(data)))
# Send response
await manager.send_message(connection_id, {"echo": data})
metrics.record_message_sent(connection_id, len(str(data)))
# Record latency
latency_ms = (time.time() - start_time) * 1000
metrics.record_latency(latency_ms)
finally:
metrics.unregister_connection(connection_id)
await manager.disconnect(connection_id)
Protocol Message Types
The package includes Pydantic models for type-safe message handling:
from netrun.websocket import (
MessageType,
WebSocketMessage,
PingMessage,
PongMessage,
UserMessage,
ErrorMessage,
parse_message
)
# Create a user message
message = UserMessage(
user_id="user123",
username="John Doe",
content="Hello, world!"
)
# Parse incoming message
data = {"type": "user_message", "user_id": "user123", "username": "John", "content": "Hi"}
parsed = parse_message(data) # Returns UserMessage instance
Reconnection with Exponential Backoff
from netrun.websocket import ReconnectionManager, ReconnectionConfig
# Configure reconnection
config = ReconnectionConfig(
initial_delay=1.0, # Start with 1 second
max_delay=60.0, # Max 60 seconds between attempts
max_attempts=10, # Try 10 times (0 = infinite)
backoff_multiplier=2.0, # Double delay each time
jitter=True # Add random jitter
)
reconnection = ReconnectionManager(config)
async def connect():
"""Your connection logic here."""
try:
# Establish connection
return True
except Exception:
return False
# Attempt reconnection
success = await reconnection.reconnect(
connect_callback=connect,
on_success=lambda: print("Connected!"),
on_failure=lambda attempt: print(f"Attempt {attempt} failed"),
on_max_attempts=lambda: print("Max attempts reached")
)
API Reference
WebSocketConnectionManager
Main connection manager class.
Constructor:
WebSocketConnectionManager(
max_connections_per_user: int = 5,
heartbeat_interval: int = 30,
connection_timeout: int = 300
)
Methods:
connect(websocket, user_id, session_id, ...)- Connect WebSocketdisconnect(connection_id, code, reason)- Disconnect WebSocketsend_message(connection_id, message, binary)- Send to connectionsend_to_user(user_id, message, binary)- Send to all user connectionsbroadcast(message, exclude_users, binary)- Broadcast to allreceive_message(connection_id)- Receive messageget_connection_info(connection_id)- Get connection metadataget_user_connections(user_id)- Get user's connectionsget_stats()- Get connection statistics
JWTAuthService
JWT authentication service.
Constructor:
JWTAuthService(
secret_key: str,
algorithm: str = "HS256",
token_expiry_seconds: int = 7200
)
Methods:
generate_token(user_id, additional_claims, expiry_seconds)- Generate JWTvalidate_token(token)- Validate and decode JWTget_user_id(token)- Extract user ID from tokenis_token_expired(token)- Check if token expired
WebSocketSessionManager
Redis-backed session manager (requires redis extra).
Constructor:
WebSocketSessionManager(
redis_client: Optional[Redis] = None,
redis_url: Optional[str] = None,
connection_ttl: int = 3600,
heartbeat_interval: int = 30,
heartbeat_timeout: int = 90
)
Methods:
initialize()- Initialize Redis connectioncreate_connection(session_id, user_id, username, ...)- Create connectionget_connection(connection_id)- Get connection metadataupdate_heartbeat(connection_id)- Update heartbeatsave_connection_state(connection_id, state)- Save staterestore_connection_state(connection_id)- Restore statedisconnect(connection_id, save_state)- Disconnectcleanup_stale_connections()- Cleanup stale connectionsget_connection_stats()- Get statistics
HeartbeatMonitor
Heartbeat monitoring for connection health.
Constructor:
HeartbeatMonitor(config: Optional[HeartbeatConfig] = None)
Methods:
register_connection(connection_id)- Register for monitoringunregister_connection(connection_id)- Unregisterupdate_heartbeat(connection_id)- Update heartbeat timestampis_stale(connection_id)- Check if stalestart(ping_callback, cleanup_callback, miss_callback)- Start monitoringstop()- Stop monitoringget_stats()- Get statistics
MetricsCollector
Connection metrics collection.
Constructor:
MetricsCollector()
Methods:
register_connection(connection_id, user_id)- Register connectionunregister_connection(connection_id)- Unregister connectionrecord_message_sent(connection_id, size_bytes)- Record sent messagerecord_message_received(connection_id, size_bytes)- Record received messagerecord_latency(latency_ms)- Record message latencyrecord_error(connection_id)- Record errorget_connection_metrics(connection_id)- Get connection metricsget_user_metrics(user_id)- Get user metricsget_stats()- Get aggregated statistics
Testing
Run the test suite:
pytest
With coverage:
pytest --cov=netrun.websocket --cov-report=html
License
MIT License. See LICENSE for details.
Support
For issues, questions, or contributions, please visit:
Related Packages
netrun-auth- Authentication and authorization utilitiesnetrun-db-pool- Database connection poolingnetrun-config- Configuration managementnetrun-logging- Structured logging
Netrun Systems - Production-Grade Infrastructure for Modern Applications
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file netrun_websocket-1.0.0.tar.gz.
File metadata
- Download URL: netrun_websocket-1.0.0.tar.gz
- Upload date:
- Size: 35.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fd49a0f5e7691b81c9382c2e825a642a51a1a7fe49b543582d8f071e266e804a
|
|
| MD5 |
0cd643fcb8d5ef60be19dbe8bb29b8e9
|
|
| BLAKE2b-256 |
10fe5446092478f28b33f25c5eb872fa15759d705676707b06daaf7545ee3678
|
File details
Details for the file netrun_websocket-1.0.0-py3-none-any.whl.
File metadata
- Download URL: netrun_websocket-1.0.0-py3-none-any.whl
- Upload date:
- Size: 26.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
70b986f3ec062bec844004c3a2ed2df5d5c65b5f7b58c50840299dd045fa1fcf
|
|
| MD5 |
734290e9b6a59f4661628fdfd2f6c308
|
|
| BLAKE2b-256 |
6b7d847db79395ce4bd9752e22cdd01dcf05351a7dfbad391fb45455470dd2e9
|