Skip to main content

WebSocket event bus — real-time event push over persistent connections

Project description

loopws

A standalone WebSocket event bus for Python — real-time event push over persistent connections. Pure Python + asyncio, zero required dependencies, pluggable Redis backend for multi-server deployments.

What it is

loopws is a library, not a service. You bring your own WebSocket framework (FastAPI, Starlette, aiohttp, anything that satisfies its structural protocol) and your own presence store (in-memory for dev, Redis for prod). loopws provides:

  • A per-connection handler with built-in dispatch, ping/pong keepalive, subscribe/unsubscribe, and lifecycle hooks.
  • A local connection registry that tracks which connections are on this server.
  • A subscription matcher with service:event patterns (loopbooks:*, *:invoice.paid, etc.).
  • In-process push helpers — push_to_connection, push_to_account, push_to_channel, plus the back-compat push_event / push_event_strict aliases (0.3.1+).
  • A cross-server pub/sub manager with exponential-backoff-with-jitter reconnect.
  • A frozen-dataclass LoopWSConfig that bundles every timing tunable.

Install

pip install loopws                 # core only, zero dependencies
pip install loopws[redis]          # with Redis adapter
pip install loopws[dev]            # with test suite extras

Requires Python 3.11+.

Connection identity

Every connection is identified by (account_id, connection_id), both strings chosen by the consumer:

Field Purpose
account_id Tenant / user / customer identity (UUID, session token, customer ID — any string)
connection_id Unique-within-account handle (device serial, browser tab UUID, sensor MAC, etc.)

This works for multi-device authenticated apps (a user with phone + laptop), anonymous browser sessions (one ID per tab), IoT (one ID per sensor), and server-to-server use (one ID per worker instance).

Minimal example (FastAPI)

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from loopws import BaseWebSocketHandler, LocalConnectionManager

class InMemoryStore:
    def __init__(self): self._d = {}
    async def register(self, a, c, s): self._d[(a, c)] = s
    async def unregister(self, a, c):  self._d.pop((a, c), None)
    async def get_server(self, a, c):  return self._d.get((a, c))
    async def refresh(self, a, c):     pass

store = InMemoryStore()
connection_mgr = LocalConnectionManager(store, server_id="pod-1")
app = FastAPI()

@app.websocket("/ws")
async def endpoint(ws: WebSocket, account_id: str, connection_id: str):
    await ws.accept()
    handler = BaseWebSocketHandler(
        ws=ws, account_id=account_id, connection_id=connection_id,
        connection_mgr=connection_mgr,
        services={"myapp"},
        default_subscriptions={"myapp:*"},
    )
    await connection_mgr.register(account_id, connection_id, ws, handler=handler)
    await handler.on_connect()
    handler.start_keepalive()
    try:
        while True:
            await handler.handle_message(await ws.receive_text())
    except WebSocketDisconnect:
        pass
    finally:
        handler.stop_keepalive()
        await handler.on_disconnect()
        await connection_mgr.unregister(account_id, connection_id)

Push events from anywhere in the process:

from loopws import push_to_connection, push_to_account, push_to_channel

# To a single connection
result = await push_to_connection(
    connection_mgr,
    account_id="user-42",
    connection_id="laptop-1",
    service="myapp",
    event="invoice.paid",
    data={"invoice_id": 9001},
)
# result: "delivered" | "filtered" | "not_connected" | "send_failed"

# Fan-out across all of an account's local connections
await push_to_account(connection_mgr, "user-42", "myapp", "invoice.paid", {"invoice_id": 9001})

# Fan-out to all locally subscribed connections (any account)
await push_to_channel(connection_mgr, "myapp", "global.announcement", {"msg": "Maintenance at 2am"})

push_event — back-compat alias (0.3.1+)

If your call sites still pass a UUID for account_id or an int for the connection identifier (the pre-0.3.0 shape), use push_event — it coerces both to str for you so you can adopt 0.3.x without rewriting call sites.

from uuid import UUID
from loopws import push_event

# Accepts UUID + int — coerces internally before dispatching.
result = await push_event(
    connection_mgr,
    account_id=UUID("…"),
    device_id=42,           # int OK; coerced to "42"
    service="myapp",
    event="invoice.paid",
    data={"invoice_id": 9001},
)

For a strict-typed counterpart with no coercion — useful when you've fully migrated to the (str, str) contract and want a type-checker to flag any regression — use push_event_strict. It's behaviourally equivalent to push_to_connection; the distinct name lets a codebase standardize on the push_event* vocabulary.

from loopws import push_event_strict

await push_event_strict(connection_mgr, "user-42", "laptop-1",
                        "myapp", "invoice.paid", {"invoice_id": 9001})

Multi-server with Redis

import redis.asyncio as aioredis
from loopws import PubSubManager
from loopws.redis import RedisConnectionManager, RedisPubSubBackend

redis_client = aioredis.from_url("redis://localhost:6379")
store   = RedisConnectionManager(redis_client)          # presence with TTL
backend = RedisPubSubBackend(redis_client)              # publish + subscribe

pubsub = PubSubManager(backend, server_id="pod-1")
pubsub.set_message_handler(on_cross_server_message)
await pubsub.start()

PubSubManager runs a real subscriber loop that reconnects on Redis failures with exponential backoff + full jitter.

Documentation

License

MIT — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

loopws-0.3.1.tar.gz (131.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

loopws-0.3.1-py3-none-any.whl (19.8 kB view details)

Uploaded Python 3

File details

Details for the file loopws-0.3.1.tar.gz.

File metadata

  • Download URL: loopws-0.3.1.tar.gz
  • Upload date:
  • Size: 131.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for loopws-0.3.1.tar.gz
Algorithm Hash digest
SHA256 2b39badc4310a482d9f5f41f42626ca1c33a79975677148fd2bcb07e1047646a
MD5 fb7c6a95bf55cbaf5bffd665e1113100
BLAKE2b-256 e82b4cefda2e32075a88d20d868d5c4cd907eaaf9554d9882ecd3103e537664a

See more details on using hashes here.

File details

Details for the file loopws-0.3.1-py3-none-any.whl.

File metadata

  • Download URL: loopws-0.3.1-py3-none-any.whl
  • Upload date:
  • Size: 19.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for loopws-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 97bff89ed4ddf2449bb95696ee98c524c3056e36e712d4ab79d6567560aeddef
MD5 9b246082f4e3682bfb9b1ad07c6041fe
BLAKE2b-256 cf39be3807173a9d1398bde1fffab14f48057d6f5c912c1008c0e7297c6f141f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page