A fast and durable bidirectional JSON RPC channel over websockets and fastapi.
Project description
⚡ fasterpc
rpc over websockets made easy, robust, and production ready
a fast and durable bidirectional json rpc channel over websockets. the easiest way to create a live async channel between two nodes via python (or other clients).
- both server and clients can easily expose python methods that can be called by the other side. method return values are sent back as rpc responses, which the other side can wait on.
- remote methods are easily called via the
.other.method()wrapper - connections are kept alive with a configurable retry mechanism (using tenacity)
- built-in support for binary data transfer and custom serialization
- automatic connection recovery and heartbeat monitoring
- support for multiple concurrent connections and load balancing
supports and tested on python >= 3.9
installation 🛠️
pip install fasterpc
for development with additional features:
pip install fasterpc[websocket-client,dev]
quick start
basic rpc call example:
say the server exposes an "add" method:
class rpc_calculator(rpc_methods_base):
async def add(self, a, b):
return a + b
calling it is as easy as calling the method under the client's "other" property:
response = await client.other.add(a=1, b=2)
print(response.result) # 3
advanced example with custom data types:
from pydantic import BaseModel
from typing import List, Dict, Any
class user_data(BaseModel):
name: str
age: int
preferences: Dict[str, Any]
class data_processor(rpc_methods_base):
async def process_users(self, users: List[user_data]) -> Dict[str, int]:
return {user.name: user.age for user in users}
async def get_statistics(self, data: Dict[str, Any]) -> Dict[str, float]:
values = list(data.values())
return {
"mean": sum(values) / len(values),
"max": max(values),
"min": min(values)
}
usage examples:
server setup:
import uvicorn
from fastapi import fastapi
from fasterpc import rpc_methods_base, websocket_rpc_endpoint
# methods to expose to the clients
class concat_server(rpc_methods_base):
async def concat(self, a="", b=""):
return a + b
async def multiply(self, x: int, y: int) -> int:
return x * y
async def get_server_info(self) -> dict:
return {
"version": "1.0.0",
"status": "running",
"uptime": "2h 15m"
}
# init the fast-api app
app = fastapi()
# create an endpoint and load it with the methods to expose
endpoint = websocket_rpc_endpoint(concat_server())
# add the endpoint to the app
endpoint.register_route(app, "/ws")
# start the server
uvicorn.run(app, host="0.0.0.0", port=9000)
client connection:
import asyncio
from fasterpc import rpc_methods_base, websocket_rpc_client
async def run_client(uri):
async with websocket_rpc_client(uri, rpc_methods_base()) as client:
# call concat on the other side
response = await client.other.concat(a="hello", b=" world")
print(response.result) # will print "hello world"
# call multiply method
result = await client.other.multiply(x=5, y=3)
print(result.result) # will print 15
# get server info
info = await client.other.get_server_info()
print(info.result) # will print server info dict
# run the client
asyncio.get_event_loop().run_until_complete(
run_client("ws://localhost:9000/ws")
)
bidirectional communication example:
class client_methods(rpc_methods_base):
async def notify_event(self, event_type: str, data: dict):
print(f"received event: {event_type} with data: {data}")
return {"status": "acknowledged"}
class server_methods(rpc_methods_base):
async def broadcast_message(self, message: str):
# this will be called by the server to notify all clients
return {"broadcast": message}
# client with bidirectional support
async def bidirectional_client():
async with websocket_rpc_client(
"ws://localhost:9000/ws",
client_methods()
) as client:
# client can call server methods
response = await client.other.get_data()
# server can call client methods via channel
await client.channel.other.notify_event("update", {"version": "2.0"})
see the examples and tests folders for more server and client examples
advanced features
connection management:
from fasterpc import websocket_rpc_client, rpc_methods_base
from tenacity import retry, stop_after_attempt, wait_exponential
# custom retry configuration
retry_config = {
"retry": retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
}
async with websocket_rpc_client(
"ws://localhost:9000/ws",
rpc_methods_base(),
retry_config=retry_config
) as client:
# connection will automatically retry on failure
pass
binary data transfer:
from fasterpc import websocket_frame_type
class binary_handler(rpc_methods_base):
async def send_file(self, filename: str, data: bytes):
# handle binary data
with open(f"received_{filename}", "wb") as f:
f.write(data)
return {"status": "file_saved", "size": len(data)}
async def get_large_data(self) -> bytes:
# return binary data
return b"large_binary_data_here"
custom serialization:
import json
from datetime import datetime
class custom_serializer:
def serialize(self, obj):
if isinstance(obj, datetime):
return {"__datetime__": obj.isoformat()}
return obj
def deserialize(self, obj):
if isinstance(obj, dict) and "__datetime__" in obj:
return datetime.fromisoformat(obj["__datetime__"])
return obj
server calling client example:
- clients can call
client.other.method()- which is a shortcut for
channel.other.method()
- which is a shortcut for
- servers also get the channel object and can call remote methods via
channel.other.method() - see the bidirectional call example for calling client from server and server events (e.g.
on_connect).
what can you do with this?
websockets are ideal to create bi-directional realtime connections over the web:
- push updates and notifications
- remote control mechanism
- pub / sub patterns (see fastapi_websocket_pubsub)
- trigger events and workflows (see "tests/trigger_flow_test.py")
- node negotiations and distributed systems (see "tests/advanced_rpc_test.py :: test_recursive_rpc_calls")
- real-time dashboards and monitoring
- multiplayer game state synchronization
- iot device communication
- microservices inter-service communication
core concepts
rpc channel
- rpc_channel - implements the rpc-protocol over the websocket
- sending rpc_requests per method call
- creating promises to track them (via unique call ids), and allow waiting for responses
- executing methods on the remote side and serializing return values
- receiving rpc_responses and delivering them to waiting callers
- handling connection state and reconnection logic
rpc methods
- rpc_methods - classes passed to both client and server-endpoint inits to expose callable methods to the other side.
- simply derive from rpc_methods_base and add your own async methods
- note currently only key-word arguments are supported
- checkout rpc_utility_methods for example methods, which are also useful debugging utilities
- support for method validation and error handling
connection management
- automatic reconnection with exponential backoff
- heartbeat monitoring and connection health checks
- connection pooling for multiple clients
- graceful shutdown and cleanup
technical foundations
- asyncio-based: built on python's asyncio for high-performance async operations
- fastapi integration: leverages fastapi's asgi platform for web server capabilities
- pydantic serialization: uses pydantic for robust data validation and serialization
- tenacity retry logic: configurable retry mechanisms for network resilience
- websockets library: comprehensive websocket client implementation
- type hints: full type annotation support for better development experience
logging and debugging
fasterpc provides a helper logging module to control how it produces logs for you.
see fasterpc/logger.py.
use logging_config.set_mode or the 'ws_rpc_logging' environment variable to choose the logging method you prefer or override completely via default logging config.
example:
# set rpc to log like uvicorn
from fasterpc.logger import logging_config, logging_modes
logging_config.set_mode(logging_modes.uvicorn)
# custom logging configuration
import logging
logging.basicConfig(level=logging.debug)
logger = logging.getlogger("fasterpc")
http(s) proxy support
by default, fasterpc uses websockets module as websocket client handler. this does not support http(s) proxy, see https://github.com/python-websockets/websockets/issues/364 . if the ability to use a proxy is important to you, another websocket client implementation can be used, e.g. websocket-client (https://websocket-client.readthedocs.io). here is how to use it:
installation:
pip install fasterpc[websocket-client]
usage:
import asyncio
from fasterpc import rpc_methods_base, websocket_rpc_client, proxy_enabled_websocket_client_handler
async def run_client(uri):
async with websocket_rpc_client(
uri,
rpc_methods_base(),
websocket_client_handler_cls=proxy_enabled_websocket_client_handler
) as client:
# your rpc calls here
pass
just set standard environment variables (lowercase and uppercase works): http_proxy, https_proxy, and no_proxy before running python script.
performance considerations
optimization tips:
- use connection pooling for multiple clients
- implement proper error handling and retry logic
- monitor memory usage with large data transfers
- use binary data for large payloads
- implement rate limiting for high-frequency calls
benchmarking:
import time
import asyncio
async def benchmark_rpc_calls(client, iterations=1000):
start_time = time.time()
for i in range(iterations):
await client.other.echo(data=f"test_{i}")
end_time = time.time()
duration = end_time - start_time
calls_per_second = iterations / duration
print(f"completed {iterations} calls in {duration:.2f}s")
print(f"rate: {calls_per_second:.2f} calls/second")
security considerations
authentication:
from fastapi import depends, header
async def verify_token(authorization: str = header(...)):
if not authorization.startswith("bearer "):
raise http_exception(status_code=401, detail="invalid token")
# implement your token verification logic here
return authorization.split(" ")[1]
# use in your endpoint
endpoint = websocket_rpc_endpoint(
concat_server(),
dependencies=[depends(verify_token)]
)
data validation:
from pydantic import validator
class secure_data_processor(rpc_methods_base):
@validator('input_data')
def validate_input(cls, v):
if len(v) > 1000000: # 1mb limit
raise valueerror("input too large")
return v
async def process_data(self, input_data: str):
# your processing logic here
pass
pull requests - welcome!
- please include tests for new features
- follow the existing code style and patterns
- add documentation for new functionality
- ensure backward compatibility when possible
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fasterpc-0.1.2.tar.gz.
File metadata
- Download URL: fasterpc-0.1.2.tar.gz
- Upload date:
- Size: 23.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.9.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5d8dca900ddeb14917356e79858d5c57d49f88d4b47c7e0e29bdd9c2449eda3f
|
|
| MD5 |
3a40fc7b13103885f490c231c317f1ae
|
|
| BLAKE2b-256 |
81ca17ca33245768893c1f6db317529247d0e49161c971ab6870f20de857d4a9
|
File details
Details for the file fasterpc-0.1.2-py3-none-any.whl.
File metadata
- Download URL: fasterpc-0.1.2-py3-none-any.whl
- Upload date:
- Size: 21.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.9.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ddf614d6cb31feaef42f91a0f5a868af840c24926ef58c108cd8a398bd1109a4
|
|
| MD5 |
155bb2406872599feeb806206ea1e07a
|
|
| BLAKE2b-256 |
64c7fc80b022363b1ed4d3dca6a8a4d5feac85cfbc2d4aefa7059114c003cd23
|