Skip to main content

Asynchronous WebSocket connection pool for Python with connection reuse, health monitoring, and automatic cleanup

Project description

aio-websocket-pool

A flexible, async-friendly WebSocket connection pool implementation for Python with connection reuse, health monitoring, and automatic cleanup.

Features

  • Connection Reuse: Efficient connection pooling with automatic reuse of healthy connections
  • Health Monitoring: Automatic connection health checks and cleanup of disconnected connections
  • Thread-safe: Uses asyncio locks for concurrent access in async environments
  • Configurable Limits: Set maximum connection limits and timeouts
  • Retry Logic: Built-in exponential backoff retry mechanism for connection failures
  • Server Data Draining: Optional server data detection and draining during connection release
  • Context Manager Support: Clean, intuitive API with async context managers
  • Idle Connection Cleanup: Automatic cleanup of idle connections to prevent resource leaks
  • Type-safe: Full typing support with comprehensive type hints

Installation

pip install aio-websocket-pool

Or using Poetry:

poetry add aio-websocket-pool

Quick Start

import asyncio
from aio_websocket_pool import WebsocketConnectionPool


async def main():
    # Create a connection pool
    async with WebsocketConnectionPool(uri="ws://localhost:8080") as pool:
        # Get a connection from the pool
        async with pool.get_connection() as conn:
            await conn.send("Hello, World!")
            response = await conn.recv()
            print(response)


asyncio.run(main())

Advanced Usage

Basic Pool Configuration

from aio_websocket_pool import WebsocketConnectionPool

# Create a pool with custom configuration
pool = WebsocketConnectionPool(
    uri="ws://localhost:8080",
    max_connections=10,
    idle_timeout=60.0,
    connection_timeout=10.0,
    max_retries=3,
    warmup_connections=2,  # Pre-create 2 connections
    headers={"Authorization": "Bearer token123"}
)

async with pool:
    # Pool is ready to use
    conn = await pool.acquire()
    try:
        await conn.send("Hello")
        response = await conn.recv()
        print(response)
    finally:
        await pool.release(conn)

Connection Health Monitoring

# Enable server data checking during connection release
pool = WebsocketConnectionPool(
    uri="ws://localhost:8080",
    check_server_data_on_release=True,
    drain_timeout=5.0,
    drain_quiet_period=1.0
)

async with pool:
    async with pool.get_connection() as conn:
        await conn.send("REQUEST")
        response = await conn.recv()
        print(response)
        # Connection will be checked for additional server data
        # before being returned to the pool

Custom Drain Condition

def custom_drain_condition(message):
    # Only drain messages that look like server notifications
    return message.startswith(b'{"type":"notification"')


pool = WebsocketConnectionPool(
    uri="ws://localhost:8080",
    check_server_data_on_release=True,
    drain_condition=custom_drain_condition
)

Force New Connection

async with pool:
    # Force create a new connection instead of reusing existing ones
    async with pool.get_new_connection() as conn:
        await conn.send("Important message")
        response = await conn.recv()
        print(response)

Connection Pool Monitoring

async with pool:
    print(f"Total connections: {pool.total_connections}")
    print(f"Available connections: {pool.available_connections}")
    print(f"Busy connections: {pool.busy_connections}")
    print(f"Pending connections: {pool.pending_connections}")

Manual Connection Management

async with pool:
    # Manually acquire and release connections
    conn = await pool.acquire()
    try:
        await conn.send("Hello")
        response = await conn.recv()
        print(response)
    finally:
        # Release connection back to pool
        await pool.release(conn)

    # Force remove a connection from the pool
    conn = await pool.acquire()
    try:
        await conn.send("Hello")
        response = await conn.recv()
        print(response)
    finally:
        # Remove connection instead of returning to pool
        await pool.release(conn, force_remove=True)

API Reference

WebsocketConnectionPool

Constructor

WebsocketConnectionPool(
    uri: str,
    headers: Dict[str, str] | None = None,
    idle_timeout: float = 60.0,
    max_connections: int = 50,
    max_retries: int = 3,
    cleanup_interval: float = 5.0,
    connection_timeout: float = 10.0,
    warmup_connections: int = 0,
    check_server_data_on_release: bool = False,
    drain_timeout: float = 10.0,
    drain_quiet_period: float = 2.0,
    drain_condition: DrainConditionCallback | None = None,
    **kwargs
)

Creates a new WebSocket connection pool.

