Asynchronous WebSocket connection pool for Python with connection reuse, health monitoring, and automatic cleanup
Project description
aio-websocket-pool
A flexible, async-friendly WebSocket connection pool implementation for Python with connection reuse, health monitoring, and automatic cleanup.
Features
- Connection Reuse: Efficient connection pooling with automatic reuse of healthy connections
- Health Monitoring: Automatic connection health checks and cleanup of disconnected connections
- Thread-safe: Uses asyncio locks for concurrent access in async environments
- Configurable Limits: Set maximum connection limits and timeouts
- Retry Logic: Built-in exponential backoff retry mechanism for connection failures
- Server Data Draining: Optional server data detection and draining during connection release
- Context Manager Support: Clean, intuitive API with async context managers
- Idle Connection Cleanup: Automatic cleanup of idle connections to prevent resource leaks
- Type-safe: Full typing support with comprehensive type hints
Installation
pip install aio-websocket-pool
Or using Poetry:
poetry add aio-websocket-pool
Quick Start
import asyncio
from aio_websocket_pool import WebsocketConnectionPool
async def main():
# Create a connection pool
async with WebsocketConnectionPool(uri="ws://localhost:8080") as pool:
# Get a connection from the pool
async with pool.get_connection() as conn:
await conn.send("Hello, World!")
response = await conn.recv()
print(response)
asyncio.run(main())
Advanced Usage
Basic Pool Configuration
from aio_websocket_pool import WebsocketConnectionPool
# Create a pool with custom configuration
pool = WebsocketConnectionPool(
uri="ws://localhost:8080",
max_connections=10,
idle_timeout=60.0,
connection_timeout=10.0,
max_retries=3,
warmup_connections=2, # Pre-create 2 connections
headers={"Authorization": "Bearer token123"}
)
async with pool:
# Pool is ready to use
conn = await pool.acquire()
try:
await conn.send("Hello")
response = await conn.recv()
print(response)
finally:
await pool.release(conn)
Connection Health Monitoring
# Enable server data checking during connection release
pool = WebsocketConnectionPool(
uri="ws://localhost:8080",
check_server_data_on_release=True,
drain_timeout=5.0,
drain_quiet_period=1.0
)
async with pool:
async with pool.get_connection() as conn:
await conn.send("REQUEST")
response = await conn.recv()
print(response)
# Connection will be checked for additional server data
# before being returned to the pool
Custom Drain Condition
def custom_drain_condition(message):
# Only drain messages that look like server notifications
return message.startswith(b'{"type":"notification"')
pool = WebsocketConnectionPool(
uri="ws://localhost:8080",
check_server_data_on_release=True,
drain_condition=custom_drain_condition
)
Force New Connection
async with pool:
# Force create a new connection instead of reusing existing ones
async with pool.get_new_connection() as conn:
await conn.send("Important message")
response = await conn.recv()
print(response)
Connection Pool Monitoring
async with pool:
print(f"Total connections: {pool.total_connections}")
print(f"Available connections: {pool.available_connections}")
print(f"Busy connections: {pool.busy_connections}")
print(f"Pending connections: {pool.pending_connections}")
Manual Connection Management
async with pool:
# Manually acquire and release connections
conn = await pool.acquire()
try:
await conn.send("Hello")
response = await conn.recv()
print(response)
finally:
# Release connection back to pool
await pool.release(conn)
# Force remove a connection from the pool
conn = await pool.acquire()
try:
await conn.send("Hello")
response = await conn.recv()
print(response)
finally:
# Remove connection instead of returning to pool
await pool.release(conn, force_remove=True)
API Reference
WebsocketConnectionPool
Constructor
WebsocketConnectionPool(
uri: str,
headers: Dict[str, str] | None = None,
idle_timeout: float = 60.0,
max_connections: int = 50,
max_retries: int = 3,
cleanup_interval: float = 5.0,
connection_timeout: float = 10.0,
warmup_connections: int = 0,
check_server_data_on_release: bool = False,
drain_timeout: float = 10.0,
drain_quiet_period: float = 2.0,
drain_condition: DrainConditionCallback | None = None,
**kwargs
)
Creates a new WebSocket connection pool.
Properties
total_connections: int- Total number of connections in the poolavailable_connections: int- Number of available connectionsbusy_connections: int- Number of connections currently in usepending_connections: int- Number of connections being drainedis_closed: bool- Whether the pool is closedis_started: bool- Whether the pool has been started
Methods
async start()- Start the connection poolasync acquire() -> WebsocketConnection- Acquire a connection from the poolasync acquire_new() -> WebsocketConnection- Force acquire a new connectionasync release(connection, *, force_remove=False)- Release a connection back to the poolasync get_connection()- Context manager for acquiring/releasing connectionsasync get_new_connection()- Context manager for acquiring/releasing new connectionsasync close_all()- Close all connections and shut down the pool
WebsocketConnection
Properties
is_busy: bool- Whether the connection is currently in useis_draining: bool- Whether the connection is being drainedis_connected: bool- Whether the connection is activelast_activity: float- Timestamp of last activity
Methods
async send(message: str | bytes)- Send a message through the connectionasync recv() -> str | bytes- Receive a message from the connectionasync ping()- Send a ping to check connection healthasync connect()- Establish the WebSocket connectionasync close()- Close the connection
Error Handling
The library provides several specific exception types:
from aio_websocket_pool import (
WebsocketError,
ConnectionBusyError,
ConnectionUnavailableError,
ConnectionClosedError,
ConnectionPoolExhaustedError,
ConnectionPoolUnavailableError,
)
try:
async with pool.get_connection() as conn:
await conn.send("Hello")
response = await conn.recv()
except ConnectionPoolExhaustedError:
print("No connections available in pool")
except ConnectionClosedError:
print("Connection was closed unexpectedly")
except WebsocketError as e:
print(f"WebSocket error: {e}")
Configuration Best Practices
For High-Traffic Applications
pool = WebsocketConnectionPool(
uri="ws://your-server.com/ws",
max_connections=100,
warmup_connections=10,
idle_timeout=120.0,
connection_timeout=5.0,
max_retries=5,
cleanup_interval=10.0
)
For Low-Latency Applications
pool = WebsocketConnectionPool(
uri="ws://your-server.com/ws",
max_connections=50,
warmup_connections=5,
idle_timeout=30.0,
connection_timeout=3.0,
check_server_data_on_release=True,
drain_timeout=2.0,
drain_quiet_period=0.5
)
For Resource-Constrained Environments
pool = WebsocketConnectionPool(
uri="ws://your-server.com/ws",
max_connections=5,
warmup_connections=1,
idle_timeout=30.0,
connection_timeout=10.0,
max_retries=2,
cleanup_interval=5.0
)
Requirements
- Python 3.11+
- websockets >= 15.0.1
- tenacity >= 9.1.2
License
This project is licensed under the MIT License.
Development
Setup
poetry install
Running Tests
poetry run pytest
Code Quality
poetry run black .
poetry run isort .
poetry run flake8 .
poetry run mypy .
Building
poetry build
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aio_websocket_pool-0.1.0.tar.gz.
File metadata
- Download URL: aio_websocket_pool-0.1.0.tar.gz
- Upload date:
- Size: 14.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.3 CPython/3.11.13 Linux/6.11.0-1018-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
beed68cf4f08babd84d25648914a3dcf82378b20c82228f41129bbaf4a32a95d
|
|
| MD5 |
cb80c2ec3d61c42cd1a32983108d1186
|
|
| BLAKE2b-256 |
46518ed5a08c1f58aa2b4bdec8fbf30d07d95a079e576e3412d84eb77a77e4e0
|
File details
Details for the file aio_websocket_pool-0.1.0-py3-none-any.whl.
File metadata
- Download URL: aio_websocket_pool-0.1.0-py3-none-any.whl
- Upload date:
- Size: 14.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.3 CPython/3.11.13 Linux/6.11.0-1018-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d0c568bbce86f7518a66edb328c4a579e95d49070c1797dc1aa482ee940c4217
|
|
| MD5 |
2464bad42103f77aa35977ec61f54ea5
|
|
| BLAKE2b-256 |
1de22ecb72406d86ed8f877cac0ff7a4f8edad354f9ceef51e13222f4b9446ca
|