No project description provided
Project description
RabbitMQ transport for Domain-driven Design Misc
Пакет предоставляет транспортную надстройку на MessageBus
пакета ddd-misc
для осуществления публикации событий и RPC-вызова команд посредством брокера RabbitMQ.
Классы
Классы объектов
AsyncRMQEventTransport
- асинхронный класс транспорта выполняющий публикацию и подписку на событияSyncRMQEventTransport
- синхронный класс транспорта выполняющий публикацию и подписку на события
Сигнатура инициализации классов:
AsyncRMQEventTransport(messagebus: AsyncMessageBus, url: t.Union[str, URL], service_name, *, publish_middlewares: t.Iterable[AbstractPublisherMiddleware] = (), consume_middlewares: t.Iterable[AbstractConsumerMiddleware] = (), **kwargs)
SyncRMQEventTransport(messagebus: MessageBus, url: t.Union[str, URL], service_name, *, publish_middlewares: t.Iterable[AbstractPublisherMiddleware] = (), consume_middlewares: t.Iterable[AbstractConsumerMiddleware] = (), **kwargs)
messagebus
- инстанс класса шины сообщений используемой в сервисеurl
- урл подключения к брокеру RabbitMQ форматаamqps://user:password@broker-host/vhost
service_name
- наименование микросервиса, используется:- для формирования наименования exchange в который будет осуществляться публикация событий на основании шаблона
<service_name>_events
- используется в качестве наименования очереди для подписки на события по умолчанию
- для формирования наименования exchange в который будет осуществляться публикация событий на основании шаблона
prefetch_count
- максимальное количество одновременно обрабатываемых событийpublish_middlewares
- middleware вызываемые при публикации событийconsume_middlewares
- middleware вызываемые при получении событий по подписке**kwargs
- дополнительно возможные расширения параметризации класса транспорта
свойства
is_ready
- готовность класса принимать/отправлять событияservice_name
- наименование сервиса заданное при ининциализации
методы
def register(events: *t.Type[DDDEvent])
- регистрация событий для публикации через брокерevents
- классы событий
def consume_to_service(service_name: str, queue_name: str = None)
- метод подписки на все события публикуемые заданным микросервисомservice_name
- наименование стороннего сервиса, на exchange которого будет осуществлена подпискаqueue_name
- специфичное наименование очереди. При передаче пустой строки будет осуществлена посредством временной очереди
def consume_to_domain(service_name: str, domain: str, queue_name: str = None)
- метод подписки на все события указанного домена, публикуемые заданным микросервисомservice_name
- наименование стороннего сервиса, на exchange которого будет осуществлена подпискаdomain
- наименование домена на события которого будет осуществлена подпискаqueue_name
- специфичное наименование очереди. При передаче пустой строки будет осуществлена посредством временной очереди
def consume_to_event(service_name: str, event: t.Type[DDDEvent], queue_name: str = None)
- метод подписки на конкретное событие, публикуемое данным сервисомservice_name
- наименование стороннего сервиса, на exchange которого будет осуществлена подпискаevent
- наименование домена на события которого будет осуществлена подпискаqueue_name
- специфичное наименование очереди. При передаче пустой строки будет осуществлена посредством временной очереди
!!! Допускается подписка на события собственного сервиса при этом события полученные через брокер не будут повторно опубликованы в брокер сообщений
Примеры использования
Пример использования для публикации событий
from sample_project.bootstap import messagebus
from sample_project.domain.events import CompleteEvent, SpecificEvent
from dddmisc_rmq import AsyncRMQEventTransport
transport = AsyncRMQEventTransport(messagebus, 'amqps://guest:guest@localhost/vhost', 'sample_project')
transport.register(CompleteEvent, SpecificEvent)
Пример использования для подписки на события
from sample_project.bootstap import messagebus
from other_project.events import CompleteEvent, SpecificEvent
from dddmisc_rmq import AsyncRMQEventTransport
transport = AsyncRMQEventTransport(messagebus, 'amqps://guest:guest@localhost/vhost', 'sample_project')
transport.consume_to_event('other_project', CompleteEvent) # Подписка на событие CompleteEvent через постоянную очередь sample_project
transport.consume_to_domain('other_project', 'other_domain', '') # Экслюзивная подписка на события домена через временную очередь
transport.consume_to_service('other_project', 'sample-queue') # Подписка на все события домена через постоянную очередь sample-queue
Пример одновренменной подписки и публикации событий
from sample_project.bootstap import messagebus
from sample_project.events import SuccessEvent
from other_project.events import CompleteEvent, SpecificEvent
from dddmisc_rmq import AsyncRMQEventTransport
transport = AsyncRMQEventTransport(messagebus, 'amqps://guest:guest@localhost/vhost', 'sample_project')
transport.register(SuccessEvent)
transport.consume_to_event('other_project', CompleteEvent) # Подписка на событие CompleteEvent через постоянную очередь sample_project
transport.consume_to_domain('other_project', 'other_domain', '') # Экслюзивная подписка на события домена через временную очередь
transport.consume_to_service('other_project', 'sample-queue') # Подписка на все события домена через постоянную очередь sample-queue
Пример использования middleware
from sample_project.bootstap import messagebus
from sample_project.events import SuccessEvent
from other_project.events import CompleteEvent, SpecificEvent
import typing as t
from uuid import uuid4
import contextvars as cv
from dddmisc_rmq import AsyncRMQEventTransport, AbstractPublisherMiddleware, AbstractConsumerMiddleware
from aio_pika.abc import AbstractMessage, AbstractIncomingMessage
class PublisherMiddleware(AbstractPublisherMiddleware):
async def __call__(self, message: AbstractMessage, routing_key: str,
publisher: t.Callable[[AbstractMessage, str], t.Coroutine]):
message.correlation_id = str(cv.ContextVar('correlation_id').get(uuid4()))
return await publisher(message, routing_key)
class ConsumeMiddleware(AbstractConsumerMiddleware):
async def __call__(self, message: AbstractIncomingMessage,
consumer: t.Callable[[AbstractIncomingMessage], t.Coroutine]):
cv.ContextVar('correlation_id').set(message.correlation_id or uuid4())
return await consumer(message)
transport = AsyncRMQEventTransport(messagebus, 'amqps://guest:guest@localhost/vhost', 'sample_project',
publish_middlewares=[PublisherMiddleware()],
consume_middlewares=[ConsumeMiddleware()])
transport.register(SuccessEvent)
transport.consume_to_event('other_project', CompleteEvent) # Подписка на событие CompleteEvent через постоянную очередь sample_project
transport.consume_to_domain('other_project', 'other_domain','') # Экслюзивная подписка на события домена через временную очередь
transport.consume_to_service('other_project', 'sample-queue') # Подписка на все события домена через постоянную очередь sample-queue
Changelog
0.2.0
- Add support middlewares
0.1.2
- Add support ddd-misc version >=0.8.1 < 0.9.0
0.1.1
- Change exchange type from
Fanout
toTopic
0.1.0
- First release
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file ddd_misc_rabbitmq_transport-0.2.0.tar.gz
.
File metadata
- Download URL: ddd_misc_rabbitmq_transport-0.2.0.tar.gz
- Upload date:
- Size: 8.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.4.2 CPython/3.9.12 Darwin/23.0.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | ecdd042a16ea0597b5e9001ab3b71ff678ab284886acb07f89eb120dc90b6448 |
|
MD5 | bc2dbe56fd57f9b11a2aa7c883f27bee |
|
BLAKE2b-256 | 35472bfaf9a192b7298b7c9442ec61296cf5dfa97d090eb8714a5b3aa9753f05 |
File details
Details for the file ddd_misc_rabbitmq_transport-0.2.0-py3-none-any.whl
.
File metadata
- Download URL: ddd_misc_rabbitmq_transport-0.2.0-py3-none-any.whl
- Upload date:
- Size: 8.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.4.2 CPython/3.9.12 Darwin/23.0.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6a17ae5ead8ab0be9204dddb9ff97bc4c7189e84ed9d04dad811883d72c017ae |
|
MD5 | 10d47685f4b30f815706aa45bf0cedd1 |
|
BLAKE2b-256 | c767a4198462906a31cd7f90d454e763b000fe9666517674de635859438fe466 |