Skip to main content

A simple JSON-RPC for aiohttp

Project description

aiohttp-rpc

PyPI PyPI - Python Version Scrutinizer Code Quality GitHub Issues License

A library for simple integration of the JSON-RPC 2.0 protocol into a Python application using aiohttp.
The goal is to provide a simple, fast, and reliable way to add JSON-RPC 2.0 to your app on the server and/or client side.

The library has only one dependency:

  • aiohttp — async HTTP client/server framework

Table Of Contents

Installation

pip

pip install aiohttp-rpc

Usage

HTTP Server Example

from aiohttp import web
import aiohttp_rpc


def echo(*args, **kwargs):
    return {'args': args, 'kwargs': kwargs}

# If a method accepts a parameter named "rpc_request",
# add it with pass_extra_kwargs=True and use inject_request_middleware
# (included in DEFAULT_MIDDLEWARES).
async def ping(rpc_request):
    return 'pong'


if __name__ == '__main__':
    # Pre-configured server with default middlewares.
    aiohttp_rpc.rpc_server.add_methods([
        aiohttp_rpc.JSONRPCMethod(ping, pass_extra_kwargs=True),
        echo,
    ])

    app = web.Application()
    app.router.add_routes([
        web.post('/rpc', aiohttp_rpc.rpc_server.handle_http_request),
    ])
    web.run_app(app, host='0.0.0.0', port=8080)

HTTP Client Example

import asyncio
import aiohttp_rpc


async def run():
  async with aiohttp_rpc.JSONRPCClient('http://0.0.0.0:8080/rpc') as rpc:
    # Idiomatic calls:
    print('#1', await rpc.methods.ping())  # No args
    print('#2', await rpc.methods.echo('one', 'two'))  # Positional args
    print('#3', await rpc.methods.echo(three='3'))  # Keyword args

    # Lower-level calls:
    print('#4', await rpc.call('echo', three='3'))
    await rpc.notify('echo', 123)  # Notification

    # Direct call returns a JSONRPCResponse object:
    resp = await rpc.direct_call(aiohttp_rpc.JSONRPCRequest(id=123, method='ping'))
    print('#5', resp)

    # Batch calls (order preserved by default):
    print('#6', await rpc.batch(
      rpc.methods.ping.request(),
      rpc.methods.echo.request('one', 'two'),
      rpc.methods.echo.request(three='3'),
    ))

    # Fire-and-forget batch notifications:
    await rpc.batch_notify(
      rpc.methods.ping.notification(),
      rpc.methods.echo.notification('one', 'two'),
      rpc.methods.echo.notification(three='3'),
    )


asyncio.run(run())

This prints:

#1 pong
#2 {'args': ['one', 'two'], 'kwargs': {}}
#3 {'args': [], 'kwargs': {'three': '3'}}
#4 {'args': [], 'kwargs': {'three': '3'}}
#5 JSONRPCResponse(id=123, jsonrpc='2.0', result='pong', error=None, context={'http_response': ...})
#6 ('pong', {'args': ['one', 'two'], 'kwargs': {}}, {'args': [], 'kwargs': {'three': '3'}})

back to top


Integration

Need to serialize non-JSON types? Provide a custom serializer:

from aiohttp import web
import aiohttp_rpc
import uuid
import json
from dataclasses import dataclass
from functools import partial


@dataclass
class User:  # Not JSON-serializable by default.
    uuid: uuid.UUID
    username: str = 'mike'
    email: str = 'some@mail.com'


async def get_user_by_uuid(user_uuid) -> User:
    # For example, data may come from a database.
    return User(uuid=uuid.UUID(user_uuid))


def json_serialize_unknown_value(value):
    if isinstance(value, User):
        return {'uuid': str(value.uuid), 'username': value.username, 'email': value.email}
    return repr(value)


if __name__ == '__main__':
    rpc_server = aiohttp_rpc.JSONRPCServer(
        json_serialize=partial(json.dumps, default=json_serialize_unknown_value),
    )
    rpc_server.add_method(get_user_by_uuid)

    app = web.Application()
    app.router.add_routes([
        web.post('/rpc', rpc_server.handle_http_request),
    ])
    web.run_app(app, host='0.0.0.0', port=8080)

Convert incoming custom types with middleware:

# RPC method that takes a custom type.
def generate_user_token(user: User):
    return f'token-{str(user.uuid).split("-")[0]}'


async def replace_type(data):
    if not isinstance(data, dict) or '__type__' not in data:
        return data
    if data['__type__'] == 'user':
        return await get_user_by_uuid(data['uuid'])
    raise aiohttp_rpc.errors.InvalidParams


