A standalone nameko rpc proxy for asyncio and a wrapper for using nameko rpc proxy with Sanic.
Project description
aio-nameko-proxy
A standalone nameko rpc proxy for asyncio and a wrapper for using nameko rpc proxy with Sanic.
This project is based on aio-pika and reference the source code of official nameko project and aio-pika.
examples:
standalone AIOClusterRpcProxy
If you want most of your messages to be persistent(default). Set the delivery mode parameter as DeliveryMode.PERSISTENT, Call sw_dlm_call when you need to send a non-persistent message.
import asyncio
from aio_nameko_proxy import AIOClusterRpcProxy
from aio_pika import DeliveryMode
config = {
"AMQP_URI": "pyamqp://guest:guest@127.0.0.1:5672",
"rpc_exchange": "nameko-rpc",
"time_out": 30,
"con_time_out": 5,
"delivery_mode": DeliveryMode.PERSISTENT
}
async def run():
async with AIOClusterRpcProxy(config) as rpc:
# time_out: the time_out of waitting the remote method result.
# con_time_out: the time_out of connecting to the rabbitmq server or binding the queue, consume and so on.
# persistent msg call
result = await rpc.rpc_demo_service.normal_rpc("demo")
reply_obj = await rpc.rpc_demo_service.normal_rpc.call_async("demo")
result = await reply_obj.result()
# non-persistent msg call
result = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call("demo")
reply_obj = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call_async("demo")
result = await reply_obj.result()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
If you want most of your messages to be non-persistent(persistent is default). Set the delivery mode parameter as DeliveryMode.NOT_PERSISTENT, Call sw_dlm_call when you need to send a persistent message.
import asyncio
from aio_nameko_proxy import AIOClusterRpcProxy
from aio_pika import DeliveryMode
config = {
"AMQP_URI": "pyamqp://guest:guest@127.0.0.1:5672",
"rpc_exchange": "nameko-rpc",
"time_out": 30,
"con_time_out": 5,
"delivery_mode": DeliveryMode.NOT_PERSISTENT
}
async def run():
async with AIOClusterRpcProxy(config) as rpc:
# non-persistent msg call
result = await rpc.rpc_demo_service.normal_rpc("demo")
reply_obj = await rpc.rpc_demo_service.normal_rpc.call_async("demo")
result = await reply_obj.result()
# persistent msg call
result = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call("demo")
reply_obj = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call_async("demo")
result = await reply_obj.result()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
AIOPooledClusterRpcProxy
import asyncio
from aio_nameko_proxy import AIOPooledClusterRpcProxy
from aio_pika import DeliveryMode
config = {
"AMQP_URI": "pyamqp://guest:guest@127.0.0.1:5672",
"rpc_exchange": "nameko-rpc",
"time_out": 30,
"con_time_out": 5,
"pool_size": 10,
"initial_size": 2,
"delivery_mode": DeliveryMode.NOT_PERSISTENT
}
async def run():
async with AIOPooledClusterRpcProxy(config) as proxy_pool:
async with proxy_pool.acquire() as rpc:
result = await rpc.rpc_demo_service.normal_rpc("demo")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
Sanic Wrapper
from sanic import Sanic
from sanic.response import json
from aio_pika import DeliveryMode
from aio_nameko_proxy.wrappers import SanicNamekoClusterRpcProxy
class Config(object):
# AMQP_URI
NAMEKO_AMQP_URI = "pyamqp://guest:guest@127.0.0.1:5672"
# rpc_exchange
NAMEKO_RPC_EXCHANGE = "nameko-rpc"
# pool_size
NAMEKO_POOL_SIZE = 60
# initial_size
NAMEKO_INITIAL_SIZE = 60
# time_out
NAMEKO_TIME_OUT = 30
# con_time_out
NAMEKO_CON_TIME_OUT = 5
# delivery_mode
NAMEKO_DELIVERY_MODE = DeliveryMode.PERSISTENT
# other supported properties of aio-pika.Message, the key name format is "NAMEKO_{}".format(property_name.upper())
# ...
app = Sanic("App Name")
app.config.from_object(Config)
# rpc_cluster = SanicNamekoClusterRpcProxy(app)
# or
rpc_cluster = SanicNamekoClusterRpcProxy()
rpc_cluster.init_app(app)
@app.route("/")
async def test(request):
rpc = await rpc_cluster.get_proxy()
result = await rpc.rpc_demo_service.normal_rpc("demo")
reply_obj = await rpc.rpc_demo_service.normal_rpc.call_async("demo")
result = await reply_obj.result()
result = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call("demo")
reply_obj = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call_async("demo")
result = await reply_obj.result()
return json({"hello": "world"})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.