Skip to main content

This library allows users to easily wrap functions with a series of decorators.

Project description

python-pipeline

This library allows users to easily wrap functions with a series of decorators to form an execution pipeline. This is useful in scenarios where input needs to be cleaned and output needs to be processed in a systematic way.

Installation

pip install execution-pipeline

Usage

A pipeline consists of four optional segments

Pre

The pre execution segment allows you to modify any input parameters passed to the decorated function. Any functions passed to the pre segment will always be executed first.

from pipeline import execution_pipeline

def do_thing_before(params):
    params['arg1'] = 'okay'
    return params


@execution_pipeline(pre=[do_thing_before])
def do_thing(arg1=5):
    return arg1*10
 


do_thing()
# okayokayokayokayokayokayokayokayokayokay

Post

The post execution segment allows you to modify or handle the result after the decorated function has already run.

def do_thing_after(response):
    response['added'] = 'yup'
    return response
        
@execution_pipeline(post=[do_thing_after])
def do_thing(**kwargs):
    return {**kwargs}  # just make a new dictionary using the passed keyword arguments
    
    
do_thing(apples=2, oranges=3, bananas=0)
 # {'apples': 2, 'oranges': 3, 'bananas': 0, 'added': 'yup'}

Just like the other segments, you can use as many post processing functions as you need; they will be executed in the order that they are passed.

def do_another_thing_after(response):
    assert response['added'] == 'yup' # the one that is first in the pipeline happens first.
    response['also_added'] = 'also yup'
    return response
    
    
@execution_pipeline(post=[do_thing_after, do_another_thing_after])
def do_thing(**kwargs):
    return {**kwargs}
    
 
do_thing()
# {'apples': 2, 'oranges': 3, 'bananas': 0, 'added': 'yup', 'also_added': 'also yup'}

Error

The error execution segment allows you to pass error handling to log, modify, absorb, etc. any exceptions that are raised by the wrapped function.

class MyException(BaseException):
    pass
    

def handle_this_error(e, response=None):
    print(f"oh no, Bob! {e}")
    return "Don't worry, we handled a TypeError."


def handle_that_error(e, response=None):
    print(f"oh no, Bob! {e}")
    return "Don't worry, we handled MyException."
    
def handle_other_errors(e, response=None):
    print(f"? {e}")
    return "Other errors?"
    
error_handlers = [
    {"exception_class": TypeError, "handler": handle_this_error},
    {"exception_class": MyException, "handler": handle_that_error},
    {"exception_classes": (Exception, BaseException), "handler": handle_other_errors},
]


@execution_pipeline(pre=[do_thing_before], post=[do_thing_after], error=error_handlers)
def fun_boys(arg1, arg4, arg2, arg3, thing=None):
    raise MyException('Something went wrong!')
    

result = fun_boys(1, 2, 3, 4, 5)
# oh no, Bob! Something went wrong!

print(result) 
# Don't worry, we handled MyException.

You can also use class instances instead of dictionaries to define your error handlers if you prefer.

class ErrorHandler:
    def __init__(self, exception_class, handler):
        self.exception_class = exception_class
        self.handler = handler
        

error_handlers = [
    ErrorHandler(TypeError, handle_this_error),
    ErrorHandler(MyException, handle_that_error),
]

Cache

The cache execution segment will record all arguments (before and after the pre segment) and store the result (after the post and error segments).

from pipeline.cache.mock import MockCache
# MockCache is basically just a dict() with some expiration convenience methods.
mock_cache = MockCache()

changing_value = 0

@execution_pipeline(cache=mock_cache)
def fun_boys(arg1, arg4, arg2, arg3, thing=None):
    return changing_value
    

    
fun_boys(1, 2, 3, 4, 5)
# 0

changing_value = 100


fun_boys(1, 2, 3, 4, 5)
# 0 # ignores the changes ( ¯\_(ツ)_/¯ that's caching! )

Supported Cache Backends

Note: if the appropriate backend is not installed, they will be replaced with a MockCache instance at runtime. This is intended to improve portability of pipeline code.

Redis
pip install redis

And then same as above except with

from pipeline.cache.redis import RedisCache
redis = RedisCache(host='localhost', port=6379) # defaults
MemCached
pip install memcached

And then the same as above except with

from pipeline.cache.mem_cache import MemCache 
mem_cache = MemCache(host='localhost', port=11211) # defaults

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

execution-pipeline-0.5.0.tar.gz (10.1 kB view details)

Uploaded Source

File details

Details for the file execution-pipeline-0.5.0.tar.gz.

File metadata

  • Download URL: execution-pipeline-0.5.0.tar.gz
  • Upload date:
  • Size: 10.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.3.0 pkginfo/1.6.1 requests/2.25.1 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.55.0 CPython/3.7.4

File hashes

Hashes for execution-pipeline-0.5.0.tar.gz
Algorithm Hash digest
SHA256 dedc0b91c6608d0dfb58a0dd90eb0d5e41812a62cc86587038cfd232cbf12299
MD5 60fc2398754bc3be04ac7a217a3ae0ab
BLAKE2b-256 1b3234d9b91515488992671ce4f366a69fa2e6afb9e4b6f1a14c52ae9906759b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page