Skip to main content

This library allows users to easily wrap functions with a series of decorators.

Project description

python-pipeline

This library allows users to easily wrap functions with a series of decorators to form an execution pipeline. This is useful in scenarios where input needs to be cleaned and output needs to be processed in a systematic way.

Installation

pip install execution-pipeline

Usage

A pipeline consists of four optional segments

Pre

The pre execution segment allows you to modify any input parameters passed to the decorated function. Any functions passed to the pre segment will always be executed first.

from pipeline import execution_pipeline

def do_thing_before(params):
    params['arg1'] = 'okay'
    return params


@execution_pipeline(pre=[do_thing_before])
def do_thing(arg1=5):
    return arg1*10
 


do_thing()
# okayokayokayokayokayokayokayokayokayokay

Post

The post execution segment allows you to modify or handle the result after the decorated function has already run.

def do_thing_after(response):
    response['added'] = 'yup'
    return response
        
@execution_pipeline(post=[do_thing_after])
def do_thing(**kwargs):
    return {**kwargs}  # just make a new dictionary using the passed keyword arguments
    
    
do_thing(apples=2, oranges=3, bananas=0)
 # {'apples': 2, 'oranges': 3, 'bananas': 0, 'added': 'yup'}

Just like the other segments, you can use as many post processing functions as you need; they will be executed in the order that they are passed.

def do_another_thing_after(response):
    assert response['added'] == 'yup' # the one that is first in the pipeline happens first.
    response['also_added'] = 'also yup'
    return response
    
    
@execution_pipeline(post=[do_thing_after, do_another_thing_after])
def do_thing(**kwargs):
    return {**kwargs}
    
 
do_thing()
# {'apples': 2, 'oranges': 3, 'bananas': 0, 'added': 'yup', 'also_added': 'also yup'}

Error

The error execution segment allows you to pass error handling to log, modify, absorb, etc. any exceptions that are raised by the wrapped function.

class MyException(BaseException):
    pass
    

def handle_this_error(e, response=None):
    print(f"oh no, Bob! {e}")
    return "Don't worry, we handled a TypeError."


def handle_that_error(e, response=None):
    print(f"oh no, Bob! {e}")
    return "Don't worry, we handled MyException."
    
def handle_other_errors(e, response=None):
    print(f"? {e}")
    return "Other errors?"
    
error_handlers = [
    {"exception_class": TypeError, "handler": handle_this_error},
    {"exception_class": MyException, "handler": handle_that_error},
    {"exception_classes": (Exception, BaseException), "handler": handle_other_errors},
]


@execution_pipeline(pre=[do_thing_before], post=[do_thing_after], error=error_handlers)
def fun_boys(arg1, arg4, arg2, arg3, thing=None):
    raise MyException('Something went wrong!')
    

result = fun_boys(1, 2, 3, 4, 5)
# oh no, Bob! Something went wrong!

print(result) 
# Don't worry, we handled MyException.

You can also use class instances instead of dictionaries to define your error handlers if you prefer.

class ErrorHandler:
    def __init__(self, exception_class, handler):
        self.exception_class = exception_class
        self.handler = handler
        

error_handlers = [
    ErrorHandler(TypeError, handle_this_error),
    ErrorHandler(MyException, handle_that_error),
]

Cache

The cache execution segment will record all arguments (before and after the pre segment) and store the result (after the post and error segments).

from pipeline.cache.mock import MockCache
# MockCache is basically just a dict() with some expiration convenience methods.
mock_cache = MockCache()

changing_value = 0

@execution_pipeline(cache=mock_cache)
def fun_boys(arg1, arg4, arg2, arg3, thing=None):
    return changing_value
    

    
fun_boys(1, 2, 3, 4, 5)
# 0

changing_value = 100


fun_boys(1, 2, 3, 4, 5)
# 0 # ignores the changes ( ¯\_(ツ)_/¯ that's caching! )

Supported Cache Backends

Note: if the appropriate backend is not installed, they will be replaced with a MockCache instance at runtime. This is intended to improve portability of pipeline code.

Redis
pip install redis

And then same as above except with

from pipeline.cache.redis import RedisCache
redis = RedisCache(host='localhost', port=6379) # defaults
MemCached
pip install memcached

And then the same as above except with

from pipeline.cache.mem_cache import MemCache 
mem_cache = MemCache(host='localhost', port=11211) # defaults

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

execution-pipeline-0.5.0.tar.gz (10.1 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page