Skip to main content

A FastAPI-based WebSocket library

Project description

WebSocket 消息处理框架

本框架提供了用于处理 WebSocket 消息的完整解决方案,包括消息处理器(MessageHandler)和 WebSocket 运行器(WebSocketRunner)。

目录

MessageHandler 使用说明

概述

MessageHandler 提供了一套用于处理不同类型 WebSocket 消息的处理器框架。支持两种消息类型:

  • BytesMessageHandler: 处理二进制消息(bytes)
  • TextMessageHandler: 处理文本消息(str)

基本使用

1. 创建自定义处理器

继承 BytesMessageHandlerTextMessageHandler 并实现 handle 方法:

from transcriber.message_handler import BytesMessageHandler, TextMessageHandler

# 处理二进制消息
class MyBytesHandler(BytesMessageHandler):
    def handle(self, data: bytes) -> None:
        # 处理二进制数据
        print(f"收到二进制数据: {data}")
        # 你的处理逻辑...

# 处理文本消息
class MyTextHandler(TextMessageHandler):
    def handle(self, data: str) -> None:
        # 处理文本数据
        print(f"收到文本消息: {data}")
        # 你的处理逻辑...

2. 使用 MessageHandlerExecutor

MessageHandlerExecutor 用于管理和执行消息处理器:

from transcriber.message_handler import MessageHandlerExecutor, BytesMessageHandler, TextMessageHandler

# 创建执行器
executor = MessageHandlerExecutor()

# 注册处理器
bytes_handler = MyBytesHandler()
text_handler = MyTextHandler()

executor.register_bytes_handler(bytes_handler)
executor.register_text_handler(text_handler)

# 执行消息
message_bytes = {"bytes": b"hello"}
message_text = {"text": "world"}

executor.execute(message_bytes)  # 调用 bytes_handler.handle()
executor.execute(message_text)   # 调用 text_handler.handle()

3. 检查处理器状态

# 检查是否已注册处理器
if executor.has_bytes_handler():
    print("已注册 bytes 处理器")
    
if executor.has_text_handler():
    print("已注册 text 处理器")

# 获取已注册的处理器
bytes_handler = executor.get_bytes_handler()
text_handler = executor.get_text_handler()

高级用法

处理多种消息类型

可以同时注册 bytes 和 text 处理器,执行器会根据消息类型自动选择合适的处理器:

executor = MessageHandlerExecutor()

class BytesProcessor(BytesMessageHandler):
    def handle(self, data: bytes) -> None:
        # 处理音频数据、图片等
        process_audio(data)

class TextProcessor(TextMessageHandler):
    def handle(self, data: str) -> None:
        # 处理 JSON、命令等文本
        process_command(data)

executor.register_bytes_handler(BytesProcessor())
executor.register_text_handler(TextProcessor())

# 执行器会自动根据消息类型选择处理器
executor.execute({"bytes": b"audio data"})  # 使用 BytesProcessor
executor.execute({"text": '{"cmd": "start"}'})  # 使用 TextProcessor

替换处理器

可以随时替换已注册的处理器:

executor.register_bytes_handler(Handler1())
executor.execute({"bytes": b"data"})  # 使用 Handler1

executor.register_bytes_handler(Handler2())
executor.execute({"bytes": b"data"})  # 现在使用 Handler2

错误处理

# 如果没有注册处理器就执行消息,会抛出 ValueError
executor = MessageHandlerExecutor()
try:
    executor.execute({"bytes": b"data"})
except ValueError as e:
    print(f"错误: {e}")  # "No bytes handler registered"

# 如果消息格式不正确,也会抛出错误
try:
    executor.execute({"other": "data"})
except ValueError as e:
    print(f"错误: {e}")  # "Message must contain either 'bytes' or 'text' key"

WebSocketRunner 使用说明

概述

WebSocketRunner 是一个用于处理 WebSocket 连接的消息循环运行器。它从 WebSocket 连接接收消息,并通过 MessageHandlerExecutor 处理这些消息。

基本使用

1. 创建并运行 Runner

from starlette.websockets import WebSocket
from transcriber.message_handler import MessageHandlerExecutor, WebSocketRunner, BytesMessageHandler, TextMessageHandler

# 创建执行器和处理器
executor = MessageHandlerExecutor()

class MyBytesHandler(BytesMessageHandler):
    def handle(self, data: bytes) -> None:
        print(f"处理二进制数据: {data}")

class MyTextHandler(TextMessageHandler):
    def handle(self, data: str) -> None:
        print(f"处理文本消息: {data}")

executor.register_bytes_handler(MyBytesHandler())
executor.register_text_handler(MyTextHandler())

# 创建 runner
runner = WebSocketRunner(executor)

# 在 WebSocket 端点中使用
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    await runner.run(websocket)  # 运行消息循环

2. 在 Starlette/FastAPI 中使用

from fastapi import FastAPI, WebSocket
from transcriber.message_handler import MessageHandlerExecutor, WebSocketRunner, BytesMessageHandler

app = FastAPI()

# 创建全局执行器(或在依赖注入中创建)
executor = MessageHandlerExecutor()

class AudioHandler(BytesMessageHandler):
    def handle(self, data: bytes) -> None:
        # 处理音频数据
        process_audio_stream(data)

executor.register_bytes_handler(AudioHandler())

@app.websocket("/ws")
async def websocket_route(websocket: WebSocket):
    await websocket.accept()
    
    # 创建 runner 并运行
    runner = WebSocketRunner(executor)
    await runner.run(websocket)

3. 获取执行器

