Python reactive gRPC support
Project description
reactive-grpc
A simple gRPC bridge to reactive streams.
Example: Given the following Protocol buffers definition:
syntax = "proto3"; package rxgrpc.test; service TestService { rpc GetOneToOne(TestRequest) returns (TestResponse) {} rpc GetOneToStream(TestRequest) returns (stream TestResponse) {} rpc GetStreamToOne(stream TestRequest) returns (TestResponse) {} rpc GetStreamToStream(stream TestRequest) returns (stream TestResponse) {} } message TestRequest { string message = 1; } message TestResponse { string message = 1; }
and a simple Servicer class:
from test.proto.test_pb2_grpc import TestServiceServicer
from test.proto import test_pb2
class _Servicer(TestServiceServicer):
def GetOneToOne(self, request: test_pb2.TestRequest, context):
return test_pb2.TestResponse(message='response: {}'.format(request.message))
def GetOneToStream(self, request, context):
for i in range(3):
yield test_pb2.TestResponse(message='response {}: {}'.format(i, request.message))
def GetStreamToOne(self, request_iterator, context):
return test_pb2.TestResponse(
message='response: {}'.format(
', '.join(map(lambda d: d.message, request_iterator))
)
)
def GetStreamToStream(self, request_iterator, context):
yield from map(
lambda d: test_pb2.TestResponse(message='response: {}'.format(d.message)),
request_iterator
)
A simple gRPC reactive server where request messages are transformed can be created as follows:
from test.proto import test_pb2_grpc, test_pb2
from rxgrpc import server, mappers
from rx import operators
from test.proto.test_pb2_grpc import TestServiceServicer
class _Servicer(TestServiceServicer):
# ...
pass
workers = 3
rx_server = server.create_server(test_pb2, workers)
test_pb2_grpc.add_TestServiceServicer_to_server(_Servicer(), rx_server)
rx_server.add_insecure_port('[::]:50051')
def _transform_message(m: test_pb2.TestRequest) -> test_pb2.TestRequest:
return test_pb2.TestRequest(message='TRANSFORMED {}'.format(m.message))
rx_server.set_grpc_observable(
rx_server.grpc_pipe(
operators.map(mappers.grpc_invocation_map(_transform_message)),
method_name='/rxgrpc.test.TestService/GetOneToOne'),
method_name='/rxgrpc.test.TestService/GetOneToOne'
)
rx_server.start()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
reactive-grpc-0.0.1.tar.gz
(4.9 kB
view hashes)