Python CQRS pattern implementation
Project description
CQRS
Python-библиотека для реализации шаблона CQRS в приложениях Python. Предоставляет набор абстракций и утилит, которые помогут разделить задачи чтения и записи, обеспечивая лучшую масштабируемость, производительность и удобство обслуживания приложения.
Библиотека является форком библиотеки diator (документация) с рядом улучшений:
- поддержка Pydantic v2.*;
- поддержка Kafka в качестве брокера aiokafka;
- добавлен
EventMediator
для обработкиNotification
иECST
событий, приходящих из шины; - переработам механизм
mapping
-а событий и запросов на обработчики; - добавлен
bootstrap
для легкого начала работы; - Добавлена поддержка Transaction Outbox,
дающего гарантию отправки
Notification
иECST
событий в брокера.
Примеры использования
Обработчики событий
from cqrs.events import EventHandler
class UserJoinedEventHandler(EventHandler[UserJoinedEventHandler])
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
async def handle(self, event: UserJoinedEventHandler) -> None:
await self._meetings_api.notify_room(event.meeting_id, "New user joined!")
Обработчик запросов
Обработчик command
from cqrs.requests.request_handler import RequestHandler
from cqrs.events.event import Event
class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None])
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
self.events: list[Event] = []
async def handle(self, request: JoinMeetingCommand) -> None:
await self._meetings_api.join_user(request.user_id, request.meeting_id)
Обработчик query
from cqrs.requests.request_handler import RequestHandler
from cqrs.events.event import Event
class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult])
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
self.events: list[Event] = []
async def handle(self, request: ReadMeetingQuery) -> ReadMeetingQueryResult:
link = await self._meetings_api.get_link(request.meeting_id)
return ReadMeetingQueryResult(link=link, meeting_id=request.meeting_id)
Продюсирование Notification
/ECST
событий
Во время обработки запроса/команды можно породить сообщения с типом cqrs.NotificationEvent
или cqrs.ECSTEvent
,
которое в дальнейшем продюсируется брокером сообщений
class CloseMeetingRoomCommandHandler(requests.RequestHandler[CloseMeetingRoomCommand, None]):
def __init__(self) -> None:
self._events: typing.List[events.Event] = []
@property
def events(self) -> typing.List:
return self._events
async def handle(self, request: CloseMeetingRoomCommand) -> None:
# some process
event = events.NotificationEvent(
event_topic="meeting_room_notifications",
event_name="meeteng_room_closed",
payload=dict(
meeting_room_id=request.meeting_room_id,
),
)
self._events.append(event)
После обработки команды/запроса, при наличии Notification
/ECST
событий, вызывается EventEmitter который
спродюсирует события посредством message_broker'а
Медиатор
from cqrs.events import EventMap, EventEmitter
from cqrs.requests import RequestMap
from cqrs.mediator import RequestMediator
from cqrs.message_brokers.amqp import AMQPMessageBroker
message_broker = AMQPMessageBroker(
dsn=f"amqp://{LOGIN}:{PASSWORD}@{HOSTNAME}/",
queue_name="user_joined_domain",
exchange_name="user_joined",
)
event_map = EventMap()
event_map.bind(UserJoinedDomainEvent, UserJoinedDomainEventHandler)
request_map = RequestMap()
request_map.bind(JoinUserCommand, JoinUserCommandHandler)
event_emitter = EventEmitter(event_map, container, message_broker)
mediator = RequestMediator(
request_map=request_map,
container=container
event_emitter=event_emitter,
)
# Handles command and published events by the command handler.
await mediator.send(join_user_command)
Kafka Брокер
from cqrs.adapters import kafka as kafka_adapter
from cqrs.message_brokers import kafka as kafka_broker
producer = kafka_adapter.kafka_producer_factory(
dsn="localhost:9094",
topics=["test.topic1", "test.topic2"],
)
broker = kafka_broker.KafkaMessageBroker(producer)
await broker.send_message(...)
Transactional Outbox
Пакет имплементирует паттерн Transaction Outbox,
что позволяет гарантировать продюсирование сообщений в брокер согласно семантике at-least-once
.
from sqlalchemy.ext.asyncio import session as sql_session
from cqrs import events
def do_some_logic(meeting_room_id: int, session: sql_session.AsyncSession):
"""
Внесение изменений в БД
"""
session.add(...)
class CloseMeetingRoomCommandHandler(requests.RequestHandler[CloseMeetingRoomCommand, None]):
def __init__(self, repository: cqrs.SqlAlchemyOutboxedEventRepository):
self._repository = repository
self._events: typing.List[events.Event] = []
async def handle(self, request: CloseMeetingRoomCommand) -> None:
async with self._repository as session:
do_some_logic(request.meeting_room_id, session)
self.repository.add(
session,
events.ECSTEvent(
event_name="MeetingRoomCloseв",
payload=dict(message="foo"),
),
)
await self.repository.commit(session)
Продюсирование событий из Outbox в Kafka
В качестве имплементации Transaction Outbox доступен для использования репозиторий доступа к Outbox
хранилищу SqlAlchemyOutboxedEventRepository.
Его можно использовать в связке с KafkaMessageBroker
.
import asyncio
import cqrs
from cqrs.message_brokers import kafka as kafka_broker
session_factory = async_sessionmaker(
create_async_engine(
f"mysql+asyncmy://{USER}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}",
isolation_level="REPEATABLE READ",
)
)
broker = kafka_broker.KafkaMessageBroker(
kafka_adapter.kafka_producer_factory(
dsn="localhost:9094",
topics=["test.topic1", "test.topic2"],
),
"DEBUG"
)
producer = cqrs.EventProducer(cqrs.SqlAlchemyOutboxedEventRepository(session_factory, zlib.ZlibCompressor()), broker)
loop = asyncio.get_event_loop()
loop.run_until_complete(app.periodically_task())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for python_cqrs-0.0.3-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | a1d51c7b27e91d65f95f8f80d4bb6de039048350dc042d71f63b9c52f062bf87 |
|
MD5 | 722c972dc5da1dc6e55734f7f4032287 |
|
BLAKE2b-256 | 3727e4c30b15041e2049e96785d01dc6a39400f215dec6b14df9d8df52e01d8f |