Skip to main content

Typed WebSocket action and event dispatch with optional framework adapters.

Project description

Beeflow WebSocket

Beeflow WebSocket is a small Python library for action-driven WebSocket communication. It keeps reusable protocol code in beeflow_websocket.core and optional framework adapters under their own packages.

Name

The proposed name is Beeflow WebSocket:

  • package distribution: beeflow-websocket
  • Python import: beeflow_websocket
  • repository directory: beeflow-websocket

The name is direct because the package owns WebSocket action dispatch, event envelopes, recipient resolution, and optional transport adapters.

Requirements

  • Python 3.12 or newer
  • uv for all project commands
  • no pip workflow

Installation

Core-only usage:

uv add beeflow-websocket

Django Channels usage after publishing the package:

uv add "beeflow-websocket[django]"

FastAPI projects can install the matching optional runtime extra:

uv add "beeflow-websocket[fastapi]"

Flask projects can install the matching optional runtime extra:

uv add "beeflow-websocket[flask]"

Runtime Dependencies

Each install target keeps only the dependencies needed by that adapter. Deployment servers, channel backends, external pub/sub, and application-level auth remain the responsibility of the application using the package.

Install target Installed by this package Not installed Add when needed
beeflow-websocket pydantic Django, Channels, FastAPI, Flask, WebSocket servers Add a framework extra when the project uses a supported adapter.
beeflow-websocket[django] django, channels daphne, channels-redis, Redis server Add daphne when serving Django Channels with Daphne. Add channels-redis and Redis when using a Redis channel layer for groups, multi-process workers, or cross-instance delivery.
beeflow-websocket[fastapi] fastapi ASGI servers such as uvicorn or hypercorn, external pub/sub, connection managers Add an ASGI server if the application does not already provide one. Add Redis or another pub/sub layer only when broadcasting across connections, workers, or instances.
beeflow-websocket[flask] flask, flask-sock Production servers such as gunicorn, external pub/sub, connection managers Add the production server used by the application deployment. Add Redis or another pub/sub layer only when broadcasting across connections, workers, or instances.

Local development from this repository:

uv sync --extra dev
make test-core
make mypy-core

Django adapter development:

uv sync --extra dev --extra django --extra django-dev
make test-django
make mypy-django

FastAPI adapter development:

uv sync --extra dev --extra fastapi --extra fastapi-dev
make test-fastapi
make mypy-fastapi

Flask adapter development:

uv sync --extra dev --extra flask --extra flask-dev
make test-flask
make mypy-flask

Protocol Flow

  1. A client sends a WebSocketActionPayload envelope.
  2. The framework adapter validates the transport-level shape.
  3. ActionRegistryMeta resolves the action class.
  4. The action yields zero, one, or many event objects.
  5. The adapter emitter serialises each event to WebSocketEventPayload.
  6. RecipientRegistryMeta resolves logical recipients into concrete WebSocket identifiers.
  7. The framework adapter sends JSON to each resolved WebSocket target.

Minimal Action

from collections.abc import AsyncIterator

from beeflow_websocket.core.action_registry import ActionContext, ActionRegistryMeta
from beeflow_websocket.core.events.health import HealthEvent
from beeflow_websocket.core.payloads import WebSocketActionPayload


class Health(metaclass=ActionRegistryMeta, name="health"):
    """Handle a health-check WebSocket action."""

    async def execute(
        self,
        message: WebSocketActionPayload,
        context: ActionContext,
    ) -> AsyncIterator[HealthEvent]:
        """Yield a response event for the current WebSocket connection."""
        yield HealthEvent(
            recipient="websocket",
            recipient_id=context.websocket_id,
            req_id=message.req_id,
        )

Plugin Autodiscover

Action, event, and recipient classes are registered when their modules are imported. Autodiscovery imports application-owned plugin modules during startup so their registry metaclasses can run.

Autodiscovery is enabled by default. Django scans every installed Django app. FastAPI and Flask scan the package that called configure_beeflow_websocket and its parent packages. Each adapter imports these conventional plugin modules when they exist:

my_app/actions.py
my_app/events.py
my_app/recipients.py
my_app/ws/actions.py
my_app/ws/events.py
my_app/ws/recipients.py

Missing modules are ignored. Import errors inside existing modules are not hidden; a broken plugin module should fail application startup.

FastAPI and Flask do not need autodiscovery configuration for this conventional layout:

configure_beeflow_websocket(
    app,
    problem_type_base_url="https://example.com/problems/websocket",
)

Set BEEFLOW_WEBSOCKET_AUTODISCOVER = False in Django or autodiscover=False in FastAPI and Flask to disable startup imports.

Django Channels Setup

Install the optional Django extra:

uv add "beeflow-websocket[django]"

Add the package and Channels settings in settings.py:

INSTALLED_APPS = [
    "channels",
    "beeflow_websocket.django",
    # ...
]

ASGI_APPLICATION = "config.asgi.application"

BEEFLOW_WEBSOCKET_PROBLEM_TYPE_BASE_URL = "https://example.com/problems/websocket"

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels.layers.InMemoryChannelLayer",
    }
}

If BEEFLOW_WEBSOCKET_PROBLEM_TYPE_BASE_URL is not configured, error payloads use about:blank as their Problem Details type. Use the in-memory channel layer only for local development and single-process testing. For production or multi-process delivery, install channels-redis and use a Redis channel layer:

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("127.0.0.1", 6379)],
        },
    }
}

WebSocket routing belongs in asgi.py, not in urls.py. Include the bundled route with Channels auth middleware when the project uses standard Django session/cookie authentication:

import os

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from django.core.asgi import get_asgi_application

from beeflow_websocket.django.routing import websocket_urlpatterns

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")

django_asgi_app = get_asgi_application()

application = ProtocolTypeRouter(
    {
        "http": django_asgi_app,
        "websocket": AllowedHostsOriginValidator(
            AuthMiddlewareStack(
                URLRouter(websocket_urlpatterns),
            )
        ),
    }
)

The bundled route is ws/, so the client connects to /ws/. The default consumer accepts only authenticated users. AuthMiddlewareStack sets scope["user"] for standard Django session/cookie authentication.

To use a different path, define your own WebSocket URL patterns and still mount them in asgi.py:

from django.urls import path

from beeflow_websocket.django.consumer import WebSocketConsumer

websocket_urlpatterns = [
    path("api/ws/", WebSocketConsumer.as_asgi()),
]

Then mount that local websocket_urlpatterns in URLRouter(...) instead of importing beeflow_websocket.django.routing.websocket_urlpatterns.

Do not send access tokens in the WebSocket query string. Query-string tokens can leak through logs, browser history, proxy logs, and monitoring tools. If browser clients authenticate with an access token, send the public marker and then the secret token as WebSocket subprotocols:

const socket = new WebSocket("wss://example.com/ws/", ["access-token", accessToken]);

Use the bundled AccessTokenAuthMiddleware to read that token and populate scope["user"]. The library extracts the token and selects only the non-secret access-token marker during the handshake. The application provides only the token validation function because signing keys, JWT claims, user models, and revocation rules are project-owned:

import os

from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from django.core.asgi import get_asgi_application

from beeflow_websocket.django.authentication import AccessTokenAuthMiddleware
from beeflow_websocket.django.routing import websocket_urlpatterns

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")

django_asgi_app = get_asgi_application()


async def get_user_for_access_token(token: str) -> object:
    """Validate the project access token and return a Django user."""
    ...


application = ProtocolTypeRouter(
    {
        "http": django_asgi_app,
        "websocket": AllowedHostsOriginValidator(
            AccessTokenAuthMiddleware(
                URLRouter(websocket_urlpatterns),
                user_resolver=get_user_for_access_token,
            )
        ),
    }
)

AccessTokenAuthMiddleware accepts sync and async resolvers. Synchronous resolvers run through Channels database_sync_to_async, so regular Django ORM lookups do not run on the ASGI event loop. The resolver must return an object compatible with the consumer contract: user.is_authenticated must be true and user.id must contain the application user id. When the client does not send an access token, the middleware sets an anonymous user and the default consumer closes the connection.

FastAPI Setup

Install the optional FastAPI extra:

uv add "beeflow-websocket[fastapi]"

FastAPI routing stays in the application. Configure the adapter once, add a WebSocket route, authenticate the connection in your own dependency, and pass the authenticated user id to the handler:

from fastapi import Depends, FastAPI, WebSocket

from beeflow_websocket.fastapi import configure_beeflow_websocket, handle_beeflow_websocket

app = FastAPI()
configure_beeflow_websocket(
    app,
    problem_type_base_url="https://example.com/problems/websocket",
)


async def get_current_user_id(websocket: WebSocket) -> int:
    """Authenticate the WebSocket connection and return the project user id."""
    ...


@app.websocket("/ws/")
async def websocket_endpoint(websocket: WebSocket, user_id: int = Depends(get_current_user_id)) -> None:
    await handle_beeflow_websocket(websocket, user_id=user_id)

