Mediator pattern impl
Project description
Meator
Python CQRS pattern implementation.
Installation
pip install meator
Available
- Dispatchers:
- Command
- Query
- Observers:
- Event
- Entities:
- Command
- Event
- Query
- Middlewares
Usecases
Command/Event/Query
from dataclasses import dataclass
from meator.dispatchers import CommandDispatcherImpl
from meator.entities import Command
from meator.interfaces import CommandHandler
@dataclass
class IntCommand(Command[int]):
answer: int
class IntCommandHandler(CommandHandler[IntCommand, int]):
async def __call__(self, request: IntCommand) -> int:
return request.answer
async def main():
c = CommandDispatcherImpl()
c.register(IntCommand, IntCommandHandler())
await c.handle(IntCommand(1))
Middleware
from dataclasses import dataclass
from meator.dispatchers import CommandDispatcherImpl
from meator.entities import Command, Request
from meator.interfaces import CommandHandler, Handler, Middleware
class SimpleMiddleware(Middleware):
async def __call__(self, call_next: Handler, request: Request):
return await call_next(request)
@dataclass
class IntCommand(Command[int]):
answer: int
class IntCommandHandler(CommandHandler[IntCommand, int]):
async def __call__(self, request: IntCommand) -> int:
return request.answer
async def main():
c = CommandDispatcherImpl(middlewares=[SimpleMiddleware()])
c.register(IntCommand, IntCommandHandler())
await c.handle(IntCommand(1))
Tests
pytest tests
Additional
Inspired by didator
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
meator-2.0.1.tar.gz
(7.2 kB
view hashes)
Built Distribution
meator-2.0.1-py3-none-any.whl
(14.1 kB
view hashes)