runner = WebSocketRunner(executor)
retrieved_executor = runner.get_executor()
assert retrieved_executor == executor

工作原理

  1. 消息接收: run() 方法持续从 WebSocket 接收消息
  2. 消息过滤: 只处理包含 bytestext 键的消息,其他消息会被跳过
  3. 消息执行: 将消息传递给 MessageHandlerExecutor 执行
  4. 连接关闭: 当 WebSocketDisconnect 异常发生时,循环正常退出
  5. 错误传播: 其他异常会被重新抛出,由调用者处理

注意事项

  • 消息格式: Runner 只处理包含 bytestext 键的消息
  • 连接断开: WebSocketDisconnect 会被正常处理,不会抛出异常
  • 异常处理: 执行器中的异常会向上传播,需要在业务逻辑中处理
  • 异步运行: run() 方法是异步的,必须在 async 函数中调用

错误处理

async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    
    try:
        runner = WebSocketRunner(executor)
        await runner.run(websocket)
    except ValueError as e:
        # 处理执行器错误(如未注册处理器)
        print(f"执行错误: {e}")
    except Exception as e:
        # 处理其他错误
        print(f"未知错误: {e}")

完整示例

示例 1: 简单的文本消息处理

from fastapi import FastAPI, WebSocket
from transcriber.message_handler import (
    MessageHandlerExecutor,
    WebSocketRunner,
    TextMessageHandler
)

app = FastAPI()
executor = MessageHandlerExecutor()

class EchoHandler(TextMessageHandler):
    def handle(self, data: str) -> None:
        print(f"收到消息: {data}")

executor.register_text_handler(EchoHandler())

@app.websocket("/ws")
async def websocket_route(websocket: WebSocket):
    await websocket.accept()
    runner = WebSocketRunner(executor)
    await runner.run(websocket)

示例 2: 同时处理文本和二进制消息

from fastapi import FastAPI, WebSocket
from transcriber.message_handler import (
    MessageHandlerExecutor,
    WebSocketRunner,
    BytesMessageHandler,
    TextMessageHandler
)

app = FastAPI()
executor = MessageHandlerExecutor()

class CommandHandler(TextMessageHandler):
    def handle(self, data: str) -> None:
        # 处理 JSON 命令
        import json
        cmd = json.loads(data)
        print(f"执行命令: {cmd}")

class AudioHandler(BytesMessageHandler):
    def handle(self, data: bytes) -> None:
        # 处理音频流
        print(f"收到音频数据: {len(data)} 字节")

executor.register_text_handler(CommandHandler())
executor.register_bytes_handler(AudioHandler())

@app.websocket("/ws")
async def websocket_route(websocket: WebSocket):
    await websocket.accept()
    runner = WebSocketRunner(executor)
    await runner.run(websocket)

示例 3: 动态处理器管理

from transcriber.message_handler import MessageHandlerExecutor, WebSocketRunner

executor = MessageHandlerExecutor()

# 根据业务需求动态注册处理器
def setup_handlers(mode: str):
    if mode == "audio":
        executor.register_bytes_handler(AudioHandler())
    elif mode == "text":
        executor.register_text_handler(TextHandler())
    else:
        executor.register_bytes_handler(AudioHandler())
        executor.register_text_handler(TextHandler())

# 使用
setup_handlers("audio")
runner = WebSocketRunner(executor)

API 参考

MessageHandlerExecutor

  • register_bytes_handler(handler: BytesMessageHandler): 注册二进制消息处理器
  • register_text_handler(handler: TextMessageHandler): 注册文本消息处理器
  • execute(message: Message): 执行消息处理
  • has_bytes_handler() -> bool: 检查是否已注册二进制处理器
  • has_text_handler() -> bool: 检查是否已注册文本处理器
  • get_bytes_handler() -> Optional[BytesMessageHandler]: 获取二进制处理器
  • get_text_handler() -> Optional[TextMessageHandler]: 获取文本处理器

WebSocketRunner

  • __init__(executor: MessageHandlerExecutor): 初始化 runner
  • run(websocket: WebSocket) -> None: 运行消息处理循环(异步)
  • get_executor() -> MessageHandlerExecutor: 获取关联的执行器

BytesMessageHandler / TextMessageHandler

  • handle(data: bytes | str) -> None: 处理消息数据(需要子类实现)
  • handle_message(message: Message) -> None: 从消息字典中提取数据并处理

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

smooth_websocket_daemon-0.1.0.tar.gz (11.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

smooth_websocket_daemon-0.1.0-py3-none-any.whl (8.9 kB view details)

Uploaded Python 3

File details

Details for the file smooth_websocket_daemon-0.1.0.tar.gz.

File metadata

  • Download URL: smooth_websocket_daemon-0.1.0.tar.gz
  • Upload date:
  • Size: 11.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for smooth_websocket_daemon-0.1.0.tar.gz
Algorithm Hash digest
SHA256 2288233dda50be460e4c6aba98751f8e8bc5c97d17fff913e35dfe4354df16af
MD5 f3d65a7e9d2c0ec6ba221a4a90fe603d
BLAKE2b-256 26c9946be284df972c9567f96570cc239a1624a6399b04a744551f11ff94df33

See more details on using hashes here.

File details

Details for the file smooth_websocket_daemon-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for smooth_websocket_daemon-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 38e62b6b5189e842d194647e8e1bec47bfd148239e7cb292432142628f9c95cb
MD5 43c4cc6e90536072e1fa14e86efb718a
BLAKE2b-256 1b3274246ee5613be47215a7ce329cb92a999aeef3e72cde29d1350ecf7f86bb

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page