Skip to main content

Production-grade WebSocket connection management for Netrun Systems services

Project description

netrun-websocket

Production-grade WebSocket connection management for Netrun Systems services.

Python Version License

Features

  • Connection Management: Pool-based connection management with per-user limits
  • Session Persistence: Redis-backed session storage for cross-node coordination
  • JWT Authentication: Token-based authentication with expiration handling
  • Protocol Validation: Pydantic-based message type validation
  • Reconnection: Exponential backoff reconnection with configurable max attempts
  • Heartbeat Monitoring: Ping/pong health checks with stale connection cleanup
  • Metrics Tracking: Comprehensive connection and latency metrics
  • Type Safety: Full type hints for IDE support and type checking
  • Async/Await: Built on async/await throughout for high performance

Installation

Basic Installation

pip install netrun-websocket

With Redis Support

pip install netrun-websocket[redis]

With JWT Authentication

pip install netrun-websocket[auth]

With All Optional Dependencies

pip install netrun-websocket[all]

Development Installation

pip install netrun-websocket[dev]

Quick Start

Basic WebSocket Connection Manager

from fastapi import FastAPI, WebSocket
from netrun.websocket import WebSocketConnectionManager

app = FastAPI()
manager = WebSocketConnectionManager(max_connections_per_user=5)

@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
    # Connect
    connection_id = await manager.connect(websocket, user_id)

    try:
        while True:
            # Receive message
            data = await manager.receive_message(connection_id)
            if data is None:
                break

            # Broadcast to all users except sender
            await manager.broadcast(
                {"type": "message", "data": data},
                exclude_users={user_id}
            )
    finally:
        # Disconnect
        await manager.disconnect(connection_id)

With JWT Authentication

from fastapi import FastAPI, WebSocket, Query, WebSocketException
from netrun.websocket import WebSocketConnectionManager, JWTAuthService

app = FastAPI()
manager = WebSocketConnectionManager()
auth_service = JWTAuthService(secret_key="your-secret-key")

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, token: str = Query(...)):
    # Validate JWT token
    payload = auth_service.validate_token(token)
    if not payload:
        raise WebSocketException(code=1008, reason="Invalid token")

    user_id = payload["user_id"]

    # Connect
    connection_id = await manager.connect(websocket, user_id)

    try:
        while True:
            data = await manager.receive_message(connection_id)
            if data is None:
                break
            # Handle message
            await manager.send_to_user(user_id, {"echo": data})
    finally:
        await manager.disconnect(connection_id)

With Redis Session Management

from fastapi import FastAPI, WebSocket
from netrun.websocket import WebSocketConnectionManager, WebSocketSessionManager

app = FastAPI()
manager = WebSocketConnectionManager()
session_manager = WebSocketSessionManager(
    redis_url="redis://localhost:6379/0"
)

@app.on_event("startup")
async def startup():
    await session_manager.initialize()

@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
    # Create session connection
    connection_id = await session_manager.create_connection(
        session_id=f"session_{user_id}",
        user_id=user_id,
        username=f"User {user_id}"
    )

    # Connect WebSocket
    await manager.connect(websocket, user_id, session_id=connection_id)

    try:
        while True:
            data = await manager.receive_message(connection_id)
            if data is None:
                break

            # Update heartbeat
            await session_manager.update_heartbeat(connection_id)

            # Handle message
            await manager.send_message(connection_id, {"echo": data})
    finally:
        await manager.disconnect(connection_id)
        await session_manager.disconnect(connection_id)

With Heartbeat Monitoring

from fastapi import FastAPI, WebSocket
from netrun.websocket import (
    WebSocketConnectionManager,
    HeartbeatMonitor,
    HeartbeatConfig
)

app = FastAPI()
manager = WebSocketConnectionManager()

# Configure heartbeat
heartbeat_config = HeartbeatConfig(
    interval=30,      # Send ping every 30 seconds
    timeout=90,       # Consider stale after 90 seconds
    max_missed=3      # Disconnect after 3 missed heartbeats
)
heartbeat = HeartbeatMonitor(heartbeat_config)