Properties

  • total_connections: int - Total number of connections in the pool
  • available_connections: int - Number of available connections
  • busy_connections: int - Number of connections currently in use
  • pending_connections: int - Number of connections being drained
  • is_closed: bool - Whether the pool is closed
  • is_started: bool - Whether the pool has been started

Methods

  • async start() - Start the connection pool
  • async acquire() -> WebsocketConnection - Acquire a connection from the pool
  • async acquire_new() -> WebsocketConnection - Force acquire a new connection
  • async release(connection, *, force_remove=False) - Release a connection back to the pool
  • async get_connection() - Context manager for acquiring/releasing connections
  • async get_new_connection() - Context manager for acquiring/releasing new connections
  • async close_all() - Close all connections and shut down the pool

WebsocketConnection

Properties

  • is_busy: bool - Whether the connection is currently in use
  • is_draining: bool - Whether the connection is being drained
  • is_connected: bool - Whether the connection is active
  • last_activity: float - Timestamp of last activity

Methods

  • async send(message: str | bytes) - Send a message through the connection
  • async recv() -> str | bytes - Receive a message from the connection
  • async ping() - Send a ping to check connection health
  • async connect() - Establish the WebSocket connection
  • async close() - Close the connection

Error Handling

The library provides several specific exception types:

from aio_websocket_pool import (
    WebsocketError,
    ConnectionBusyError,
    ConnectionUnavailableError,
    ConnectionClosedError,
    ConnectionPoolExhaustedError,
    ConnectionPoolUnavailableError,
)

try:
    async with pool.get_connection() as conn:
        await conn.send("Hello")
        response = await conn.recv()
except ConnectionPoolExhaustedError:
    print("No connections available in pool")
except ConnectionClosedError:
    print("Connection was closed unexpectedly")
except WebsocketError as e:
    print(f"WebSocket error: {e}")

Configuration Best Practices

For High-Traffic Applications

pool = WebsocketConnectionPool(
    uri="ws://your-server.com/ws",
    max_connections=100,
    warmup_connections=10,
    idle_timeout=120.0,
    connection_timeout=5.0,
    max_retries=5,
    cleanup_interval=10.0
)

For Low-Latency Applications

pool = WebsocketConnectionPool(
    uri="ws://your-server.com/ws",
    max_connections=50,
    warmup_connections=5,
    idle_timeout=30.0,
    connection_timeout=3.0,
    check_server_data_on_release=True,
    drain_timeout=2.0,
    drain_quiet_period=0.5
)

For Resource-Constrained Environments

pool = WebsocketConnectionPool(
    uri="ws://your-server.com/ws",
    max_connections=5,
    warmup_connections=1,
    idle_timeout=30.0,
    connection_timeout=10.0,
    max_retries=2,
    cleanup_interval=5.0
)

Requirements

  • Python 3.11+
  • websockets >= 15.0.1
  • tenacity >= 9.1.2

License

This project is licensed under the MIT License.

Development

Setup

poetry install

Running Tests

poetry run pytest

Code Quality

poetry run black .
poetry run isort .
poetry run flake8 .
poetry run mypy .

Building

poetry build

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aio_websocket_pool-0.1.0.tar.gz (14.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aio_websocket_pool-0.1.0-py3-none-any.whl (14.2 kB view details)

Uploaded Python 3

File details

Details for the file aio_websocket_pool-0.1.0.tar.gz.

File metadata

  • Download URL: aio_websocket_pool-0.1.0.tar.gz
  • Upload date:
  • Size: 14.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.11.13 Linux/6.11.0-1018-azure

File hashes

Hashes for aio_websocket_pool-0.1.0.tar.gz
Algorithm Hash digest
SHA256 beed68cf4f08babd84d25648914a3dcf82378b20c82228f41129bbaf4a32a95d
MD5 cb80c2ec3d61c42cd1a32983108d1186
BLAKE2b-256 46518ed5a08c1f58aa2b4bdec8fbf30d07d95a079e576e3412d84eb77a77e4e0

See more details on using hashes here.

File details

Details for the file aio_websocket_pool-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: aio_websocket_pool-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 14.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.11.13 Linux/6.11.0-1018-azure

File hashes

Hashes for aio_websocket_pool-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d0c568bbce86f7518a66edb328c4a579e95d49070c1797dc1aa482ee940c4217
MD5 2464bad42103f77aa35977ec61f54ea5
BLAKE2b-256 1de22ecb72406d86ed8f877cac0ff7a4f8edad354f9ceef51e13222f4b9446ca

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page