# Middleware that converts arguments before the method call.
async def type_conversion_middleware(request, handler):
    request.set_args_and_kwargs(
        args=[await replace_type(arg) for arg in request.args],
        kwargs={key: await replace_type(value) for key, value in request.kwargs.items()},
    )
    return await handler(request)


rpc_server = aiohttp_rpc.JSONRPCServer(middlewares=[
    aiohttp_rpc.middlewares.exception_middleware,
    aiohttp_rpc.middlewares.inject_request_middleware,
    type_conversion_middleware,
])

back to top


Middleware

Middleware has an interface similar to aiohttp’s web middleware:

import aiohttp_rpc
import typing


async def simple_middleware(request: aiohttp_rpc.JSONRPCRequest,
                            handler: typing.Callable) -> aiohttp_rpc.JSONRPCResponse:
    # Before the method (and downstream middleware)
    response = await handler(request)
    # After the method
    return response


rpc_server = aiohttp_rpc.JSONRPCServer(middlewares=[
    aiohttp_rpc.middlewares.exception_middleware,
    simple_middleware,
])

Included middlewares:

  • exception_middleware — catches exceptions, converts to JSON-RPC errors (logging included).
  • inject_request_middleware — stores the request object in extra kwargs as "rpc_request". Methods receive it only if added with pass_extra_kwargs=True.
  • logging_middleware — logs raw JSON-RPC requests and responses.
  • check_origins(allowed_origins) — factory returning middleware that permits only the listed HTTP Origin values (for HTTP endpoints).

DEFAULT_MIDDLEWARES:

DEFAULT_MIDDLEWARES = (
    exception_middleware,
    inject_request_middleware,
)

You can also use aiohttp web middlewares for web.Request/web.Response processing.

back to top


WebSockets

WS Server Example

from aiohttp import web
import aiohttp_rpc


def echo(*args, **kwargs):
    return {'args': args, 'kwargs': kwargs}


async def ping(rpc_request):
    return 'pong'


if __name__ == '__main__':
    rpc_server = aiohttp_rpc.WSJSONRPCServer(
        middlewares=aiohttp_rpc.middlewares.DEFAULT_MIDDLEWARES,
        # allowed_origins={'https://example.com'},  # optional Origin check
    )
    rpc_server.add_methods([
        aiohttp_rpc.JSONRPCMethod(ping, pass_extra_kwargs=True),
        echo,
    ])

    app = web.Application()
    app.router.add_routes([
        web.get('/rpc', rpc_server.handle_http_request),
    ])
    app.on_shutdown.append(rpc_server.on_shutdown)
    web.run_app(app, host='0.0.0.0', port=8080)

Options:

  • allowed_origins: an optional container of allowed Origin values. Requests with other origins get HTTP 403.
  • json_response_handler: optional callback invoked if the server receives a response-shaped message (useful if the server also acts as a client over the same connection).
  • ws_response_cls / ws_response_kwargs: customize the WebSocketResponse class and options (default max_msg_size is 1_048_576 bytes).

WS Client Example

import asyncio
import aiohttp_rpc


async def run():
    async with aiohttp_rpc.WSJSONRPCClient('ws://0.0.0.0:8080/rpc') as rpc:
        print(await rpc.methods.ping())
        print(await rpc.methods.echo('request'))          # args
        await rpc.methods.echo.notify('notification')     # notification (no response)
        print(rpc.methods.echo.request('some request'))   # JSONRPCRequest for batching
        print(rpc.methods.echo.notification(a=1))         # JSONRPCRequest without id
        await rpc.notify('ping')                          # returns None
        print(await rpc.batch(
            rpc.methods.echo.request('test'),
            rpc.methods.echo.notification(a=1, b=2),
            rpc.methods.ping.request(),
        ))


asyncio.run(run())

back to top


API Reference

server

  • class JSONRPCServer(BaseJSONRPCServer)

    • def init(self, *, json_serialize=json_serialize, json_deserialize=json_deserialize, middlewares=(), methods=None, max_batch=None)
    • def add_method(self, method, *, replace=False) -> JSONRPCMethod
    • def add_methods(self, methods, *, replace=False) -> Tuple[JSONRPCMethod, ...]
    • async def handle_http_request(self, http_request: web.Request) -> web.Response
  • class WSJSONRPCServer(BaseJSONRPCServer)

    • def init(..., allowed_origins: Optional[Container[str]] = None, json_response_handler: Optional[Callable] = None, ws_response_cls=WebSocketResponse, ws_response_kwargs=None)
    • async def handle_http_request(self, http_request: web.Request) -> web.StreamResponse
    • async def on_shutdown(self, app: web.Application) -> None
  • rpc_server: JSONRPCServer (pre-configured with DEFAULT_MIDDLEWARES)

