Skip to main content

Reusable JSON-RPC 3.0 client, server, streaming, and transport primitives.

Project description

jsonrpc3

jsonrpc3 is a reusable JSON-RPC 3.0 package. It owns protocol mechanics so application modules do not repeat transport plumbing.

What It Provides

  • Message envelopes: requests, responses, acknowledgements, errors, and streams.
  • Request parsing and validation.
  • Registry-backed server dispatch.
  • Typed params through Pydantic models.
  • Reusable connection runtime with send locking, backpressure queue, task tracking, cancellation, flushing, and close lifecycle.
  • Streaming runtime with acknowledgements, event sinks, terminal result frames, terminal error frames, delayed ack metadata, and cancellation error data hooks.
  • Client request generation and direct response correlation.
  • Optional FastAPI WebSocket adapter.
  • Extension points for result serializers and exception mappers.

What It Does Not Provide

  • Application method names.
  • Domain request/response models.
  • Domain event schemas.
  • Product, domain, workflow, or application-specific behavior.
  • Durable business storage. Applications can plug storage into method handlers or future generic replay/cache contracts.

When To Use

Use jsonrpc3 whenever a module needs a JSON-RPC 3.0 server or client. Application code should register methods and call services; it should not parse JSON-RPC envelopes or manage stream task registries.

Do not use this package for non-JSON-RPC REST endpoints, internal Python function calls, or domain event modeling.

Install

pip install fict-json-rpc
pip install "fict-json-rpc[fastapi]"  # optional FastAPI adapter

Server Example

from pydantic import BaseModel, ConfigDict, Field
from jsonrpc3 import JsonRpcContext, JsonRpcServer


class EchoParams(BaseModel):
    model_config = ConfigDict(extra="forbid")

    text: str = Field(..., min_length=1)


server = JsonRpcServer()


@server.method("example.echo", params_model=EchoParams)
async def echo(ctx: JsonRpcContext, params: EchoParams) -> dict[str, str]:
    return {"text": params.text}

Streaming Example

@server.method("example.run", params_model=EchoParams, stream="optional")
async def run(ctx: JsonRpcContext, params: EchoParams) -> dict[str, str]:
    sink = ctx.optional_event_sink(lambda event: {"type": "event", "data": event})
    if sink is not None:
        await sink.emit({"started": params.text})
    return {"status": "done"}

The framework sends the ack, event frames, terminal result frame, cancellation error frame, and task cleanup. The method only supplies domain work and event serialization.

Cancellation

JsonRpcServer.cancel_method(...) registers a normal JSON-RPC method such as request.cancel, but it is a control method, not a completion method. When cancellation succeeds, the cancel method does not emit a root-level result; the cancelled stream emits the terminal stream.error frame with code -32800.

Use ctx.on_cancel(...) only to attach application-specific data to that cancellation error. Do not use cancellation data as a successful stream result. Stream lifecycle frames carry identity only in stream.id; they must not include a root-level id.

Clients should call abort_stream(stream_id) for method-less abort or cancel_stream(stream_id, method=...) for application cancel methods. cancel_stream(...) sends a notification so the client waits on the original stream terminal frame, not on a separate cancel-method result.

Optional FastAPI Adapter

jsonrpc3.fastapi is optional adapter glue. It adapts a FastAPI WebSocket to JsonRpcServer; it is not part of the core runtime contract.

from fastapi import APIRouter, WebSocket
from jsonrpc3.fastapi import serve_fastapi_websocket

router = APIRouter()


@router.websocket("/rpc")
async def rpc(websocket: WebSocket) -> None:
    await serve_fastapi_websocket(websocket, server)

Client Example

from jsonrpc3 import JsonRpcClient

client = JsonRpcClient(send_json)
result = await client.request("example.echo", {"text": "hello"})

The transport still owns how frames are physically sent and received. Call client.receive(payload) for inbound response payloads.

Stream Iterator Example

stream = await client.stream("example.run", {"text": "hello"})

async for event in stream:
    handle_event(event)

result = await stream.result()

stream(...) requests options.stream = true, yields each stream.data payload by reference, follows ACK stream-id remaps, and ends only on terminal stream.result or stream.error. Use await stream.abort() for method-less aborts or await stream.cancel(method="request.cancel") for application cancel notifications.

Limitations

  • Stream iterators expose stream.data payloads directly; callers that need immutable snapshots must copy at the application boundary.
  • Core modules are transport-neutral; use optional adapters or a custom send/receive wrapper for specific frameworks.
  • Replay, cache, and idempotency are application-owned today. Add generic interfaces here only when a third application would otherwise repeat the same logic.
  • Stream data, stream result, and stream error frames keep identity in the stream envelope and do not include root-level id.

Acceptance Bar

A new JSON-RPC server should only create a server, register methods, attach a transport, and optionally provide serializers or exception mappers.

A new JSON-RPC client should only create a client, send requests, and feed received frames back into the client runtime.

Repeated parsers, dispatch loops, send locks, task registries, stream runners, cancellation handlers, or response builders belong in this package.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fict_json_rpc-0.1.0.tar.gz (16.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fict_json_rpc-0.1.0-py3-none-any.whl (20.6 kB view details)

Uploaded Python 3

File details

Details for the file fict_json_rpc-0.1.0.tar.gz.

File metadata

  • Download URL: fict_json_rpc-0.1.0.tar.gz
  • Upload date:
  • Size: 16.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for fict_json_rpc-0.1.0.tar.gz
Algorithm Hash digest
SHA256 3aa1cc7981cbc29d1471a2f4daa1f9d16414224ed0863dbbc7ede307c0dd145c
MD5 71ba748886e89441cfd675aaae5f998c
BLAKE2b-256 7a6064781ac77d99d78ffc9c4252321be57bcd622372a363fa14380dfecefd27

See more details on using hashes here.

File details

Details for the file fict_json_rpc-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: fict_json_rpc-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 20.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for fict_json_rpc-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c4371b2f7ca4b9e0de6b3c245aa432c2b4dc19cb47188b9a65d9af41b3545546
MD5 932dcaff1b3f767738bea851348f92de
BLAKE2b-256 c85178ded6d877183af79380ac966f325ff578cca16450589b937b5bb9d59c3b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page