RPC брокер сообщений
Project description
RabbitMQ Broker
Не зависящий от фреймворков пакет для общения между микросервисами. Пакет предоставляет интерфейс для работы с брокером сообщений RabbitMQ
и базовый класс цепочки обработчиков.
Пакет реализует паттерн цепочка обязанностей в синхронном и асинхронном виде.
Конфигурация
Переменная окружения | Описание | Значение по умолчанию |
---|---|---|
MICROSERVICE_SETTINGS | Путь к модулю настроек проекта отно- | "settings" |
-сительно корня проекта. (Разделение | ||
через точку: module1.module2.settings) |
Использование
Цепочка обработчиков
Для использования цепочки обязанностей нужно импортировать базовый класс звена и унаследоваться от него. Необходимо установить параметр request_type
. Обязательно должен быть переопределен абстрактный метод BaseChain.get_response_body(self, data)
:
from rmq_broker.chains.base import BaseChain
class LoginChain(BaseChain):
request_type = "login"
def get_response_body(self, data: dict) -> dict:
...
Также, можно унаследоваться от асинхронного варианта реализации:
from rmq_broker.async_chains.base import BaseChain
class LoginChain(BaseChain):
request_type = "login"
async def get_response_body(self, data: dict) -> dict:
...
Метод должен содержать логику обработки запроса и вызывать родительский метод
BaseChain.form_response(data, body, code, message)
class LoginChain(BaseChain):
request_type = "login"
async def get_response_body(self, data):
# Обработка запроса
...
return super().form_response(data, response, status.HTTP_200_OK, "OK")
data
: Изначальное сообщение.
body
: Обработанное тело запроса.
code
: HTTP код ответа.
message
: Сообщение к ответу.
Модели
Доработанные pydantic модели - могут быть использованы как для валидации, так и для генерации сообщения.
Валидация сообщения происходит так же, как и в обычной pydantic модели:
ProcessedMessage(**message_dict)
Для генерации сообщения необходимо создать объект модели, не передавая аргументы при инициализации, и вызвать метод generate. В аргументы метода generate можно передавать любой из ключей структуры сообщения, в том числе code, message, dst и src. Вложенная структура(header, status) формируется сама:
>>> ProcessedMessage().generate(dst="destination", code=201, request_type="creation")
>>> {"header": {"src": "", "dst": "destination"}, "request_type": "creation"...}
RPC брокер
Запуск прослушки и автоматического ответа на сообщения.
В файле consumer.py
:
from app.chains import your_chain
from rmq_broker.queues.rabbitmq import AsyncRabbitMessageQueue
async def broker_runner():
async with AsyncRabbitMessageQueue() as provider:
await provider.register_tasks("rpc_queue_name", your_chain.handle)
await provider.consume()
asyncio.run(broker_runner())
Далее:
python3 consumer.py
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Hashes for rabbitmq_broker-1.2.5-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7073e3935bf8fb074eef1f7f7b2d675b620b2ab214e60a70ea438817ef4ada96 |
|
MD5 | 4f4e44d47930545d2957260731c32cac |
|
BLAKE2b-256 | 45f2a42639176cd5944acfbc7d0b186e8aff50424e6732e086f7381c5ac26c1d |