Skip to main content

RPC over SLIM Library

Project description

SLIM RPC (SLIM Remote Procedure Call)

SLIMRCP, or SLIM Remote Procedure Call, is a library designed to enable Protocol Buffers (protobuf) RPC over SLIM (Secure Low-latency Inter-process Messaging). This is analogous to gRPC, which leverages HTTP/2 as its underlying transport layer for protobuf RPC.

To use SLIM RPC you can compile you protobuf file using the SLIM RPC compiler and use the generated code to create you application.

Protobuf example

syntax = "proto3";

package example_service;

service Test {
  rpc ExampleUnaryUnary(ExampleRequest) returns (ExampleResponse);
  rpc ExampleUnaryStream(ExampleRequest) returns (stream ExampleResponse);
  rpc ExampleStreamUnary(stream ExampleRequest) returns (ExampleResponse);
  rpc ExampleStreamStream(stream ExampleRequest) returns (stream ExampleResponse);
}

message ExampleRequest {
  string example_string = 1;
  int64  example_integer = 2;
}

message ExampleResponse {
  string example_string = 1;
  int64  example_integer = 2;
}

The following code is based on autogenerated files produced by compiling the protobuf example using the SRPC compiler. Specifically, the files example_pb2_slimrpc.py and example_pb2.py in the imports were generated automatically.

Server Usage

import asyncio
import logging
from collections.abc import AsyncIterable

from slimrpc.context import Context
from slimrpc.examples.simple.types.example_pb2 import ExampleRequest, ExampleResponse
from slimrpc.examples.simple.types.example_pb2_slimrpc import (
    TestServicer,
    add_TestServicer_to_server,
)
from slimrpc.server import Server

logger = logging.getLogger(__name__)


class TestService(TestServicer):
    async def ExampleUnaryUnary(
        self, request: ExampleRequest, context: Context
    ) -> ExampleResponse:
        logger.info(f"Received unary-unary request: {request}")

        return ExampleResponse(example_integer=1, example_string="Hello, World!")

    async def ExampleUnaryStream(
        self, request: ExampleRequest, context: Context
    ) -> AsyncIterable[ExampleResponse]:
        logger.info(f"Received unary-stream request: {request}")

        # generate async responses stream
        for i in range(5):
            logger.info(f"Sending response {i}")
            yield ExampleResponse(example_integer=i, example_string=f"Response {i}")

    async def ExampleStreamUnary(
        self, request_iterator: AsyncIterable[ExampleRequest], context: Context
    ) -> ExampleResponse:
        logger.info(f"Received stream-unary request: {request_iterator}")

        async for request in request_iterator:
            logger.info(f"Received stream-unary request: {request}")
        response = ExampleResponse(
            example_integer=1, example_string="Stream Unary Response"
        )
        return response

    async def ExampleStreamStream(
        self, request_iterator: AsyncIterable[ExampleRequest], context: Context
    ) -> AsyncIterable[ExampleResponse]:
        """Missing associated documentation comment in .proto file."""
        raise NotImplementedError("Method not implemented!")


def create_server(
    local: str,
    slim: dict,
    enable_opentelemetry: bool = False,
    shared_secret: str = "",
) -> Server:
    """
    Create a new SRPC server instance.
    """
    server = Server(
        local=local,
        slim=slim,
        enable_opentelemetry=enable_opentelemetry,
        shared_secret=shared_secret,
    )

    return server


async def amain() -> None:
    server = create_server(
        local="agntcy/grpc/server",
        slim={
            "endpoint": "http://localhost:46357",
            "tls": {
                "insecure": True,
            },
        },
        enable_opentelemetry=False,
        shared_secret="my_shared_secret",
    )

    # Create RPCs
    add_TestServicer_to_server(
        TestService(),
        server,
    )

    await server.run()


def main() -> None:
    """
    Main entry point for the server.
    """
    logging.basicConfig(level=logging.DEBUG)
    try:
        asyncio.run(amain())
    except KeyboardInterrupt:
        print("Server interrupted by user.")

Client Usage

import asyncio
import logging
from collections.abc import AsyncGenerator

import slimrpc
from slimrpc.examples.simple.types.example_pb2 import ExampleRequest
from slimrpc.examples.simple.types.example_pb2_slimrpc import TestStub

logger = logging.getLogger(__name__)


async def amain() -> None:
    channel_factory = slimrpc.ChannelFactory(
        slim_app_config=slimrpc.SLIMAppConfig(
            identity="agntcy/grpc/client",
            slim_client_config={
                "endpoint": "http://localhost:46357",
                "tls": {
                    "insecure": True,
                },
            },
            enable_opentelemetry=False,
            shared_secret="my_shared_secret",
        ),
    )

    channel = channel_factory.new_channel(remote="agntcy/grpc/server")

    # Stubs
    stubs = TestStub(channel)

    # Call method
    try:
        request = ExampleRequest(example_integer=1, example_string="hello")
        response = await stubs.ExampleUnaryUnary(request, timeout=2)

        logger.info(f"Response: {response}")

        responses = stubs.ExampleUnaryStream(request, timeout=2)
        async for resp in responses:
            logger.info(f"Stream Response: {resp}")

        async def stream_requests() -> AsyncGenerator[ExampleRequest, None]:
            for i in range(10):
                yield ExampleRequest(example_integer=i, example_string=f"Request {i}")

        response = await stubs.ExampleStreamUnary(stream_requests(), timeout=2)
        logger.info(f"Stream Unary Response: {response}")
    except asyncio.TimeoutError:
        logger.error("timeout while waiting for response")

    await asyncio.sleep(1)


def main() -> None:
    """
    Main entry point for the server.
    """
    logging.basicConfig(level=logging.INFO)
    try:
        asyncio.run(amain())
    except KeyboardInterrupt:
        print("Server interrupted by user.")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

slimrpc-0.2.0.tar.gz (17.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

slimrpc-0.2.0-py3-none-any.whl (17.2 kB view details)

Uploaded Python 3

File details

Details for the file slimrpc-0.2.0.tar.gz.

File metadata

  • Download URL: slimrpc-0.2.0.tar.gz
  • Upload date:
  • Size: 17.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for slimrpc-0.2.0.tar.gz
Algorithm Hash digest
SHA256 d4ee0574e53aa02fde0bded6cf3b71479693351c2f367aff2d63cef1f4178c8a
MD5 06d41b365e15daf3a14aaf8b4d3dbcf5
BLAKE2b-256 164298c9a09de198cb3fb009bad0447a52c226f2818c7d34cb921f1d13e4e235

See more details on using hashes here.

File details

Details for the file slimrpc-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: slimrpc-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 17.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for slimrpc-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 712a50ac544b5babaf8622ac20e43b27ab0d5653376654b09393a56c9fe33273
MD5 8877e5bc0662b83624cc65aefa9d624e
BLAKE2b-256 e3b6ff378793beb699338f63bc29525114ef7dfbc749260f3e4772bffb57d9b3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page