async def send_ping(connection_id: str) -> bool:
    """Send ping to connection."""
    return await manager.send_message(
        connection_id,
        {"type": "ping", "timestamp": time.time()}
    )

async def cleanup_connection(connection_id: str):
    """Cleanup stale connection."""
    await manager.disconnect(connection_id, code=1001, reason="Heartbeat timeout")

@app.on_event("startup")
async def startup():
    # Start heartbeat monitoring
    await heartbeat.start(
        ping_callback=send_ping,
        cleanup_callback=cleanup_connection
    )

@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
    connection_id = await manager.connect(websocket, user_id)

    # Register for heartbeat monitoring
    heartbeat.register_connection(connection_id)

    try:
        while True:
            data = await manager.receive_message(connection_id)
            if data is None:
                break

            # Update heartbeat on activity
            if data.get("type") == "pong":
                heartbeat.update_heartbeat(connection_id)

            await manager.send_message(connection_id, {"echo": data})
    finally:
        heartbeat.unregister_connection(connection_id)
        await manager.disconnect(connection_id)

With Metrics Collection

from fastapi import FastAPI, WebSocket
from netrun.websocket import WebSocketConnectionManager, MetricsCollector

app = FastAPI()
manager = WebSocketConnectionManager()
metrics = MetricsCollector()

@app.get("/metrics")
async def get_metrics():
    """Get WebSocket metrics."""
    return metrics.get_stats()

@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
    connection_id = await manager.connect(websocket, user_id)

    # Register for metrics
    metrics.register_connection(connection_id, user_id)

    try:
        while True:
            start_time = time.time()
            data = await manager.receive_message(connection_id)
            if data is None:
                break

            # Record metrics
            metrics.record_message_received(connection_id, len(str(data)))

            # Send response
            await manager.send_message(connection_id, {"echo": data})
            metrics.record_message_sent(connection_id, len(str(data)))

            # Record latency
            latency_ms = (time.time() - start_time) * 1000
            metrics.record_latency(latency_ms)
    finally:
        metrics.unregister_connection(connection_id)
        await manager.disconnect(connection_id)

Protocol Message Types

The package includes Pydantic models for type-safe message handling:

from netrun.websocket import (
    MessageType,
    WebSocketMessage,
    PingMessage,
    PongMessage,
    UserMessage,
    ErrorMessage,
    parse_message
)

# Create a user message
message = UserMessage(
    user_id="user123",
    username="John Doe",
    content="Hello, world!"
)

# Parse incoming message
data = {"type": "user_message", "user_id": "user123", "username": "John", "content": "Hi"}
parsed = parse_message(data)  # Returns UserMessage instance

Reconnection with Exponential Backoff

from netrun.websocket import ReconnectionManager, ReconnectionConfig

# Configure reconnection
config = ReconnectionConfig(
    initial_delay=1.0,        # Start with 1 second
    max_delay=60.0,           # Max 60 seconds between attempts
    max_attempts=10,          # Try 10 times (0 = infinite)
    backoff_multiplier=2.0,   # Double delay each time
    jitter=True               # Add random jitter
)

reconnection = ReconnectionManager(config)

async def connect():
    """Your connection logic here."""
    try:
        # Establish connection
        return True
    except Exception:
        return False

# Attempt reconnection
success = await reconnection.reconnect(
    connect_callback=connect,
    on_success=lambda: print("Connected!"),
    on_failure=lambda attempt: print(f"Attempt {attempt} failed"),
    on_max_attempts=lambda: print("Max attempts reached")
)

API Reference

WebSocketConnectionManager

Main connection manager class.

Constructor:

WebSocketConnectionManager(
    max_connections_per_user: int = 5,
    heartbeat_interval: int = 30,
    connection_timeout: int = 300
)

Methods:

  • connect(websocket, user_id, session_id, ...) - Connect WebSocket
  • disconnect(connection_id, code, reason) - Disconnect WebSocket
  • send_message(connection_id, message, binary) - Send to connection
  • send_to_user(user_id, message, binary) - Send to all user connections
  • broadcast(message, exclude_users, binary) - Broadcast to all
  • receive_message(connection_id) - Receive message
  • get_connection_info(connection_id) - Get connection metadata
  • get_user_connections(user_id) - Get user's connections
  • get_stats() - Get connection statistics

