Skip to main content

A battle tested Python JSON-RPC2.0 library supporting client and server code in sync and async fashion.

Project description

Overview

A JSON-RPC 2.0 implementation.

Features:

  • Experimental support for request canceling with rpc.cancel(id) in async servers
  • Experimental support for response streaming in the ASGI server
  • Experimental support for MQTT Transport
  • Supports calling JSON-RCP 2.0 servers
  • Build a JSON-RPC server using ASGI, WSGI and CGI
  • Optionally supports async client and server code
  • Use the JSON-RPC 2.0 with any transport layer by using building blocks
  • Battle tested code because it was alreay used ~10 years in varioius closed source projects
  • Unittests are written for new code and whenever a bug is fixed for older parts
  • Abstracts handling of datetime.datetime() over the RPC
  • Because this is a serious library a GPG signature is uploaded to PIP so the integrity of updates can be easily verified.
  • Plays nice with OpenRPC (https://open-rpc.org/)

Drawbacks:

  • Depnends on libraries to support async and correctly convert Pythons datatime.datetime() to JSON and to ensure timezone information is not swallowed
  • This library is commited not to brake existing code so it can get bloated over time

Features under construction: Note: For features under construction it is open if they will arrive as a separate package or merged in to this one

  • Experimental code for OpenRPC is already included and will get more major over time. Several closed source applications already rely on it.
  • A JSON-RPC 2.0 over MQTT is under consideration

Install

pip install jsonrpcx

A GPG signature is available at PIP that can be checked as well.

Build clients

Async

Note: acall does not support a session parameter because it could lead to hard to debug code. If for example you have two paralel requests one is logging you in to the server and sets a cookie and the other is calling a method where it already needs to be logged in Then it can happen that sometimes it works while other times the login happens after the first request to a resource that requires a login thus withuot the login. In the last case the server would return some kind of error message and the develper with a headage. If you want to use async requests please only do so with stateless RPC servers or use the sync version instead. Note: There currenlty is code to support sessions for acall but that will be removed in a future version unless a way to circumvent the problem described above is found.

from jsonrpcx import acall
import asyncio

async def main():
    result1 = await acall("https://yourjsonrpc.online", method="add", params=[1, 2])
    print(result1)

    result2 = await acall("https://yourjsonrpc.online", method="add", params={a: 1, b: 2})
    print(result2)

if __name__ == "__main__":
    asyncio.run(main())

If you don't want to write the URL over and over again you can do this trick

from jsonrpcx import acall
import asyncio
from functools import partial

async def main():
    rpc = partial(acall "https://yourjsonrpc.online")
    result = await rpc("add", [2, 2])
    print(result)

if __name__ == "__main__":
    asyncio.run(main())

Sync

Call RPC via string

from jsonrpcx import call
import https

result1 = call("https://yourjsonrpc.online", method="add", params=[1, 2])
print(result1)

result2 = call("https://yourjsonrpc.online", method="add", params={a: 1, b: 2})
print(result2)

# Generally you should keep JSON-RPC 2.0 servers state less that means not rely on sessions for stuff like authentication
# Sessions can be used here but should be threaded as an implementation detail and only used if absolutely necessary
# because it will only work as long as httpx is used under the hood
# If you do need a session you can do the following
session = httpx.Client()
result3 = call("https://yourjsonrpc.online", method="add", params=[1, 2], session=session)
print(result3)

If you don't want to write the URL over and over again you can do this trick

from jsonrpcx import call
from functools import partial

rpc = partial(call, "https://yourjsonrpc.online")
result = rpc("add", [2, 2])
print(result)

Call RPC via proxy object

For legacy reasons ServiceProxy exists and will automatically translate a method call on the ServiceProxy object to a method call on the RPC. Because this looks like normal Python code but static code analysis or auto complete can not support the developer here this way of calling an RPC has not made it to the newer async version.

import jsonrpcx

# The service proxy keeps track of HTTP sessions
rpc = jsonrpcx.ServiceProxy("https://yourjsonrpc.online")
rpc.add(1, 2)
rpc.add(a=1, b=2)

# As a limitation of the JSON-RPC 2.0 protocol this is not possible and would usually trhow an Exception but is dependend on the implementation detail of the server
rpc.add(1, b=2)

Build Servers

ASGI

import logging
from typing import *
from jsonrpcx import *

logging.basicConfig(level=logging.DEBUG)

class Service(ASGIServer):
    async def ping(self) -> str:
        return "pong"

    async def add(self, val1: float, val2: float) -> float:
        return val1 + val2

    async def echo(self, var: str) -> str:
        return var

    async def testReturnNone(self):
        return None

# You only need this if you want to overwrite the HTTP headers sent to the client
class Delegate(ASGIServerDelegate):
    def HTMLHeaders(self) -> List[str]:
        return [("Content-Type", "application/json")]

async def app(scope, receive, send):
    rpcServer = Service(delegate=Delegate())
    return await rpcServer.parseRequest(scope, receive, send)

WSGI

Note: WSGI does not support async. This is a limitation of WSGI itself not of this library.

from typing import *
from jsonrpcx import *

class Service(WSGIServer):
    def ping(self) -> str:
        return "pong"

    def add(self, val1: int, val2: int) -> int:
        return val1 + val2

    def echo(self, var):
        return var

    def testReturnNone(self):
        return None

# You only need this if you want to overwrite the HTTP headers sent to the client
class Delegate(WSGIServerDelegate):
    def HTMLHeaders(self) -> List[str]:
        return [("Content-Type", "application/json")]

def app(environment, start_response):
    wsgiServer = Service(delegate=Delegate())
    return wsgiServer.parseRequest(environment, start_response)

CGI

CGI is the perfect choice for hosting providers that do not support WSGI or ASGI.

Recomended configuration using Apache .htaccess as an example a similar configuration should work for other webservers.

    # If the webserver retuns some error your JSON-RPC 2.0 clients won't break even if they don't understand HTTP error codes which some don't
    ErrorDocument 500 '{"jsonrpc": "2.0", "error": {"code": -32603, "message": "Internal error issued by webserver"}, "id": null}'

    # Security precaution
    Options -Indexes

    DirectoryIndex index.py

    # Don't give hackes access to configuration secrets
    <Files ~ "\.pyc$">
        Order allow,deny
        Deny from all
    </Files>

    # Requied to make .py scripts executable
    Options +ExecCGI
    AddHandler cgi-script .py

    # Only if you intend to use the RPC from a browser.
    # Please check if you can use more strict CORS rules
    Header set Access-Control-Allow-Origin "*"
    Header set Access-Control-Allow-Headers "Accept,Content-Type,Cookie,Origin,Referer,User-Agent"
    Header set Access-Control-Allow-Methods "*"
    Header set Access-Control-Allow-Credentials true

Async

from jsonrpcx import *
import asyncio

class Service(AsyncCGIServer):
    # Will work becasue its marked with the `async` keyword`
    async def ping(self) -> str:
        return "pong"
    
    # This will not work because it is missing the `async` keyword
    def ping2(self) -> str:
        return "pong"


# You only need this if you want to overwrite the HTTP headers sent to the client
class Delegate(CGIServerDelegate):
    def HTTPHeader(self):
        return "Content-Type: application/json"

if __name__ == "__main__":
    service = Service(delegate=Delegate())
    asyncio.run(service.processRequests())

Sync

from jsonrpcx import *

class Service(CGIServer):
    def ping(self) -> str:
        return "pong"

# You only need this if you want to overwrite the HTTP headers sent to the client
class Delegate(CGIServerDelegate):
    def HTTPHeader(self):
        return "Content-Type: application/json"

if __name__ == "__main__":
    Service(delegate=Delegate())

OpenRPC documentation

Note: If you want to use this with web based tooling you need to configure CORS headers in a way to allow that.

This library already provieds experimental support for https://open-rpc.org/ which means you can auto generate documenation. This library uses Pythons typing support to figure out what input types a function has.

If you point a tool that can generate documentation form the OpenRPC spec to your JSON-RPC 2.0 server it will already be able to generate documentation for it. The method to generate the OpenRPC json document is rpc.discover if you want to overwrite its behaviour then you can overrde the method def rpcDiscover(self): or async rpcDiscover(self): depending on if you choose a sync or async JSON-RPC 2.0 server.

The info and server parts of the documentation return a example information by default. If you want to add your own information you can do it like so:

from typing import *
from jsonrpcx import *

# Make sure to inherit from the correct class here depending on how you deploy the server
class Delegate(CGIServerDelegate):
    def experimentalOpenRPCInfo(self):
        return {
            "version": "1.0.0",
            "title": "Example Title",
            "termsOfService": "https://example.eu/tos",
            "contact": {
                "name": "Support Contact Name",
                "email": "support@example.eu",
                "url": "https://example.eu"
            },
            "description": "This API uses JSON-RPC 2.0. For simplicity it does not support batch method calls."
            }

    def experimentalOpenRPCServers(self) -> Optional[List[Dict]]:
        return [{
                "name": "Example endpoint",
                "url": "http://localhost:8000"
            }]

Experimental support for request canceling

There is currently no definition for how request canceling should be implemented in JSON-RPC 2.0 but Microsoft has something like that in LSP and this works similar to that.

WARNING: WHEN ENABLING EXPERIMENTAL REQUEST CANCELING ANYONE CAN CANCEL REQUESTS EVEN REQUESTS FROM OTHER CLIENTS AS LONG AS THEY CORRECTL GUESS THE MESSAGEID.

The server supports a method rpc.cancel(id) where id is the messageId the client sent to the server.

from jsonrpcx import *
import asyncio

someVar = []

async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Wait for 1 hour
        someVar.append("example")
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')


class Service(ASGIServer, ExperimentalCancelRequestMixin):
    async def getLongRunningProcess2(self, bla: str):   
        return await cancel_me()


class Delegate(ASGIServerDelegate):
    # You have these options to overwrite default behaviour (method signatures are subject to change):

    async def experimentalWaitOnCancelRequest(self) -> None:
        super.experimentalWaitOnCancelRequest()

    async def experimentalAddRunningRequest(self, messageId: Any, task: asyncio.Task) -> None:
        super.experimentalAddRunningRequest(messageId, task)

    async def experimentalRemoveRunningRequest(self, messageId: Any) -> None:
        super.experimentalRemoveRunningRequest(messageID)

Experimental support for response streaming

There is currently no definition for how request canceling should be implemented in JSON-RPC 2.0.

On the Server:

from jsonrpcx import *

class Service(ASGIServer, ExperimentalCancelRequestMixin):
    async def streamedResponse(self):
        # Any dict or list (json serializable) that is returned with the `yield` keyword will be streamed to the client
        for i in range(10):
            yield {"res": f"test {i=}"}
            await asyncio.sleep(5)

On the client:

import httpx
import asyncio

async def main():
    client = httpx.AsyncClient()
    async with client.stream('POST', 'http://127.0.0.1:8000', timeout=5000, json={
        "jsonrpc": "2.0",
        "method": "streamedResponse",
        "params": [],
        "id": 1
    }) as response:
        async for chunk in response.aiter_bytes():
            print(chunk)
            
            
if __name__ == '__main__':
    asyncio.run(main())

MQTT Transport

WARNING: MQTT Transport is currently EXPERIMENTAL and its interface is subject to change. NOTE: To use MQTT Transport you need to install the package asyncio_mqtt.

Server:

from jsonrpcx import *

class API(MqttJSONRPCServer):
    async def ping(self):
        return "pong"
    
    async def add(self, a, b):
        return a + b


async def main():
    server = API("localhost", "user", "password", "mqttChannel")
    await server.processRequests()


if __name__ == "__main__":
    asyncio.run(main())

Client:

from jsonrpcx import *

async def main():
    res = await mqttRPCCall("add", [5, 5], mqttHost="host", mqttUser="user", mqttPassword="password", mqttPort=1883, mqttChannel="mqttChannel")
    print(res)

if __name__ == "__main__":
    asyncio.run(main())

Setup development

Run pipenv install

Testing

Run pipenv run pytest

Running the Example server

uvicorn exampleserver:app --reload

Publish

  • python3 setup.py bdist_wheel
  • twine upload dist/* --skip-existing

Improvement ideas

  • Instead of throwing CircularReference detected when what is returned from a function can not be serialized by json send a JSON serialization exception.
  • Add middle wear support
  • Add authentication middlewear
  • Add stream support for websockets
  • Improve secutiry problem with rpc.cancel (currently anyone can cancel any request as long as they know the messageId)
    • Force the client to use cryptographic secure messageId.
    • Let the server provide a crypographic secure cancelToken or JWT so only the client that made the request can canel that without the need to generate cryptographic secure messageId on the client which some clients might not be capable of.
    • Keep a dict which client requested which messageId and check against that before canceling a request somehow.
    • Let the middlewear take care of this.
    • Expose all required data into the delegate and let the developer handle this.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

jsonrpcx-4.1.0-py3-none-any.whl (16.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page