Skip to main content

Bidirectional WAMP RPC and PubSub for FastAPI without a separate router

Project description

fastapi-headless-wamp

PyPI PyPI - Downloads PyPI - Python Version Tests License Ruff

fastapi-headless-wamp logo

Full bidirectional WAMP (Web Application Messaging Protocol) RPC and PubSub for FastAPIwithout a separate WAMP router.

Each WebSocket connection gets its own isolated peer-to-peer session between the server and the client. There is no shared broker or dealer infrastructure. This makes the library lightweight, easy to deploy, and ideal for applications that need structured bidirectional communication over WebSockets.

Features

  • Bidirectional RPC — register server-side procedures that clients can call, and call client-registered procedures from the server.
  • PubSub — subscribe to topics and publish events in both directions (server-to-client and client-to-server).
  • Progressive results — stream intermediate results from long-running RPCs.
  • Progressive call invocations — clients can stream input chunks to the server.
  • Call cancellation — cancel in-flight RPCs from either side.
  • Class-based services — group related RPCs and subscriptions into service classes with a URI prefix.
  • Pluggable serialization — JSON and CBOR built-in, extensible to other formats.
  • Typed — fully typed with strict linting via ruff.
  • wampy.js compatible — designed to work with wampy.js and other WAMP clients.

Architecture

┌──────────────┐  WebSocket  ┌───────────────────────┐
│  WAMP Client │◄───────────►│  FastAPI Application  │
│  (wampy.js)  │  wamp.2.json│                       │
└──────────────┘             │  WampHub              │
                             │  ├─ session 1 (P2P)   │
┌──────────────┐  WebSocket  │  ├─ session 2 (P2P)   │
│  WAMP Client │◄───────────►│  └─ session N (P2P)   │
└──────────────┘             └───────────────────────┘

Each WebSocket connection creates an isolated peer-to-peer WAMP session. There is no shared router — the server and client communicate directly within the session. This means:

  • No external WAMP router (like Crossbar.io) is required.
  • Each session is fully independent: client-registered RPCs and subscriptions are scoped to that session.
  • Server-side RPCs and subscriptions (registered via decorators) are shared across all sessions.

Installation

pip install fastapi-headless-wamp

With binary serialization (CBOR and/or MsgPack):

pip install fastapi-headless-wamp[cbor]
pip install fastapi-headless-wamp[msgpack]

Quick Start

Standalone Decorators

from fastapi import FastAPI
from fastapi_headless_wamp import WampHub, WampSession

app = FastAPI()
wamp = WampHub(realm="realm1")

# Register a server-side RPC
@wamp.register("com.example.add")
async def add(session: WampSession, a: int, b: int) -> int:
    return a + b

# Mount the WAMP WebSocket endpoint
app.include_router(wamp.get_router(path="/ws"))

Run with uvicorn:

uvicorn myapp:app --host 0.0.0.0 --port 8000

Clients connect via WebSocket to ws://localhost:8000/ws using the wamp.2.json subprotocol and can call com.example.add.

Class-Based Services

from fastapi import FastAPI
from fastapi_headless_wamp import WampHub, WampService, WampSession, rpc

app = FastAPI()
wamp = WampHub(realm="realm1")

class MathService(WampService):
    prefix = "com.example.math"

    @rpc()
    async def add(self, session: WampSession, *, a: int, b: int) -> int:
        return a + b

    @rpc("multiply")
    async def mul(self, session: WampSession, *, a: int, b: int) -> int:
        return a * b

wamp.register_service(MathService())
app.include_router(wamp.get_router(path="/ws"))

Clients can now call com.example.math.add and com.example.math.multiply.

Manual WebSocket Handler

If you need more control, use handle_websocket directly:

from fastapi import FastAPI, WebSocket
from fastapi_headless_wamp import WampHub, WampSession

app = FastAPI()
wamp = WampHub(realm="realm1")

@wamp.register("com.example.greet")
async def greet(session: WampSession, name: str) -> str:
    return f"Hello, {name}!"

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket) -> None:
    await wamp.handle_websocket(websocket)

Bidirectional RPC

The server can call RPCs that clients have registered on their session:

from fastapi_headless_wamp import WampHub, WampSession

wamp = WampHub(realm="realm1")

@wamp.on_session_open
async def on_open(session: WampSession) -> None:
    # Call a client-registered procedure
    result = await session.call("com.client.get_status")
    print(f"Client status: {result}")

PubSub

Server publishes to clients

@wamp.on_session_open
async def on_open(session: WampSession) -> None:
    # Publish to this specific session (if subscribed)
    await session.publish("com.example.news", args=["breaking update"])

    # Publish to all connected sessions that are subscribed
    await wamp.publish_to_all("com.example.news", args=["broadcast"])

Server handles client publishes

@wamp.subscribe("com.example.chat")
async def on_chat(session: WampSession, *args, **kwargs) -> None:
    message = args[0] if args else kwargs.get("message", "")
    print(f"Chat from session {session.session_id}: {message}")

Progressive Results

Server RPC handlers can stream intermediate results:

from typing import Any
from collections.abc import Awaitable, Callable

@wamp.register("com.example.long_task")
async def long_task(
    session: WampSession,
    n: int,
    _progress: Callable[[Any], Awaitable[None]] | None = None,
) -> str:
    for i in range(n):
        if _progress is not None:
            await _progress(f"Step {i + 1}/{n}")
    return "done"

The client receives each progressive result as it is produced, then the final result.

Call Cancellation

Clients can cancel in-flight RPCs. The server supports three cancel modes:

  • kill (default) — cancel the handler's asyncio task.
  • skip — stop waiting for the result, let the handler keep running.
  • killnowait — cancel the task and respond immediately.

The server can also cancel calls to clients:

@wamp.on_session_open
async def on_open(session: WampSession) -> None:
    # Start a call, then cancel it
    import asyncio
    task = asyncio.create_task(session.call("com.client.slow_op"))
    await asyncio.sleep(1)
    await session.cancel(1)  # cancel by request ID

Session Lifecycle Callbacks

@wamp.on_session_open
async def on_open(session: WampSession) -> None:
    print(f"Session {session.session_id} connected")

@wamp.on_session_close
async def on_close(session: WampSession) -> None:
    print(f"Session {session.session_id} disconnected")

Serialization

Built-in serializers

Serializer Subprotocol Format Install
JsonSerializer wamp.2.json Text (UTF-8) included
CborSerializer wamp.2.cbor Binary pip install fastapi-headless-wamp[cbor]
MsgpackSerializer wamp.2.msgpack Binary pip install fastapi-headless-wamp[msgpack]

Both are registered automatically at import time (CBOR only when cbor2 is installed). The server advertises all available subprotocols during the WebSocket handshake and the client picks one.

Both serializers handle Pydantic models, dataclasses, datetime, Decimal, UUID, set/frozenset, and bytes out of the box — RPC handlers can return these types directly without manual serialization.

CBOR preserves native types (e.g. datetime stays a datetime, not a string), while JSON converts them to string representations.

Custom serializers

To add another serializer (e.g. MsgPack), implement the Serializer protocol:

from fastapi_headless_wamp import Serializer, register_serializer

class MsgpackSerializer:
    @property
    def protocol(self) -> str:
        return "msgpack"

    @property
    def is_binary(self) -> bool:
        return True

    def encode(self, data: list) -> bytes:
        import msgpack
        return msgpack.packb(data)

    def decode(self, data: str | bytes) -> list:
        import msgpack
        return msgpack.unpackb(data, raw=False)

register_serializer(MsgpackSerializer())

Clients can then connect with the wamp.2.msgpack subprotocol.

Clients Compatibility

This library is designed to be compatible with wampy.js and other standard WAMP clients. The server advertises the following WAMP role features:

Dealer features:

  • progressive_call_results
  • call_canceling
  • caller_identification
  • call_timeout

Broker features:

  • publisher_identification
  • publisher_exclusion
  • subscriber_blackwhite_listing

API Reference

WampHub(realm="realm1")

The central hub that manages WAMP sessions. Key methods and decorators:

Method / Decorator Description
@hub.register(uri) Register a server-side RPC handler
@hub.subscribe(topic) Register a server-side subscription handler
hub.register_service(service) Register a WampService instance
@hub.on_session_open Callback when a session completes handshake
@hub.on_session_close Callback when a session disconnects
hub.handle_websocket(ws) Full async WebSocket handler
hub.get_router(path="/ws") Get a FastAPI APIRouter with the WAMP endpoint
hub.publish_to_all(topic, ...) Publish an event to all subscribed sessions
hub.sessions Dict of active sessions (session_id -> WampSession)
hub.session_count Number of active sessions

WampSession

Represents a single WAMP session. Key methods:

Method Description
session.call(uri, args, kwargs, timeout) Call a client-registered RPC
session.cancel(request_id, mode) Cancel a pending call to a client
session.publish(topic, args, kwargs) Publish an event to this client

WampService

Base class for grouping RPCs with a URI prefix. Use @rpc() and @subscribe() to mark methods.

Error Classes

Exception WAMP Error URI
WampError (base class)
WampNoSuchProcedure wamp.error.no_such_procedure
WampNoSuchSubscription wamp.error.no_such_subscription
WampRuntimeError wamp.error.runtime_error
WampCallTimeout wamp.error.canceled
WampCanceled wamp.error.canceled
WampProcedureAlreadyExists wamp.error.procedure_already_exists
WampProtocolError (protocol-level error)
WampInvalidMessage (message validation error)

License

Apache 2.0 — see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastapi_headless_wamp-0.3.0.tar.gz (158.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastapi_headless_wamp-0.3.0-py3-none-any.whl (36.1 kB view details)

Uploaded Python 3

File details

Details for the file fastapi_headless_wamp-0.3.0.tar.gz.

File metadata

  • Download URL: fastapi_headless_wamp-0.3.0.tar.gz
  • Upload date:
  • Size: 158.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fastapi_headless_wamp-0.3.0.tar.gz
Algorithm Hash digest
SHA256 d330f7149d53085f4ed9bd46959e31a8890d6133f0eb581bb1d6f74fb3769263
MD5 5db3d1777c45b2d75681868acbb28fa0
BLAKE2b-256 d9cb73508eb00d0aeeceec123cee4549fe51995bd2fb824f81a1189102a7334f

See more details on using hashes here.

Provenance

The following attestation bundles were made for fastapi_headless_wamp-0.3.0.tar.gz:

Publisher: publish.yml on KSDaemon/fastapi-headless-wamp

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file fastapi_headless_wamp-0.3.0-py3-none-any.whl.

File metadata

File hashes

Hashes for fastapi_headless_wamp-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ae3fafc0869d30b994fd90c89db9c2bdf20efc75d357e210ea7a3fd38c9ac996
MD5 8cc3bf4e3534f4237d0f22ec90559a7d
BLAKE2b-256 b926f0cfcfcb0b8075ccebb0756f93db7831d1ba4644791b0ee1a2049443ee52

See more details on using hashes here.

Provenance

The following attestation bundles were made for fastapi_headless_wamp-0.3.0-py3-none-any.whl:

Publisher: publish.yml on KSDaemon/fastapi-headless-wamp

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page