Skip to main content

FastWS framework. A WebSocket wrapper around FastAPI with auto-documentation using AsyncAPI.

Project description

FastWS

FastWS Logo

Source Code: https://github.com/endrekrohn/fastws


FastWS is a wrapper around FastAPI to create better WebSocket applications with auto-documentation using AsyncAPI, in a similar fashion as FastAPIs existing use of OpenAPI.

The current supported AsyncAPI verison is 2.4.0. Once version 3.0.0 is released the plan is to upgrade to this standard.

Requirements

Python 3.11+

FastWS uses Pydantic v2 and FastAPI.

Installation

$ pip install fastws

You will also need an ASGI server, for production such as Uvicorn or Hypercorn.

$ pip install "uvicorn[standard]"

Example

Create it

  • Create a file main.py with:
from contextlib import asynccontextmanager
from typing import Annotated

from fastapi import Depends, FastAPI
from fastws import Client, FastWS

service = FastWS()


@service.send("ping")
async def send_event_a() -> str:
    return "pong"


@asynccontextmanager
async def lifespan(app: FastAPI):
    service.setup(app)
    yield


app = FastAPI(lifespan=lifespan)


@app.websocket("/")
async def fastws_stream(client: Annotated[Client, Depends(service.manage)]):
    await service.serve(client)

We can look at the generated documentation at http://localhost:<port>/asyncapi.

AsyncAPI Logo


Example breakdown

First we import and initialize the service.

from fastws import Client, FastWS

service = FastWS()

Define event

Next up we connect an operation (a WebSocket message) to the service, using the decorator @service.send(...). We need to define the operation using a string similar to how we define an HTTP-endpoint using a path.

The operation-identificator is in this case "ping", meaning we will use this string to identify what type of message we are receiving.

@service.send("ping")
async def send_event_a() -> str:
    return "pong"

If we want to define an payload for the operation we can extend the example:

from pydantic import BaseModel

class PingPayload(BaseModel):
    foo: str

@service.send("ping")
async def send_event_a(payload: PingPayload) -> str:
    return "pong"

An incoming message should now have the following format. (We will later view this in the generated AsyncAPI-documentation).

{
    "type": "ping",
    "payload": {
        "foo": "bar"
    }
}

Connect service

Next up we connect the service to our running FastAPI application.

@asynccontextmanager
async def lifespan(app: FastAPI):
    service.setup(app)
    yield


app = FastAPI(lifespan=lifespan)


@app.websocket("/")
async def fastws_stream(client: Annotated[Client, Depends(service.manage)]):
    await service.serve(client)

The function service.setup(app) inside FastAPIs lifespan registers two endpoints

  • /asyncapi.json, to retrieve our API definition
  • /asyncapi, to view the AsyncAPI documentation UI.

You can override both of these URLs when initializing the service, or set them to None to avoid registering the endpoints at all.

Routing

To spread out our service we can use the OperationRouter-class.

# feature_1.py
from fastws import Client, OperationRouter
from pydantic import BaseModel

router = OperationRouter(prefix="user.")


class SubscribePayload(BaseModel):
    topic: str


class SubscribeResponse(BaseModel):
    detail: str
    topics: set[str]


@router.send("subscribe")
async def subscribe_to_topic(
    payload: SubscribePayload,
    client: Client,
) -> SubscribeResponse:
    client.subscribe(payload.topic)
    return SubscribeResponse(
        detail=f"Subscribed to {payload.topic}",
        topics=client.topics,
    )

We can then include the router in our main service.

# main.py
from fastws import Client, FastWS

from feature_1 import router

service = FastWS()
service.include_router(router)

Operations, send and recv

The service enables two types of operations. Let us define these operations clearly:

  • send: An operation where API user sends a message to the API server.

    Note: Up to AsyncAPI version 2.6.0 this refers to a publish-operation, but is changing to send in version 3.0.0.

  • recv: An operation where API server sends a message to the API user.

    Note: Up to AsyncAPI version 2.6.0 this refers to a subscribe-operation, but is changing to receive in version 3.0.0.

The send-operation

The above examples have only displayed the use of send-operations.

When using the functions FastWS.client_send(message, client) or FastWS.serve(client), we implicitly send some arguments. These keyword-arguments have the following keywords and types:

  • client with type fastws.application.Client
  • app with type fastws.application.FastWS
  • payload, optional with type defined in the function processing the message.

A send-operation can therefore access the following arguments:

from fastws import Client, FastWS
from pydantic import BaseModel

class Something(BaseModel):
    foo: str


class Thing(BaseModel):
    bar: str


@router.send("foo")
async def some_function(
    payload: Something,
    client: Client,
    app: FastWS,
) -> Thing:
    print(f"{app.connections=}")
    print(f"{client.uid=}")

    return Thing(bar=client.uid)

The recv-operation