client

  • class JSONRPCClient(BaseJSONRPCClient)

    • def init(self, url, *, session: Optional[aiohttp.ClientSession] = None, json_serialize=json_serialize, json_deserialize=json_deserialize, **request_kwargs)
      • request_kwargs are passed to ClientSession(...)
    • async def connect() -> None
    • async def disconnect() -> None
    • async def call(self, method: str, *args, **kwargs) -> Any
    • async def notify(self, method: str, *args, **kwargs) -> None
    • async def batch(self, *requests, save_order: bool = True) -> Sequence
    • async def batch_notify(self, *requests) -> None
    • async def direct_call(self, request: JSONRPCRequest, **request_kwargs) -> Optional[JSONRPCResponse]
    • async def direct_batch(self, batch_request: JSONRPCBatchRequest, **request_kwargs) -> Optional[JSONRPCBatchResponse]
      • request_kwargs go to aiohttp.ClientSession.post(...)
      • On success, response.context contains {'http_response': aiohttp.ClientResponse}
  • class WSJSONRPCClient(BaseJSONRPCClient)

    • def init(self, url: Optional[str] = None, *, session: Optional[aiohttp.ClientSession] = None, ws_connect: Optional[WSConnectType] = None, timeout: Optional[float] = 60, timeout_for_data_receiving: Optional[float] = 60, connection_check_interval: Optional[float] = 5, json_requests_handler: Optional[WSJSONRequestsHandler] = None, unprocessed_json_responses_handler: Optional[UnprocessedWSJSONResponsesHandler] = None, json_serialize=json_serialize, json_deserialize=json_deserialize, **ws_connect_kwargs)
    • async def connect() -> None
    • async def disconnect() -> None
    • Same high-level API as HTTP client; errors include RequestTimeoutError, TransportError, ServerError.
  • Common to both clients

    • constructor arg error_map: Mapping[int, Type[JSONRPCError]] (default: DEFAULT_KNOWN_ERRORS_MAP) for mapping server error codes to custom exception types.
    • methods: JSONRPCClientMethods — dynamic attribute access for remote methods with helpers:
      • await rpc.methods.method_name(...)
      • await rpc.methods.method_name.notify(...)
      • rpc.methods.method_name.request(...) -> JSONRPCRequest
      • rpc.methods.method_name.notification(...) -> JSONRPCRequest (no id)

protocol

  • class JSONRPCRequest

    • id: Union[int, str, None]; method: str; jsonrpc: str; extra_kwargs: MutableMapping; context: MutableMapping
    • params: Any; args: Optional[Sequence]; kwargs: Optional[Mapping]
    • is_notification: bool
    • methods: set_params(...), set_args_and_kwargs(...), dump(), load(...)
  • class JSONRPCResponse

    • id: Union[int, str, None]; jsonrpc: str; result: Any; error: Optional[JSONRPCError]; context: MutableMapping
    • dump(), load(...)
  • class JSONRPCBatchResponse

    • responses: Tuple[JSONRPCResponse, ...]; dump(), load(...)
  • class JSONRPCMethod(BaseJSONRPCMethod)

    • def init(self, func, *, name=None, pass_extra_kwargs=False, prepare_result=None)
      • prepare_result can be sync or async; if provided, it post-processes the method result.
  • class JSONRPCUnlinkedResults / JSONRPCDuplicatedResults

    • Utilities used by collect_batch_result.

decorators

  • def rpc_method(name: Optional[str] = None, *, rpc_server=default_rpc_server, pass_extra_kwargs=False, prepare_result=None)
    • Registers the function on the default HTTP rpc_server at import time.

errors

  • class JSONRPCError(RuntimeError)
  • class ServerError(JSONRPCError)
  • class ParseError(JSONRPCError)
  • class InvalidRequest(JSONRPCError)
  • class MethodNotFound(JSONRPCError)
  • class InvalidParams(JSONRPCError)
  • class InternalError(JSONRPCError)
  • Client-side errors:
    • EmptyResponse
    • RequestTimeoutError
    • TransportError
    • HTTPStatusError
  • DEFAULT_KNOWN_ERRORS and DEFAULT_KNOWN_ERRORS_MAP

middlewares

  • exception_middleware(request, handler) -> JSONRPCResponse
  • inject_request_middleware(request, handler) -> JSONRPCResponse
  • logging_middleware(request, handler) -> JSONRPCResponse
  • check_origins(allowed_origins) -> middleware
  • DEFAULT_MIDDLEWARES

