Python client for WSE (WebSocket Engine) -- real-time event streaming
Project description
wse-client
Python client for WSE (WebSocket Engine) - real-time event streaming with auto-reconnect, compression, encryption, and connection resilience.
Feature parity with the TypeScript client. Pure Python, no Rust dependency.
Installation
pip install wse-client
Optional extras:
pip install wse-client[crypto] # E2E encryption (ECDH + AES-GCM)
pip install wse-client[msgpack] # MessagePack binary encoding
pip install wse-client[orjson] # Faster JSON (de)serialization
pip install wse-client[all] # Everything
Quick Start
Async (recommended)
from wse_client import connect
async with connect("ws://localhost:5007/wse", token="<jwt>") as client:
await client.subscribe(["notifications", "trades"])
async for event in client:
print(event.type, event.payload)
Sync
from wse_client import SyncWSEClient
client = SyncWSEClient("ws://localhost:5007/wse", token="<jwt>")
client.connect()
client.subscribe(["notifications"])
event = client.recv(timeout=5.0)
print(event.type, event.payload)
client.close()
Callbacks
from wse_client import SyncWSEClient
client = SyncWSEClient("ws://localhost:5007/wse", token="<jwt>")
@client.on("notifications")
def handle(event):
print(event.payload)
@client.on_any
def catch_all(event):
print(f"[{event.type}] {event.payload}")
client.connect()
client.run_forever()
Constructor Parameters
AsyncWSEClient(url, **kwargs):
| Parameter | Type | Default | Description |
|---|---|---|---|
url |
str | required | WebSocket server URL |
token |
str | None | JWT token for authentication |
topics |
list[str] | None | Topics to auto-subscribe after connecting |
reconnect |
ReconnectConfig | default config | Reconnection strategy |
extra_headers |
dict[str, str] | None | Additional HTTP headers for the handshake |
queue_size |
int | 1000 | Max events buffered for the async iterator |
The connect(url, **kwargs) factory returns an AsyncWSEClient configured as an async context manager.
API Reference
AsyncWSEClient
| Method | Description |
|---|---|
connect() |
Open WebSocket connection |
disconnect() |
Close connection gracefully |
send(type, payload, priority=NORMAL, correlation_id=None) |
Send a structured message |
send_with_retry(type, payload, priority=NORMAL, correlation_id=None, max_retries=5) |
Send with exponential backoff retries |
send_batch(messages) |
Send multiple messages in a single frame |
subscribe(topics, recover=False) |
Subscribe to event topics, optionally recovering missed messages |
unsubscribe(topics) |
Unsubscribe from topics |
request_snapshot(topics) |
Request current state snapshot for topics |
on(event_type) |
Register event handler (decorator) |
on_any(handler) |
Register a wildcard handler that receives all events |
off(event_type, handler) |
Remove a specific event handler |
recv(timeout=None) |
Receive next event (blocks until available) |
force_reconnect() |
Force an immediate reconnection |
change_endpoint(url) |
Switch to a different server URL |
get_stats() |
Connection statistics, message counts, network quality |
Properties:
| Property | Type | Description |
|---|---|---|
is_connected |
bool | WebSocket connection is open |
is_ready |
bool | server_ready handshake completed |
is_fully_ready |
bool | Connected, server_ready received, and client_hello sent |
state |
ConnectionState | Current connection state enum |
connection_quality |
ConnectionQuality | Network quality assessment |
subscribed_topics |
set[str] | Currently subscribed topics |
queue_size |
int | Events waiting in the receive queue |
recovery_enabled |
bool | Whether server supports message recovery |
recovery_state |
dict | Per-topic recovery state: {topic: (epoch, offset)} |
Context manager and async iterator:
async with AsyncWSEClient(url) as client: # auto connect/disconnect
async for event in client: # yields WSEEvent objects
...
SyncWSEClient
Same API as AsyncWSEClient but blocking. Runs the async client in a background daemon thread.
| Method | Description |
|---|---|
run_forever() |
Block and dispatch events to registered callbacks |
recv(timeout=None) |
Block until next event (raises WSETimeoutError on timeout) |
close() |
Disconnect and shut down the background thread |
WSEEvent
@dataclass(frozen=True, slots=True)
class WSEEvent:
type: str # Event type ("t" field)
payload: dict[str, Any] # Event data ("p" field)
id: str | None = None # Message ID
sequence: int | None = None # Sequence number
timestamp: str | None = None # ISO 8601 timestamp
version: int = 1 # Protocol version
category: str | None = None # Message category (system/snapshot/update)
priority: int | None = None # Message priority (see MessagePriority)
correlation_id: str | None = None # Request correlation ID
signature: str | None = None # Message signature (if signed)
Features
Auto-Reconnection
Four reconnection strategies with configurable parameters:
from wse_client import AsyncWSEClient, ReconnectConfig, ReconnectMode
client = AsyncWSEClient(
"ws://localhost:5007/wse",
reconnect=ReconnectConfig(
mode=ReconnectMode.EXPONENTIAL, # EXPONENTIAL, LINEAR, FIBONACCI, ADAPTIVE
base_delay=1.0, # Initial delay in seconds
max_delay=30.0, # Maximum delay cap
factor=1.5, # Backoff multiplier
jitter=True, # Add randomness to prevent thundering herd
),
)
On reconnect, the client automatically re-subscribes to all previously subscribed topics.
Message Priority
Five priority levels for message ordering:
from wse_client import MessagePriority
await client.send("alert", {"msg": "critical"}, priority=MessagePriority.CRITICAL) # 10
await client.send("update", {"data": "..."}, priority=MessagePriority.HIGH) # 8
await client.send("status", {"ok": True}, priority=MessagePriority.NORMAL) # 5
await client.send("metric", {"cpu": 42}, priority=MessagePriority.LOW) # 3
await client.send("log", {"msg": "debug"}, priority=MessagePriority.BACKGROUND) # 1
Compression
Built-in zlib compression for messages over 1 KB. Applied automatically on send - no configuration needed. The server decompresses transparently.
E2E Encryption
ECDH P-256 key exchange with AES-GCM-256 encryption. Requires the cryptography package:
pip install wse-client[crypto]
Key exchange happens automatically during the WebSocket handshake. All messages are encrypted with per-connection session keys derived via HKDF-SHA256. Wire format is the E: prefix + 12-byte IV + ciphertext + 16-byte auth tag, compatible with the TypeScript client and Rust server.
Circuit Breaker
Prevents connection storms during outages. Opens after 5 consecutive failures, rejects further attempts for 60s, then enters half-open state for recovery probing.
States: CLOSED (normal) -> OPEN (blocking) -> HALF_OPEN (probing) -> CLOSED (recovered)
Rate Limiting
Client-side token bucket rate limiter (1000 tokens, 100/sec refill). Prevents message flooding and coordinates with the server's rate limit feedback (rate_limit_warning at 20% capacity).
Event Sequencing
Automatic duplicate detection via sliding window (10,000-entry dedup cache). Out-of-order event buffering with configurable gap tolerance (up to 100 sequence gap). Missed sequences trigger automatic gap recovery.
Network Quality Monitoring
Real-time network quality assessment based on PING/PONG round-trip measurements:
stats = client.get_stats()
print(stats["network"]["quality"]) # EXCELLENT / GOOD / FAIR / POOR
print(stats["network"]["latency_ms"]) # Average round-trip time
print(stats["network"]["jitter_ms"]) # Latency variance
print(stats["network"]["packet_loss"]) # Estimated packet loss ratio
Connection Pool
Multi-endpoint support with health scoring and automatic failover:
from wse_client import ConnectionPool, LoadBalancingStrategy
pool = ConnectionPool(
["ws://server1:5007/wse", "ws://server2:5007/wse", "ws://server3:5007/wse"],
strategy=LoadBalancingStrategy.WEIGHTED_RANDOM, # ROUND_ROBIN, WEIGHTED_RANDOM, LEAST_CONNECTIONS
)
url = pool.select_endpoint()
Endpoints are scored based on connection success rate, latency, and recent failures. Unhealthy endpoints are deprioritized automatically.
Error Handling
from wse_client.errors import (
WSEError, # Base exception
WSEConnectionError, # Connection failures
WSETimeoutError, # Operation timeouts
WSEAuthError, # Authentication failures
WSERateLimitError, # Rate limit exceeded
WSEProtocolError, # Wire protocol violations
WSECircuitBreakerError, # Circuit breaker open
WSEEncryptionError, # Encryption/decryption failures
)
Wire Protocol
The client speaks WSE wire protocol v1:
- Text frames: Category prefix (
WSE{,S{,U{) + JSON envelope - Binary frames: Codec prefix (
C:zlib,M:msgpack,E:AES-GCM) + payload - Heartbeat: JSON PING/PONG with latency tracking
Full protocol specification: PROTOCOL.md
Requirements
- Python 3.11+
websockets >= 13.0
Optional:
cryptography >= 43.0(E2E encryption)msgpack >= 1.0(MessagePack binary encoding)orjson >= 3.10(faster JSON serialization)
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file wse_client-2.0.1.tar.gz.
File metadata
- Download URL: wse_client-2.0.1.tar.gz
- Upload date:
- Size: 42.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f8fcbe0f8433a36b80e71ff5070b8a54ba9efc4b5f909303f37b7be5712a9c0e
|
|
| MD5 |
7c1ec4152c075ca7268a0484adca5e1a
|
|
| BLAKE2b-256 |
72db38808a0eba090c2833c3459dc542e3f3a2bd23cf75a0b3a33290a8bd837a
|
Provenance
The following attestation bundles were made for wse_client-2.0.1.tar.gz:
Publisher:
release.yml on silvermpx/wse
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
wse_client-2.0.1.tar.gz -
Subject digest:
f8fcbe0f8433a36b80e71ff5070b8a54ba9efc4b5f909303f37b7be5712a9c0e - Sigstore transparency entry: 1004844546
- Sigstore integration time:
-
Permalink:
silvermpx/wse@b957557b8b5a34b791696307e827ce7dfd92c2f8 -
Branch / Tag:
refs/tags/v2.0.1 - Owner: https://github.com/silvermpx
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@b957557b8b5a34b791696307e827ce7dfd92c2f8 -
Trigger Event:
push
-
Statement type:
File details
Details for the file wse_client-2.0.1-py3-none-any.whl.
File metadata
- Download URL: wse_client-2.0.1-py3-none-any.whl
- Upload date:
- Size: 43.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3fbc2e4a87ca119daa1173eee62c943db8ff09edcee2b4a614346615a18ae7c7
|
|
| MD5 |
89e73f7c8b48d08c13f3048a0c0b2efb
|
|
| BLAKE2b-256 |
6de1838e51994bec0f297c17f7e83042b976113a7b537813ac4ea6b4980dd02d
|
Provenance
The following attestation bundles were made for wse_client-2.0.1-py3-none-any.whl:
Publisher:
release.yml on silvermpx/wse
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
wse_client-2.0.1-py3-none-any.whl -
Subject digest:
3fbc2e4a87ca119daa1173eee62c943db8ff09edcee2b4a614346615a18ae7c7 - Sigstore transparency entry: 1004844547
- Sigstore integration time:
-
Permalink:
silvermpx/wse@b957557b8b5a34b791696307e827ce7dfd92c2f8 -
Branch / Tag:
refs/tags/v2.0.1 - Owner: https://github.com/silvermpx
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@b957557b8b5a34b791696307e827ce7dfd92c2f8 -
Trigger Event:
push
-
Statement type: