Skip to main content

This library allows users to easily wrap functions with a series of decorators.

Project description


This library allows users to easily wrap functions with a series of decorators to form an execution pipeline. This is useful in scenarios where input needs to be cleaned and output needs to be processed in a systematic way.


pip install execution-pipeline


A pipeline consists of four optional segments


The pre execution segment allows you to modify any input parameters passed to the decorated function. Any functions passed to the pre segment will always be executed first.

from pipeline import execution_pipeline

def do_thing_before(params):
    params['arg1'] = 'okay'
    return params

def do_thing(arg1=5):
    return arg1*10

# okayokayokayokayokayokayokayokayokayokay


The post execution segment allows you to modify or handle the result after the decorated function has already run.

def do_thing_after(response):
    response['added'] = 'yup'
    return response
def do_thing(**kwargs):
    return {**kwargs}  # just make a new dictionary using the passed keyword arguments
do_thing(apples=2, oranges=3, bananas=0)
 # {'apples': 2, 'oranges': 3, 'bananas': 0, 'added': 'yup'}

Just like the other segments, you can use as many post processing functions as you need; they will be executed in the order that they are passed.

def do_another_thing_after(response):
    assert response['added'] == 'yup' # the one that is first in the pipeline happens first.
    response['also_added'] = 'also yup'
    return response
@execution_pipeline(post=[do_thing_after, do_another_thing_after])
def do_thing(**kwargs):
    return {**kwargs}
# {'apples': 2, 'oranges': 3, 'bananas': 0, 'added': 'yup', 'also_added': 'also yup'}


The error execution segment allows you to pass error handling to log, modify, absorb, etc. any exceptions that are raised by the wrapped function.

class MyException(BaseException):

def handle_this_error(e, response=None):
    print(f"oh no, Bob! {e}")
    return "Don't worry, we handled a TypeError."

def handle_that_error(e, response=None):
    print(f"oh no, Bob! {e}")
    return "Don't worry, we handled MyException."
def handle_other_errors(e, response=None):
    print(f"? {e}")
    return "Other errors?"
error_handlers = [
    {"exception_class": TypeError, "handler": handle_this_error},
    {"exception_class": MyException, "handler": handle_that_error},
    {"exception_classes": (Exception, BaseException), "handler": handle_other_errors},

@execution_pipeline(pre=[do_thing_before], post=[do_thing_after], error=error_handlers)
def fun_boys(arg1, arg4, arg2, arg3, thing=None):
    raise MyException('Something went wrong!')

result = fun_boys(1, 2, 3, 4, 5)
# oh no, Bob! Something went wrong!

# Don't worry, we handled MyException.

You can also use class instances instead of dictionaries to define your error handlers if you prefer.

class ErrorHandler:
    def __init__(self, exception_class, handler):
        self.exception_class = exception_class
        self.handler = handler

error_handlers = [
    ErrorHandler(TypeError, handle_this_error),
    ErrorHandler(MyException, handle_that_error),


The cache execution segment will record all arguments (before and after the pre segment) and store the result (after the post and error segments).

from pipeline.cache.mock import MockCache
# MockCache is basically just a dict() with some expiration convenience methods.
mock_cache = MockCache()

changing_value = 0

def fun_boys(arg1, arg4, arg2, arg3, thing=None):
    return changing_value

fun_boys(1, 2, 3, 4, 5)
# 0

changing_value = 100

fun_boys(1, 2, 3, 4, 5)
# 0 # ignores the changes ( ¯\_(ツ)_/¯ that's caching! )

Supported Cache Backends

Note: if the appropriate backend is not installed, they will be replaced with a MockCache instance at runtime. This is intended to improve portability of pipeline code.

pip install redis

And then same as above except with

from pipeline.cache.redis import RedisCache
redis = RedisCache(host='localhost', port=6379) # defaults
pip install memcached

And then the same as above except with

from pipeline.cache.mem_cache import MemCache 
mem_cache = MemCache(host='localhost', port=11211) # defaults

Project details

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

execution-pipeline-0.5.0.tar.gz (10.1 kB view hashes)

Uploaded source

Supported by

AWS AWS Cloud computing Datadog Datadog Monitoring Fastly Fastly CDN Google Google Object Storage and Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page