Python reactive gRPC support
Project description
reactive-grpc
A simple gRPC bridge to reactive streams.
Example: Given the following Protocol buffers definition:
syntax = "proto3"; package rxgrpc.test; service TestService { rpc GetOneToOne(TestRequest) returns (TestResponse) {} rpc GetOneToStream(TestRequest) returns (stream TestResponse) {} rpc GetStreamToOne(stream TestRequest) returns (TestResponse) {} rpc GetStreamToStream(stream TestRequest) returns (stream TestResponse) {} } message TestRequest { string message = 1; } message TestResponse { string message = 1; }
and a simple Servicer class:
from test.proto.test_pb2_grpc import TestServiceServicer
from test.proto import test_pb2
class _Servicer(TestServiceServicer):
def GetOneToOne(self, request: test_pb2.TestRequest, context):
return test_pb2.TestResponse(message='response: {}'.format(request.message))
def GetOneToStream(self, request, context):
for i in range(3):
yield test_pb2.TestResponse(message='response {}: {}'.format(i, request.message))
def GetStreamToOne(self, request_iterator, context):
return test_pb2.TestResponse(
message='response: {}'.format(
', '.join(map(lambda d: d.message, request_iterator))
)
)
def GetStreamToStream(self, request_iterator, context):
yield from map(
lambda d: test_pb2.TestResponse(message='response: {}'.format(d.message)),
request_iterator
)
A simple gRPC reactive server where request messages are transformed can be created as follows:
from test.proto import test_pb2_grpc, test_pb2
from rxgrpc import server, mappers
from rx import operators
from test.proto.test_pb2_grpc import TestServiceServicer
class _Servicer(TestServiceServicer):
# ...
pass
workers = 3
rx_server = server.create_server(test_pb2, workers)
test_pb2_grpc.add_TestServiceServicer_to_server(_Servicer(), rx_server)
rx_server.add_insecure_port('[::]:50051')
def _transform_message(m: test_pb2.TestRequest) -> test_pb2.TestRequest:
return test_pb2.TestRequest(message='TRANSFORMED {}'.format(m.message))
rx_server.set_grpc_observable(
rx_server.grpc_pipe(
operators.map(mappers.grpc_invocation_map(_transform_message)),
method_name='/rxgrpc.test.TestService/GetOneToOne'),
method_name='/rxgrpc.test.TestService/GetOneToOne'
)
rx_server.start()
Here it is an example of a filter for a streaming input:
from rxgrpc import operators
from test.proto import test_pb2
def _filter_message(m: test_pb2.TestRequest) -> test_pb2.TestRequest:
return bool(int(m.message[-1]) % 2)
server = ...
server.set_grpc_observable(
server.grpc_pipe(
operators.filter(_filter_message),
method_name='/rxgrpc.test.TestService/GetStreamToOne'),
method_name='/rxgrpc.test.TestService/GetStreamToOne'
)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
reactive-grpc-0.0.2.tar.gz
(5.5 kB
view details)
File details
Details for the file reactive-grpc-0.0.2.tar.gz
.
File metadata
- Download URL: reactive-grpc-0.0.2.tar.gz
- Upload date:
- Size: 5.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.8.1 pkginfo/1.4.1 requests/2.19.1 setuptools/35.0.2 requests-toolbelt/0.7.1 clint/0.5.1 CPython/2.7.15rc1 Linux/4.15.0-45-generic
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 88585707c4996894987deff715f24173ce83448019ae40a6d93a5ace10125bf7 |
|
MD5 | d790412d5d052eb8bab80f96d5e195b5 |
|
BLAKE2b-256 | 22d16f356fb5c1911f151c2cfe80ce1579f70b63bbc13826f55ede1c8130370d |