Skip to main content

Python reactive gRPC support

Project description

build status

reactive-grpc

A simple gRPC bridge to reactive streams.

Example: Given the following Protocol buffers definition:

syntax = "proto3";

package rxgrpc.test;

service TestService {
  rpc GetOneToOne(TestRequest) returns (TestResponse) {}
  rpc GetOneToStream(TestRequest) returns (stream TestResponse) {}
  rpc GetStreamToOne(stream TestRequest) returns (TestResponse) {}
  rpc GetStreamToStream(stream TestRequest) returns (stream TestResponse) {}
}

message TestRequest {
  string message = 1;
}

message TestResponse {
  string message = 1;
}

and a simple Servicer class:

from test.proto.test_pb2_grpc import TestServiceServicer
from test.proto import test_pb2


class _Servicer(TestServiceServicer):
    def GetOneToOne(self, request: test_pb2.TestRequest, context):
        return test_pb2.TestResponse(message='response: {}'.format(request.message))

    def GetOneToStream(self, request, context):
        for i in range(3):
            yield test_pb2.TestResponse(message='response {}: {}'.format(i, request.message))

    def GetStreamToOne(self, request_iterator, context):
        return test_pb2.TestResponse(
            message='response: {}'.format(
                ', '.join(map(lambda d: d.message, request_iterator))
            )
        )

    def GetStreamToStream(self, request_iterator, context):
        yield from map(
            lambda d: test_pb2.TestResponse(message='response: {}'.format(d.message)),
            request_iterator
        )

A simple gRPC reactive server where request messages are transformed can be created as follows:

from test.proto import test_pb2_grpc, test_pb2
from rxgrpc import server, mappers
from rx import operators
from test.proto.test_pb2_grpc import TestServiceServicer


class _Servicer(TestServiceServicer):
    # ...
    pass


workers = 3
rx_server = server.create_server(test_pb2, workers)
test_pb2_grpc.add_TestServiceServicer_to_server(_Servicer(), rx_server)
rx_server.add_insecure_port('[::]:50051')

def _transform_message(m: test_pb2.TestRequest) -> test_pb2.TestRequest:
    return test_pb2.TestRequest(message='TRANSFORMED {}'.format(m.message))

rx_server.set_grpc_observable(
    rx_server.grpc_pipe(
        operators.map(mappers.grpc_invocation_map(_transform_message)),
        method_name='/rxgrpc.test.TestService/GetOneToOne'),
    method_name='/rxgrpc.test.TestService/GetOneToOne'
)

rx_server.start()

Here it is an example of a filter for a streaming input:

from rxgrpc import operators
from test.proto import test_pb2


def _filter_message(m: test_pb2.TestRequest) -> test_pb2.TestRequest:
    return bool(int(m.message[-1]) % 2)

server = ...
server.set_grpc_observable(
    server.grpc_pipe(
        operators.filter(_filter_message),
        method_name='/rxgrpc.test.TestService/GetStreamToOne'),
    method_name='/rxgrpc.test.TestService/GetStreamToOne'
)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

reactive-grpc-0.0.2.tar.gz (5.5 kB view details)

Uploaded Source

File details

Details for the file reactive-grpc-0.0.2.tar.gz.

File metadata

  • Download URL: reactive-grpc-0.0.2.tar.gz
  • Upload date:
  • Size: 5.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.8.1 pkginfo/1.4.1 requests/2.19.1 setuptools/35.0.2 requests-toolbelt/0.7.1 clint/0.5.1 CPython/2.7.15rc1 Linux/4.15.0-45-generic

File hashes

Hashes for reactive-grpc-0.0.2.tar.gz
Algorithm Hash digest
SHA256 88585707c4996894987deff715f24173ce83448019ae40a6d93a5ace10125bf7
MD5 d790412d5d052eb8bab80f96d5e195b5
BLAKE2b-256 22d16f356fb5c1911f151c2cfe80ce1579f70b63bbc13826f55ede1c8130370d

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page