A lightweight WebSocket router for FastAPI, fully compatible with the ObservableSocket (JS/TS) client.
Project description
fastapi_observable_socket
A tiny, ergonomic WebSocket router for FastAPI — designed as the backend counterpart of the
ObservableSocket TypeScript client.
Together, they offer a minimal, predictable request/response layer for real‑time apps without the complexity of RPC frameworks.
✨ Features
- Route by
route(akapath) withuuidcorrelation - Per‑route hooks:
data_check,access,hydrate,dehydrate - Typed message envelopes (
Request,Response) - Graceful error mapping (400/403/404/500)
- Optional PING→PONG heartbeat
- Fully compatible with ObservableSocket (JS/TS)
🚀 Installation
pip install fastapi-observable-socket
Requires FastAPI ≥ 0.115 and Python ≥ 3.11.
🧭 Message Schema
// Request
{
"uuid": 123,
"route": "math/sum",
"headers": { "x-user": "42" },
"payload": [1, 2, 3, 4]
}
// Response
{
"uuid": 123,
"status": 200,
"headers": {"unit":"test"},
"payload": {"sum":10}
}
🧩 Quickstart
from fastapi import FastAPI
from fastapi_observable_socket import SocketRouter, Status
app = FastAPI()
router = SocketRouter()
@router.route("math/sum")
async def sum_handler(ws, headers, payload):
return {"status": Status.OK, "payload": {"sum": sum(payload or [])}}
app.add_websocket_route("/ws", router)
⚙️ Route Options
Each route can define:
-
data_check(message[, local]) → bool
Early validation. May mutatelocal(a dict) to store lightweight cached values. -
access(ws, message[, local]) → bool
Access logic. May fetch data (DB, external source) and store it inlocal. -
hydrate(ws, message) → Any
Builds handler arguments. If defined, it overrides any cached value inlocal. -
dehydrate(payload) → Any
Final output transformation (serialization, shaping, redaction, etc.).
🧩 Handler Argument Behavior
Depending on hooks:
| Situation | Handler receives |
|---|---|
No hydrate, no cached data |
(ws, headers, payload) |
Cached value stored in local["value"] |
handler receives that cached value |
hydrate is defined |
handler receives hydrate's return value (highest priority) |
General rule:
hydrate> cachedlocal>(ws, headers, payload).
🔥 Example — Fetch Once, Use Twice
This example shows how to validate, authorize, fetch an article once, and reuse it later.
from fastapi import FastAPI
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from starlette.websockets import WebSocket
from fastapi_observable_socket import (
MessageData, SocketRouter, HandlerResult, Status
)
from .models import Article
from .db import get_async_db
app = FastAPI()
router = SocketRouter()
async def data_check_article(message: MessageData, local: dict) -> bool:
try:
local["article_id"] = int(message.headers.get("article"))
except (TypeError, ValueError):
return False
return True
async def access_article(ws: WebSocket, message: MessageData, local: dict) -> bool:
db: AsyncSession = ws.scope["db"]
article_id = local["article_id"]
result = await db.execute(select(Article).where(Article.id == article_id))
article = result.scalar_one_or_none()
if article is None:
local["value"] = None
return True
local["value"] = article # cached for later
user = ws.scope.get("user")
if article.access == "PUBLIC":
return True
if article.access == "MEMBERS":
return user is not None
return article.author == user
async def article_to_json(article:Article) -> dict:
return {
"title": article.title,
"author": article.author.name,
"body": article.content,
}
# hydrate is omitted intentionally — cached article will be used
@router.route("get-article", options={
"data_check": data_check_article,
"access": access_article,
"dehydrate": article_to_json
})
async def get_article(local: dict) -> HandlerResult:
if local["value"] is None:
return {"status": Status.NOT_FOUND, "payload": {"message": "not found"}}
return {
"status": Status.OK,
"payload": local["value"]
}
@app.websocket("/ws")
async def websocket_router(ws: WebSocket):
# Attach DB, user, etc. into ws.scope beforehand
router(ws)
What happens:
data_check_articlevalidates header & storesarticle_idaccess_articleloads the article once, caches it inlocal["value"], checks access- Because no
hydrateis defined, handler receives the cached article - if article is fetched when checking access,
get_article()returns status 200 and that article, else it returns status 404 and not found message - if
get_article()returns status 200 (200 <= status < 300), then the payload part which is the actual article will be sent toarticle_to_json()and serialized there. the result will replace output payload.
🔢 Status Codes
| Code | Meaning |
|---|---|
| 200 | OK |
| 400 | BAD_REQUEST |
| 403 | FORBIDDEN |
| 404 | NOT_FOUND |
| 500 | INTERNAL_SERVER_ERROR |
| 402 | PENDING |
🧰 Python Compatibility
| Python | Behavior |
|---|---|
| 3.13+ | Uses native asyncio.Queue.shutdown() |
| 3.11–3.12 | Uses internal compat shim with sentinel shutdown |
Zero API differences.
🧠 Why This Package?
| Package | Model | Feature | Client Story |
|---|---|---|---|
| fastapi_observable_socket | Route + UUID | Lightweight, hookable, ObservableSocket support | JS/TS client |
| fastapi-websocket-rpc | JSON-RPC | Full RPC | Python client |
| fastapi-websocket-pubsub | PubSub | Multicast topics | Python client |
| fastapi-ws-router | Typed events | Pydantic unions | No official client |
A middle ground between raw Starlette WebSockets and heavy RPC systems.
📦 License
MIT
Related
- Frontend: https://www.npmjs.com/package/@djanext/observable-socket
- Backend: this repository
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fastapi_observable_socket-1.1.1.tar.gz.
File metadata
- Download URL: fastapi_observable_socket-1.1.1.tar.gz
- Upload date:
- Size: 11.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d9556d051e73ca16089b2fbdf4eb98739f7163393674659203051aa6a6644631
|
|
| MD5 |
ba474150d0db879c33b4b32b71c80436
|
|
| BLAKE2b-256 |
2b6a6edc2d8835d220b169b54213aab8218b7fdd8a23cfc19bac6fdd8cf43b85
|
File details
Details for the file fastapi_observable_socket-1.1.1-py3-none-any.whl.
File metadata
- Download URL: fastapi_observable_socket-1.1.1-py3-none-any.whl
- Upload date:
- Size: 9.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4fbc3719077cef51637084cbdc3f5a1ab1adfd4f2fcfcf7149854e342f550a01
|
|
| MD5 |
4f94ea51cbfb2f8233b1ee86b8f4d4a5
|
|
| BLAKE2b-256 |
439330507446af4eecf160ceea5954604469531a6fe5873cadd740d751d8455d
|