Skip to main content

Aduib RPC - A Python library for Aduib AI RPC framework.

Project description

Aduib RPC

项目简介

Aduib RPC 是一个基于 Python 的远程过程调用(RPC)框架,支持 gRPC、JSON-RPC 和 REST 协议。该框架提供了客户端和服务端的完整实现,支持服务发现、负载均衡和认证等功能,特别适用于 AI 服务集成场景。

核心功能

  • 多协议支持:支持 gRPC、JSON-RPC 和 REST API
  • 服务发现:集成服务注册与发现机制
  • 负载均衡:支持多种负载均衡策略
  • 认证机制:提供客户端认证拦截器
  • 中间件支持:可扩展的中间件架构
  • 错误处理:统一的错误处理机制

目录结构

aduib_rpc/
├── src/aduib_rpc/
│   ├── client/            # 客户端实现
│   │   ├── auth/          # 认证相关
│   │   └── transports/    # 传输层实现
│   ├── discover/          # 服务发现
│   │   ├── entities/      # 实体定义
│   │   ├── load_balance/  # 负载均衡
│   │   ├── registry/      # 服务注册
│   │   └── service/       # 服务工厂
│   ├── grpc/              # gRPC proto 相关
│   ├── proto/             # 协议定义文件
│   ├── server/            # 服务端实现
│   │   ├── protocols/     # 协议实现
│   │   └── rpc_execution/ # rpc 执行
│   │   └── request_handler/ # 请求处理
│   ├── thrift/            # Thrift 相关
│   ├── types/             # 数据类型定义
│   └── utils/             # 工具函数
├── scripts/               # 辅助脚本
└── tests/                 # 测试用例

使用方法

  • 安装依赖:

     pip install aduib_rpc aduib_rpc[nacos]
    
  • 或者使用 uv 安装(推荐):

    uv add aduib_rpc aduib_rpc[nacos]
    

使用示例

客户端示例

import asyncio
import logging

import grpc
from pydantic import BaseModel

from aduib_rpc.client.auth import InMemoryCredentialsProvider
from aduib_rpc.client.auth.interceptor import AuthInterceptor
from aduib_rpc.client.base_client import ClientConfig, AduibRpcClient
from aduib_rpc.client.client_factory import AduibRpcClientFactory
from aduib_rpc.discover.registry.nacos.nacos import NacosServiceRegistry
from aduib_rpc.discover.registry.registry_factory import ServiceRegistryFactory
from aduib_rpc.server.rpc_execution.service_call import client, FuncCallContext
from aduib_rpc.utils.constant import TransportSchemes

logging.basicConfig(level=logging.DEBUG)

async def main():
    registry = NacosServiceRegistry(server_addresses='10.0.0.96:8848',
                                         namespace='eeb6433f-d68c-4b3b-a4a7-eeff19110e4d', group_name='DEFAULT_GROUP',
                                         username='nacos', password='nacos11.')
    service_name = 'test_grpc_app'
    discover_service = await registry.discover_service(service_name)
    logging.debug(f'Service: {discover_service}')
    logging.debug(f'Service URL: {discover_service.url}')
    def create_channel(url: str) -> grpc.aio.Channel:
        logging.debug(f'Channel URL: {url}')
        return grpc.aio.insecure_channel(url)

    client_factory = AduibRpcClientFactory(
        config=ClientConfig(streaming=True,grpc_channel_factory=create_channel, supported_transports=[TransportSchemes.GRPC]))
    aduib_rpc_client:AduibRpcClient = client_factory.create(discover_service.url, server_preferred=TransportSchemes.GRPC,interceptors=[AuthInterceptor(credentialProvider=InMemoryCredentialsProvider())])
    resp = aduib_rpc_client.completion(method="chat.completions",
                                       data={"model": "gpt-3.5-turbo", "messages": [{"role": "user", "content": "Hello!"}]},
                                       meta={"model": "gpt-3.5-turbo",
                                            "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36 Edg/139.0.0.0"} | discover_service.get_service_info())
    async for r in resp:
        logging.debug(f'Response: {r}')


class test_add(BaseModel):
    x: int = 1
    y: int = 2

@client("CaculServiceApp")
class CaculService:
    def add(self, x, y):
        """同步加法"""
        ...

    def add2(self, data:test_add):
        """同步加法"""
        ...

    async def async_mul(self, x, y):
        """异步乘法"""
        ...

    def fail(self, x):
        """会失败的函数"""
        ...

async def client_call():
    registry_config = {
        "server_addresses": "10.0.0.96:8848",
        "namespace": "eeb6433f-d68c-4b3b-a4a7-eeff19110e4d",
        "group_name": "DEFAULT_GROUP",
        "username": "nacos",
        "password": "nacos11.",
        "max_retry": 3,
        "DISCOVERY_SERVICE_ENABLED": True,
        "DISCOVERY_SERVICE_TYPE": "nacos"
    }
    ServiceRegistryFactory.start_service_discovery(registry_config)
    FuncCallContext.enable_auth()
    caculService = CaculService()
    result = caculService.add(1, 2)
    logging.debug(f'1 + 2 = {result}')
    result = caculService.add2(test_add(x=3, y=4))
    logging.debug(f'3 + 4 = {result}')
    result = await caculService.async_mul(3, 5)
    logging.debug(f'3 * 5 = {result}')
    # client_caller = ClientCaller.from_client_caller("caculService")
    # res1 = await client_caller.call("add", 1, 2)
    # res3 = await client_caller.call("add2", test_add())
    # res2 = await client_caller.call("async_mul", 3, 4)
    # res4 = await client_caller.call("fail", 123)

    # print("add:", res1)
    # print("add2:", res3)
    # print("async_mul:", res2)
    # print("fail:", res4)


