Skip to main content

WebSocket event bus — real-time event push over persistent connections

Project description

loopws

A standalone WebSocket event bus for Python — real-time event push over persistent connections. Pure Python + asyncio, zero required dependencies, pluggable Redis backend for multi-server deployments.

What it is

loopws is a library, not a service. You bring your own WebSocket framework (FastAPI, Starlette, aiohttp, anything that satisfies its structural protocol) and your own presence store (in-memory for dev, Redis for prod). loopws provides:

  • A per-connection handler with built-in dispatch, ping/pong keepalive, subscribe/unsubscribe, and lifecycle hooks.
  • A local connection registry that tracks which connections are on this server.
  • A subscription matcher with service:event patterns (loopbooks:*, *:invoice.paid, etc.).
  • In-process push helpers — push_to_connection, push_to_account, push_to_channel.
  • A cross-server pub/sub manager with exponential-backoff-with-jitter reconnect.
  • A frozen-dataclass LoopWSConfig that bundles every timing tunable.

Install

pip install loopws                 # core only, zero dependencies
pip install loopws[redis]          # with Redis adapter
pip install loopws[dev]            # with test suite extras

Requires Python 3.11+.

Connection identity

Every connection is identified by (account_id, connection_id), both strings chosen by the consumer:

Field Purpose
account_id Tenant / user / customer identity (UUID, session token, customer ID — any string)
connection_id Unique-within-account handle (device serial, browser tab UUID, sensor MAC, etc.)

This works for multi-device authenticated apps (a user with phone + laptop), anonymous browser sessions (one ID per tab), IoT (one ID per sensor), and server-to-server use (one ID per worker instance).

Minimal example (FastAPI)

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from loopws import BaseWebSocketHandler, LocalConnectionManager

class InMemoryStore:
    def __init__(self): self._d = {}
    async def register(self, a, c, s): self._d[(a, c)] = s
    async def unregister(self, a, c):  self._d.pop((a, c), None)
    async def get_server(self, a, c):  return self._d.get((a, c))
    async def refresh(self, a, c):     pass

store = InMemoryStore()
connection_mgr = LocalConnectionManager(store, server_id="pod-1")
app = FastAPI()

@app.websocket("/ws")
async def endpoint(ws: WebSocket, account_id: str, connection_id: str):
    await ws.accept()
    handler = BaseWebSocketHandler(
        ws=ws, account_id=account_id, connection_id=connection_id,
        connection_mgr=connection_mgr,
        services={"myapp"},
        default_subscriptions={"myapp:*"},
    )
    await connection_mgr.register(account_id, connection_id, ws, handler=handler)
    await handler.on_connect()
    handler.start_keepalive()
    try:
        while True:
            await handler.handle_message(await ws.receive_text())
    except WebSocketDisconnect:
        pass
    finally:
        handler.stop_keepalive()
        await handler.on_disconnect()
        await connection_mgr.unregister(account_id, connection_id)

Push events from anywhere in the process:

from loopws import push_to_connection, push_to_account, push_to_channel

# To a single connection
result = await push_to_connection(
    connection_mgr,
    account_id="user-42",
    connection_id="laptop-1",
    service="myapp",
    event="invoice.paid",
    data={"invoice_id": 9001},
)
# result: "delivered" | "filtered" | "not_connected" | "send_failed"

# Fan-out across all of an account's local connections
await push_to_account(connection_mgr, "user-42", "myapp", "invoice.paid", {"invoice_id": 9001})

# Fan-out to all locally subscribed connections (any account)
await push_to_channel(connection_mgr, "myapp", "global.announcement", {"msg": "Maintenance at 2am"})

Multi-server with Redis

import redis.asyncio as aioredis
from loopws import PubSubManager
from loopws.redis import RedisConnectionManager, RedisPubSubBackend

redis_client = aioredis.from_url("redis://localhost:6379")
store   = RedisConnectionManager(redis_client)          # presence with TTL
backend = RedisPubSubBackend(redis_client)              # publish + subscribe

pubsub = PubSubManager(backend, server_id="pod-1")
pubsub.set_message_handler(on_cross_server_message)
await pubsub.start()

PubSubManager runs a real subscriber loop that reconnects on Redis failures with exponential backoff + full jitter.

Documentation

License

MIT — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

loopws-0.3.0-py3-none-any.whl (18.6 kB view details)

Uploaded Python 3

File details

Details for the file loopws-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: loopws-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 18.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for loopws-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 fa6b9a3d39e9cc217fec765f942a85df65f223e5cecccc7407b376e480dcdcc8
MD5 7ad0b4816731779d40d553d6c93f3090
BLAKE2b-256 81af4eb65cc5e202b91f96f41e6d51966d5590d3e3826da2f4752f2882bc6126

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page