Skip to main content

framework for gRPC like server-client communication over message brokers

Project description

BrokRPC

Latest Version Python Supported versions MyPy Strict Test Coverage Downloads GitHub stars

BrokRPC (Broker Remote Procedure Call) is a framework for gRPC like server-client communication over message brokers.

key features

  • strict typing (even disallow_any_expr=true)
  • same protobuf structures as in gRPC
  • similar calls as in gRPC
    • unary-unary
    • (TODO) unary-stream
    • (TODO) stream-unary
    • (TODO) stream-stream
  • declarative style, abstract from broker commands (such as declare_exchange / queue_bind)
  • publisher & consumer middlewares
  • message serializers

codegen

You can generate python code for server & client from .proto files. The pyprotostuben project provides protoc plugin protoc-gen-brokrpc. See pyprotostuben project example for more details.

You may configure codegen output using protobuf extensions from buf schema registry.

supported brokers & protocols

  • AMQP
  • (TODO) redis
  • (TODO) kafka
  • (TODO) NATS

usage

pypi package

install with your favorite python package manager

pip install BrokRPC[aiormq]

Broker

use Broker as high level API to create consumers & publishers

from brokrpc.broker import Broker

# create & connect to broker with specified params
async with Broker(...) as broker:
    assert broker.is_connected
    # work with broker
    ...

Consumer

import asyncio
from brokrpc.broker import Broker
from brokrpc.message import Message
from brokrpc.options import BindingOptions, QueueOptions, ExchangeOptions
from brokrpc.serializer.json import JSONSerializer


async def register_consumer(broker: Broker) -> None:
    # define consumer function (you also can use async function & `Consumer` interface).
    def consume_binary_message(message: Message[bytes]) -> None:
        print(message)
  
    # consumer is not attached yet
  
    async with broker.consumer(consume_binary_message, BindingOptions(binding_keys=["my-consumer"])):
        # in this code block consumer is attached to broker and can receive messages
        ...
  
    # outside CM consumer is detached from broker and cannot receive messages
  
    async def consume_json(message: Message[object]) -> bool:
        obj = message.body
        if not isinstance(obj, dict):
            return False
    
        username = obj.get("username")
        if not username:
            return False
        
        print(f"Hello, {username}")
        await asyncio.sleep(1.0)  # simulate long processing
    
        return True
  
    async with broker.consumer(
        consume_json,
        BindingOptions(
            exchange=ExchangeOptions(name="json"),
            binding_keys=["my-json-consumer"],
            queue=QueueOptions(name="jsons", auto_delete=True),
        ),
        serializer=JSONSerializer(),
    ):
        ...

ConsumerMiddlewares

...

Publisher

from brokrpc.broker import Broker
from brokrpc.message import create_message
from brokrpc.serializer.json import JSONSerializer


async def publish_messages(broker: Broker) -> None:
    async with broker.publisher() as pub:
        # in this code block publisher is attached to broker and can send messages
        await pub.publish(create_message(body=b"this is a binary message", routing_key="test-consumer"))

    async with broker.publisher(serializer=JSONSerializer()) as json_pub:
        await json_pub.publish(create_message(body={"username": "John Smith"}, routing_key="my-json-consumer"))

Publisher Middlewares

...

Message

  • Message
  • SomeMessage
  • EncodedMessage
  • DecodedMessage

Serializer

  • JSONSerializer
  • ProtobufSerializer

RPC Server

  • Server

RPC Handler

...

RPC Client

  • Client

RPC Caller

...

RPC example

RPC server

run server process with following code

import asyncio

from brokrpc.broker import Broker
from brokrpc.options import ExchangeOptions
from brokrpc.rpc.model import Request
from brokrpc.rpc.server import Server
from brokrpc.serializer.json import JSONSerializer


# define app RPC handler
async def handle_request(request: Request[object]) -> str:
    print(f"{request=!s}")
    print(f"{request.body=}")
    return f"I greet you, {request.body}"


async def main() -> None:
    # create broker & RPC server
    broker = Broker(
        options="amqp://guest:guest@localhost:5672/",
        default_exchange=ExchangeOptions(name="simple-test-app"),
    )
    server = Server(broker)

    # register RPC handler
    server.register_unary_unary_handler(
        func=handle_request,
        routing_key="test-greeting",
        serializer=JSONSerializer(),
    )

    # connect to broker
    async with broker:
        # start RPC server until SIGINT or SIGTERM
        await server.run_until_terminated()


if __name__ == "__main__":
    asyncio.run(main())

RPC client

make a call to RPC server via RPC client with following code

import asyncio

from brokrpc.broker import Broker
from brokrpc.options import ExchangeOptions
from brokrpc.rpc.client import Client
from brokrpc.serializer.json import JSONSerializer


async def main() -> None:
    async with (
        # create & connect to broker
        Broker(
            options="amqp://guest:guest@localhost:5672/",
            default_exchange=ExchangeOptions(name="simple-test-app"),
        ) as broker,
        # create RPC client & get RPC caller
        Client(broker).unary_unary_caller(
            routing_key="test-greeting",
            serializer=JSONSerializer(),
        ) as caller,
    ):
        # publish app message & receive RPC response
        response = await caller.invoke("John")

        print(response)
        print(f"{response.body=}")


if __name__ == "__main__":
    asyncio.run(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

brokrpc-0.3.0.tar.gz (34.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

brokrpc-0.3.0-py3-none-any.whl (47.4 kB view details)

Uploaded Python 3

File details

Details for the file brokrpc-0.3.0.tar.gz.

File metadata

  • Download URL: brokrpc-0.3.0.tar.gz
  • Upload date:
  • Size: 34.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for brokrpc-0.3.0.tar.gz
Algorithm Hash digest
SHA256 5042f44dfd16fc1ae8bdf825e9985b4b4c4e27352db1b618913ce6efaf6b4baf
MD5 842777ef41c59f65fda9567a82427df9
BLAKE2b-256 27d0317e34976c6998735888238154d987c06e3cf3e2f03a54d35890632aee62

See more details on using hashes here.

Provenance

The following attestation bundles were made for brokrpc-0.3.0.tar.gz:

Publisher: publish.yaml on zerlok/BrokRPC

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file brokrpc-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: brokrpc-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 47.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for brokrpc-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1e259d81d4f238bd5f4d2937f0582441d6355490b3e5e3c02df05435188fe966
MD5 1bac1d90cd7bdae5fb81bf9272fa9b6a
BLAKE2b-256 e6726caf9f3df9d59c2a5b65f438704aef10eaad4512bc4fd637c49727f4c0d9

See more details on using hashes here.

Provenance

The following attestation bundles were made for brokrpc-0.3.0-py3-none-any.whl:

Publisher: publish.yaml on zerlok/BrokRPC

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page