Skip to main content

Typed WebSocket action and event dispatch with optional framework adapters.

Project description

Beeflow WebSocket

Beeflow WebSocket is a small Python library for action-driven WebSocket communication. It keeps reusable protocol code in beeflow_websocket.core and optional framework adapters under their own packages.

Name

The proposed name is Beeflow WebSocket:

  • package distribution: beeflow-websocket
  • Python import: beeflow_websocket
  • repository directory: beeflow-websocket

The name is direct because the package owns WebSocket action dispatch, event envelopes, recipient resolution, and optional transport adapters.

Requirements

  • Python 3.12 or newer
  • uv for all project commands
  • no pip workflow

Installation

Core-only usage:

uv add beeflow-websocket

Django Channels usage after publishing the package:

uv add "beeflow-websocket[django]"

FastAPI projects can install the matching optional runtime extra:

uv add "beeflow-websocket[fastapi]"

Flask projects can install the matching optional runtime extra:

uv add "beeflow-websocket[flask]"

Runtime Dependencies

Each install target keeps only the dependencies needed by that adapter. Deployment servers, channel backends, external pub/sub, and application-level auth remain the responsibility of the application using the package.

Install target Installed by this package Not installed Add when needed
beeflow-websocket pydantic Django, Channels, FastAPI, Flask, WebSocket servers Add a framework extra when the project uses a supported adapter.
beeflow-websocket[django] django, channels daphne, channels-redis, Redis server Add daphne when serving Django Channels with Daphne. Add channels-redis and Redis when using a Redis channel layer for groups, multi-process workers, or cross-instance delivery.
beeflow-websocket[fastapi] fastapi ASGI servers such as uvicorn or hypercorn, external pub/sub, connection managers Add an ASGI server if the application does not already provide one. Add Redis or another pub/sub layer only when broadcasting across connections, workers, or instances.
beeflow-websocket[flask] flask, flask-sock Production servers such as gunicorn, external pub/sub, connection managers Add the production server used by the application deployment. Add Redis or another pub/sub layer only when broadcasting across connections, workers, or instances.

Local development from this repository:

uv sync --extra dev
make test-core
make mypy-core

Django adapter development:

uv sync --extra dev --extra django --extra django-dev
make test-django
make mypy-django

FastAPI adapter development:

uv sync --extra dev --extra fastapi --extra fastapi-dev
make test-fastapi
make mypy-fastapi

Flask adapter development:

uv sync --extra dev --extra flask --extra flask-dev
make test-flask
make mypy-flask

Protocol Flow

  1. A client sends a WebSocketActionPayload envelope.
  2. The framework adapter validates the transport-level shape.
  3. ActionRegistryMeta resolves the action class.
  4. The action yields zero, one, or many event objects.
  5. The adapter emitter serialises each event to WebSocketEventPayload.
  6. RecipientRegistryMeta resolves logical recipients into concrete WebSocket identifiers.
  7. The framework adapter sends JSON to each resolved WebSocket target.

Minimal Action

from collections.abc import AsyncIterator

from beeflow_websocket.core.action_registry import ActionContext, ActionRegistryMeta
from beeflow_websocket.core.events.health import HealthEvent
from beeflow_websocket.core.payloads import WebSocketActionPayload


class Health(metaclass=ActionRegistryMeta, name="health"):
    """Handle a health-check WebSocket action."""

    async def execute(
        self,
        message: WebSocketActionPayload,
        context: ActionContext,
    ) -> AsyncIterator[HealthEvent]:
        """Yield a response event for the current WebSocket connection."""
        yield HealthEvent(
            recipient="websocket",
            recipient_id=context.websocket_id,
            req_id=message.req_id,
        )

Plugin Autodiscover

Action, event, and recipient classes are registered when their modules are imported. Autodiscovery imports application-owned plugin modules during startup so their registry metaclasses can run.

Autodiscovery is enabled by default. Django scans every installed Django app. FastAPI and Flask scan the package that called configure_beeflow_websocket and its parent packages. Each adapter imports these conventional plugin modules when they exist:

my_app/actions.py
my_app/events.py
my_app/recipients.py
my_app/ws/actions.py
my_app/ws/events.py
my_app/ws/recipients.py

Missing modules are ignored. Import errors inside existing modules are not hidden; a broken plugin module should fail application startup.

FastAPI and Flask do not need autodiscovery configuration for this conventional layout:

configure_beeflow_websocket(
    app,
    problem_type_base_url="https://example.com/problems/websocket",
)

Set BEEFLOW_WEBSOCKET_AUTODISCOVER = False in Django or autodiscover=False in FastAPI and Flask to disable startup imports.

Django Channels Setup

Add the package to Django settings when using the adapter:

INSTALLED_APPS = [
    "channels",
    "beeflow_websocket.django",
]

BEEFLOW_WEBSOCKET_PROBLEM_TYPE_BASE_URL = "https://example.com/problems/websocket"

If BEEFLOW_WEBSOCKET_PROBLEM_TYPE_BASE_URL is not configured, error payloads use about:blank as their Problem Details type.

