Skip to main content

mediator and CQRS pattern implementation with pipline behaviors for Python 3.6+. Mediatr py

Project description

mediatr_py

PyPI Python Downloads

Buy Me A Coffee

This is an async implementation of Mediator pattern with pipline behaviors.

It is a port of Mediatr from .Net C#

Requirements:

  • Python >= 3.6

Usage:

install mediatr:

pip install mediatr

Define your request class

class GetArrayQuery():
    def __init__(self,items_count:int):
        self.items_count = items_count

Define your handler class or function

import Mediator from mediatr

@Mediator.handler
async def get_array_handler(request:GetArrayQuery):
    items = list()
    for i in range(0, request.items_count):
        items.append(i)
    return items
    
# or just Mediator.register_handler(get_array_handler)
    

or class:

@Mediator.handler
class GetArrayQueryHandler():
    def handle(self,request:GetArrayQuery):
        items = list()
        for i in range(0, request.items_count):
            items.append(i)
        return items
        
# or just Mediator.register_handler(GetArrayQueryHandler)

Run mediator

import Mediator from mediatr

mediator = Mediator()

request = GetArrayQuery(5)

result = await mediator.send_async(request)

# result = mediator.send(request) in synchronous mode

print(result) // [0,1,2,3,4]

If you are using synchronous mediator.send(request) method, try to define synchronous handlers and behaviors

In another case use asyncio module for manual manage of event loop in synchronous code

Run mediator statically, without instance

import Mediator from mediatr

request = GetArrayQuery(5)

result = await Mediator.send_async(request)
# or:
result = Mediator.send(request) #in synchronous mode. Async handlers and behaviors will not blocking!

print(result) // [0,1,2,3,4]

Note that instantiation of Mediator(handler_class_manager = my_manager_func) is useful if you have custom handlers creation. For example using an injector. By default class handlers are instantiated with simple init: SomeRequestHandler(). handlers or behaviors as functions are executed directly.

Using behaviors

You can define behavior class with method 'handle' or function:

@Mediator.behavior
async def get_array_query_behavior(request:GetArrayQuery, next): #behavior only for GetArrayQuery or derived classes
    array1 = await next()
    array1.append(5)
    return array1

@Mediator.behavior
def common_behavior(request:object, next): #behavior for all requests because issubclass(GetArrayQuery,object)==True
    request.timestamp = '123'
    return next()

# ...

mediator = Mediator()
request = GetArrayQuery(5)
result = await mediator.send_async(request)
print(result) // [0,1,2,3,4,5]
print(request.timestamp) // '123'

Using custom handler (behavior) factory for handlers (behaviors) as classes

If your handlers or behaviors registered as functions, it just executes them.

In case with handlers or behaviors, declared as classes with method handle Mediator uses function, that instantiates handlers or behaviors:

def default_handler_class_manager(HandlerCls:type,is_behavior:bool=False):
    return HandlerCls()

For example, if you want to instantiate them with dependency injector or custom, pass your own factory function to Mediator:

def my_class_handler_manager(handler_class, is_behavior=False):
    
    if is_behavior:
        # custom logic
        pass

    return injector.get(handler_class)

mediator = Mediator(handler_class_manager=my_class_handler_manager)

PS:

The next function in behavior is async, so if you want to take results or if your behavior is async, use middle_results = await next()

Handler may be async too, if you need.

Using with generic typing support (version >= 1.2):

from mediatr import Mediator, GenericQuery


class UserModel(BaseModel): # For example sqlalchemy ORM entity
    id = Column(String,primary_key=True)
    name = Column(String)


class FetchUserQuery(GenericQuery[UserModel])
    def __init__(self,user_id:str):
        self.user_id = user_id


mediator = Mediator()

request = FetchUserQuery(user_id = "123456")

user = mediator.send(request) # type of response will be a UserModel


# -------------------------------------------------------------


class FetchUserQueryHandler():

    def handle(self, request:FetchUserQuery):
        db_session = Session() #sqlalchemy session
        return db_session.query(UserModel).filter(UserModel.id == request.user_id).one()

# or handler as simple function:

def fetch_user_query_handler(request:FetchUserQuery):
    db_session = Session() #sqlalchemy session
    return db_session.query(UserModel).filter(UserModel.id == request.user_id).one()

Please give a star if the library is useful for you :smiley:

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mediatr-1.3.2.tar.gz (10.4 kB view details)

Uploaded Source

File details

Details for the file mediatr-1.3.2.tar.gz.

File metadata

  • Download URL: mediatr-1.3.2.tar.gz
  • Upload date:
  • Size: 10.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.15.0 pkginfo/1.7.0 requests/2.25.1 setuptools/56.0.0 requests-toolbelt/0.9.1 tqdm/4.60.0 CPython/3.7.9

File hashes

Hashes for mediatr-1.3.2.tar.gz
Algorithm Hash digest
SHA256 6a463c53fe35982cfff3e273a661cf317c46a850d3f0385493c0d7571c06b89f
MD5 d8522b584876e78e3edf0b05c2e813d9
BLAKE2b-256 a00fddba08ed784ced1fce3ac895151b377d71f768ee70214f4cff479a754f58

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page