WebSocket utilities for HawkAPI — rooms, broadcasting, Redis pub/sub, auth integration
Project description
hawkapi-websockets
WebSocket utilities for HawkAPI. Connection manager with rooms + broadcasting, optional Redis pub/sub backplane for multi-process fan-out, and a heartbeat monitor.
Install
pip install hawkapi-websockets # local-only (single-process)
pip install 'hawkapi-websockets[redis]' # + Redis backplane
Quickstart
from hawkapi import Depends, HawkAPI, WebSocket
from hawkapi_websockets import ConnectionManager, get_manager, init_websockets
app = HawkAPI()
init_websockets(app)
@app.websocket("/ws/{room}")
async def ws_room(websocket: WebSocket, room: str, m: ConnectionManager = Depends(get_manager)):
await websocket.accept()
conn = await m.connect(websocket, rooms=[room])
try:
await m.broadcast_json({"event": "joined", "id": conn.id}, room=room)
async for msg in websocket.iter_text():
await m.broadcast_text(msg, room=room, exclude=[conn.id])
finally:
await m.disconnect(conn.id)
await m.broadcast_json({"event": "left", "id": conn.id}, room=room)
Authentication & Security
WebSockets bypass CORS, so the browser's same-origin protections do not apply. A page on any site can open a WebSocket to your server and ride the user's cookies (Cross-Site WebSocket Hijacking, CSWSH). This library is secure-by-default where it can be and provides hooks where enforcement has to live in your app.
Validate the Origin (CSWSH)
Pass allowed_origins and call check_origin before accept():
m = ConnectionManager(allowed_origins={"https://app.example.com"})
# or: init_websockets(app, allowed_origins={"https://app.example.com"})
@app.websocket("/ws/{room}")
async def ws_room(websocket, room, m=Depends(get_manager)):
if not m.check_origin(websocket):
await websocket.close(code=4403) # forbidden
return
await websocket.accept()
...
When allowed_origins is None (the default) check_origin returns True and
performs no validation — you are responsible for enforcing it out of band.
Leaving it unset on a cookie-authenticated endpoint exposes you to CSWSH.
Authenticate the connection
Provide an on_connect hook. It runs inside connect() before the connection is
tracked; return False (or raise) to reject — connect() raises PermissionError.
Authenticate with a token sent in a header (e.g. Authorization /
Sec-WebSocket-Protocol), never in the query string — query strings leak into logs,
referrers, and proxies.
async def authenticate(websocket, metadata: dict) -> bool:
token = (getattr(websocket, "headers", {}) or {}).get("authorization", "")
user = await verify_token(token)
if user is None:
return False
metadata["user_id"] = user.id # mutate metadata to attach identity
return True
m = ConnectionManager(on_connect=authenticate)
Authorize rooms
room_validator is enforced for both join() and the initial rooms=[...]
passed to connect(). A denial raises PermissionError, so an unauthenticated
client can never enter a room it isn't allowed into.
DoS limits
-
max_connectionsdefaults to10_000. Set it toNoneonly if you have your own admission control —Nonemeans unbounded and is an A05 DoS risk. -
max_message_bytesdefaults to1_048_576(1 MiB). It is advisory: your receive loop must enforce it. Thereceive_text(conn)/receive_json(conn)helpers wrap the underlying receive, reject oversized frames (closing with code1009), and drop the connection:conn = await m.connect(websocket) while True: msg = await m.receive_text(conn) # raises ValueError if over the limit ...
Tenant isolation
require_room=Truemakes room-lessbroadcast_text/broadcast_jsonraiseValueError, preventing accidental cross-tenant fan-out.- The Redis backplane drops room-less messages by default (they would otherwise
broadcast to every connection). Set
bind_manager(bp, m, allow_global=True)— orinit_websockets(app, allow_global_broadcast=True)— only if you really want server-wide broadcasts.
Logging
Security-relevant events are logged on the hawkapi_websockets logger: info on
connect/disconnect, warning on Origin rejection, on_connect/room_validator
denial, oversized messages, dropped room-less backplane messages, and when
max_connections is hit.
Broadcasting
await m.broadcast_text("hi") # everyone
await m.broadcast_json({"event": "x"}, room="lobby") # one room
await m.broadcast_text("hi", exclude=[conn.id]) # skip the sender
await m.send_to(connection_id, {"private": True}) # direct
Failed sends auto-drop the broken connection, so a misbehaving client never blocks the broadcast.
Redis backplane
When you run multiple worker processes, broadcasts must travel across them. Plug in the Redis backplane and every publish() is fanned out to every replica's ConnectionManager.
from hawkapi_websockets import RedisBackplane, init_websockets
backplane = RedisBackplane(url="redis://localhost:6379/0", channel="hawkapi:ws")
m = init_websockets(app, backplane=backplane)
# Anywhere in your code:
await backplane.publish({"kind": "json", "room": "lobby", "payload": {"event": "tick"}})
The backplane subscribes during app.on_startup and shuts down with the app.
Heartbeat
from hawkapi_websockets import HeartbeatConfig, HeartbeatMonitor
monitor = HeartbeatMonitor(manager=m, config=HeartbeatConfig(interval_seconds=30, timeout_seconds=90))
monitor.start() # background task pings every interval, drops stale connections
@app.websocket("/ws")
async def ws(websocket):
await websocket.accept()
conn = await m.connect(websocket)
monitor.touch(conn.id)
try:
async for msg in websocket.iter_text():
monitor.touch(conn.id) # client liveness signal
...
finally:
await m.disconnect(conn.id)
Testing
The manager works with any object implementing WebSocketLike (send_text/send_bytes/close), so you can test broadcast logic without a real WebSocket connection.
class FakeWS:
def __init__(self): self.sent: list[str] = []
async def send_text(self, data): self.sent.append(data)
async def send_bytes(self, data): ...
async def close(self, code=1000): ...
async def test_chat_broadcast():
m = ConnectionManager()
a, b = FakeWS(), FakeWS()
await m.connect(a, connection_id="alice", rooms=["lobby"])
await m.connect(b, connection_id="bob", rooms=["lobby"])
await m.broadcast_text("hi", room="lobby", exclude=["alice"])
assert b.sent == ["hi"]
Development
git clone https://github.com/Hawk-API/hawkapi-websockets.git
cd hawkapi-websockets
uv sync --extra dev
uv run pytest -q
uv run ruff check . && uv run ruff format --check .
uv run pyright src/
License
MIT.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file hawkapi_websockets-0.3.0.tar.gz.
File metadata
- Download URL: hawkapi_websockets-0.3.0.tar.gz
- Upload date:
- Size: 26.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
faa903fdca7efeb8e676e0372012e2004c09f9972bc77bd8255f5e1aa821c583
|
|
| MD5 |
40793c1fd71ce326273c02213d9a10a1
|
|
| BLAKE2b-256 |
7259d320ba730ad4e3c0558c79960925d8c745bebf11abf454e5c64522bf5610
|
Provenance
The following attestation bundles were made for hawkapi_websockets-0.3.0.tar.gz:
Publisher:
release.yml on Hawk-API/hawkapi-websockets
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
hawkapi_websockets-0.3.0.tar.gz -
Subject digest:
faa903fdca7efeb8e676e0372012e2004c09f9972bc77bd8255f5e1aa821c583 - Sigstore transparency entry: 1778528660
- Sigstore integration time:
-
Permalink:
Hawk-API/hawkapi-websockets@fc370e43af97181076d1705a8f13b820041abfd9 -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/Hawk-API
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@fc370e43af97181076d1705a8f13b820041abfd9 -
Trigger Event:
release
-
Statement type:
File details
Details for the file hawkapi_websockets-0.3.0-py3-none-any.whl.
File metadata
- Download URL: hawkapi_websockets-0.3.0-py3-none-any.whl
- Upload date:
- Size: 14.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c4d4b17d34f9cc287a1e3561048cef61707fdd36f695a0f475b67ecb8da03cc2
|
|
| MD5 |
3aa96584ff0499fa9a702302f380d7e1
|
|
| BLAKE2b-256 |
b1c524d010e4d4ad238d29deabfafdd269316fdb2a62303f291b78d2174ee0f4
|
Provenance
The following attestation bundles were made for hawkapi_websockets-0.3.0-py3-none-any.whl:
Publisher:
release.yml on Hawk-API/hawkapi-websockets
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
hawkapi_websockets-0.3.0-py3-none-any.whl -
Subject digest:
c4d4b17d34f9cc287a1e3561048cef61707fdd36f695a0f475b67ecb8da03cc2 - Sigstore transparency entry: 1778528773
- Sigstore integration time:
-
Permalink:
Hawk-API/hawkapi-websockets@fc370e43af97181076d1705a8f13b820041abfd9 -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/Hawk-API
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@fc370e43af97181076d1705a8f13b820041abfd9 -
Trigger Event:
release
-
Statement type: