Skip to main content

WebSocket event bus — real-time event push over persistent connections

Project description

loopws

A standalone WebSocket event bus for Python — real-time event push over persistent connections. Pure Python + asyncio, zero required dependencies, pluggable Redis backend for multi-server deployments.

What it is

loopws is a library, not a service. You bring your own WebSocket framework (FastAPI, Starlette, aiohttp, anything that satisfies its structural protocol) and your own presence store (in-memory for dev, Redis for prod). loopws provides:

  • A per-connection handler with built-in dispatch, ping/pong keepalive, subscribe/unsubscribe, and lifecycle hooks.
  • A local connection registry that tracks which devices are on this server.
  • A subscription matcher with service:event patterns (loopbooks:*, *:invoice.paid, etc.).
  • An in-process push_event helper for same-server delivery.
  • A cross-server pub/sub manager with exponential-backoff-with-jitter reconnect.
  • A frozen-dataclass LoopWSConfig that bundles every timing tunable.

Install

pip install loopws                 # core only, zero dependencies
pip install loopws[redis]          # with Redis adapter
pip install loopws[dev]            # with test suite extras

Requires Python 3.11+.

Minimal example (FastAPI)

from uuid import UUID
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from loopws import BaseWebSocketHandler, LocalConnectionManager

class InMemoryStore:
    def __init__(self): self._d = {}
    async def register(self, a, d, s): self._d[(a, d)] = s
    async def unregister(self, a, d):  self._d.pop((a, d), None)
    async def get_server(self, a, d):  return self._d.get((a, d))
    async def refresh(self, a, d):     pass

store = InMemoryStore()
connection_mgr = LocalConnectionManager(store, server_id="pod-1")
app = FastAPI()

@app.websocket("/ws")
async def endpoint(ws: WebSocket, account_id: UUID, device_id: int):
    await ws.accept()
    handler = BaseWebSocketHandler(
        ws=ws, account_id=account_id, device_id=device_id,
        connection_mgr=connection_mgr,
        services={"myapp"},
        default_subscriptions={"myapp:*"},
    )
    await connection_mgr.register(account_id, device_id, ws, handler=handler)
    await handler.on_connect()
    handler.start_keepalive()
    try:
        while True:
            await handler.handle_message(await ws.receive_text())
    except WebSocketDisconnect:
        pass
    finally:
        handler.stop_keepalive()
        await handler.on_disconnect()
        await connection_mgr.unregister(account_id, device_id)

Push events from anywhere in the process:

from loopws import push_event

result = await push_event(
    connection_mgr,
    account_id=user_id,
    device_id=42,
    service="myapp",
    event="invoice.paid",
    data={"invoice_id": 9001},
)
# result: "delivered" | "filtered" | "not_connected" | "send_failed"

Multi-server with Redis

import redis.asyncio as aioredis
from loopws import PubSubManager
from loopws.redis import RedisConnectionManager, RedisPubSubBackend

redis_client = aioredis.from_url("redis://localhost:6379")
store   = RedisConnectionManager(redis_client)          # presence with TTL
backend = RedisPubSubBackend(redis_client)              # publish + subscribe

pubsub = PubSubManager(backend, server_id="pod-1")
pubsub.set_message_handler(on_cross_server_message)
await pubsub.start()

PubSubManager runs a real subscriber loop that reconnects on Redis failures with exponential backoff + full jitter.

Documentation

License

MIT — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

loopws-0.2.0.tar.gz (62.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

loopws-0.2.0-py3-none-any.whl (18.2 kB view details)

Uploaded Python 3

File details

Details for the file loopws-0.2.0.tar.gz.

File metadata

  • Download URL: loopws-0.2.0.tar.gz
  • Upload date:
  • Size: 62.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for loopws-0.2.0.tar.gz
Algorithm Hash digest
SHA256 1d831a5e829b8dca2bb4fa3e2ed09e14ef02eab0a0fbd5291cf6ceae9b1fefd3
MD5 f8def5c26924ba741b18f4961e510f2e
BLAKE2b-256 1b8355a036100d10705f8fd0c00ad22a0a49d03260e1f262c3f0dc72b9cf4998

See more details on using hashes here.

File details

Details for the file loopws-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: loopws-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 18.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for loopws-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d671b39b9bffa0151ebf0725e1ac80e3aa386a350c134b7b76d35db64648fe25
MD5 155ea0386b7bbaf5ae7d2a0fbc6d8e94
BLAKE2b-256 c7e5cb394b0f288b8295cc603bc24c680f369d6aa6344e14509cab34f5551fee

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page