framework for gRPC like server-client communication over message brokers
Project description
BrokRPC
BrokRPC (Broker Remote Procedure Call) is a framework for gRPC like server-client communication over message brokers.
key features
- same protobuf structures as in gRPC
- similar calls as in gRPC
- unary-unary
- (TODO) unary-stream
- (TODO) stream-unary
- (TODO) stream-stream
- declarative style, abstract from broker commands (such as declare_exchange / queue_bind)
- publisher & consumer middlewares
- message serializers
codegen
You can generate python code for server & client from .proto files.
The pyprotostuben project provides protoc plugin protoc-gen-brokrpc. See
pyprotostuben project example for more details.
You may configure codegen output using protobuf extensions from buf schema registry.
supported brokers & protocols
usage
install with your favorite python package manager
pip install BrokRPC[aiormq]
Broker
use Broker as high level API to create consumers & publishers
from brokrpc.broker import Broker
# create & connect to broker with specified params
async with Broker(...) as broker:
assert broker.is_connected
# work with broker
...
Consumer
import asyncio
from brokrpc.broker import Broker
from brokrpc.message import Message
from brokrpc.options import BindingOptions, QueueOptions, ExchangeOptions
from brokrpc.serializer.json import JSONSerializer
async def register_consumer(broker: Broker) -> None:
# define consumer function (you also can use async function & `Consumer` interface).
def consume_binary_message(message: Message[bytes]) -> None:
print(message)
# consumer is not attached yet
async with broker.consumer(consume_binary_message, BindingOptions(binding_keys=["my-consumer"])):
# in this code block consumer is attached to broker and can receive messages
...
# outside CM consumer is detached from broker and cannot receive messages
async def consume_json(message: Message[object]) -> bool:
obj = message.body
if not isinstance(obj, dict):
return False
username = obj.get("username")
if not username:
return False
print(f"Hello, {username}")
await asyncio.sleep(1.0) # simulate long processing
return True
async with broker.consumer(
consume_json,
BindingOptions(
exchange=ExchangeOptions(name="json"),
binding_keys=["my-json-consumer"],
queue=QueueOptions(name="jsons", auto_delete=True),
),
serializer=JSONSerializer(),
):
...
ConsumerMiddlewares
...
Publisher
from brokrpc.broker import Broker
from brokrpc.message import AppMessage
from brokrpc.serializer.json import JSONSerializer
async def publish_messages(broker: Broker) -> None:
async with broker.publisher() as pub:
# in this code block publisher is attached to broker and can send messages
await pub.publish(AppMessage(body=b"this is a binary message", routing_key="test-consumer"))
async with broker.publisher(serializer=JSONSerializer()) as json_pub:
await json_pub.publish(AppMessage(body={"username": "John Smith"}, routing_key="my-json-consumer"))
Publisher Middlewares
...
Message
MessageAppMessagePackedMessageUnpackedMessage
Serializer
JSONSerializerProtobufSerializer
RPC Server
Server
RPC Handler
...
RPC Client
Client
RPC Caller
...
RPC example
RPC server
run server process with following code
import asyncio
from brokrpc.broker import Broker
from brokrpc.options import ExchangeOptions
from brokrpc.rpc.model import Request
from brokrpc.rpc.server import Server
from brokrpc.serializer.json import JSONSerializer
# define app RPC handler
async def handle_request(request: Request[object]) -> str:
print(f"{request=!s}")
print(f"{request.body=}")
return f"I greet you, {request.body}"
async def main() -> None:
# create broker & RPC server
broker = Broker(
options="amqp://guest:guest@localhost:5672/",
default_exchange=ExchangeOptions(name="simple-test-app"),
)
server = Server(broker)
# register RPC handler
server.register_unary_unary_handler(
func=handle_request,
routing_key="test-greeting",
serializer=JSONSerializer(),
)
# connect to broker
async with broker:
# start RPC server until SIGINT or SIGTERM
await server.run_until_terminated()
if __name__ == "__main__":
asyncio.run(main())
RPC client
make a call to RPC server via RPC client with following code
import asyncio
from brokrpc.broker import Broker
from brokrpc.options import ExchangeOptions
from brokrpc.rpc.client import Client
from brokrpc.serializer.json import JSONSerializer
async def main() -> None:
async with (
# create & connect to broker
Broker(
options="amqp://guest:guest@localhost:5672/",
default_exchange=ExchangeOptions(name="simple-test-app"),
) as broker,
# create RPC client & get RPC caller
Client(broker).unary_unary_caller(
routing_key="test-greeting",
serializer=JSONSerializer(),
) as caller,
):
# publish app message & receive RPC response
response = await caller.invoke("John")
print(response)
print(f"{response.body=}")
if __name__ == "__main__":
asyncio.run(main())
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file brokrpc-0.2.4.tar.gz.
File metadata
- Download URL: brokrpc-0.2.4.tar.gz
- Upload date:
- Size: 31.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.0.1 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b2145be9fba364541dc978993fc765387d9f5bcb3ae1ab6acecf1b3e7800ff44
|
|
| MD5 |
a1a2b564bd424b01efef0e6f693dfd2a
|
|
| BLAKE2b-256 |
53aba93ec0484ba84a34f8ff58992add0c379827ca03b004c1fb3e58f66e2ac2
|
Provenance
The following attestation bundles were made for brokrpc-0.2.4.tar.gz:
Publisher:
publish.yaml on zerlok/BrokRPC
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
brokrpc-0.2.4.tar.gz -
Subject digest:
b2145be9fba364541dc978993fc765387d9f5bcb3ae1ab6acecf1b3e7800ff44 - Sigstore transparency entry: 157337001
- Sigstore integration time:
-
Permalink:
zerlok/BrokRPC@ab897d76410c4e81781878bdf3ff839290aedf47 -
Branch / Tag:
refs/tags/v0.2.4 - Owner: https://github.com/zerlok
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yaml@ab897d76410c4e81781878bdf3ff839290aedf47 -
Trigger Event:
release
-
Statement type:
File details
Details for the file brokrpc-0.2.4-py3-none-any.whl.
File metadata
- Download URL: brokrpc-0.2.4-py3-none-any.whl
- Upload date:
- Size: 42.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.0.1 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
125c3fcce8e512e85d7db92ca519d6f63d1efa3713fadb599b79f29b12c75a9f
|
|
| MD5 |
72d265298b21c43dea9fc580f7c116de
|
|
| BLAKE2b-256 |
a8c8aa9ec440091ae43cbffca51aa228fabc80f5d85149ab534287c4aca6d75a
|
Provenance
The following attestation bundles were made for brokrpc-0.2.4-py3-none-any.whl:
Publisher:
publish.yaml on zerlok/BrokRPC
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
brokrpc-0.2.4-py3-none-any.whl -
Subject digest:
125c3fcce8e512e85d7db92ca519d6f63d1efa3713fadb599b79f29b12c75a9f - Sigstore transparency entry: 157337002
- Sigstore integration time:
-
Permalink:
zerlok/BrokRPC@ab897d76410c4e81781878bdf3ff839290aedf47 -
Branch / Tag:
refs/tags/v0.2.4 - Owner: https://github.com/zerlok
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yaml@ab897d76410c4e81781878bdf3ff839290aedf47 -
Trigger Event:
release
-
Statement type: