Skip to main content

No project description provided

Project description

RabbitMQ transport for Domain-driven Design Misc

Пакет предоставляет транспортную надстройку на MessageBus пакета ddd-misc для осуществления публикации событий и RPC-вызова команд посредством брокера RabbitMQ.

Классы

Классы объектов

  • AsyncRMQEventTransport - асинхронный класс транспорта выполняющий публикацию и подписку на события
  • SyncRMQEventTransport - синхронный класс транспорта выполняющий публикацию и подписку на события

Сигнатура инициализации классов:

AsyncRMQEventTransport(messagebus: AsyncMessageBus, url: t.Union[str, URL], service_name, *, publish_middlewares: t.Iterable[AbstractPublisherMiddleware] = (), consume_middlewares: t.Iterable[AbstractConsumerMiddleware] = (), **kwargs)

SyncRMQEventTransport(messagebus: MessageBus, url: t.Union[str, URL], service_name, *, publish_middlewares: t.Iterable[AbstractPublisherMiddleware] = (), consume_middlewares: t.Iterable[AbstractConsumerMiddleware] = (), **kwargs)

  • messagebus - инстанс класса шины сообщений используемой в сервисе
  • url - урл подключения к брокеру RabbitMQ формата amqps://user:password@broker-host/vhost
  • service_name - наименование микросервиса, используется:
    • для формирования наименования exchange в который будет осуществляться публикация событий на основании шаблона <service_name>_events
    • используется в качестве наименования очереди для подписки на события по умолчанию
  • prefetch_count - максимальное количество одновременно обрабатываемых событий
  • publish_middlewares - middleware вызываемые при публикации событий
  • consume_middlewares - middleware вызываемые при получении событий по подписке
  • **kwargs - дополнительно возможные расширения параметризации класса транспорта

свойства

  • is_ready - готовность класса принимать/отправлять события
  • service_name - наименование сервиса заданное при ининциализации

методы

  • def register(events: *t.Type[DDDEvent]) - регистрация событий для публикации через брокер
    • events - классы событий
  • def consume_to_service(service_name: str, queue_name: str = None) - метод подписки на все события публикуемые заданным микросервисом
    • service_name - наименование стороннего сервиса, на exchange которого будет осуществлена подписка
    • queue_name - специфичное наименование очереди. При передаче пустой строки будет осуществлена посредством временной очереди
  • def consume_to_domain(service_name: str, domain: str, queue_name: str = None) - метод подписки на все события указанного домена, публикуемые заданным микросервисом
    • service_name - наименование стороннего сервиса, на exchange которого будет осуществлена подписка
    • domain - наименование домена на события которого будет осуществлена подписка
    • queue_name - специфичное наименование очереди. При передаче пустой строки будет осуществлена посредством временной очереди
  • def consume_to_event(service_name: str, event: t.Type[DDDEvent], queue_name: str = None) - метод подписки на конкретное событие, публикуемое данным сервисом
    • service_name - наименование стороннего сервиса, на exchange которого будет осуществлена подписка
    • event - наименование домена на события которого будет осуществлена подписка
    • queue_name - специфичное наименование очереди. При передаче пустой строки будет осуществлена посредством временной очереди

!!! Допускается подписка на события собственного сервиса при этом события полученные через брокер не будут повторно опубликованы в брокер сообщений

Примеры использования

Пример использования для публикации событий

from sample_project.bootstap import messagebus
from sample_project.domain.events import CompleteEvent, SpecificEvent
from dddmisc_rmq import AsyncRMQEventTransport

transport = AsyncRMQEventTransport(messagebus, 'amqps://guest:guest@localhost/vhost', 'sample_project')
transport.register(CompleteEvent, SpecificEvent)

Пример использования для подписки на события

from sample_project.bootstap import messagebus
from other_project.events import CompleteEvent, SpecificEvent
from dddmisc_rmq import AsyncRMQEventTransport

