Skip to main content

No project description provided

Project description

VoidRail

VoidRail 的名称来自于古老的修仙界,是虚空传送阵的意思。

VoidRail 是一个基于 ZeroMQ 的轻量级微服务通信框架,采用 ROUTER-DEALER 模式实现服务发现、负载均衡和高可用性。它使用纯 JSON 作为数据交换格式,非常容易在分布式环境中部署和扩展。

VoidRail 的主要目标是实现 CPU/GPU 密集型计算的分布式部署,尤其是与主服务的 fastapi 搭配使用,因此推荐使用默认的 FIFO 模式。

安装

使用 pip 安装:

pip install voidrail

或使用 poetry 安装:

poetry add voidrail

核心组件

VoidRail 由三个主要组件构成:

  • ServiceRouter:中央路由模块,管理服务注册、请求分发和健康监控
  • ServiceDealer:服务实现模块,处理业务逻辑,支持同步/异步方法和流式响应
  • ClientDealer:客户端访问,负责发现服务并发送请求

使用 ROUTER-DEALER 这些名词是为了与 ZMQ 的概念保持一致,建议用户使用时了解一些 ZMQ 的基本知识,这对于重度使用的用户来说非常必要。但 ZMQ 也是一个相当成熟的底层消息队列框架,如果你没有遇到阻碍,确实可以完全忽略这些专业名词。

工作流程

ClientDealer --请求--> ServiceRouter --转发--> ServiceDealer
            <--响应-- ServiceRouter <--返回--

特性

  • 支持 FIFO 和负载均衡两种分发模式(前者适合CPU密集型计算,后者适合IO密集型计算)
  • 支持自动服务发现和注册(这有利于你运维时手动下线或上线 DEALER 服务)
  • 支持服务监控、健康检查和心跳机制
  • 支持同步/异步方法和流式响应
  • 使用 JSON 做数据交换(如果有必要,实际上你可以用其他语言来实现队列访问或提供服务)
  • 支持 API 密钥认证,提升服务安全性

快速入门

1. 创建 Router(核心交换服务)

from voidrail import ServiceRouter
from voidrail import RouterMode

# 创建并启动路由器
router = ServiceRouter(
    address="tcp://127.0.0.1:5555",  # 监听地址
    router_mode=RouterMode.FIFO,     # 或 RouterMode.LOAD_BALANCE
    heartbeat_timeout=5.0            # 心跳超时时间,这涉及判定对DELAER服务掉线,如果你的服务处理时间很长,最简单的办法是加大这个数值
)
await router.start()

2. 实现 Dealer(服务端)

from voidrail import ServiceDealer, service_method

class EchoService(ServiceDealer):
    # 这些方法不需要全部定义,而是根据需要选择一个即可
    # 但你可以定义定义多个服务方法
    #
    # 同步方法
    @service_method
    def echo(self, message: str) -> str:
        return message
        
    # 异步方法
    @service_method
    async def async_echo(self, message: str) -> str:
        await asyncio.sleep(0.1)
        return message
        
    # 流式响应
    @service_method
    async def stream_numbers(self, start: int, end: int):
        for i in range(start, end):
            yield i
            await asyncio.sleep(0.1)
            
# 创建并启动服务
service = EchoService(router_address="tcp://127.0.0.1:5555")
await service.start()

3. 使用 Client(客户端)

from voidrail import ClientDealer

# 创建客户端
client = ClientDealer(router_address="tcp://127.0.0.1:5555")

# 发现可用服务
available_methods = await client.discover_services()
print(f"可用方法: {available_methods}")

# 调用普通方法
result = await client.invoke("EchoService.echo", "Hello World")
print(f"结果: {result}")

# 使用流式响应
async for number in client.stream("EchoService.stream_numbers", 0, 5):
    print(f"收到数字: {number}")

认证

VoidRail 提供了 API 密钥认证机制,以提高服务的安全性。

1. 启用认证

在 Router 中启用认证:

from voidrail import ServiceRouter, ApiKeyManager

# 生成密钥
dealer_key = ApiKeyManager.generate_key(prefix="dealer")
client_key = ApiKeyManager.generate_key(prefix="client")

# 创建带认证的 Router
router = ServiceRouter(
    address="tcp://127.0.0.1:5555",
    require_auth=True,
    dealer_api_keys=[dealer_key],  # 服务端密钥
    client_api_keys=[client_key]   # 客户端密钥
)
await router.start()

你也可以自己手工设定密钥,服务端不会对密钥做格式检查,但生成的密钥格式可能更符合最佳实践。

2. 配置服务认证

# 创建带认证的服务
service = EchoService(
    router_address="tcp://127.0.0.1:5555",
    api_key=dealer_key  # 服务必须提供有效的 dealer_key
)
await service.start()

3. 配置客户端认证