if __name__ == '__main__':
    asyncio.run(client_call())

服务端示例

import asyncio
import logging
from typing import Any

from pydantic import BaseModel

from aduib_rpc.discover.registry.registry_factory import ServiceRegistryFactory
from aduib_rpc.discover.service import AduibServiceFactory
from aduib_rpc.server.rpc_execution import RequestExecutor, RequestContext
from aduib_rpc.server.rpc_execution.request_executor import request_execution
from aduib_rpc.server.rpc_execution.service_call import service
from aduib_rpc.types import ChatCompletionResponse

logging.basicConfig(level=logging.DEBUG)

@request_execution(method="chat.completions")
class TestRequestExecutor(RequestExecutor):
    def execute(self, context: RequestContext) -> Any:
        print(f"Received prompt: {context}")
        response = ChatCompletionResponse(id="chatcmpl-123", object="chat.completion", created=1677652288,
                                              model="gpt-3.5-turbo-0301", choices=[
                    {"index": 0, "message": {"role": "assistant", "content": "Hello! How can I assist you today?"},
                     "finish_reason": "stop"}], usage={"prompt_tokens": 9, "completion_tokens": 12, "total_tokens": 21})
        if context.stream:
            async def stream_response():
                for i in range(1, 4):
                    chunk = response
                    yield chunk
            return stream_response()
        else:
            return response

class test_add(BaseModel):
    x: int = 1
    y: int = 2

@service(service_name='CaculService')
class CaculService:
    def add(self, x, y):
        """同步加法"""
        return x + y

    def add2(self, data:test_add):
        """同步加法"""
        return data.x + data.y

    async def async_mul(self, x, y):
        """异步乘法"""
        await asyncio.sleep(0.1)
        return x * y

    def fail(self, x):
        """会失败的函数"""
        raise RuntimeError("Oops!")

async def main():
    registry_config = {
        "server_addresses": "10.0.0.96:8848",
        "namespace": "eeb6433f-d68c-4b3b-a4a7-eeff19110e4d",
        "group_name": "DEFAULT_GROUP",
        "username": "nacos",
        "password": "nacos11.",
        "max_retry": 3,
        "DISCOVERY_SERVICE_ENABLED": True,
        "DISCOVERY_SERVICE_TYPE": "nacos",
        "APP_NAME": "CaculServiceApp"
    }
    service = await ServiceRegistryFactory.start_service_registry(registry_config)
    # ip,port = NetUtils.get_ip_and_free_port()
    # service = ServiceInstance(service_name='test_grpc', host=ip, port=port,
    #                                protocol=AIProtocols.AduibRpc, weight=1, scheme=TransportSchemes.GRPC)
    # registry = NacosServiceRegistry(server_addresses='10.0.0.96:8848',
    #                                      namespace='eeb6433f-d68c-4b3b-a4a7-eeff19110e4d', group_name='DEFAULT_GROUP',
    #                                      username='nacos', password='nacos11.')
    factory = AduibServiceFactory(service_instance=service)
    # await registry.register_service(service)
    await factory.run_server()

if __name__ == '__main__':
    asyncio.run(main())

开发

  1. 克隆仓库:

    git clone https://github.com/chaorenex1/aduib_rpc.git
    cd aduib_rpc
    
  2. 安装开发依赖:

    uv sync  --all-extras --dev
    
  3. 运行测试:

    pytest tests/
    
  4. 编译 proto 文件(如需更新):

    python scripts/compile_protos.py
    

协议支持

框架支持以下协议与数据格式:

  • gRPC (Protocol Buffers)
  • JSON-RPC
  • REST API

许可证

Apache License 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aduib_rpc-1.0.9.tar.gz (133.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aduib_rpc-1.0.9-py3-none-any.whl (85.8 kB view details)

Uploaded Python 3

File details

Details for the file aduib_rpc-1.0.9.tar.gz.

File metadata

  • Download URL: aduib_rpc-1.0.9.tar.gz
  • Upload date:
  • Size: 133.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.17

File hashes

Hashes for aduib_rpc-1.0.9.tar.gz
Algorithm Hash digest
SHA256 2ce54717baffe201421bbb8775eb7256648c5865a5ab17cbdfd390c4ce244ee0
MD5 1915a57419e61d830db460bcbe72a9c3
BLAKE2b-256 cdbf1d3ea8f0b6c00dfd28032e133f51389b9eef65058f8bd7099b115f3cd43e

See more details on using hashes here.

File details

Details for the file aduib_rpc-1.0.9-py3-none-any.whl.

File metadata

  • Download URL: aduib_rpc-1.0.9-py3-none-any.whl
  • Upload date:
  • Size: 85.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.17

File hashes

Hashes for aduib_rpc-1.0.9-py3-none-any.whl
Algorithm Hash digest
SHA256 7da14464250e1c3c1ed2eafb1948775ccf61095ec4006ff5dbcbbb9d9d670370
MD5 0ad05f5b842e9aa5c0f9e86627376496
BLAKE2b-256 03c0bd2ad36c5f1b6eb5e207e7fdf51a1d0ed63e965c4840790b67762574459c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page