A standalone nameko rpc proxy for asyncio and a wrapper for using nameko rpc proxy with Sanic.
Project description
aio-nameko-proxy
A standalone nameko rpc proxy for asyncio and some wrappers for using nameko rpc proxy with asynchronous web frameworks(Sanic, fastapi).
This project is based on aio-pika and reference the source code of official nameko project and aio-pika.
install
pip install aio-nameko-proxy
examples:
standalone AIOClusterRpcProxy
If you want most of your messages to be persistent(default). Set the delivery mode parameter as DeliveryMode.PERSISTENT, Call sw_dlm_call when you need to send a non-persistent message.
import ssl
import asyncio
from aio_nameko_proxy import AIOClusterRpcProxy
from aio_pika import DeliveryMode
config = {
"AMQP_URI": "amqp://guest:guest@127.0.0.1:5672", # Required,
"rpc_exchange": "nameko-rpc",
"time_out": 30,
"con_time_out": 5,
"delivery_mode": DeliveryMode.PERSISTENT,
"serializer": "my_serializer",
"ACCEPT": ["pickle", "json", "my_serializer"],
"SERIALIZERS": {
"my_serializer": {
"encoder": "my_slizer.dumps",
"decoder": "my_slizer.loads",
"content_type": "my-content-type",
"content_encoding": "utf-8"
}
},
# If SSL is configured, Remember to change the URI to TLS port. eg: "amqps://guest:guest@127.0.0.1:5671"
"AMQP_SSL": {
'ca_certs': 'certs/ca_certificate.pem', # or 'cafile': 'certs/ca_certificate.pem',
'certfile': 'certs/client_certificate.pem',
'keyfile': 'certs/client_key.pem',
'cert_reqs': ssl.CERT_REQUIRED
}
}
async def run():
async with AIOClusterRpcProxy(config) as rpc:
# time_out: the time_out of waitting the remote method result.
# con_time_out: the time_out of connecting to the rabbitmq server or binding the queue, consume and so on.
# persistent msg call
result = await rpc.rpc_demo_service.normal_rpc("demo")
reply_obj = await rpc.rpc_demo_service.normal_rpc.call_async("demo")
result = await reply_obj.result()
# non-persistent msg call
result = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call("demo")
reply_obj = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call_async("demo")
result = await reply_obj.result()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
If you want most of your messages to be non-persistent(persistent is default). Set the delivery mode parameter as DeliveryMode.NOT_PERSISTENT, Call sw_dlm_call when you need to send a persistent message.
import asyncio
from aio_nameko_proxy import AIOClusterRpcProxy
from aio_pika import DeliveryMode
config = {
"AMQP_URI": "pyamqp://guest:guest@127.0.0.1:5672",
"rpc_exchange": "nameko-rpc",
"time_out": 30,
"con_time_out": 5,
"delivery_mode": DeliveryMode.NOT_PERSISTENT
}
async def run():
async with AIOClusterRpcProxy(config) as rpc:
# non-persistent msg call
result = await rpc.rpc_demo_service.normal_rpc("demo")
reply_obj = await rpc.rpc_demo_service.normal_rpc.call_async("demo")
result = await reply_obj.result()
# persistent msg call
result = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call("demo")
reply_obj = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call_async("demo")
result = await reply_obj.result()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
AIOPooledClusterRpcProxy
import asyncio
from aio_nameko_proxy import AIOPooledClusterRpcProxy
from aio_pika import DeliveryMode
config = {
"AMQP_URI": "pyamqp://guest:guest@127.0.0.1:5672",
"rpc_exchange": "nameko-rpc",
"time_out": 30,
"con_time_out": 5,
"pool_size": 10,
"initial_size": 2,
"delivery_mode": DeliveryMode.NOT_PERSISTENT
}
async def run():
async with AIOPooledClusterRpcProxy(config) as proxy_pool:
async with proxy_pool.acquire() as rpc:
result = await rpc.rpc_demo_service.normal_rpc("demo")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
Sanic Wrapper
import ssl
from sanic import Sanic
from sanic.response import json
from aio_pika import DeliveryMode
from aio_nameko_proxy.wrappers import SanicNamekoClusterRpcProxy
class Config(object):
# AMQP_URI: Required
NAMEKO_AMQP_URI = "pyamqp://guest:guest@127.0.0.1:5672"
# rpc_exchange
NAMEKO_RPC_EXCHANGE = "nameko-rpc"
# pool_size
NAMEKO_POOL_SIZE = 60
# initial_size
NAMEKO_INITIAL_SIZE = 60
# time_out
NAMEKO_TIME_OUT = 30
# con_time_out
NAMEKO_CON_TIME_OUT = 5
# serializer
NAMEKO_SERIALIZER = "json"
# ACCEPT
NAMEKO_ACCEPT = ["pickle", "json"]
# SERIALIZERS: custom serializers
NAMEKO_SERIALIZERS = {
"my_serializer": {
"encoder": "my_slizer.dumps",
"decoder": "my_slizer.loads",
"content_type": "my-content-type",
"content_encoding": "utf-8"
}
}
# AMQP_SSL: ssl configs
NAMEKO_AMQP_SSL = {
'ca_certs': 'certs/ca_certificate.pem', # or 'cafile': 'certs/ca_certificate.pem',
'certfile': 'certs/client_certificate.pem',
'keyfile': 'certs/client_key.pem',
'cert_reqs': ssl.CERT_REQUIRED
}
# delivery_mode
NAMEKO_DELIVERY_MODE = DeliveryMode.PERSISTENT
# other supported properties of aio-pika.Message, the key name format is "NAMEKO_{}".format(property_name.upper())
# ...
app = Sanic("App Name")
app.config.from_object(Config)
# rpc_cluster = SanicNamekoClusterRpcProxy(app)
# or
# from aio_nameko_proxy.wrappers import rpc_cluster # contextvars required in py36
# SanicNamekoClusterRpcProxy(app)
# or
rpc_cluster = SanicNamekoClusterRpcProxy()
rpc_cluster.init_app(app)
@app.route("/")
async def test(request):
rpc = await rpc_cluster.get_proxy()
result = await rpc.rpc_demo_service.normal_rpc("demo")
reply_obj = await rpc.rpc_demo_service.normal_rpc.call_async("demo")
result = await reply_obj.result()
result = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call("demo")
reply_obj = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call_async("demo")
result = await reply_obj.result()
return json({"hello": "world"})
@app.websocket('/ws')
async def ws(request, ws):
rpc = await rpc_cluster.get_proxy()
for i in range(3):
_ = await ws.recv()
result = await rpc.rpc_demo_service.normal_rpc("demo")
await ws.send(result)
ws.close()
# in websocket handlers, you should call the remove actively in the end
rpc_cluster.remove()
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000)
FastAPI Wrapper
import ssl
from fastapi import FastAPI, WebSocket
from aio_pika import DeliveryMode
from pydantic import BaseSettings
from aio_nameko_proxy.wrappers import FastApiNamekoProxyMiddleware, rpc_cluster # contextvars required in py36
class Settings(BaseSettings):
# AMQP_URI: Required
NAMEKO_AMQP_URI = "pyamqp://guest:guest@127.0.0.1:5672"
# rpc_exchange
NAMEKO_RPC_EXCHANGE = "nameko-rpc"
# pool_size
NAMEKO_POOL_SIZE = 60
# initial_size
NAMEKO_INITIAL_SIZE = 60
# time_out
NAMEKO_TIME_OUT = 30
# con_time_out
NAMEKO_CON_TIME_OUT = 5
# serializer
NAMEKO_SERIALIZER = "json"
# ACCEPT
NAMEKO_ACCEPT = ["pickle", "json"]
# SERIALIZERS: custom serializers
NAMEKO_SERIALIZERS = {
"my_serializer": {
"encoder": "my_slizer.dumps",
"decoder": "my_slizer.loads",
"content_type": "my-content-type",
"content_encoding": "utf-8"
}
}
# AMQP_SSL: ssl configs
NAMEKO_AMQP_SSL = {
'ca_certs': 'certs/ca_certificate.pem', # or 'cafile': 'certs/ca_certificate.pem',
'certfile': 'certs/client_certificate.pem',
'keyfile': 'certs/client_key.pem',
'cert_reqs': ssl.CERT_REQUIRED
}
# delivery_mode
NAMEKO_DELIVERY_MODE = DeliveryMode.PERSISTENT
# other supported properties of aio-pika.Message, the key name format is "NAMEKO_{}".format(property_name.upper())
# ...
settings = Settings()
app = FastAPI()
app.add_middleware(FastApiNamekoProxyMiddleware, config=settings)
@app.get("/")
async def test():
rpc = await rpc_cluster.get_proxy()
result = await rpc.rpc_demo_service.normal_rpc("demo")
reply_obj = await rpc.rpc_demo_service.normal_rpc.call_async("demo")
result = await reply_obj.result()
result = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call("demo")
reply_obj = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call_async("demo")
result = await reply_obj.result()
return {"hello": "world"}
@app.websocket("/ws")
async def ws(ws: WebSocket):
await ws.accept()
rpc = await rpc_cluster.get_proxy()
for i in range(3):
_ = await ws.receive()
result = await rpc.rpc_demo_service.normal_rpc("demo")
await ws.send(result)
ws.close()
# in websocket handlers, you should call the remove() actively in the end
rpc_cluster.remove()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file aio-nameko-proxy-1.3.1.tar.gz
.
File metadata
- Download URL: aio-nameko-proxy-1.3.1.tar.gz
- Upload date:
- Size: 13.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.45.0 CPython/3.6.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2c4ca532370ce55c92c03bdd1da337e5ac041e8c4d2d73675836e888cbb3f28c |
|
MD5 | 0dde2bc482fbf29dbd68806eaae54209 |
|
BLAKE2b-256 | 2a4091ac94c8e8db5008a2c8ee710ae502870e21c67d49b3f7bddbdf9f5474bd |