基于 Stdio 的 JSON RPC 父子进程通信模块
Project description
OkStdio
基于 Stdio 的 JSON-RPC 父子进程通信框架
OkStdio 是一个轻量级的 Python 框架,通过标准输入输出(stdin/stdout)实现父子进程之间的 JSON-RPC 2.0 通信。提供优雅的 API 设计、强类型支持、链式调用、流式响应、中间件机制和自动文档生成。
特性
- ✅ JSON-RPC 2.0 标准:完整实现 JSON-RPC 2.0 协议规范
- ✅ 基于 Stdio:通过标准输入输出通信,轻量且跨平台
- ✅ 强类型支持:基于 Pydantic 的参数验证和序列化
- ✅ 异步优先:完整的 asyncio 支持
- ✅ 链式调用:
call().then().error()的 Promise 风格链式 API - ✅ 流式响应:
async for流式接收服务器推送(StreamListener) - ✅ 中间件机制:灵活的请求/响应拦截处理
- ✅ 路由系统:支持方法前缀和嵌套路由
- ✅ 依赖注入:支持单例/非单例、运行时动态注册
- ✅ 批量管理:ClientManager 管理多客户端,支持广播请求
- ✅ 自动文档:自动生成 Markdown 格式的 API 文档
- ✅ 服务器方法树:通过
__system__系统方法获取完整方法树 - ✅ TUI 调试工具:内置文本用户界面调试工具
rcptui - ✅ 跨平台:Windows/Linux/macOS 全平台支持,自动处理编码
安装
pip install okstdio
# 包含 TUI 调试工具
pip install "okstdio[tui]"
或使用 uv(推荐):
uv add okstdio
uv add "okstdio[tui]"
快速开始
服务器端
from okstdio.server import RPCServer
from pydantic import BaseModel, Field
app = RPCServer("my_server", label="我的服务器")
class User(BaseModel):
name: str = Field(..., description="用户名")
age: int = Field(..., ge=0, le=120, description="年龄")
@app.add_method(name="get_user", label="获取用户")
def get_user(user_id: int) -> User:
"""根据 ID 获取用户信息"""
return User(name="张三", age=25)
if __name__ == "__main__":
app.runserver()
客户端
import asyncio
from okstdio.client import RPCClient
async def main():
async with RPCClient("my_client") as client:
await client.start("my_server") # 启动服务器进程
# 方式一:send() + await(传统风格)
future = await client.send("get_user", {"user_id": 1})
response = await future
print(response.result)
# 方式二:call().then()(链式风格)
await client.call("get_user", {"user_id": 1}).then(
lambda result: print(result)
)
if __name__ == "__main__":
asyncio.run(main())
核心概念
1. 服务器 (RPCServer)
from okstdio.server import RPCServer, RPCRouter
app = RPCServer("example_server", label="示例服务器", version="v1.0.0")
# 注册方法
@app.add_method(name="hello", label="问候")
def hello(name: str) -> str:
return f"Hello, {name}!"
# 路由分组
user_router = RPCRouter(prefix="user", label="用户管理")
@user_router.add_method(name="create")
def create_user(username: str) -> dict:
return {"id": 1, "username": username}
app.include_router(user_router)
2. 客户端 (RPCClient)
from okstdio.client import RPCClient
async with RPCClient("client_name") as client:
await client.start("example.server") # 模块方式
# await client.start("path/to/server.py") # 脚本方式
# 传统方式
future = await client.send("user.create", {"username": "alice"})
response = await future
# 链式调用
await client.call("user.create", {"username": "alice"}).then(
lambda result: print(result)
)
3. 链式调用 (RPCFuture)
call() 返回 RPCFuture,支持 .then() 和 .error() 链式处理:
from pydantic import BaseModel
class UserResult(BaseModel):
id: int
username: str
# 自动注入 Pydantic 模型
await client.call("user.create", {"username": "alice"}).then(
lambda user: print(f"创建成功: {user.username}"), # user 是 UserResult 实例
extra_params={"user": UserResult} # 类型提示
)
# 使用参数名约定
await client.call("user.create", {"username": "alice"}).then(
lambda result: print(result) # result → response.result
).error(
lambda err: print(f"出错: {err}")
)
# 后台任务(不阻塞)
client.call("long_task", {}).then(on_done, create_task=True)
.then() 参数注入规则:
- 参数类型为
BaseModel子类 → 用response.result实例化该模型 extra_params中有同名 key → 注入对应值- 参数名为
result→ 注入response.result - 参数名为
response→ 注入完整JSONRPCResponse - 其余 → 注入
response.result
4. 流式响应 (IOWrite + StreamListener)
服务器端 通过 IOWrite 推送:
from okstdio.server import IOWrite
@app.add_method(name="long_task")
async def long_task(io_write: IOWrite) -> dict:
for i in range(10):
await io_write.write({"progress": i * 10, "status": "running"})
await asyncio.sleep(1)
return {"status": "completed"}
客户端 通过 stream() 上下文管理器接收:
# 推荐:async for 迭代(自动结束)
async with client.stream("progress") as listener:
future = await client.send("long_task", {})
async for message in listener:
print(f"进度: {message}")
# 手动:监听队列
queue = client.add_listen_queue("progress")
future = await client.send("long_task", {})
while True:
msg = await queue.get()
if msg is None:
break
print(msg)
5. 中间件
@app.add_middleware(label="日志中间件")
async def log_middleware(request, call_next):
print(f"收到请求: {request.method}")
response = await call_next(request)
print(f"返回响应: {response}")
return response
6. 依赖注入
import uiautomator2 as u2
# 注册单例依赖
app.register_dependency(
u2.Device,
lambda: u2.connect("192.168.1.100:5555"),
singleton=True
)
@app.add_method()
def click_device(device: u2.Device) -> dict:
device.click(0.5, 0.5)
return {"status": "clicked"}
# 运行时动态注册
@app.add_method()
def init_device(device_ip: str) -> dict:
device = u2.connect(device_ip)
app.register_dependency(u2.Device, lambda: device, singleton=True)
return {"status": "device registered"}
# 检查依赖
if app.has_dependency(u2.Device):
device = app.get_dependency(u2.Device)
7. 批量客户端管理 (ClientManager)
from okstdio import ClientManager
async with ClientManager() as manager:
# 添加多个客户端
await manager.add("server1", "module.server1")
await manager.add("server2", "module.server2")
# 并发启动所有
await manager.start_all()
# 单独调用
result = await manager.send_to("server1", "method", {"param": "value"})
await manager.call_to("server2", "method", {"param": "value"}).then(
lambda result: print(result)
)
# 广播请求(不抛异常,独立封装结果)
results = await manager.broadcast("health_check", {})
for r in results:
if r.error:
print(f"{r.client_name} 失败: {r.error}")
else:
print(f"{r.client_name} 结果: {r.result}")
8. 服务器方法树
# 客户端获取服务器方法树
method_tree = await client.get_server_methods()
print("服务器名称:", method_tree["server_name"])
print("服务器版本:", method_tree["version"])
for method in method_tree["methods"]:
print(f"- {method['name']} ({method['path']}): {method['doc']}")
方法树结构:
server_name: 服务器名称version: 服务器版本label: 服务器标签methods: 方法列表(name, label, path, doc, params, results)middlewares: 中间件列表routers: 路由器字典
9. TUI 调试工具
# 使用模块路径启动
rcptui --server tests.test_server
# 使用脚本路径
rcptui --server path/to/server.py
# 使用 uv
uv run rcptui --server tests.test_server
界面布局:
┌──────────────────────────────────────────────────────────────────┐
│ Header │
├───────────────────┬──────────────────────────────────────────────┤
│ Method Tree │ Method: 方法名 [标签] │
│ ▼ Server ├──────────────────────────────────────────────┤
│ ├── 方法1 │ Params (JSON): │
│ ├── 方法2 │ { │
│ └──▶ 子路由 │ "user_id": 123 │
│ ├── 方法1 │ } │
│ └── 方法2 ├──────────────────────────────────────────────┤
│ │ Response | Log │
│ ├──────────────────────────────────────────────┤
│ │ # 格式化的 JSON 响应 │
│ │ # 或请求/响应日志 │
├───────────────────┴──────────────────────────────────────────────┤
│ Footer: Ctrl+j 发送 | Ctrl+r 重启 | Ctrl+l 清日志 | Ctrl+q 退出 │
└──────────────────────────────────────────────────────────────────┘
| 快捷键 | 功能 |
|---|---|
Ctrl+j |
发送请求 |
Ctrl+r |
重启服务器 |
Ctrl+l |
清理日志 |
Ctrl+q |
退出 |
自动文档生成
if __name__ == "__main__":
app.docs_markdown() # 生成 {server_name}.md
app.runserver()
错误处理
from okstdio.general.errors import (
RPCParseError, # -32700: 语法解析错误
RPCInvalidRequestError, # -32600: 无效请求
RPCMethodNotFoundError, # -32601: 找不到方法
RPCInvalidParamsError, # -32602: 无效参数
RPCInternalError, # -32603: 内部错误
RPCServerError, # -32000~-32099: 服务器错误
)
# 自定义错误响应
from okstdio.general.jsonrpc_model import JSONRPCServerErrorDetail
@app.add_method(name="restricted")
def restricted_method() -> dict | JSONRPCServerErrorDetail:
return JSONRPCServerErrorDetail(code=-32001, message="权限不足")
最佳实践
日志配置
确保日志只写入文件,不输出到 stdout(避免干扰 JSON-RPC 通信):
import logging
from logging.handlers import RotatingFileHandler
handler = RotatingFileHandler("app.log", maxBytes=2*1024*1024, encoding="utf-8")
logging.getLogger().handlers.clear()
logging.getLogger().addHandler(handler)
编码问题
在 Windows 上,框架会自动将 stdin/stdout 重新包装为 UTF-8,无需手动配置。
Pydantic 模型设计
class CreateUserParams(BaseModel):
username: str = Field(..., min_length=3)
email: str = Field(..., description="邮箱")
age: int = Field(..., ge=0, le=120)
@app.add_method(name="create_user")
def create_user(params: CreateUserParams) -> dict:
return {"id": 1, **params.model_dump()}
技术栈
- Python 3.10+:利用现代 Python 特性
- Pydantic 2.x:数据验证和序列化
- asyncio:异步 I/O 支持
- Textual(可选):TUI 界面框架
完整示例
查看 example/ 目录了解完整示例,包括英雄管理系统服务器/客户端实现。
python -m example.client
许可证
MIT License
贡献
欢迎提交 Issue 和 Pull Request!
相关链接
作者: jianjian 邮箱: jianjian2048@gmail.com
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file okstdio-1.0.1.tar.gz.
File metadata
- Download URL: okstdio-1.0.1.tar.gz
- Upload date:
- Size: 38.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.10 {"installer":{"name":"uv","version":"0.9.10"},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
98bd82287a0e7d5525d32d4d6232621d608ed32559f5d9d39fccc41442cb37ae
|
|
| MD5 |
a026301176bb19ae8f1e419718ea1981
|
|
| BLAKE2b-256 |
f370e1eeadd84e4680bfd1f9b4684ba1b3e91e7fc5869e4d29f382e41b83a240
|
File details
Details for the file okstdio-1.0.1-py3-none-any.whl.
File metadata
- Download URL: okstdio-1.0.1-py3-none-any.whl
- Upload date:
- Size: 52.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.10 {"installer":{"name":"uv","version":"0.9.10"},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0a7ce936b113694ec085b66ff0a26ca3a992242a5c23df3d0794b82c41d2d020
|
|
| MD5 |
fb7c96dd957b77462b1f26544cb5d106
|
|
| BLAKE2b-256 |
1dfe4398dcfce1f107d0835a078ab42e1a12ffcf7330aaa007471a5caae1f163
|