A FastAPI-based WebSocket library
Project description
WebSocket 消息处理框架
本框架提供了用于处理 WebSocket 消息的完整解决方案,包括消息处理器(MessageHandler)和 WebSocket 运行器(WebSocketRunner)。
目录
MessageHandler 使用说明
概述
MessageHandler 提供了一套用于处理不同类型 WebSocket 消息的处理器框架。支持两种消息类型:
- BytesMessageHandler: 处理二进制消息(bytes)
- TextMessageHandler: 处理文本消息(str)
基本使用
1. 创建自定义处理器
继承 BytesMessageHandler 或 TextMessageHandler 并实现 handle 方法:
from transcriber.message_handler import BytesMessageHandler, TextMessageHandler
# 处理二进制消息
class MyBytesHandler(BytesMessageHandler):
def handle(self, data: bytes) -> None:
# 处理二进制数据
print(f"收到二进制数据: {data}")
# 你的处理逻辑...
# 处理文本消息
class MyTextHandler(TextMessageHandler):
def handle(self, data: str) -> None:
# 处理文本数据
print(f"收到文本消息: {data}")
# 你的处理逻辑...
2. 使用 MessageHandlerExecutor
MessageHandlerExecutor 用于管理和执行消息处理器:
from transcriber.message_handler import MessageHandlerExecutor, BytesMessageHandler, TextMessageHandler
# 创建执行器
executor = MessageHandlerExecutor()
# 注册处理器
bytes_handler = MyBytesHandler()
text_handler = MyTextHandler()
executor.register_bytes_handler(bytes_handler)
executor.register_text_handler(text_handler)
# 执行消息
message_bytes = {"bytes": b"hello"}
message_text = {"text": "world"}
executor.execute(message_bytes) # 调用 bytes_handler.handle()
executor.execute(message_text) # 调用 text_handler.handle()
3. 检查处理器状态
# 检查是否已注册处理器
if executor.has_bytes_handler():
print("已注册 bytes 处理器")
if executor.has_text_handler():
print("已注册 text 处理器")
# 获取已注册的处理器
bytes_handler = executor.get_bytes_handler()
text_handler = executor.get_text_handler()
高级用法
处理多种消息类型
可以同时注册 bytes 和 text 处理器,执行器会根据消息类型自动选择合适的处理器:
executor = MessageHandlerExecutor()
class BytesProcessor(BytesMessageHandler):
def handle(self, data: bytes) -> None:
# 处理音频数据、图片等
process_audio(data)
class TextProcessor(TextMessageHandler):
def handle(self, data: str) -> None:
# 处理 JSON、命令等文本
process_command(data)
executor.register_bytes_handler(BytesProcessor())
executor.register_text_handler(TextProcessor())
# 执行器会自动根据消息类型选择处理器
executor.execute({"bytes": b"audio data"}) # 使用 BytesProcessor
executor.execute({"text": '{"cmd": "start"}'}) # 使用 TextProcessor
替换处理器
可以随时替换已注册的处理器:
executor.register_bytes_handler(Handler1())
executor.execute({"bytes": b"data"}) # 使用 Handler1
executor.register_bytes_handler(Handler2())
executor.execute({"bytes": b"data"}) # 现在使用 Handler2
错误处理
# 如果没有注册处理器就执行消息,会抛出 ValueError
executor = MessageHandlerExecutor()
try:
executor.execute({"bytes": b"data"})
except ValueError as e:
print(f"错误: {e}") # "No bytes handler registered"
# 如果消息格式不正确,也会抛出错误
try:
executor.execute({"other": "data"})
except ValueError as e:
print(f"错误: {e}") # "Message must contain either 'bytes' or 'text' key"
WebSocketRunner 使用说明
概述
WebSocketRunner 是一个用于处理 WebSocket 连接的消息循环运行器。它从 WebSocket 连接接收消息,并通过 MessageHandlerExecutor 处理这些消息。
基本使用
1. 创建并运行 Runner
from starlette.websockets import WebSocket
from transcriber.message_handler import MessageHandlerExecutor, WebSocketRunner, BytesMessageHandler, TextMessageHandler
# 创建执行器和处理器
executor = MessageHandlerExecutor()
class MyBytesHandler(BytesMessageHandler):
def handle(self, data: bytes) -> None:
print(f"处理二进制数据: {data}")
class MyTextHandler(TextMessageHandler):
def handle(self, data: str) -> None:
print(f"处理文本消息: {data}")
executor.register_bytes_handler(MyBytesHandler())
executor.register_text_handler(MyTextHandler())
# 创建 runner
runner = WebSocketRunner(executor)
# 在 WebSocket 端点中使用
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
await runner.run(websocket) # 运行消息循环
2. 在 Starlette/FastAPI 中使用
from fastapi import FastAPI, WebSocket
from transcriber.message_handler import MessageHandlerExecutor, WebSocketRunner, BytesMessageHandler
app = FastAPI()
# 创建全局执行器(或在依赖注入中创建)
executor = MessageHandlerExecutor()
class AudioHandler(BytesMessageHandler):
def handle(self, data: bytes) -> None:
# 处理音频数据
process_audio_stream(data)
executor.register_bytes_handler(AudioHandler())
@app.websocket("/ws")
async def websocket_route(websocket: WebSocket):
await websocket.accept()
# 创建 runner 并运行
runner = WebSocketRunner(executor)
await runner.run(websocket)
3. 获取执行器
runner = WebSocketRunner(executor)
retrieved_executor = runner.get_executor()
assert retrieved_executor == executor
工作原理
- 消息接收:
run()方法持续从 WebSocket 接收消息 - 消息过滤: 只处理包含
bytes或text键的消息,其他消息会被跳过 - 消息执行: 将消息传递给
MessageHandlerExecutor执行 - 连接关闭: 当
WebSocketDisconnect异常发生时,循环正常退出 - 错误传播: 其他异常会被重新抛出,由调用者处理
注意事项
- 消息格式: Runner 只处理包含
bytes或text键的消息 - 连接断开:
WebSocketDisconnect会被正常处理,不会抛出异常 - 异常处理: 执行器中的异常会向上传播,需要在业务逻辑中处理
- 异步运行:
run()方法是异步的,必须在async函数中调用
错误处理
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
runner = WebSocketRunner(executor)
await runner.run(websocket)
except ValueError as e:
# 处理执行器错误(如未注册处理器)
print(f"执行错误: {e}")
except Exception as e:
# 处理其他错误
print(f"未知错误: {e}")
完整示例
示例 1: 简单的文本消息处理
from fastapi import FastAPI, WebSocket
from transcriber.message_handler import (
MessageHandlerExecutor,
WebSocketRunner,
TextMessageHandler
)
app = FastAPI()
executor = MessageHandlerExecutor()
class EchoHandler(TextMessageHandler):
def handle(self, data: str) -> None:
print(f"收到消息: {data}")
executor.register_text_handler(EchoHandler())
@app.websocket("/ws")
async def websocket_route(websocket: WebSocket):
await websocket.accept()
runner = WebSocketRunner(executor)
await runner.run(websocket)
示例 2: 同时处理文本和二进制消息
from fastapi import FastAPI, WebSocket
from transcriber.message_handler import (
MessageHandlerExecutor,
WebSocketRunner,
BytesMessageHandler,
TextMessageHandler
)
app = FastAPI()
executor = MessageHandlerExecutor()
class CommandHandler(TextMessageHandler):
def handle(self, data: str) -> None:
# 处理 JSON 命令
import json
cmd = json.loads(data)
print(f"执行命令: {cmd}")
class AudioHandler(BytesMessageHandler):
def handle(self, data: bytes) -> None:
# 处理音频流
print(f"收到音频数据: {len(data)} 字节")
executor.register_text_handler(CommandHandler())
executor.register_bytes_handler(AudioHandler())
@app.websocket("/ws")
async def websocket_route(websocket: WebSocket):
await websocket.accept()
runner = WebSocketRunner(executor)
await runner.run(websocket)
示例 3: 动态处理器管理
from transcriber.message_handler import MessageHandlerExecutor, WebSocketRunner
executor = MessageHandlerExecutor()
# 根据业务需求动态注册处理器
def setup_handlers(mode: str):
if mode == "audio":
executor.register_bytes_handler(AudioHandler())
elif mode == "text":
executor.register_text_handler(TextHandler())
else:
executor.register_bytes_handler(AudioHandler())
executor.register_text_handler(TextHandler())
# 使用
setup_handlers("audio")
runner = WebSocketRunner(executor)
API 参考
MessageHandlerExecutor
register_bytes_handler(handler: BytesMessageHandler): 注册二进制消息处理器register_text_handler(handler: TextMessageHandler): 注册文本消息处理器execute(message: Message): 执行消息处理has_bytes_handler() -> bool: 检查是否已注册二进制处理器has_text_handler() -> bool: 检查是否已注册文本处理器get_bytes_handler() -> Optional[BytesMessageHandler]: 获取二进制处理器get_text_handler() -> Optional[TextMessageHandler]: 获取文本处理器
WebSocketRunner
__init__(executor: MessageHandlerExecutor): 初始化 runnerrun(websocket: WebSocket) -> None: 运行消息处理循环(异步)get_executor() -> MessageHandlerExecutor: 获取关联的执行器
BytesMessageHandler / TextMessageHandler
handle(data: bytes | str) -> None: 处理消息数据(需要子类实现)handle_message(message: Message) -> None: 从消息字典中提取数据并处理
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file smooth_websocket_daemon-0.1.0.tar.gz.
File metadata
- Download URL: smooth_websocket_daemon-0.1.0.tar.gz
- Upload date:
- Size: 11.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2288233dda50be460e4c6aba98751f8e8bc5c97d17fff913e35dfe4354df16af
|
|
| MD5 |
f3d65a7e9d2c0ec6ba221a4a90fe603d
|
|
| BLAKE2b-256 |
26c9946be284df972c9567f96570cc239a1624a6399b04a744551f11ff94df33
|
File details
Details for the file smooth_websocket_daemon-0.1.0-py3-none-any.whl.
File metadata
- Download URL: smooth_websocket_daemon-0.1.0-py3-none-any.whl
- Upload date:
- Size: 8.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
38e62b6b5189e842d194647e8e1bec47bfd148239e7cb292432142628f9c95cb
|
|
| MD5 |
43c4cc6e90536072e1fa14e86efb718a
|
|
| BLAKE2b-256 |
1b3274246ee5613be47215a7ce329cb92a999aeef3e72cde29d1350ecf7f86bb
|