# 创建带认证的客户端
client = ClientDealer(
    router_address="tcp://127.0.0.1:5555",
    api_key=client_key  # 客户端必须提供有效的 client_key
)
await client.connect()

4. 通过环境变量配置认证

也可以通过环境变量设置认证:

# Router 环境变量
export VOIDRAIL_REQUIRE_AUTH=true
export VOIDRAIL_DEALER_API_KEYS=dealer_key1,dealer_key2
export VOIDRAIL_CLIENT_API_KEYS=client_key1,client_key2

# 客户端或服务的环境变量
export VOIDRAIL_API_KEY=your_api_key

分布式部署

VoidRail 天然支持分布式部署,实现方式如下:

1. 部署 Router

Router 需要在一个固定的、所有服务和客户端都能访问到的地址:

# 监听所有网络接口
router = ServiceRouter(address="tcp://0.0.0.0:5555")
await router.start()

2. 部署多个 Dealer 服务

服务可以在不同机器上启动,只需连接到同一个 Router:

# 服务器 A
service_a = EchoService(router_address="tcp://router_ip:5555")
await service_a.start()

# 服务器 B
service_b = EchoService(router_address="tcp://router_ip:5555")
await service_b.start()

3. 客户端连接

客户端只需要知道 Router 的地址:

client = ClientDealer(router_address="tcp://router_ip:5555")
await client.connect()

服务监控

VoidRail 提供了内置的监控功能:

查看路由器信息

router_info = await client.get_router_info()
print(f"路由器模式: {router_info['mode']}")
print(f"活跃服务数: {router_info['active_services']}")

查看队列状态

queues = await client.get_queue_status()
for method, status in queues.items():
    print(f"方法 {method}: 队列长度 {status['queue_length']}, 空闲服务数 {status['available_services']}")

关键概念详解

ROUTER-DEALER 通信架构

  • ROUTER ZMQ 的核心概念之一,可以简单理解为路由服务总线,一般可以将其放在你的主服务中启动
  • DEALER ZMQ 的核心概念之一,可以简单理解为需要你自定义的服务处理端,在分布式架构中可以根据CPU核心情况独立启动
  • CLIENT 在 VoidRail 框架中 CLIENT 是一种特殊的 DEALER,一般也是在主服务中启动

实际上该框架使用了典型的 ROUTER-DEALER 通信架构来完成跨服异步双向通信。

该架构需要启动一个 ROUTER 端作为路由中心,由至少一个 DEALER 端负责处理,然后就可以由 CLIENT 端调用,组成完整服务了。

服务派发模式

路由策略,也就是服务派发策略,由 router_mode 参数决定。

  • FIFO 模式:保证同一个方法的请求按顺序处理,适合需要严格顺序的应用
  • 负载均衡模式:基于服务当前负载分配请求,适合追求最高吞吐量的应用

服务方法类型

你至少应该对 Python 的异步行为有一定了解,不过实现时你也可以将服务方法定义为同步的。

  • 同步方法:直接返回结果,适合简单计算
  • 异步方法:使用 async/await,适合 I/O 密集型操作
  • 流式响应:通过 yield 产生多个结果,适合大数据传输或实时数据流

健康检查

  • 服务通过定期心跳保持活跃状态
  • Router 自动检测失效服务并停止向其转发请求
  • 服务实例自动尝试重新连接 Router

API 参考

ServiceRouter

ServiceRouter(
    address: str,                          # 监听地址
    heartbeat_timeout: float = 30.0,       # 心跳超时(秒)
    router_mode: RouterMode = RouterMode.FIFO,  # 路由模式
    require_auth: bool = None,           # 是否要求认证
    dealer_api_keys: List[str] = None,   # 允许的 DEALER 端 API 密钥列表
    client_api_keys: List[str] = None,   # 允许的 CLIENT 端 API 密钥列表
)

主要方法:

  • start(): 启动路由器
  • stop(): 停止路由器

ServiceDealer

ServiceDealer(
    router_address: str,                   # 路由器地址
    context: Optional[zmq.Context] = None, # ZMQ 上下文
    max_concurrent: int = 100,             # 最大并发请求数
    heartbeat_interval: float = 0.5,       # 心跳间隔
    service_name: str = None,              # 服务名称
)

主要方法:

  • start(): 启动服务
  • stop(): 停止服务
  • @service_method 装饰器:标记服务方法

ClientDealer

ClientDealer(
    router_address: str,                   # 路由器地址
    context: Optional[zmq.Context] = None, # ZMQ 上下文
    timeout: Optional[float] = None,       # 请求超时
)

主要方法:

  • connect(): 连接到 Router
  • close(): 关闭连接
  • discover_services(): 发现可用服务
  • invoke(method, *args, **kwargs): 调用方法并返回结果
  • stream(method, *args, **kwargs): 流式调用方法
  • get_router_info(): 获取路由器信息
  • get_queue_status(): 获取队列状态

命令行工具

VoidRail提供了便捷的命令行工具,无需编写代码即可启动和管理组件。

