Skip to main content

Asynchronous RPC client/server with streaming support

Project description

RPCx

CI

Asynchronous RPC server/client for Python 3.10+ with streaming support.

  • Async backend implemented by anyio providing support for asyncio and trio.
  • Generic stream support for transport includes Websockets.
  • Messages are serialized with msgpack.
import math

import anyio
from anyio.streams.stapled import StapledObjectStream
from rpcx import RPCClient, RPCManager, RPCServer, Stream


async def add(a: int, b: int) -> int:
    """Add two numbers"""
    return a + b


async def fibonacci(n: int, stream: Stream) -> None:
    """Stream each number as computed in the Fibonacci sequence for a given starting number"""
    a, b = 0, 1

    for i in range(n):
        await stream.send(i)
        a, b = b, a + b


async def sum(stream: Stream) -> int:
    """Stream numbers from client and return the sum"""
    total = 0

    async for num in stream:
        total += num

    return total


manager = RPCManager()
manager.register("add", add)
manager.register("fibonacci", fibonacci)
manager.register("sum", sum)


async def main() -> None:
    # Create two connected stapled streams to simulate a network connection
    server_send, server_receive = anyio.create_memory_object_stream[bytes](math.inf)
    client_send, client_receive = anyio.create_memory_object_stream[bytes](math.inf)
    server_stream = StapledObjectStream(client_send, server_receive)
    client_stream = StapledObjectStream(server_send, client_receive)

    server = RPCServer(server_stream, manager)

    async with anyio.create_task_group() as task_group:
        task_group.start_soon(server.serve)

        async with RPCClient(client_stream) as client:
            # Simple method call
            assert await client.request("add", 1, 2) == 3

            # Streaming (server to client) example
            async with client.request_stream("fibonacci", 6) as stream:
                async for num in stream:
                    print(num)  # 1, 1, 2, 3, 5, 8

            # Streaming (client to server) example
            async with client.request_stream("sum") as stream:
                for num in range(10):
                    await stream.send(num)

            assert await stream == 45


anyio.run(main)

Development

Installation

Run poetry install to install the project.

Execute tests

Execute poetry run tox to run tests for all python environments.

Pre-commit hooks

Execute poetry run pre-commit run -a to run all linters, formatters, and checks.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rpcx-0.4.0.tar.gz (7.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rpcx-0.4.0-py3-none-any.whl (9.4 kB view details)

Uploaded Python 3

File details

Details for the file rpcx-0.4.0.tar.gz.

File metadata

  • Download URL: rpcx-0.4.0.tar.gz
  • Upload date:
  • Size: 7.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.13.1 Linux/6.11.0-1018-azure

File hashes

Hashes for rpcx-0.4.0.tar.gz
Algorithm Hash digest
SHA256 dadefd25580c12a455b87a2cf544c5df8d001f24ae6e67a4bfdd1803d311fde4
MD5 ab0fd1b6e27a10a075e94c1c76a7c1c8
BLAKE2b-256 1076e6daa0df3f73a1af97c884a444c5beccaa0eed98325319f8a3888f483d8e

See more details on using hashes here.

File details

Details for the file rpcx-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: rpcx-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 9.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.13.1 Linux/6.11.0-1018-azure

File hashes

Hashes for rpcx-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 90dcd47a537dd176f05e97fb89554151e4444610844dfa493036cdbe56b4b9eb
MD5 38c40bbdc24ec2a0018a59266444d356
BLAKE2b-256 5e6e6592318f919e742c7b44139f4bef1c897ab07102e5d9134a9852a8780d5c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page