Skip to main content

A RPC software based on ZeroMQ with built-in Majordomo.

Project description

使用 ZeroMQ 和 asyncio 开发的“gate-rpc”。

  • 使用msgpack序列化消息

  • 支持在线程池和进程池里运行普通函数

  • 当函数返回的是 Generator 或 AsyncGenerator 时,会换成流式传输,接收端通过遍历 StreamReply 实例即可获得

  • 当函数返回的值过大时,会转换成 HugeData 对象,压缩返回值并分块流式传输,默认块大小通过 Settings 的 HUGE_DATA_SIZEOF 来设置

  • 使用队列传输和记录日志,避免因为日志导致事件循环被阻塞

  • 使用可以异步设置键值和异步获取键值的 BoundedDict 来简化超时等待获取

  • 在每次实例化 Worker、Server、AMajordomo、Client 各类之前,通过修改 Settings 的属性来修改运行配置

安装

可以直接使用pip安装,或直接下载源码随意放置。

pip install gaterpc

配置

在实例化 Worker、Service、AMajordomo、Client 各类之前,需要运行 Settings.setup 函数来配置全局配置 [1] , 特殊返回值的序列化通过 MessagePack 的全局实例来定制 [2]

# 可能会修改的几个主要配置
Settings.MESSAGE_MAX: int = Worker 和 Client 实例里等待处理的消息最大数量
Settings.HUGE_DATA_SIZEOF: int = 每次传输的结果值的最大大小,超过该值的将会被压缩并分片传输
Settings.HUGE_DATA_COMPRESS_MODULE: str = 使用的压缩模块的名称 [#f1]_
Settings.SERVICE_DEFAULT_NAME: str = 默认的服务名,当在实例化 Service 时如果不提供 name 参数则会以这个为服务名
Settings.MDP_INTERNAL_SERVICE_PREFIX: bytes = MDP 内部服务的前缀
Settings.MDP_HEARTBEAT_INTERVAL: int = 服务端和客户端相对于中间代理的心跳间隔时间,默认1500毫秒
Settings.MDP_HEARTBEAT_LIVENESS: int = 判定掉线的丢失心跳次数,即当超过该次数*心跳时间没有收到心跳则认为已经掉线,默认3次
Settings.REPLY_TIMEOUT: float = 客户端调用远程方法时,等待回复的超时时间,应设置的远远大于心跳时间,默认是一分钟
Settings.STREAM_REPLY_MAXSIZE: int = 流式数据使用的缓存队列的最大长度(使用的 asyncio.Queue)
Settings.REPLY_TIMEOUT: float = 获取回复的超时时间,也是流式传输的每一个子回复的超时时间
Settings.ZAP_PLAIN_DEFAULT_USER: str = ZAP 的 PLAIN 机制的默认用户名
Settings.ZAP_PLAIN_DEFAULT_PASSWORD: str = ZAP 的 PLAIN 机制的默认密码
Settings.ZAP_ADDR: str = ZAP 服务绑定的地址
Settings.ZAP_REPLY_TIMEOUT: float = 等待 ZAP 服务的回复的超时时间
Settings.setup()

# 特殊返回值的序列化配置 [#f2]_
from gaterpc.utils.message_pack
message_pack.prepare_pack = 在使用 msgpack.packb 时,传递给 default 参数的可执行对象
message_pack.unpack_object_hook = 在使用 msgpack.unpackb 时,传递给 object_hook 的可执行对象
message_pack.unpack_object_pairs_hook = 在使用 msgpack.unpackb 时,传递给 object_pairs_hook 的可执行对象
message_pack.unpack_object_list_hook = 在使用 msgpack.unpackb 时,传递给 list_hook 的可执行对象

Footnotes

测试示范

实例化 ZAP 服务后,需要配置校验策略

zap = AsyncZAPService()
zap.configure_plain(
    Settings.ZAP_DEFAULT_DOMAIN,
    {
        Settings.ZAP_PLAIN_DEFAULT_USER: Settings.ZAP_PLAIN_DEFAULT_PASSWORD
    }
)
zap.start()

继承Worker类,用interface装饰希望被远程调用的方法,然后实例化一个Server来创建Worker的实例,这个worker实例的描述信息由server实例提供。

# Worker
class GRWorker(Worker):
    @interface
    async def atest(self, *args, **kwargs):
        loop = self._get_loop()
        return {
            "name": "async atest",
            "args": args,
            "kwargs": kwargs,
            "loop_time": loop.time()
        }

    @interface
    def test(self, *args, **kwargs):
        return {
            "name": "test",
            "args": args,
            "kwargs": kwargs,
            "loop_time": time()
        }

    @interface
    def test_generator(self, maximum: int):
        i = 0
        while i < maximum:
            yield i
            i += 1

    @interface
    async def test_agenerator(self, maximum: int):
        i = 0
        while i < maximum:
            await asyncio.sleep(0.1)
            yield i
            i += 1

Settings.setup()
gr = Service(name="SRkv")
gr_worker = gr.create_worker(
    GRWorker, "tcp://127.0.0.1:5555",
    zap_mechanism=Settings.ZAP_MECHANISM_PLAIN.decode("utf-8"),
    zap_credentials=(
        Settings.ZAP_PLAIN_DEFAULT_USER,
        Settings.ZAP_PLAIN_DEFAULT_PASSWORD
    )
)
gr_worker.run()

当要执行 IO 密集或 CPU 密集型操作时,可以在方法内使用执行器来执行,可以使用自带的两个执行器,也可以使用自定义的; 另,所有同步的函数都会使用默认执行器执行,默认执行器是 ThreadPoolExecutor 实例,可以修改。

@interface
async def test_io():
    result = await self.run_in_executor(self.thread_executor, func, *args, **kwargs)
    return result

@interface
async def test_cpu():
    # 如果需要和 CPU 密集型执行器里的方法交换数据,可以使用 utils.SyncManager 来创建代理对象使用。
    queue = SyncManager.Queue()
    result = await self.run_in_executor(self.process_executor, func, queue, *args, **kwargs)
    return result

实例化代理时会绑定两个地址,一个用于给后端服务连接上来,一个给前端客户端连接上来,bind 方法是绑定的给客户端访问的地址也就是前端地址。

# Majordomo
class GRMajordomo(AMajordomo):
# 可以新增内部处理程序,用于扩展分布式应用,所有内部处理程序只能接收kwargs关键词参数
# kwargs 的结构是固定的
# kwargs = {
#    "client_id": client_id,
#    "client_addr": client_addr,
#    "request_id": request_id,
#    "body": body
# }
    @interface
    def internal_x_process(**kwargs):
        return stat_code

    @interface
    async def internal_y_process(**kwargs):
        return stat_code

Settings.setup()
gr_majordomo = GRMajordomo(
    backend_addr="tcp://127.0.0.1:5555",
    zap_mechanism=Settings.ZAP_MECHANISM_PLAIN.decode("utf-8"),
    zap_addr=Settings.ZAP_ADDR
)
gr_majordomo.bind("tcp://127.0.0.1:777")
gr_majordomo.run()

客户端直接连接代理地址,使用点语法调用远程方法,一般格式是 client.服务名.方法名,当直接使用 client.方法名时,会使用默认服务名调用。

# Client
Settings.setup()
gr_cli = Client(
    broker_addr="tcp://127.0.0.1:777"
    zap_mechanism=Settings.ZAP_MECHANISM_PLAIN.decode("utf-8"),
    zap_credentials=(
        Settings.ZAP_PLAIN_DEFAULT_USER,
        Settings.ZAP_PLAIN_DEFAULT_PASSWORD
    )
)
await gr_cli.SRkv.test("a", "b", "c", time=time())
await gr_cli.SRkv.atest("a", "b", "c", time=time())
async for i in await gr_cli.SRkv.test_agenerator(10):
    print(i)

客户端调用的远程方法后,会创建一个延迟回调用来删掉缓存的已经执行完毕的请求,包括超时没拿到回复的请求, 而流式回复会每次回调时都检查一次该 StreamReply 实例是否已经结束,没结束就再创建一个延迟回调后续再检查

注意点

客户端和服务端对请求和回复的异步处理是使用的 utils.BoundedDict 异步字典来处理

# 请求远程方法
request_id = await request(service_name, body)
response = await requests.aget(request_id, timeout=reply_timeout)
# 接收回复
response = await socket.recv_multipart()
await requests.aset(request_id, response)

如果自定义方法的返回对象的大小无法使用 sys.getsizeof 准确获取,建议用 HugeData 包装后再返回

# data 必须要是 bytes 或 bytearray,简言之能用 memoryview 包装的
hd = HugeData(data=data, compress_module="gzip", compress_level=9)
# 或者不提供 data ,HugeData 初始化时会创建一个 Queue 的跨进程代理对象,往这个跨进程队列里传输数据即可
hd = HugeData(compress_module="gzip", compress_level=9)
d = process_data()
hd.data.put(d)

HugeData 的 compress 和 decompress 方法都会在进程池里执行增量压缩和增量解压缩, 返回的生成器每次获取的字节数大小不会超过 Settings.HUGE_DATA_SIZEOF , compress 方法对每一块返回的大小的限制是 HugeData 内部实现, decompress 方法对每一块返回的大小限制则是由压缩模块来实现,会在调用解压缩器实例的 decompress 方法时传递一个 max_length 位置参数。

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gaterpc-0.1.10.tar.gz (33.9 kB view hashes)

Uploaded Source

Built Distribution

gaterpc-0.1.10-py3-none-any.whl (33.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page