No project description provided
Project description
RabbitMQ transport for Domain-driven Design Misc
Пакет предоставляет транспортную надстройку на MessageBus
пакета ddd-misc
для осуществления публикации событий и RPC-вызова команд посредством брокера RabbitMQ.
Классы
Классы объектов
AsyncRMQEventTransport
- асинхронный класс транспорта выполняющий публикацию и подписку на событияSyncRMQEventTransport
- синхронный класс транспорта выполняющий публикацию и подписку на события
Сигнатура инициализации классов:
AsyncRMQEventTransport(messagebus: AsyncMessageBus, url: t.Union[str, URL], service_name, **kwargs)
SyncRMQEventTransport(messagebus: MessageBus, url: t.Union[str, URL], service_name, **kwargs)
messagebus
- инстанс класса шины сообщений используемой в сервисеurl
- урл подключения к брокеру RabbitMQ форматаamqps://user:password@broker-host/vhost
service_name
- наименование микросервиса, используется:- для формирования наименования exchange в который будет осуществляться публикация событий на основании шаблона
<service_name>_events
- используется в качестве наименования очереди для подписки на события по умолчанию
- для формирования наименования exchange в который будет осуществляться публикация событий на основании шаблона
prefetch_count
- максимальное количество одновременно обрабатываемых событий**kwargs
- дополнительно возможные расширения параметризации класса транспорта
свойства
is_ready
- готовность класса принимать/отправлять событияservice_name
- наименование сервиса заданное при ининциализации
методы
def register(events: *t.Type[DDDEvent])
- регистрация событий для публикации через брокерevents
- классы событий
def consume_to_service(service_name: str, queue_name: str = None)
- метод подписки на все события публикуемые заданным микросервисомservice_name
- наименование стороннего сервиса, на exchange которого будет осуществлена подпискаqueue_name
- специфичное наименование очереди. При передаче пустой строки будет осуществлена посредством временной очереди
def consume_to_domain(service_name: str, domain: str, queue_name: str = None)
- метод подписки на все события указанного домена, публикуемые заданным микросервисомservice_name
- наименование стороннего сервиса, на exchange которого будет осуществлена подпискаdomain
- наименование домена на события которого будет осуществлена подпискаqueue_name
- специфичное наименование очереди. При передаче пустой строки будет осуществлена посредством временной очереди
def consume_to_event(service_name: str, event: t.Type[DDDEvent], queue_name: str = None)
- метод подписки на конкретное событие, публикуемое данным сервисомservice_name
- наименование стороннего сервиса, на exchange которого будет осуществлена подпискаevent
- наименование домена на события которого будет осуществлена подпискаqueue_name
- специфичное наименование очереди. При передаче пустой строки будет осуществлена посредством временной очереди
!!! Допускается подписка на события собственного сервиса при этом события полученные через брокер не будут повторно опубликованы в брокер сообщений
Примеры использования
Пример использования для публикации событий
from sample_project.bootstap import messagebus
from sample_project.domain.events import CompleteEvent, SpecificEvent
from dddmisc_rmq import AsyncRMQEventTransport
transport = AsyncRMQEventTransport(messagebus, 'amqps://guest:guest@localhost/vhost', 'sample_project')
transport.register(CompleteEvent, SpecificEvent)
Пример использования для подписки на события
from sample_project.bootstap import messagebus
from other_project.events import CompleteEvent, SpecificEvent
from dddmisc_rmq import AsyncRMQEventTransport
transport = AsyncRMQEventTransport(messagebus, 'amqps://guest:guest@localhost/vhost', 'sample_project')
transport.consume_to_event('other_project', CompleteEvent) # Подписка на событие CompleteEvent через постоянную очередь sample_project
transport.consume_to_domain('other_project', 'other_domain', '') # Экслюзивная подписка на события домена через временную очередь
transport.consume_to_service('other_project', 'sample-queue') # Подписка на все события домена через постоянную очередь sample-queue
Пример одновренменной подписки и публикации событий
from sample_project.bootstap import messagebus
from sample_project.events import SuccessEvent
from other_project.events import CompleteEvent, SpecificEvent
from dddmisc_rmq import AsyncRMQEventTransport
transport = AsyncRMQEventTransport(messagebus, 'amqps://guest:guest@localhost/vhost', 'sample_project')
transport.register(SuccessEvent)
transport.consume_to_event('other_project', CompleteEvent) # Подписка на событие CompleteEvent через постоянную очередь sample_project
transport.consume_to_domain('other_project', 'other_domain', '') # Экслюзивная подписка на события домена через временную очередь
transport.consume_to_service('other_project', 'sample-queue') # Подписка на все события домена через постоянную очередь sample-queue
Changelog
0.1.1
- Change exchange type from
Fanout
toTopic
0.1.0
- First release
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for ddd-misc-rabbitmq-transport-0.1.1.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9698486ebe278ab10592edda960e6ea0470cd9b6ab9947c70f9940dc65bb2d9a |
|
MD5 | 62b5d026f388667bd8be4759786f6a6a |
|
BLAKE2b-256 | 47f93eeeda184af697ed21259e18281d6951053c8ec2680f603882fbb2a54c2b |
Hashes for ddd_misc_rabbitmq_transport-0.1.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | b3097db9cdc7fde40d23262fa664821103782f4928bd26a2ecf00ea972686f64 |
|
MD5 | 6e6909a6b7aaff19530a161dc52c48c7 |
|
BLAKE2b-256 | 62b7d401fc552128a10a03eb45fda757f7ae3c4bad7522d9e7240609c6598797 |