JWTAuthService

JWT authentication service.

Constructor:

JWTAuthService(
    secret_key: str,
    algorithm: str = "HS256",
    token_expiry_seconds: int = 7200
)

Methods:

  • generate_token(user_id, additional_claims, expiry_seconds) - Generate JWT
  • validate_token(token) - Validate and decode JWT
  • get_user_id(token) - Extract user ID from token
  • is_token_expired(token) - Check if token expired

WebSocketSessionManager

Redis-backed session manager (requires redis extra).

Constructor:

WebSocketSessionManager(
    redis_client: Optional[Redis] = None,
    redis_url: Optional[str] = None,
    connection_ttl: int = 3600,
    heartbeat_interval: int = 30,
    heartbeat_timeout: int = 90
)

Methods:

  • initialize() - Initialize Redis connection
  • create_connection(session_id, user_id, username, ...) - Create connection
  • get_connection(connection_id) - Get connection metadata
  • update_heartbeat(connection_id) - Update heartbeat
  • save_connection_state(connection_id, state) - Save state
  • restore_connection_state(connection_id) - Restore state
  • disconnect(connection_id, save_state) - Disconnect
  • cleanup_stale_connections() - Cleanup stale connections
  • get_connection_stats() - Get statistics

HeartbeatMonitor

Heartbeat monitoring for connection health.

Constructor:

HeartbeatMonitor(config: Optional[HeartbeatConfig] = None)

Methods:

  • register_connection(connection_id) - Register for monitoring
  • unregister_connection(connection_id) - Unregister
  • update_heartbeat(connection_id) - Update heartbeat timestamp
  • is_stale(connection_id) - Check if stale
  • start(ping_callback, cleanup_callback, miss_callback) - Start monitoring
  • stop() - Stop monitoring
  • get_stats() - Get statistics

MetricsCollector

Connection metrics collection.

Constructor:

MetricsCollector()

Methods:

  • register_connection(connection_id, user_id) - Register connection
  • unregister_connection(connection_id) - Unregister connection
  • record_message_sent(connection_id, size_bytes) - Record sent message
  • record_message_received(connection_id, size_bytes) - Record received message
  • record_latency(latency_ms) - Record message latency
  • record_error(connection_id) - Record error
  • get_connection_metrics(connection_id) - Get connection metrics
  • get_user_metrics(user_id) - Get user metrics
  • get_stats() - Get aggregated statistics

Testing

Run the test suite:

pytest

With coverage:

pytest --cov=netrun.websocket --cov-report=html

License

MIT License. See LICENSE for details.

Support

For issues, questions, or contributions, please visit:

Related Packages

  • netrun-auth - Authentication and authorization utilities
  • netrun-db-pool - Database connection pooling
  • netrun-config - Configuration management
  • netrun-logging - Structured logging

Netrun Systems - Production-Grade Infrastructure for Modern Applications

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

netrun_websocket-1.0.0.tar.gz (35.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

netrun_websocket-1.0.0-py3-none-any.whl (26.3 kB view details)

Uploaded Python 3

File details

Details for the file netrun_websocket-1.0.0.tar.gz.

File metadata

  • Download URL: netrun_websocket-1.0.0.tar.gz
  • Upload date:
  • Size: 35.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.12

File hashes

Hashes for netrun_websocket-1.0.0.tar.gz
Algorithm Hash digest
SHA256 fd49a0f5e7691b81c9382c2e825a642a51a1a7fe49b543582d8f071e266e804a
MD5 0cd643fcb8d5ef60be19dbe8bb29b8e9
BLAKE2b-256 10fe5446092478f28b33f25c5eb872fa15759d705676707b06daaf7545ee3678

See more details on using hashes here.

File details

Details for the file netrun_websocket-1.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for netrun_websocket-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 70b986f3ec062bec844004c3a2ed2df5d5c65b5f7b58c50840299dd045fa1fcf
MD5 734290e9b6a59f4661628fdfd2f6c308
BLAKE2b-256 6b7d847db79395ce4bd9752e22cdd01dcf05351a7dfbad391fb45455470dd2e9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page