The route path is fully controlled by the application. Use @app.websocket("/api/ws/") or any other path when the project does not want /ws/. The FastAPI adapter handles the current WebSocket connection directly and accepts it inside handle_beeflow_websocket. Cross-connection broadcast, connection registries, and external pub/sub belong in the application layer.

If the FastAPI app is not configured with a problem type base URL, error payloads use about:blank as their Problem Details type. Run the app with the ASGI server already used by the project, for example uvicorn my_app.main:app.

Flask Setup

Install the optional Flask extra:

uv add "beeflow-websocket[flask]"

Flask routing stays in the application. Configure the adapter once, add a Flask-Sock WebSocket route, authenticate the connection in your own Flask auth layer, and pass the authenticated user id to the handler:

from flask import Flask, g
from flask_sock import Sock

from beeflow_websocket.flask import configure_beeflow_websocket, handle_beeflow_websocket

app = Flask(__name__)
sock = Sock(app)
configure_beeflow_websocket(
    app,
    problem_type_base_url="https://example.com/problems/websocket",
)


@sock.route("/ws/")
def websocket_endpoint(websocket) -> None:
    user_id = g.user.id

    handle_beeflow_websocket(websocket, user_id=user_id)

The route path is fully controlled by the application. Use @sock.route("/api/ws/") or any other path when the project does not want /ws/. The Flask adapter handles the current WebSocket connection directly. Cross-connection broadcast, connection registries, and external pub/sub belong in the application layer.

If the Flask app is not configured with a problem type base URL, error payloads use about:blank as their Problem Details type. Use the production server already chosen by the Flask project and make sure it supports WebSocket traffic for Flask-Sock.

Documentation

English documentation starts at docs/en/README.md. Polish documentation starts at docs/pl/README.md.

Development

Use uv only:

uv sync --extra dev --extra django --extra django-dev --extra fastapi --extra fastapi-dev --extra flask --extra flask-dev
make test
make mypy
uv build

Release Automation

Every push to master runs .github/workflows/publish.yml. The workflow tests all adapters, runs mypy, builds the source distribution and wheel, then publishes the built distributions to PyPI through Trusted Publishing.

Pull requests targeting master run .github/workflows/ci.yml, which checks the lockfile, runs tests, runs mypy, and verifies that distributions build. The workflow also runs pre-commit hooks.

Configure PyPI Trusted Publishing for:

  • owner: beeflow
  • repository: beeflow-websocket
  • workflow: publish.yml
  • environment: pypi

PyPI package versions are immutable. Bump project.version in pyproject.toml before merging changes that should be published.

The master branch is protected on GitHub. Code changes must go through a pull request and review before they can be merged. Required pull request checks are Pre-commit, Test, Mypy, and Build.

Manual patch release bump example:

[project]
version = "0.1.1"

Move the relevant changelog entries from [Unreleased] into a dated release section:

## [0.1.1] - 2026-05-03

### Added

- Added FastAPI and Flask adapters.

Verify before merging:

make test
make mypy
uv build

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

beeflow_websocket-0.3.0.tar.gz (104.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

beeflow_websocket-0.3.0-py3-none-any.whl (32.2 kB view details)

Uploaded Python 3

File details

Details for the file beeflow_websocket-0.3.0.tar.gz.

File metadata

  • Download URL: beeflow_websocket-0.3.0.tar.gz
  • Upload date:
  • Size: 104.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.13

File hashes

Hashes for beeflow_websocket-0.3.0.tar.gz
Algorithm Hash digest
SHA256 f4c1184b05de1dca81d48cb9793db060fd718fea166c94134265deb6415aa457
MD5 4e210a26f6080b314402efad0a8a9e90
BLAKE2b-256 ff7a42ea846207320782f0e39420bf47af053d8e6a8cfecd2253101216e58bb5

See more details on using hashes here.

Provenance

The following attestation bundles were made for beeflow_websocket-0.3.0.tar.gz:

Publisher: publish.yml on beeflow/beeflow-websocket

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file beeflow_websocket-0.3.0-py3-none-any.whl.

File metadata

File hashes

Hashes for beeflow_websocket-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ca4f2eb662c17c2a93f7275546378d37072ffd78c07e8a7a50cfa144432e3fe0
MD5 38f716999818b551d53303d6a14cde17
BLAKE2b-256 6550bf37ccc2e5419317c67ef701f106882b3804d381146fa0d02a9da425c947

See more details on using hashes here.

Provenance

The following attestation bundles were made for beeflow_websocket-0.3.0-py3-none-any.whl:

Publisher: publish.yml on beeflow/beeflow-websocket

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page