When using the function FastWS.server_send(message, topic), we implicitly send some arguments. These keyword-arguments have the keywords and types:

  • app with type fastws.application.FastWS
  • Optional payload with type defined in the function processing the message.

A recv-operation can therefore access the following arguments:

from fastws import FastWS
from pydantic import BaseModel

class AlertPayload(BaseModel):
    message: str


@router.recv("alert")
async def recv_client(payload: AlertPayload, app: FastWS) -> str:
    return "hey there!"

If we want create a message on the server side we can do the following:

from fastapi import FastAPI
from fastws import FastWS

service = FastWS()
app = FastAPI()

@app.post("/")
async def alert_on_topic_foobar(message: str):
    await service.server_send(
        Message(type="alert", payload={"message": message}),
        topic="foobar",
    )
    return "ok"

In the example above all connections subscribed to the topic foobar will recieve a message the the payload "hey there!".

In this way you can on the server-side choose to publish messages from anywhere to any topic. This is especially useful if you have a persistent connection to Redis or similar that reads messages from some channel and want to propagate these to your users.

Authentication

There are to ways to tackle authentication using FastWS.

By defining auth_handler

One way is to provide a custom auth_handler when initializing the service. Below is an example where the API user must provide a secret message within a timeout to authenticate.

import asyncio
import logging
from fastapi import WebSocket
from fastws import FastWS


def custom_auth(to_wait: float = 5):
    async def handler(ws: WebSocket) -> bool:
        await ws.accept()
        try:
            initial_msg = await asyncio.wait_for(
                ws.receive_text(),
                timeout=to_wait,
            )
            return initial_msg == "SECRET_HUSH_HUSH"
        except asyncio.exceptions.TimeoutError:
            logging.info("Took to long to provide authentication")

        return False

    return handler


service = FastWS(auth_handler=custom_auth())

By using FastAPI dependency

If you want to use your own FastAPI dependency to handle authentication before it enters the FastWS service you will have to set auto_ws_accept to False.

import asyncio
from typing import Annotated

from fastapi import Depends, FastAPI, WebSocket, WebSocketException, status
from fastws import Client, FastWS

service = FastWS(auto_ws_accept=False)

app = FastAPI()


async def custom_dep(ws: WebSocket):
    await ws.accept()
    initial_msg = await asyncio.wait_for(
        ws.receive_text(),
        timeout=5,
    )
    if initial_msg == "SECRET_HUSH_HUSH":
        return
    raise WebSocketException(
        code=status.WS_1008_POLICY_VIOLATION,
        reason="Not authenticated",
    )


@app.websocket("/")
async def fastws_stream(
    client: Annotated[Client, Depends(service.manage)],
    _=Depends(custom_dep),
):
    await service.serve(client)

Heartbeats and connection lifespan

To handle a WebSocket's lifespan FastWS tries to help you by using asyncio.timeout()-context managers in its serve(client)-function.

You can set the both:

  • heartbeat_interval: Meaning a client needs to send a message within this time.
  • max_connection_lifespan: Meaning all connections will disconnect when exceeding this time.

These must set during initialization:

from fastws import FastWS

service = FastWS(
    heartbeat_interval=10,
    max_connection_lifespan=300,
)

Both heartbeat_interval and max_connection_lifespan can be set to None to disable any restrictions. Note this is the default.

Please note that you can often also set restrictions in your ASGI-server. Applicable settings for uvicorn:

  • --ws-ping-interval INTEGER
  • --ws-ping-timeout INTEGER
  • --ws-max-size INTEGER

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastws-0.1.2.tar.gz (14.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastws-0.1.2-py3-none-any.whl (13.2 kB view details)

Uploaded Python 3

File details

Details for the file fastws-0.1.2.tar.gz.

File metadata

  • Download URL: fastws-0.1.2.tar.gz
  • Upload date:
  • Size: 14.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.11.4 Linux/5.15.90.1-microsoft-standard-WSL2

File hashes

Hashes for fastws-0.1.2.tar.gz
Algorithm Hash digest
SHA256 e65fc44000a0ea7d4adc9883ba980c4608f6bdf35da06f1c26b93a3de63e0810
MD5 48488001381a1d3a357c9d33870a8423
BLAKE2b-256 37923c9d23faf4d62f9e3788b576cab61b9f2ee7064348f76fca463697996b99

See more details on using hashes here.

File details

Details for the file fastws-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: fastws-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 13.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.11.4 Linux/5.15.90.1-microsoft-standard-WSL2

File hashes

Hashes for fastws-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 7417b7e4f77f580a225bfceb9760f6399efaed22220ba6b86c942d918bd2dbf2
MD5 6c7277c6438655c8609e375bee88a3e9
BLAKE2b-256 aeaeaccad8a66e58c305c7ebde6017334cbace6a46e5be6aafeb20eb6f852280

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page