启动Router服务

# 基本用法
voidrail router --host 0.0.0.0 --port 5555

# 启用认证
voidrail router --require-auth --dealer-keys dealer_key1 --client-keys client_key1

# 生成API密钥
voidrail router --generate-keys

使用客户端工具

# 列出所有可用服务
voidrail client --list

# 查看路由器信息
voidrail client --router-info

# 查看队列状态
voidrail client --queue-status

# 传递参数的不同方式:

# 1. 使用字符串参数(注意引号嵌套)
voidrail client --call EchoService.hello --args '"Hello World"'

# 2. 使用数字参数
voidrail client --call MathService.square --args '42'

# 3. 使用位置参数列表
voidrail client --call MathService.add --args '[5, 3]'

# 4. 使用关键字参数(最灵活的方式)
voidrail client --call EchoService.greet --args '{"name":"John", "title":"Dr."}'

# 5. 使用复杂的嵌套结构
voidrail client --call DataService.process --args '{"config":{"max_items":100}, "filters":["active", "recent"]}'

# 使用API密钥认证
voidrail client --api-key your_client_key --list

常见参数传递问题和解决方法

  1. 如果使用单引号包裹JSON(推荐):

    voidrail client --call EchoService.hello --args '{"name":"John"}'
    
  2. 如果使用双引号包裹JSON,需要转义内部引号:

    voidrail client --call EchoService.hello --args "{\"name\":\"John\"}"
    
  3. 传递单个字符串时,需要额外的引号层:

    voidrail client --call EchoService.hello --args '"John"'  # 正确
    voidrail client --call EchoService.hello --args 'John'    # 错误
    

启动Dealer服务

假设您有一个自定义服务类在 myapp/services.py 文件中:

from voidrail import ServiceDealer, service_method

class MyService(ServiceDealer):
    @service_method
    async def hello(self, name: str) -> str:
        return f"Hello, {name}!"

启动单个服务实例

# 基本用法
voidrail dealer --module myapp.services --class MyService

# 指定连接参数
voidrail dealer --host 192.168.1.100 --port 5555 --module myapp.services --class MyService

# 使用API密钥认证
voidrail dealer --api-key your_dealer_key --module myapp.services --class MyService

启动多进程服务实例(优化CPU利用率)

# 为一个服务类启动多个实例(每个实例在独立进程中运行)
voidrail dealer --module myapp.services --class MyService --instances 4

# 启动多个不同服务类(每个类在独立进程中运行)
voidrail dealer --module myapp.services --class MyService --class OtherService

# 启动多个不同服务类,每个类多个实例
voidrail dealer --module myapp.services --class MyService --class OtherService --instances 2

# 结合其他参数
voidrail dealer --host 192.168.1.100 --port 5555 --module myapp.services \
  --class MyService --class OtherService --instances 4 \
  --max-concurrent 50 --api-key your_dealer_key

多进程启动功能允许您:

  • 充分利用多核CPU资源
  • 在单台机器上轻松管理多个服务实例
  • 实现更高的服务吞吐量

最佳实践

  1. 错误处理:主动在服务方法中捕获异常,否则处理错误和异常信息会作为结果发送到客户端,这可能不够优雅
  2. 资源管理:使用异步上下文管理器(async with)自动处理连接生命周期
  3. 监控:在生产环境中定期检查队列状态和服务健康
  4. 超时设置:为客户端请求设置合适的超时时间,尤其是CPU密集型服务
  5. 消息大小:避免在单个请求中传递过大的数据

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

voidrail-0.1.5.tar.gz (30.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

voidrail-0.1.5-py3-none-any.whl (31.5 kB view details)

Uploaded Python 3

File details

Details for the file voidrail-0.1.5.tar.gz.

File metadata

  • Download URL: voidrail-0.1.5.tar.gz
  • Upload date:
  • Size: 30.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.0.0 CPython/3.11.7 Darwin/24.3.0

File hashes

Hashes for voidrail-0.1.5.tar.gz
Algorithm Hash digest
SHA256 8d8a0c3390b43f5e159ecdfed3416bbed6deb42bfa8fb91c8c264e3cc2f7088c
MD5 0c5256bead1c89f1cf6bdc481df88923
BLAKE2b-256 1d2b45bf0d77e92761d8fd12ad2468e363c7fb23f09dec001981510ae3a7354b

See more details on using hashes here.

File details

Details for the file voidrail-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: voidrail-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 31.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.0.0 CPython/3.11.7 Darwin/24.3.0

File hashes

Hashes for voidrail-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 5883ba86e8e718a0b3c1aa44de76013f402106762a1df867e3452b7170df929e
MD5 ef3b1081544d3814dbbc693b2465ed3a
BLAKE2b-256 f87a0956aaf47745d8f1ae7d00dc5554d2221d897667ec7cbf9ab8fdea4ec796

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page