A mediator implementation in Python
Project description
Simple Mediator
Simple Mediator is a lightweight implementation of the Mediator pattern for Python, inspired by the MediatR library. It provides a way to decouple the sending of messages from their handling, supporting requests, notifications, and pipeline behaviors.
Installation
You can install Simple Mediator using pip or uv:
pip install simple-mediator
# or using uv
uv add simple-mediator
Usage
Requests
In the Mediator pattern, a request represents a command or query that you want to execute. It encapsulates all the data needed to perform a specific operation. Requests are typically used for operations that return a result.
Key characteristics of requests:
- They are immutable data structures.
- Each request type corresponds to a single operation.
- They usually expect a response.
The Request interface
The Request interface defines the contract for creating custom requests, it looks like this:
class Request(BaseModel, Generic[TResponse]):
pass
Key points about this interface:
- It inherits from Pydantic's BaseModel, allowing for easy data validation and serialization.
- It's generic, with TResponse representing the expected response type.
- It's an empty base class, serving as a marker for request types.
Implementing a Request
from simple_mediator import Request
class GetUserRequest(Request[dict]):
user_id: int
In this example:
GetUserRequest is a request that expects a User object as a response.
It has one field, user_id, which is required to fetch the user.
The RequestHandler interface
For each request, you need a corresponding handler. The RequestHandler interface is defined as:
class RequestHandler(ABC, Generic[TRequest, TResponse]):
@abstractmethod
async def handle(
self, request: TRequest, cancellation_token: Optional[AbstractToken] = None
) -> TResponse:
pass
Key points:
- It's generic, allowing you to specify the request and response types.
- The handle method is where you implement the logic to process the request.
- It receives the request and an optional cancellation token.
- It should return the response of the type specified in TResponse.
Implementing a RequestHandler
Here's an example of implementing a handler for the GetUserRequest:
class GetUserHandler(RequestHandler[GetUserRequest, User]):
async def handle(
self, request: GetUserRequest, cancellation_token: Optional[AbstractToken] = None
) -> User:
# In a real application, you would fetch this from a database
if request.user_id == 1:
return User(id=1, name="John Doe", email="john@example.com")
else:
raise ValueError("User not found")
Registering and using requests
To use requests with the Mediator:
mediator = Mediator()
mediator.register_request_handler(GetUserRequest, GetUserHandler)
Sending a request
async def get_user(user_id: int) -> User:
request = GetUserRequest(user_id=user_id)
return await mediator.send(request)
# Usage
user = await get_user(1)
print(user) # User(id=1, name="John Doe", email="john@example.com")
Notifications
In the Mediator pattern, a notification represents an event that has occurred in your system. Unlike requests, notifications are used for scenarios where you want to inform multiple handlers about an event without expecting a specific response.
Key characteristics of notifications:
- They represent events that have already occurred.
- Multiple handlers can respond to a single notification.
- They don't expect a return value.
The Notification interface
The Notification interface defines the contract for creating custom notifications, it looks like this:
class Notification(BaseModel):
pass
Key points about this interface:
- It inherits from Pydantic's BaseModel, allowing for easy data validation and serialization.
- It's a simple base class, serving as a marker for notification types.
Implementing a Notification
To create a specific notification, you subclass the Notification class and define the necessary fields:
class UserCreatedNotification(Notification):
user_id: int
username: str
email: str
In this example:
- UserCreatedNotification represents an event where a new user has been created.
- It contains relevant information about the created user.
The NotificationHandler interface
For each notification, you can have multiple handlers. The NotificationHandler interface is defined as:
class NotificationHandler(ABC, Generic[T]):
@abstractmethod
async def handle(
self, notification: T, cancellation_token: Optional[AbstractToken] = None
) -> None:
pass
Key points:
- It's generic, allowing you to specify the notification type.
- The handle method is where you implement the logic to process the notification.
- It receives the notification and an optional cancellation token.
- It doesn't return a value (returns None).
Implementing a NotificationHandler
Here's an example of implementing handlers for the UserCreatedNotification:
class EmailNotificationHandler(NotificationHandler[UserCreatedNotification]):
async def handle(
self, notification: UserCreatedNotification, cancellation_token: Optional[AbstractToken] = None
) -> None:
print(f"Sending welcome email to {notification.email}")
# In a real application, you would send an actual email here
class AnalyticsNotificationHandler(NotificationHandler[UserCreatedNotification]):
async def handle(
self, notification: UserCreatedNotification, cancellation_token: Optional[AbstractToken] = None
) -> None:
print(f"Logging new user creation: User ID {notification.user_id}")
# In a real application, you might log this to an analytics service
Registering and using notifications
To use notifications with the Mediator:
mediator = Mediator()
mediator.register_notification_handler(UserCreatedNotification, EmailNotificationHandler)
mediator.register_notification_handler(UserCreatedNotification, AnalyticsNotificationHandler)
Publish a notification
async def create_user(username: str, email: str) -> None:
# Logic to create user in database
user_id = 123 # Assume this is returned from database
notification = UserCreatedNotification(user_id=user_id, username=username, email=email)
await mediator.publish(notification)
# Usage
await create_user("johndoe", "john@example.com")
Benefits of using notifications
- Decoupling: The code that triggers an event is decoupled from the code that handles its effects.
- Extensibility: You can easily add new handlers for existing notifications without modifying existing code.
- Single Responsibility: Each handler can focus on a specific task in response to an event.
- Scalability: Handlers can be executed asynchronously, allowing for better performance in high-load scenarios.
Best practices for notifications
- Use past tense: Name notifications to represent events that have already occurred (e.g.,
UserCreatedNotification,OrderShippedNotification). - Include relevant data: Ensure the notification contains all necessary information for handlers to process the event.
- Keep handlers focused: Each handler should perform a single, specific task in response to a notification.
- Consider idempotency: Design handlers to be idempotent, as notifications might be delivered more than once in some scenarios.
- Use for side effects: Notifications are great for triggering side effects like sending emails, updating caches, or logging.
Notifications vs Requests
- Use notifications when you want to inform multiple parts of your system about an event without expecting a specific response.
- Use requests when you need to perform a specific operation and expect a result.
Pipeline behaviors
A pipeline in the context of the Mediator pattern is a series of operations that are executed in a specific order when processing a request. The pipeline allows you to add cross-cutting concerns or additional processing steps before and after the actual request handler is invoked.
The main benefits of using a pipeline are:
- Separation of concerns: You can extract common logic from your request handlers.
- Reusability: Pipeline behaviors can be applied to multiple request types.
- Flexibility: You can easily add, remove, or reorder pipeline steps without modifying the request handlers.
The PipelineBehavior interface
The PipelineBehavior interface defines the contract for creating custom pipeline steps. In your implementation, it looks like this:
class PipelineBehavior(ABC, Generic[TRequest, TResponse]):
@abstractmethod
async def handle(
self,
request: TRequest,
next_request: Callable[
[TRequest, Optional[AbstractToken]], Coroutine[Any, Any, TResponse]
],
cancellation_token: Optional[AbstractToken] = None,
) -> TResponse:
pass
Key points about this interface:
- It's generic, allowing you to specify the request and response types.
- The handle method is where you implement the behavior's logic.
- It receives the current request, a next_request callable to invoke the next step in the pipeline, and an optional cancellation token.
- It should return the response of the same type as specified in the generic parameter.
How Pipeline Behaviors Work
When you register pipeline behaviors with the Mediator, they are wrapped around the request handler in the order they are registered. Each behavior has the opportunity to:
- Perform actions before the request is handled (pre-processing)
- Modify the request
- Call the next step in the pipeline
- Perform actions after the request is handled (post-processing)
- Modify the response
- Handle exceptions
- Here's a conceptual view of how the pipeline works:
[Behavior 1] -> [Behavior 2] -> [Behavior 3] -> [Request Handler] -> [Behavior 3] -> [Behavior 2] -> [Behavior 1]
The request flows from left to right, and then the response flows back from right to left.
Implementing a Pipeline Behavior
from cantok import AbstractToken
from simple_mediator import PipelineBehavior
class LoggingBehavior(PipelineBehavior[TRequest, TResponse]):
async def handle(
self,
request: TRequest,
next_request: Callable[
[TRequest, Optional[AbstractToken]], Coroutine[Any, Any, TResponse]
],
cancellation_token: Optional[AbstractToken] = None,
) -> TResponse:
print(f"Handling request: {request}")
try:
response = await next_request(request, cancellation_token)
print(f"Request handled successfully: {response}")
return response
except Exception as e:
print(f"Error handling request: {e}")
raise
This behavior logs the request, calls the next step in the pipeline, logs the response or any error, and then returns the response or re-raises the exception.
Using pipeline behaviors
mediator = Mediator()
mediator.register_pipeline_behavior(LoggingBehavior)
mediator.register_pipeline_behavior(ValidationBehavior)
mediator.register_pipeline_behavior(CachingBehavior)
Cancellation tokens
Simple Mediator supports cancellation tokens using the cantok library:
It supports all the cancellation tokens provided by cantok, such as SimpleToken, TimeoutToken, ConditionToken or CounterToken.
More information about tokens here: cantok
import asyncio
from cantok import SimpleToken
async def main():
token = SimpleToken()
request = GetUserRequest(user_id=1)
# In another coroutine or thread:
# token.cancel()
try:
result = await mediator.send(request, cancellation_token=token)
except Exception as e:
print(f"Request was cancelled: {e}")
asyncio.run(main())
Examples
You can find examples of how to use the mediator in the examples directory.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
This project is licensed under the GPL License.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file simple_mediator-0.1.3.tar.gz.
File metadata
- Download URL: simple_mediator-0.1.3.tar.gz
- Upload date:
- Size: 32.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6dcffd2e3483e88070e9c885b4fa83b8c318eaa6f7adad3c8488948125593c57
|
|
| MD5 |
336e90a0add1fd84ff528ec87da8b7f9
|
|
| BLAKE2b-256 |
309bb022495c1b70caff42af030dddbc96c7b8f1b094fe6c170e08e6ca9bca7c
|
File details
Details for the file simple_mediator-0.1.3-py3-none-any.whl.
File metadata
- Download URL: simple_mediator-0.1.3-py3-none-any.whl
- Upload date:
- Size: 19.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cec4215f22ef20d3bb8767d15278871c4fa151ac2796e19cba49b3288f7ab00d
|
|
| MD5 |
b78730f97daaab96c3de40dd8eddac56
|
|
| BLAKE2b-256 |
fbb647143275397453d13a4e63efd0f23429821b055e7ed8f456c5cb7814f4cc
|