WebSocket event bus — real-time event push over persistent connections
Project description
loopws
A standalone WebSocket event bus for Python — real-time event push over persistent connections. Pure Python + asyncio, zero required dependencies, pluggable Redis backend for multi-server deployments.
What it is
loopws is a library, not a service. You bring your own WebSocket framework (FastAPI, Starlette, aiohttp, anything that satisfies its structural protocol) and your own presence store (in-memory for dev, Redis for prod). loopws provides:
- A per-connection handler with built-in dispatch, ping/pong keepalive, subscribe/unsubscribe, and lifecycle hooks.
- A local connection registry that tracks which connections are on this server.
- A subscription matcher with
service:eventpatterns (loopbooks:*,*:invoice.paid, etc.). - In-process push helpers —
push_to_connection,push_to_account,push_to_channel, plus the back-compatpush_event/push_event_strictaliases (0.3.1+). - A cross-server pub/sub manager with exponential-backoff-with-jitter reconnect.
- A frozen-dataclass
LoopWSConfigthat bundles every timing tunable.
Install
pip install loopws # core only, zero dependencies
pip install loopws[redis] # with Redis adapter
pip install loopws[dev] # with test suite extras
Requires Python 3.11+.
Connection identity
Every connection is identified by (account_id, connection_id), both strings chosen by the consumer:
| Field | Purpose |
|---|---|
account_id |
Tenant / user / customer identity (UUID, session token, customer ID — any string) |
connection_id |
Unique-within-account handle (device serial, browser tab UUID, sensor MAC, etc.) |
This works for multi-device authenticated apps (a user with phone + laptop), anonymous browser sessions (one ID per tab), IoT (one ID per sensor), and server-to-server use (one ID per worker instance).
Minimal example (FastAPI)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from loopws import BaseWebSocketHandler, LocalConnectionManager
class InMemoryStore:
def __init__(self): self._d = {}
async def register(self, a, c, s): self._d[(a, c)] = s
async def unregister(self, a, c): self._d.pop((a, c), None)
async def get_server(self, a, c): return self._d.get((a, c))
async def refresh(self, a, c): pass
store = InMemoryStore()
connection_mgr = LocalConnectionManager(store, server_id="pod-1")
app = FastAPI()
@app.websocket("/ws")
async def endpoint(ws: WebSocket, account_id: str, connection_id: str):
await ws.accept()
handler = BaseWebSocketHandler(
ws=ws, account_id=account_id, connection_id=connection_id,
connection_mgr=connection_mgr,
services={"myapp"},
default_subscriptions={"myapp:*"},
)
await connection_mgr.register(account_id, connection_id, ws, handler=handler)
await handler.on_connect()
handler.start_keepalive()
try:
while True:
await handler.handle_message(await ws.receive_text())
except WebSocketDisconnect:
pass
finally:
handler.stop_keepalive()
await handler.on_disconnect()
await connection_mgr.unregister(account_id, connection_id)
Push events from anywhere in the process:
from loopws import push_to_connection, push_to_account, push_to_channel
# To a single connection
result = await push_to_connection(
connection_mgr,
account_id="user-42",
connection_id="laptop-1",
service="myapp",
event="invoice.paid",
data={"invoice_id": 9001},
)
# result: "delivered" | "filtered" | "not_connected" | "send_failed"
# Fan-out across all of an account's local connections
await push_to_account(connection_mgr, "user-42", "myapp", "invoice.paid", {"invoice_id": 9001})
# Fan-out to all locally subscribed connections (any account)
await push_to_channel(connection_mgr, "myapp", "global.announcement", {"msg": "Maintenance at 2am"})
push_event — back-compat alias (0.3.1+)
If your call sites still pass a UUID for account_id or an int for the
connection identifier (the pre-0.3.0 shape), use push_event — it coerces
both to str for you so you can adopt 0.3.x without rewriting call sites.
from uuid import UUID
from loopws import push_event
# Accepts UUID + int — coerces internally before dispatching.
result = await push_event(
connection_mgr,
account_id=UUID("…"),
device_id=42, # int OK; coerced to "42"
service="myapp",
event="invoice.paid",
data={"invoice_id": 9001},
)
For a strict-typed counterpart with no coercion — useful when you've fully
migrated to the (str, str) contract and want a type-checker to flag any
regression — use push_event_strict. It's behaviourally equivalent to
push_to_connection; the distinct name lets a codebase standardize on the
push_event* vocabulary.
from loopws import push_event_strict
await push_event_strict(connection_mgr, "user-42", "laptop-1",
"myapp", "invoice.paid", {"invoice_id": 9001})
Multi-server with Redis
import redis.asyncio as aioredis
from loopws import PubSubManager
from loopws.redis import RedisConnectionManager, RedisPubSubBackend
redis_client = aioredis.from_url("redis://localhost:6379")
store = RedisConnectionManager(redis_client) # presence with TTL
backend = RedisPubSubBackend(redis_client) # publish + subscribe
pubsub = PubSubManager(backend, server_id="pod-1")
pubsub.set_message_handler(on_cross_server_message)
await pubsub.start()
PubSubManager runs a real subscriber loop that reconnects on Redis failures with exponential backoff + full jitter.
Documentation
- INTEGRATION_GUIDE.md — step-by-step integration for new consumers
- SPECIFICATION.md — full technical & functional spec
- CHANGELOG.md — release history
- UPGRADING.md — migration notes per release
- OPERATIONS_RUNBOOK.md — deploy and ops guide
License
MIT — see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file loopws-0.3.1.tar.gz.
File metadata
- Download URL: loopws-0.3.1.tar.gz
- Upload date:
- Size: 131.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2b39badc4310a482d9f5f41f42626ca1c33a79975677148fd2bcb07e1047646a
|
|
| MD5 |
fb7c6a95bf55cbaf5bffd665e1113100
|
|
| BLAKE2b-256 |
e82b4cefda2e32075a88d20d868d5c4cd907eaaf9554d9882ecd3103e537664a
|
File details
Details for the file loopws-0.3.1-py3-none-any.whl.
File metadata
- Download URL: loopws-0.3.1-py3-none-any.whl
- Upload date:
- Size: 19.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
97bff89ed4ddf2449bb95696ee98c524c3056e36e712d4ab79d6567560aeddef
|
|
| MD5 |
9b246082f4e3682bfb9b1ad07c6041fe
|
|
| BLAKE2b-256 |
cf39be3807173a9d1398bde1fffab14f48057d6f5c912c1008c0e7297c6f141f
|