CQRS-style mediator and pipeline for Python - interface-compatible with martinothamar/Mediator.
Project description
MediatorX
CQRS-style mediator and pipeline for Python - designed for clean / onion architecture.
Inspired by MediatR and interface-compatible with martinothamar/Mediator for .NET.
[!NOTE] Early-stage release. The public interface mirrors
Mediator.Abstractionsand is stable; the concreteMediatorimplementation and DI helpers will still evolve. Pin a minor version.
⚙️ About The Project
mediatorx brings the IRequest / ICommand / IQuery / INotification mediator pattern from .NET to Python - without runtime reflection magic, code generation, or scary metaclasses. Just protocols, async, and an explicit composition root.
The Python ecosystem is thin here: most existing packages (mediatr, diator, mediatpy, python-mediator) are abandoned, and the maintained ones either ship too much (Kafka, outbox, brokers) or too little (no streaming, no command/query separation, no pipeline pre/post split). mediatorx aims for one thing: mirror the MediatR interface contracts in idiomatic async Python, following the cleaner taxonomy from martinothamar/Mediator - IRequest / ICommand / IQuery separation, stream variants, pre/post/exception processors. No more, no less.
If you've used MediatR or martinothamar's Mediator in C# and want the same shape in your FastAPI / clean architecture Python project, this is for you.
✨ Features
- CQRS separation -
IRequest<T>,ICommand<T>,IQuery<T>with marker-interface constraints (where TMessage : ICommandequivalent) - Streaming -
IStreamRequest,IStreamCommand,IStreamQueryviaAsyncIterator - Notifications (pub/sub) - pluggable publishers (
ForeachAwait,TaskWhenAll) - Pipeline behaviors -
IPipelineBehaviorplus splitPreProcessor/PostProcessor/ExceptionHandlerbase classes - Async-first - no sync/async dual path, no
asyncio.runhacks - Typed - ships with
py.typed, mypy-strict friendly - Zero runtime deps - no broker integration, no reflection, no source generation; in-process only
📦 Installation
pip install mediatorx
# or
uv add mediatorx
Requires Python 3.11+.
🚀 Quickstart
import asyncio
from dataclasses import dataclass
from mediatorx import IRequest, IRequestHandler, Mediator
@dataclass
class Ping(IRequest[str]):
message: str
class PingHandler(IRequestHandler[Ping, str]):
async def handle(self, request: Ping) -> str:
return f"pong: {request.message}"
async def main():
mediator = Mediator()
mediator.register(Ping, PingHandler)
response = await mediator.send(Ping("hello"))
print(response) # "pong: hello"
asyncio.run(main())
🧠 Core Concepts
Messages - Request, Command, Query
Three semantically distinct message kinds with identical mechanics. Pick the one that matches your CQRS intent; pipeline behaviors can be constrained by marker:
from mediatorx import IRequest, ICommand, IQuery
@dataclass
class GetUserById(IQuery[User]): # read - must return data
user_id: int
@dataclass
class CreateBooking(ICommand[int]): # write - returns booking id
user_id: int
room_id: int
@dataclass
class SendWelcomeEmail(ICommand[None]): # write - no meaningful return
user_id: int
Handlers
One handler per message type. Constructor-injected dependencies - works with any DI container:
from mediatorx import IQueryHandler
class GetUserByIdHandler(IQueryHandler[GetUserById, User]):
def __init__(self, repo: IUserRepository):
self._repo = repo
async def handle(self, query: GetUserById) -> User:
return await self._repo.get(query.user_id)
Pipeline Behaviors
Wrap every handler in cross-cutting concerns (logging, validation, transactions) without touching the handler itself:
from mediatorx import IPipelineBehavior, MessageHandlerDelegate
class LoggingBehavior(IPipelineBehavior[IMessage, TResponse]):
def __init__(self, logger: Logger):
self._logger = logger
async def handle(
self,
message: IMessage,
next: MessageHandlerDelegate[TResponse],
) -> TResponse:
self._logger.info(f"handling {type(message).__name__}")
try:
response = await next()
self._logger.info(f"handled {type(message).__name__}")
return response
except Exception:
self._logger.exception(f"failed {type(message).__name__}")
raise
For simpler cases, use split pre/post/exception processors instead of the omnibus middleware shape:
from mediatorx import MessagePreProcessor, MessageExceptionHandler
class ValidationProcessor(MessagePreProcessor[IValidate, TResponse]):
async def handle(self, message: IValidate) -> None:
message.validate() # raises on invalid input
class RetryableErrorHandler(MessageExceptionHandler[IMessage, TResponse, TransientError]):
async def handle(self, message, exc):
return self.NotHandled # let it bubble; record metric elsewhere
Constraining Behaviors
Use the marker hierarchy (IMessage, IBaseCommand, IBaseQuery) to scope behaviors. A behavior bound to IBaseCommand only fires for commands, not queries - exactly like the C# where TMessage : ICommand constraint:
mediator.add_behavior(TransactionBehavior, constraint=IBaseCommand)
# Wraps commands in a DB transaction. Queries skip this entirely.
Notifications (pub/sub)
from mediatorx import INotification, INotificationHandler
@dataclass(frozen=True)
class UserRegistered(INotification):
user_id: int
class SendWelcomeEmailOnRegister(INotificationHandler[UserRegistered]):
async def handle(self, n: UserRegistered) -> None: ...
class LogRegistrationOnRegister(INotificationHandler[UserRegistered]):
async def handle(self, n: UserRegistered) -> None: ...
# Both handlers run on every publish
await mediator.publish(UserRegistered(user_id=42))
Pick a publisher strategy at construction:
from mediatorx import Mediator, ForeachAwaitPublisher, TaskWhenAllPublisher
mediator = Mediator(publisher=TaskWhenAllPublisher()) # parallel
# or
mediator = Mediator(publisher=ForeachAwaitPublisher()) # sequential, default
Streaming
from mediatorx import IStreamRequest, IStreamRequestHandler
from typing import AsyncIterator
@dataclass
class TailLogs(IStreamRequest[LogLine]):
service: str
class TailLogsHandler(IStreamRequestHandler[TailLogs, LogLine]):
async def handle(self, request: TailLogs) -> AsyncIterator[LogLine]:
async for line in log_source(request.service):
yield line
async for line in mediator.create_stream(TailLogs("api")):
print(line)
🏗️ Clean Architecture with FastAPI
mediatorx is designed to live in your application layer. Endpoints become two-liners that hand a DTO to the mediator and return the response:
src/myapp/
├── domain/ # entities, value objects, repository interfaces
├── application/ # commands, queries, handlers, behaviors ← uses mediatorx
├── infrastructure/ # repository impls, db, external services
└── api/ # FastAPI routes (composition root)
# api/bookings.py
from fastapi import APIRouter, Depends
router = APIRouter()
@router.post("/bookings")
async def create_booking(
body: CreateBookingDto,
mediator: Mediator = Depends(get_mediator),
) -> BookingResponse:
booking_id = await mediator.send(CreateBookingCommand(**body.dict()))
return BookingResponse(id=booking_id)
Wire handlers and behaviors in your composition root (typically main.py or a container.py):
def build_mediator(container: Container) -> Mediator:
m = Mediator(resolver=container.resolver())
# Behaviors run outermost-first
m.add_behavior(LoggingBehavior)
m.add_behavior(TransactionBehavior, constraint=IBaseCommand)
m.add_behavior(ValidationBehavior)
# Handlers
m.register(CreateBookingCommand, CreateBookingHandler)
m.register(GetUserById, GetUserByIdHandler)
return m
📊 Comparison
| Feature | MediatR | Mediator (martinothamar) | mediatorx |
|---|---|---|---|
IRequest<T> / IRequestHandler<,> |
✅ | ✅ | ✅ |
| Command / Query separation | ❌ | ✅ | ✅ |
| Streaming | ✅ | ✅ | ✅ |
| Notifications | ✅ | ✅ | ✅ |
| Pipeline behaviors | ✅ | ✅ | ✅ |
| Pre/Post/Exception processors | ✅ | ✅ | ✅ |
| Compile-time handler validation | ❌ | ✅ (source gen) | ❌ (runtime) |
| Native AOT / no reflection | ❌ | ✅ | N/A (Python) |
| Async-only | ❌ | ❌ | ✅ |
📜 License
This project is licensed under the MIT License. See LICENSE for details.
🙏 Acknowledgements
- Jimmy Bogard for MediatR, which started all of this.
- Martin Othamar for Mediator, whose interface taxonomy this library mirrors.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mediatorx-1.0.0.tar.gz.
File metadata
- Download URL: mediatorx-1.0.0.tar.gz
- Upload date:
- Size: 10.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.16 {"installer":{"name":"uv","version":"0.11.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bb908f10a56311aa338ac9f1efb42c8d1163f481a2c4be3a93001c8fec20fbd4
|
|
| MD5 |
cfeeb99d21f4edf7f2b8ac7e681497fe
|
|
| BLAKE2b-256 |
e8d885d35d6a70f099d925cd73cbbaa785504579bffdec8ed8e741dabe441942
|
File details
Details for the file mediatorx-1.0.0-py3-none-any.whl.
File metadata
- Download URL: mediatorx-1.0.0-py3-none-any.whl
- Upload date:
- Size: 14.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.16 {"installer":{"name":"uv","version":"0.11.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b86bbd09cb7217e4b8ed55d4e542c2575ba3204b29fd49fa9695aff2ad9e6f4f
|
|
| MD5 |
4fe226c07ff54ca4f6e9dbd8eef239d5
|
|
| BLAKE2b-256 |
2b4987afdeae286101a16028e40b97e4ec6ba2b5200fd1cdd52eec55573ad36a
|