utils

  • json_serialize(value) -> str
  • json_deserialize(text) -> Any
  • convert_params_to_args_and_kwargs(params) -> Tuple[Sequence, Mapping]
  • parse_args_and_kwargs(args, kwargs) -> Tuple[Any, Sequence, Mapping]
  • get_random_id() -> str
  • collect_batch_result(batch_request, batch_response) -> Tuple[Any, ...]

constants

  • NOTHING
  • VERSION_2_0

back to top


More examples

The library lets you add methods in several ways:

import aiohttp_rpc

def ping_1(): return 'pong 1'
def ping_2(): return 'pong 2'
def ping_3(): return 'pong 3'

rpc_server = aiohttp_rpc.JSONRPCServer()
rpc_server.add_method(ping_1)                                      # 'ping_1'
rpc_server.add_method(aiohttp_rpc.JSONRPCMethod(ping_2))           # 'ping_2'
rpc_server.add_method(aiohttp_rpc.JSONRPCMethod(ping_3, name='third_ping'))  # 'third_ping'
rpc_server.add_methods([ping_3])                                   # 'ping_3'

# Replace methods:
rpc_server.add_method(ping_1, replace=True)
rpc_server.add_methods([ping_1, ping_2], replace=True)

# Receive "rpc_request" (requires inject_request_middleware):
async def ping_with_request(rpc_request): return 'pong with request'
rpc_server.add_method(aiohttp_rpc.JSONRPCMethod(ping_with_request, pass_extra_kwargs=True))

Built-ins:

# Server
import aiohttp_rpc

rpc_server = aiohttp_rpc.JSONRPCServer(middlewares=[aiohttp_rpc.middlewares.inject_request_middleware])
rpc_server.add_method(sum)
rpc_server.add_method(aiohttp_rpc.JSONRPCMethod(zip, prepare_result=list))

# Client
# async with aiohttp_rpc.JSONRPCClient('/rpc') as rpc:
#     assert await rpc.methods.sum([1, 2, 3]) == 6
#     assert await rpc.methods.zip(['a', 'b'], [1, 2]) == [['a', 1], ['b', 2]]

Decorator:

import aiohttp_rpc
from aiohttp import web

@aiohttp_rpc.rpc_method()  # pass_extra_kwargs=False by default
def echo(*args, **kwargs):
    return {'args': args, 'kwargs': kwargs}

if __name__ == '__main__':
    app = web.Application()
    app.router.add_routes([
        web.post('/rpc', aiohttp_rpc.rpc_server.handle_http_request),
    ])
    web.run_app(app, host='0.0.0.0', port=8080)

Pass extra aiohttp parameters for HTTP requests:

import aiohttp_rpc
from aiohttp import ClientTimeout


jsonrpc_request = aiohttp_rpc.JSONRPCRequest(method='test', params={'test_value': 1})

async with aiohttp_rpc.JSONRPCClient('http://0.0.0.0:8080/rpc') as rpc:
  await rpc.direct_call(
    jsonrpc_request,
    headers={'X-Custom-Header': 'custom value'},
    timeout=ClientTimeout(total=10),  # forwarded to aiohttp.ClientSession.post(...)
  )

back to top


License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiohttp_rpc-2.0.0.tar.gz (30.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aiohttp_rpc-2.0.0-py3-none-any.whl (27.8 kB view details)

Uploaded Python 3

File details

Details for the file aiohttp_rpc-2.0.0.tar.gz.

File metadata

  • Download URL: aiohttp_rpc-2.0.0.tar.gz
  • Upload date:
  • Size: 30.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for aiohttp_rpc-2.0.0.tar.gz
Algorithm Hash digest
SHA256 80117b632383e0308e537c2c1b78c427fcd7f865be9cc06fc418a6c37c02989f
MD5 5d88d5ed6a85cfc5cdb996d6a3e37073
BLAKE2b-256 75874f966b0bbc8dceb26d8d58f15318353b9fc6b1acf53fbe58e014cddded79

See more details on using hashes here.

File details

Details for the file aiohttp_rpc-2.0.0-py3-none-any.whl.

File metadata

  • Download URL: aiohttp_rpc-2.0.0-py3-none-any.whl
  • Upload date:
  • Size: 27.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for aiohttp_rpc-2.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a307481bfd76e95744de40678b8c37afc8897cde6708401a2b1f9b7eeed8d057
MD5 6b15bb023d77ba11c012ec80b94d168e
BLAKE2b-256 9cb3ce2e88b47bf67507b31322ba007b6fcf6700ee19febab8e236e8a2eec2f0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page