transport = AsyncRMQEventTransport(messagebus, 'amqps://guest:guest@localhost/vhost', 'sample_project')
transport.consume_to_event('other_project', CompleteEvent)  # Подписка на событие CompleteEvent через постоянную очередь sample_project
transport.consume_to_domain('other_project', 'other_domain', '')  # Экслюзивная подписка на события домена через временную очередь
transport.consume_to_service('other_project', 'sample-queue')  # Подписка на все события домена через постоянную очередь sample-queue

Пример одновренменной подписки и публикации событий

from sample_project.bootstap import messagebus
from sample_project.events import SuccessEvent
from other_project.events import CompleteEvent, SpecificEvent
from dddmisc_rmq import AsyncRMQEventTransport

transport = AsyncRMQEventTransport(messagebus, 'amqps://guest:guest@localhost/vhost', 'sample_project')
transport.register(SuccessEvent)
transport.consume_to_event('other_project', CompleteEvent)  # Подписка на событие CompleteEvent через постоянную очередь sample_project
transport.consume_to_domain('other_project', 'other_domain', '')  # Экслюзивная подписка на события домена через временную очередь
transport.consume_to_service('other_project', 'sample-queue')  # Подписка на все события домена через постоянную очередь sample-queue

Пример использования middleware

from sample_project.bootstap import messagebus
from sample_project.events import SuccessEvent
from other_project.events import CompleteEvent, SpecificEvent
import typing as t
from uuid import uuid4
import contextvars as cv
from dddmisc_rmq import AsyncRMQEventTransport, AbstractPublisherMiddleware, AbstractConsumerMiddleware
from aio_pika.abc import AbstractMessage, AbstractIncomingMessage


class PublisherMiddleware(AbstractPublisherMiddleware):

  async def __call__(self, message: AbstractMessage, routing_key: str,
                     publisher: t.Callable[[AbstractMessage, str], t.Coroutine]):
    message.correlation_id = str(cv.ContextVar('correlation_id').get(uuid4()))
    return await publisher(message, routing_key)


class ConsumeMiddleware(AbstractConsumerMiddleware):

  async def __call__(self, message: AbstractIncomingMessage,
                     consumer: t.Callable[[AbstractIncomingMessage], t.Coroutine]):
    cv.ContextVar('correlation_id').set(message.correlation_id or uuid4())
    return await consumer(message)


transport = AsyncRMQEventTransport(messagebus, 'amqps://guest:guest@localhost/vhost', 'sample_project',
                                   publish_middlewares=[PublisherMiddleware()],
                                   consume_middlewares=[ConsumeMiddleware()])
transport.register(SuccessEvent)
transport.consume_to_event('other_project', CompleteEvent)  # Подписка на событие CompleteEvent через постоянную очередь sample_project
transport.consume_to_domain('other_project', 'other_domain','')  # Экслюзивная подписка на события домена через временную очередь
transport.consume_to_service('other_project', 'sample-queue')  # Подписка на все события домена через постоянную очередь sample-queue

Changelog

0.2.0

  • Add support middlewares

0.1.2

  • Add support ddd-misc version >=0.8.1 < 0.9.0

0.1.1

  • Change exchange type from Fanout to Topic

0.1.0

  • First release

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ddd_misc_rabbitmq_transport-0.2.0.tar.gz (8.8 kB view details)

Uploaded Source

Built Distribution

File details

Details for the file ddd_misc_rabbitmq_transport-0.2.0.tar.gz.

File metadata

File hashes

Hashes for ddd_misc_rabbitmq_transport-0.2.0.tar.gz
Algorithm Hash digest
SHA256 ecdd042a16ea0597b5e9001ab3b71ff678ab284886acb07f89eb120dc90b6448
MD5 bc2dbe56fd57f9b11a2aa7c883f27bee
BLAKE2b-256 35472bfaf9a192b7298b7c9442ec61296cf5dfa97d090eb8714a5b3aa9753f05

See more details on using hashes here.

File details

Details for the file ddd_misc_rabbitmq_transport-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for ddd_misc_rabbitmq_transport-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6a17ae5ead8ab0be9204dddb9ff97bc4c7189e84ed9d04dad811883d72c017ae
MD5 10d47685f4b30f815706aa45bf0cedd1
BLAKE2b-256 c767a4198462906a31cd7f90d454e763b000fe9666517674de635859438fe466

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page