Aduib RPC - A Python library for Aduib AI RPC framework.
Project description
Aduib RPC
项目简介
Aduib RPC 是一个基于 Python 的远程过程调用(RPC)框架,内置 gRPC / JSON-RPC / REST / Thrift 多协议适配层,并提供统一的请求/响应封装(AduibRpcRequest / AduibRpcResponse)。
它适合做“服务化的 AI 能力”对外暴露:你可以用统一的服务注册/发现、负载均衡、重试、鉴权中间件,把不同协议的调用收敛到同一套 Service / RequestExecutor 模型中。
核心特性(Feature Summary)
1) 多协议与多传输层
- gRPC:Unary + Stream
- REST:HTTP POST + SSE Streaming
- JSON-RPC 2.0:HTTP JSON-RPC + SSE Streaming
- Thrift:Unary(stream 暂不支持)
2) 统一的协议无关消息模型
AduibRpcRequest(method, data, meta, id)AduibRpcResponse(result, error, status, id)- 统一错误结构:
{code, message, data}
3) 服务定义与调用(Server/Client 双端)
- 服务端:
@service(service_name=...)注册方法 - 客户端:
@client(app_name)生成强类型 service stub(同步/异步方法均可)
4) Server 执行器(RequestExecutor)
- 支持通过
@request_execution(method=...)绑定自定义执行器 - 支持 streaming 场景(
context.stream)
5) 服务发现 / 注册
- 内置 registry 抽象与工厂
- 支持 In-Memory / Nacos(以 extras 形式提供)
6) 负载均衡、重试与超时
- Client 侧提供负载均衡策略与连接池复用
- 支持重试(需配合
meta的 retry 参数 / idempotent 标记) - 支持超时(
timeout_ms/timeout_s或 transport/config 默认值)
7) 鉴权与中间件
- ClientRequestInterceptor / ServerInterceptor
- 可用于:鉴权、审计、注入 trace / headers / 路由信息等
8) 长时间任务(Long Running Task)+ 完成通知(新增)
适用于耗时较长的 AI 推理/批处理场景:提交任务立即返回
task_id,后台执行; 客户端可轮询查询结果,或通过流式订阅在完成时收到通知。
task/submit:提交长任务,立即返回task_idtask/status:查询任务状态task/result:查询任务结果(成功时返回value)task/subscribe(stream):订阅任务事件,完成后自动结束
目录结构
注:目录名以
src/aduib_rpc为准。
aduib_rpc/
├── src/aduib_rpc/
│ ├── client/ # 客户端实现(transport/middleware/pool/retry 等)
│ │ ├── auth/ # 认证相关
│ │ └── transports/ # 传输层实现(grpc/jsonrpc/rest)
│ ├── discover/ # 服务发现/注册(registry/load_balance 等)
│ ├── grpc/ # gRPC 生成代码
│ ├── proto/ # proto / thrift 定义
│ ├── rpc/ # 方法名解析等
│ ├── server/ # 服务端实现
│ │ ├── protocols/ # 协议服务器实现(rest/jsonrpc/...)
│ │ ├── request_handlers/ # 请求处理(DefaultRequestHandler 等)
│ │ ├── rpc_execution/ # service_call / request_executor / context
│ │ └── tasks/ # 长任务管理(task manager)
│ ├── thrift/ # thrift 生成代码
│ ├── types.py # 核心类型(Request/Response/Error/JSONRPC types)
│ └── utils/ # 工具函数
├── scripts/ # 辅助脚本(protos 编译等)
└── tests/ # 测试用例
安装
- pip:
pip install aduib_rpc aduib_rpc[nacos]
- uv(推荐):
uv add aduib_rpc aduib_rpc[nacos]
使用示例
客户端示例
注意:
client.completion(...)返回的是 AsyncIterator;即便非 streaming,也会 yield 一次。
import asyncio
import logging
import os
import grpc
from pydantic import BaseModel
from aduib_rpc.client.auth import InMemoryCredentialsProvider
from aduib_rpc.client.auth.interceptor import AuthInterceptor
from aduib_rpc.client.base_client import ClientConfig, AduibRpcClient
from aduib_rpc.client.client_factory import AduibRpcClientFactory
from aduib_rpc.discover.registry.nacos.nacos import NacosServiceRegistry
from aduib_rpc.discover.registry.registry_factory import ServiceRegistryFactory
from aduib_rpc.server.rpc_execution.service_call import client, FuncCallContext
from aduib_rpc.utils.constant import TransportSchemes
logging.basicConfig(level=logging.DEBUG)
# SECURITY NOTE:
# Never hardcode real service addresses, namespaces, usernames, or passwords in code or README files.
# Use environment variables or a secrets manager instead.
async def main():
registry = NacosServiceRegistry(
server_addresses=os.getenv('NACOS_SERVER_ADDRESSES', '127.0.0.1:8848'),
namespace=os.getenv('NACOS_NAMESPACE', 'your-namespace'),
group_name=os.getenv('NACOS_GROUP_NAME', 'DEFAULT_GROUP'),
username=os.getenv('NACOS_USERNAME', 'nacos'),
password=os.getenv('NACOS_PASSWORD', 'nacos'),
)
service_name = 'test_grpc_app'
discover_service = await registry.discover_service(service_name)
logging.debug(f'Service: {discover_service}')
logging.debug(f'Service URL: {discover_service.url}')
def create_channel(url):
logging.debug(f'Channel URL: {url}')
# For production, prefer TLS (secure_channel) instead of insecure_channel.
return grpc.aio.insecure_channel(url)
client_factory = AduibRpcClientFactory(
config=ClientConfig(
streaming=True,
grpc_channel_factory=create_channel,
supported_transports=[TransportSchemes.GRPC],
)
)
aduib_rpc_client: AduibRpcClient = client_factory.create(
discover_service.url,
server_preferred=TransportSchemes.GRPC,
interceptors=[AuthInterceptor(credentialProvider=InMemoryCredentialsProvider())],
)
resp = aduib_rpc_client.completion(
method="chat.completions",
data={"model": "gpt-3.5-turbo", "messages": [{"role": "user", "content": "Hello!"}]},
meta={
"model": "gpt-3.5-turbo",
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) ...",
} | discover_service.get_service_info(),
)
async for r in resp:
logging.debug(f'Response: {r}')
class test_add(BaseModel):
x: int = 1
y: int = 2
@client("CaculServiceApp")
class CaculService:
def add(self, x, y):
"""同步加法"""
...
def add2(self, data: test_add):
"""同步加法"""
...
async def async_mul(self, x, y):
"""异步乘法"""
...
def fail(self, x):
"""会失败的函数"""
...
async def client_call():
registry_config = {
"server_addresses": os.getenv('NACOS_SERVER_ADDRESSES', '127.0.0.1:8848'),
"namespace": os.getenv('NACOS_NAMESPACE', 'your-namespace'),
"group_name": os.getenv('NACOS_GROUP_NAME', 'DEFAULT_GROUP'),
"username": os.getenv('NACOS_USERNAME', 'nacos'),
"password": os.getenv('NACOS_PASSWORD', 'nacos'),
"max_retry": 3,
"DISCOVERY_SERVICE_ENABLED": True,
"DISCOVERY_SERVICE_TYPE": "nacos",
}
ServiceRegistryFactory.start_service_discovery(registry_config)
FuncCallContext.enable_auth()
caculService = CaculService()
result = caculService.add(1, 2)
logging.debug(f'1 + 2 = {result}')
result = caculService.add2(test_add(x=3, y=4))
logging.debug(f'3 + 4 = {result}')
result = await caculService.async_mul(3, 5)
logging.debug(f'3 * 5 = {result}')
if __name__ == '__main__':
asyncio.run(client_call())
服务端示例
import asyncio
import logging
import os
from typing import Any
from pydantic import BaseModel
from aduib_rpc.discover.registry.registry_factory import ServiceRegistryFactory
from aduib_rpc.discover.service import AduibServiceFactory
from aduib_rpc.server.rpc_execution import RequestExecutor, RequestContext
from aduib_rpc.server.rpc_execution.request_executor import request_execution
from aduib_rpc.server.rpc_execution.service_call import service
logging.basicConfig(level=logging.DEBUG)
@request_execution(method="chat.completions")
class TestRequestExecutor(RequestExecutor):
def execute(self, context: RequestContext) -> Any:
print(f"Received prompt: {context}")
# 这里的返回值可以是任意可序列化对象(dict / pydantic model / 基础类型等)。
response = {
"id": "chatcmpl-123",
"object": "chat.completion",
"created": 1677652288,
"model": "gpt-3.5-turbo-0301",
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": "Hello! How can I assist you today?"},
"finish_reason": "stop",
}
],
"usage": {"prompt_tokens": 9, "completion_tokens": 12, "total_tokens": 21},
}
if context.stream:
async def stream_response():
for _ in range(1, 4):
yield response
return stream_response()
else:
return response
class test_add(BaseModel):
x: int = 1
y: int = 2
@service(service_name='CaculService')
class CaculService:
def add(self, x, y):
return x + y
def add2(self, data: test_add):
return data.x + data.y
async def async_mul(self, x, y):
await asyncio.sleep(0.1)
return x * y
def fail(self, x):
raise RuntimeError("Oops!")
async def main():
registry_config = {
"server_addresses": os.getenv('NACOS_SERVER_ADDRESSES', '127.0.0.1:8848'),
"namespace": os.getenv('NACOS_NAMESPACE', 'your-namespace'),
"group_name": os.getenv('NACOS_GROUP_NAME', 'DEFAULT_GROUP'),
"username": os.getenv('NACOS_USERNAME', 'nacos'),
"password": os.getenv('NACOS_PASSWORD', 'nacos'),
"max_retry": 3,
"DISCOVERY_SERVICE_ENABLED": True,
"DISCOVERY_SERVICE_TYPE": "nacos",
"APP_NAME": "CaculServiceApp",
}
service_instance = await ServiceRegistryFactory.start_service_registry(registry_config)
factory = AduibServiceFactory(service_instance=service_instance)
await factory.run_server()
if __name__ == '__main__':
asyncio.run(main())
长时间任务(Long Running Task)与通知
API(method)一览
task/submit:提交任务(返回 task_id)task/status:查询状态task/result:查询结果task/subscribe:订阅任务事件(stream,完成后会结束)
示例:提交 + 轮询 + 订阅
说明:下面示例假设你已经按上面的方式拿到了
aduib_rpc_client(任意 transport 均可)。
import asyncio
async def run_long_task(aduib_rpc_client):
# 1) 提交一个后台执行的 RPC 调用
submit_req = {
"target_method": "CaculService.async_mul",
"params": {"x": 3, "y": 5},
"options": {"ttl_seconds": 3600},
}
task_id = None
async for r in aduib_rpc_client.completion(method="task/submit", data=submit_req):
task_id = r.result["task_id"]
assert task_id is not None
# 2) 轮询结果(所有 transport 都能用)
while True:
async for r in aduib_rpc_client.completion(method="task/result", data={"task_id": task_id}):
status = r.result["status"]
if status == "succeeded":
print("value=", r.result["value"])
return
if status == "failed":
print("error=", r.result["error"])
return
await asyncio.sleep(0.2)
async def subscribe_until_done(aduib_rpc_client, task_id: str):
# 3) 订阅通知(stream)
# - REST/JSON-RPC:通过 SSE
# - gRPC:通过 stream_completion
async for r in aduib_rpc_client.completion(method="task/subscribe", data={"task_id": task_id}):
print(r.result)
提示:Thrift 目前没有 stream,所以建议使用轮询(task/result)。
开发
- 克隆仓库:
git clone https://github.com/chaorenex1/aduib_rpc.git
cd aduib_rpc
- 安装开发依赖:
uv sync --all-extras --dev
- 运行测试:
pytest tests/
- 编译 proto 文件(如需更新):
python scripts/compile_protos.py
协议支持
- gRPC (Protocol Buffers)
- JSON-RPC 2.0
- REST API (+ SSE streaming)
- Thrift
许可证
Apache License 2.0
使用示例
Rust SDK(新增)
仓库内提供一个 Rust SDK(预览版),位于 rust-sdk/:
- 文档:
rust-sdk/README.md - crate:
rust-sdk/crates/aduib-rpc
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aduib_rpc-1.0.14.tar.gz.
File metadata
- Download URL: aduib_rpc-1.0.14.tar.gz
- Upload date:
- Size: 167.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.21 {"installer":{"name":"uv","version":"0.9.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fadc40611703e2ea921c415be919290a0df5786c0fba7e009e607fdd359481f4
|
|
| MD5 |
d733c4d08f7fb197f4a38515c036b14a
|
|
| BLAKE2b-256 |
d7ceaed5163d1ada76e016b4334e2d7d30c63f22ef1e7cfef450ae9718921eca
|
File details
Details for the file aduib_rpc-1.0.14-py3-none-any.whl.
File metadata
- Download URL: aduib_rpc-1.0.14-py3-none-any.whl
- Upload date:
- Size: 100.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.21 {"installer":{"name":"uv","version":"0.9.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fc254bb7180be4754530fb6b86a271673cb5314cf96878af351e3f30fc04de32
|
|
| MD5 |
cff6c7871b1f2588bee9ec7e329f3731
|
|
| BLAKE2b-256 |
88c298a4fda6f907962a93e140b87e1bfe2e61c1de4dca9cfeb7835e69c4198e
|