Skip to main content

CQRS-style mediator and pipeline for Python — interface-compatible with martinothamar/Mediator.

Project description

mediatorx

CQRS-style mediator and pipeline for Python — designed for clean / onion architecture. Inspired by MediatR and interface-compatible with martinothamar/Mediator for .NET.

PyPI version Python versions License: MIT

mediatorx brings the IRequest / ICommand / IQuery / INotification mediator pattern from .NET to Python, without runtime reflection magic, code generation, or scary metaclasses. Just protocols, async, and an explicit composition root.

If you've used MediatR or martinothamar's Mediator in C# and want the same shape in your FastAPI / clean architecture Python project, this is for you.


Why another mediator library?

If you're coming from .NET, you've almost certainly used MediatR — the de facto mediator pattern library for ASP.NET Core / clean architecture projects. mediatorx exists because the equivalent Python ecosystem is thin: most packages (mediatr, diator, mediatpy, python-mediator) are abandoned, and the handful that are maintained either ship too much (Kafka, outbox, brokers) or too little (no streaming, no command/query separation, no pipeline pre/post split).

mediatorx aims for one thing: mirror the MediatR interface contracts in idiomatic async Python — specifically following the cleaner taxonomy from martinothamar/Mediator (IRequest / ICommand / IQuery separation, stream variants, pre/post/exception processors). No more, no less.

  • IRequest<T>, ICommand<T>, IQuery<T> — semantic CQRS separation
  • ✅ Streaming variants (IStreamRequest, IStreamCommand, IStreamQuery) via AsyncIterator
  • ✅ Notifications (pub/sub) with pluggable publishers (ForeachAwait, TaskWhenAll)
  • ✅ Pipeline behaviors with IPipelineBehavior, plus split PreProcessor / PostProcessor / ExceptionHandler base classes
  • ✅ Marker-interface constraints (where TMessage : ICommand equivalent)
  • ✅ Async-first — no sync/async dual path, no asyncio.run hacks
  • ❌ No runtime reflection, no source generation, no assembly scanning
  • ❌ No broker integration — bring your own; this is in-process

Installation

pip install mediatorx
# or
uv add mediatorx

Requires Python 3.11+.


Quickstart

import asyncio
from dataclasses import dataclass
from mediatorx import IRequest, IRequestHandler, Mediator

@dataclass
class Ping(IRequest[str]):
    message: str

class PingHandler(IRequestHandler[Ping, str]):
    async def handle(self, request: Ping) -> str:
        return f"pong: {request.message}"

async def main():
    mediator = Mediator()
    mediator.register(Ping, PingHandler())

    response = await mediator.send(Ping("hello"))
    print(response)  # "pong: hello"

asyncio.run(main())

Core concepts

Messages — Request, Command, Query

Three semantically distinct message kinds with identical mechanics. Pick the one that matches your CQRS intent; pipeline behaviors can be constrained by marker:

from mediatorx import IRequest, ICommand, IQuery

@dataclass
class GetUserById(IQuery[User]):       # read — must return data
    user_id: int

@dataclass
class CreateBooking(ICommand[int]):    # write — returns booking id
    user_id: int
    room_id: int

@dataclass
class SendWelcomeEmail(ICommand[None]):  # write — no meaningful return
    user_id: int

Handlers

One handler per message type. Constructor-injected dependencies — works with any DI container:

from mediatorx import IQueryHandler

class GetUserByIdHandler(IQueryHandler[GetUserById, User]):
    def __init__(self, repo: IUserRepository):
        self._repo = repo

    async def handle(self, query: GetUserById) -> User:
        return await self._repo.get(query.user_id)

Pipeline behaviors

Wrap every handler in cross-cutting concerns (logging, validation, transactions) without touching the handler itself:

from mediatorx import IPipelineBehavior, MessageHandlerDelegate

class LoggingBehavior(IPipelineBehavior[IMessage, TResponse]):
    def __init__(self, logger: Logger):
        self._logger = logger

    async def handle(
        self,
        message: IMessage,
        next: MessageHandlerDelegate[TResponse],
    ) -> TResponse:
        self._logger.info(f"handling {type(message).__name__}")
        try:
            response = await next()
            self._logger.info(f"handled {type(message).__name__}")
            return response
        except Exception:
            self._logger.exception(f"failed {type(message).__name__}")
            raise

For simple cases, use split pre/post/exception processors instead of the omnibus middleware shape:

from mediatorx import MessagePreProcessor, MessageExceptionHandler

class ValidationProcessor(MessagePreProcessor[IValidate, TResponse]):
    async def handle(self, message: IValidate) -> None:
        message.validate()  # raises on invalid input

class RetryableErrorHandler(MessageExceptionHandler[IMessage, TResponse, TransientError]):
    async def handle(self, message, exc):
        return self.NotHandled  # let it bubble; record metric elsewhere

Constraining behaviors to a subset of messages

Use the marker hierarchy (IMessage, IBaseCommand, IBaseQuery) to scope behaviors. A behavior bound to ICommand only fires for commands, not queries — exactly like the C# where TMessage : ICommand constraint:

class TransactionBehavior(IPipelineBehavior[IBaseCommand, TResponse]):
    """Wraps commands in a DB transaction. Queries skip this entirely."""
    ...

Notifications (pub/sub)

from mediatorx import INotification, INotificationHandler

@dataclass(frozen=True)
class UserRegistered(INotification):
    user_id: int

class SendWelcomeEmailOnRegister(INotificationHandler[UserRegistered]):
    async def handle(self, n: UserRegistered) -> None: ...

class LogRegistrationOnRegister(INotificationHandler[UserRegistered]):
    async def handle(self, n: UserRegistered) -> None: ...

# Both handlers run on every publish
await mediator.publish(UserRegistered(user_id=42))

Pick a publisher strategy at construction:

from mediatorx import Mediator, ForeachAwaitPublisher, TaskWhenAllPublisher

mediator = Mediator(publisher=TaskWhenAllPublisher())  # parallel
# or
mediator = Mediator(publisher=ForeachAwaitPublisher())  # sequential, default

Streaming

from mediatorx import IStreamRequest, IStreamRequestHandler
from typing import AsyncIterator

@dataclass
class TailLogs(IStreamRequest[LogLine]):
    service: str

class TailLogsHandler(IStreamRequestHandler[TailLogs, LogLine]):
    async def handle(self, request: TailLogs) -> AsyncIterator[LogLine]:
        async for line in log_source(request.service):
            yield line

async for line in mediator.create_stream(TailLogs("api")):
    print(line)

Clean architecture with FastAPI

mediatorx is designed to live in your application layer. Endpoints become two-liners that hand a DTO to the mediator and return the response:

src/myapp/
├── domain/              # entities, value objects, repository interfaces
├── application/         # commands, queries, handlers, behaviors  ← uses mediatorx
├── infrastructure/      # repository impls, db, external services
└── api/                 # FastAPI routes (composition root)
# api/bookings.py
from fastapi import APIRouter, Depends

router = APIRouter()

@router.post("/bookings")
async def create_booking(
    body: CreateBookingDto,
    mediator: Mediator = Depends(get_mediator),
) -> BookingResponse:
    booking_id = await mediator.send(CreateBookingCommand(**body.dict()))
    return BookingResponse(id=booking_id)

Wire handlers and behaviors in your composition root (typically main.py or a container.py):

def build_mediator(container: Container) -> Mediator:
    m = Mediator()

    # Behaviors run outermost-first
    m.add_behavior(LoggingBehavior(container.logger()))
    m.add_behavior(TransactionBehavior(container.uow()))
    m.add_behavior(ValidationProcessor())

    # Handlers
    m.register(CreateBookingCommand, CreateBookingHandler(container.booking_repo()))
    m.register(GetUserById, GetUserByIdHandler(container.user_repo()))

    return m

Comparison to .NET libraries

Feature MediatR Mediator (martinothamar) mediatorx
IRequest<T> / IRequestHandler<,>
Command / Query separation
Streaming
Notifications
Pipeline behaviors
Pre/Post/Exception processors
Compile-time handler validation ✅ (source gen) ❌ (runtime)
Native AOT / no reflection N/A (Python)
Async-only

Status

Early. The interface surface is stable and mirrors Mediator.Abstractions; expect the concrete Mediator implementation and DI integration helpers to evolve. Pinning a minor version is recommended.


License

MIT. See LICENSE.


Acknowledgements

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mediatorx-0.0.1.tar.gz (10.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mediatorx-0.0.1-py3-none-any.whl (14.2 kB view details)

Uploaded Python 3

File details

Details for the file mediatorx-0.0.1.tar.gz.

File metadata

  • Download URL: mediatorx-0.0.1.tar.gz
  • Upload date:
  • Size: 10.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.21 {"installer":{"name":"uv","version":"0.9.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for mediatorx-0.0.1.tar.gz
Algorithm Hash digest
SHA256 ed8c3f2888015bef9eb67ee748a1bbdca99c31e9683d43ccf9a686f89691f32d
MD5 f1279ce9a0ed920898edbbcddd905de3
BLAKE2b-256 1aff48642a7f98c372c86282e1913e82e40f0621670d3738ba0395f686e71fe0

See more details on using hashes here.

File details

Details for the file mediatorx-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: mediatorx-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 14.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.21 {"installer":{"name":"uv","version":"0.9.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for mediatorx-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 65680d8efbcc12f89b134d11a6cf0c658bf8b5ec018a3f0aed368a9bfd0ca4fa
MD5 a1edbbe40bbf003d47ec968796a3e8de
BLAKE2b-256 059afd43b8b1b79c4f16c2ef8450d9838b388a963498152c08d65f40e5784b38

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page