Include the bundled WebSocket route in your ASGI routing:

from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application

from beeflow_websocket.django.routing import websocket_urlpatterns

application = ProtocolTypeRouter(
    {
        "http": get_asgi_application(),
        "websocket": URLRouter(websocket_urlpatterns),
    }
)

The bundled route is ws/. The default consumer accepts only authenticated users.

FastAPI Setup

Install the optional FastAPI extra:

uv add "beeflow-websocket[fastapi]"

Add a WebSocket route and pass the authenticated user id from your own FastAPI dependency:

from fastapi import FastAPI, WebSocket

from beeflow_websocket.fastapi import configure_beeflow_websocket, handle_beeflow_websocket

app = FastAPI()
configure_beeflow_websocket(
    app,
    problem_type_base_url="https://example.com/problems/websocket",
)


@app.websocket("/ws/")
async def websocket_endpoint(websocket: WebSocket) -> None:
    await handle_beeflow_websocket(websocket, user_id=1)

The FastAPI adapter handles the current WebSocket connection directly. Cross-connection broadcast or external pub/sub belongs in a separate connection manager. If the FastAPI app is not configured with a problem type base URL, error payloads use about:blank as their Problem Details type.

Flask Setup

Install the optional Flask extra:

uv add "beeflow-websocket[flask]"

Add a Flask-Sock WebSocket route and pass the authenticated user id from your own Flask auth layer:

from flask import Flask
from flask_sock import Sock

from beeflow_websocket.flask import configure_beeflow_websocket, handle_beeflow_websocket

app = Flask(__name__)
sock = Sock(app)
configure_beeflow_websocket(
    app,
    problem_type_base_url="https://example.com/problems/websocket",
)


@sock.route("/ws/")
def websocket_endpoint(websocket) -> None:
    handle_beeflow_websocket(websocket, user_id=1)

The Flask adapter handles the current WebSocket connection directly. Cross-connection broadcast or external pub/sub belongs in a separate connection manager. If the Flask app is not configured with a problem type base URL, error payloads use about:blank as their Problem Details type.

Documentation

English documentation starts at docs/en/README.md. Polish documentation starts at docs/pl/README.md.

Development

Use uv only:

uv sync --extra dev --extra django --extra django-dev --extra fastapi --extra fastapi-dev --extra flask --extra flask-dev
make test
make mypy
uv build

Release Automation

Every push to master runs .github/workflows/publish.yml. The workflow tests all adapters, runs mypy, builds the source distribution and wheel, then publishes the built distributions to PyPI through Trusted Publishing.

Pull requests targeting master run .github/workflows/ci.yml, which checks the lockfile, runs tests, runs mypy, and verifies that distributions build. The workflow also runs pre-commit hooks.

Configure PyPI Trusted Publishing for:

  • owner: beeflow
  • repository: beeflow-websocket
  • workflow: publish.yml
  • environment: pypi

PyPI package versions are immutable. Bump project.version in pyproject.toml before merging changes that should be published.

The master branch is protected on GitHub. Code changes must go through a pull request and review before they can be merged. Required pull request checks are Pre-commit, Test, Mypy, and Build.

Manual patch release bump example:

[project]
version = "0.1.1"

Move the relevant changelog entries from [Unreleased] into a dated release section:

## [0.1.1] - 2026-05-03

### Added

- Added FastAPI and Flask adapters.

Verify before merging:

make test
make mypy
uv build

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

beeflow_websocket-0.2.0.tar.gz (102.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

beeflow_websocket-0.2.0-py3-none-any.whl (30.2 kB view details)

Uploaded Python 3

File details

Details for the file beeflow_websocket-0.2.0.tar.gz.

File metadata

  • Download URL: beeflow_websocket-0.2.0.tar.gz
  • Upload date:
  • Size: 102.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.13

File hashes

Hashes for beeflow_websocket-0.2.0.tar.gz
Algorithm Hash digest
SHA256 d96c5db1da9d25006517c89886ef384b8d664991aec4c72f8b1ba89f12530644
MD5 ec5888c15c137f27a5a3b62e34ac5393
BLAKE2b-256 05b3f7abfc3c17bf43890eab7563dde286108f3b0c9c80a5c9619ec1fbe7c127

See more details on using hashes here.

Provenance

The following attestation bundles were made for beeflow_websocket-0.2.0.tar.gz:

Publisher: publish.yml on beeflow/beeflow-websocket

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file beeflow_websocket-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for beeflow_websocket-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2b986652a43c4732afb4915cb9d1a9754412258515dbab88a15c853284675e81
MD5 e5d513d1477f749f7bb594488a744632
BLAKE2b-256 5e6e6f5d2a50331f3f38510bfa5efd89395b6f4277bc98f24890415155fa06d2

See more details on using hashes here.

Provenance

The following attestation bundles were made for beeflow_websocket-0.2.0-py3-none-any.whl:

Publisher: publish.yml on beeflow/beeflow-websocket

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page