Skip to main content

Mediator pattern impl

Project description

Meator

image Mypy checked codecov image Ruff image

Python CQRS pattern implementation.

Docs

Installation

pip install meator

Available

  • Dispatchers:
    • Command
    • Query
  • Observers:
    • Event
  • Entities:
    • Command
    • Event
    • Query
  • Middlewares

Usecases

Command/Event/Query

from dataclasses import dataclass

from meator.dispatchers import CommandDispatcherImpl
from meator.entities import Command
from meator.interfaces import CommandHandler


@dataclass
class IntCommand(Command[int]):
    answer: int


class IntCommandHandler(CommandHandler[IntCommand, int]):
    async def __call__(self, request: IntCommand) -> int:
        return request.answer


async def main():
    c = CommandDispatcherImpl()

    c.register(IntCommand, IntCommandHandler())

    await c.handle(IntCommand(1))

Middleware

from dataclasses import dataclass

from meator.dispatchers import CommandDispatcherImpl
from meator.entities import Command, Request
from meator.interfaces import CommandHandler, Handler, Middleware


class SimpleMiddleware(Middleware):
    async def __call__(self, call_next: Handler, request: Request):
        return await call_next(request)


@dataclass
class IntCommand(Command[int]):
    answer: int


class IntCommandHandler(CommandHandler[IntCommand, int]):
    async def __call__(self, request: IntCommand) -> int:
        return request.answer


async def main():
    c = CommandDispatcherImpl(middlewares=[SimpleMiddleware()])

    c.register(IntCommand, IntCommandHandler())

    await c.handle(IntCommand(1))

Tests

  • pytest tests

Additional

Inspired by didator

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

meator-2.0.1.tar.gz (7.2 kB view hashes)

Uploaded Source

Built Distribution

meator-2.0.1-py3-none-any.whl (14.1 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page