A library that implements the Chain of Responsibility pattern.
Project description
PipelineChain
A Pipeline / Chain of Responsibility design pattern implementation based on Laravel's Pipeline implementation.
A Pipeline
is an object that accepts input data and sends it through a series of handlers (or "pipes") — which are functions, closures, and/or classes — to get a result at the end.
flowchart LR
input[Input data]
subgraph Pipeline
pipe1[Pipe 1]
pipe2[Pipe 2]
pipe3[Pipe 3]
pipe1 --> pipe2
pipe2 --> pipe3
end
output[Output data]
input --> pipe1
pipe3 --> output
Table of contents
- Installation
- Getting started
- Methods
pipe()
(aliases:add_pipe()
)send()
set_container()
(aliases:setContainer()
)then()
(aliases:run()
)then_return()
(aliases:run_and_return()
,thenReturn()
)through()
(aliases:pipes()
)via()
Installation
Install PyPipeline via pip:
pip install pipelinechain
Getting started
Luckily, there isn't a whole lot to Pipelines from a code perspective, so implementing them is pretty easy. We'll walk through some of the basic concepts.
Important note: The examples provide a string as input data. That is just for simplicity's sake! You can actually pass whatever you want - a scalar, an object, an array, whatever.
Creating a simple pipeline
Let's say you have a string that you want to pass through a series of steps in order to manipulate it. You can create a pipeline to do that like so:
flowchart LR
input[String]
subgraph Pipeline
pipe1[title]
pipe2[trim]
pipe1 --> pipe2
end
output[Output data]
input --> pipe1
pipe2 --> output
from pipelinechain import Pipeline
# Create a new pipeline instance.
pipeline = Pipeline()
# Send a string through the pipeline.
result = pipeline.send(' hello world ').through([
str.title,
str.strip
]).then_return()
print(result) # Output: 'Hello world'
Building pipelines in parts
You don't need to build the pipeline all at once, you can spread it out over a number of lines.
flowchart LR
input[String]
subgraph Pipeline
pipe1[title]
pipe2[trim]
pipe3[reverse_string]
pipe1 --> pipe2
pipe2 --> pipe3
end
output[Output data]
input --> pipe1
pipe3 --> output
from pipelinechain import Pipeline
# Create a new pipeline instance.
pipeline = Pipeline()
# Set the pipes you want to use.
pipeline.through([ str.title, str.strip ])
# Add another transformation.
def replace(s):
return s.replace('Hello', 'Goodbye')
# Add it to the pipeline.
pipeline.pipe(replace)
# Send data through the pipeline.
result = pipeline.send(' hello world ').then_return()
print(result) # Output: 'Goodbye World'
Using custom pipes
If you have a more complicated function that you wish to use as a pipe, you can pass in a callable instead of a string. Your closure will need to accept two parameters, the first being the input data and the second being the next item in the pipeline.
flowchart LR
input[String]
subgraph Pipeline
pipe1[custom_pipe]
pipe2[captialize]
pipe1 --> pipe2
end
output[Output data]
input --> pipe1
pipe2 --> output
from pipelinechain import Pipeline
pipeline = Pipeline()
def custom_pipe(passable, next_pipe):
passable = passable.replace('hello', 'goodbye')
return next_pipe(passable)
pipeline.through([custom_pipe, str.title])
result = pipeline.send('hello world').then_return()
print(result) # Output: 'Goodbye world'
Using classes with the handle
method
You can even create your own classes to use as pipes in the pipeline. For a class to be usable in the pipeline, it needs a method that accepts two parameters, the first being the input data and the second being the next item in the pipeline.
By default, the Pipeline expects that the method is called handle
. If you want to use that method name, you can
optionally implement the StellarWP\Pipeline\Contracts\Pipe
interface to enforce that method convention.
Example classes
First class:
class TitlePipe:
def handle(self, passable, next_pipe):
return next_pipe(passable.title())
Second class:
class StripPipe:
def handle(self, passable, next_pipe):
return next_pipe(passable.strip())
Example pipeline
flowchart LR
input[String]
subgraph Pipeline
pipe1[TitlePipe::handle]
pipe2[StripPipe::handle]
pipe1 --> pipe2
end
output[Output data]
input --> pipe1
pipe2 --> output
from pipelinechain import Pipeline
pipeline = Pipeline().through([TitlePipe(), StripPipe()])
result = pipeline.send(' hello world ').then_return()
print(result) # Output: 'Hello world'
Using classes with a custom method
If you want to use classes but want to use a different method than the expected default (handle
), you can declare
the alternate method name using the via()
method.
Example classes
First class:
class TitlePipe:
def execute(self, passable, next_pipe):
return next_pipe(passable.title())
Second class:
class StripPipe:
def execute(self, passable, next_pipe):
return next_pipe(passable.strip())
Example pipeline
flowchart LR
input[String]
subgraph Pipeline
pipe1[ReversePipe::execute]
pipe2[StripPipe::execute]
pipe1 --> pipe2
end
output[Output data]
input --> pipe1
pipe2 --> output
from pipelinechain import Pipeline
pipeline = Pipeline().via('execute').through([StripPipe(), ReversePipe()])
result = pipeline.send(' hello ').then_return()
print(result) # Output: 'Hello'
Bailing early
Sometimes in the middle of a pipeline, you want to stop processing the rest of the pipes and return a value. Luckily, you
can do this with a return
statement!
Example pipeline
from pipelinechain import Pipeline
def check_content(passable, next_pipe):
if 'stop' in passable:
return 'Early termination'
return next_pipe(passable)
pipeline = Pipeline().through([check_content, str.upper])
result = pipeline.send('please stop').then_return()
print(result) # Output: 'Early termination'
Doing more than returning
Sometimes you may want to do more than returning the result when the pipeline completes. You can do that by
using the then()
(or its alias, run()
) method instead of then_return()
.
Example pipeline
flowchart LR
input[String]
pipe3[Closure]
subgraph Pipeline
pipe1[strip]
pipe2[upper]
pipe1 --> pipe2
end
output[Output data]
input --> pipe1
pipe2 --> pipe3
pipe3 --> output
from pipelinechain import Pipeline
pipeline = Pipeline().through([str.strip, str.upper])
result = pipeline.send(' hello world ').then(lambda x: len(x))
print(result) # Output: 11
Methods
pipe()
This method is used to add a pipe to the pipeline.
def pipe(pipes: Union[List, Any]): Pipeline
Aliases: add_pipe()
Examples
pipeline.pipe( str.strip )
# or
pipeline.add_pipe( str.strip )
# or
pipeline.pipe( [ str.title, str.strip ] )
send()
This method is used to set the object being passed through the pipeline.
def send( passable: Any ): Pipeline
Examples
# Send a scalar.
pipeline.send( 'Some string' )
# Send an object.
pipeline.send( my_object )
then()
This method is used to run the pipeline and return the result.
def then( destination: Optional[Callable] = None ): Any
Aliases: run()
Examples
pipeline.then()
# Use the alias.
pipeline.run()
# Provide a function to run before returning the result.
pipeline.then( str.strip )
# Provide a closure to run before returning the result.
pipeline.then( lambda passable: passable.strip() )
# Provide an object as a pipe to run before returning the result.
pipeline.then( StripPipe() )
then_return()
This method is used to run the pipeline and return the result.
def then_return(): Any
Aliases: run_and_return()
, thenReturn()
Examples
pipeline.then_return()
# Use an alias.
pipeline.thenReturn()
# Use the other alias.
pipeline.run_and_return()
through()
This method is used to set the handlers (or "pipes") that are used to process the data.
def through( pipes: Union[List, Any] ): Pipeline
Aliases: pipes()
Examples
# You can provide any number of pipes.
pipeline.through([ str.title, str.strip ])
# Using the alias.
pipeline.pipes([ str.title, str.strip ])
# Pass closures as pipes.
pipeline.through([ str.title, lambda passable, next: next_pipe(passable.strip)])
# Pass objects as pipes.
pipeline.through([ TitlePipe(), StripPipe() ])
via()
This method is used to set the method to call on all the pipes in the pipeline.
def via( method: str ): Pipeline
Examples
# Set the method used in all classes in the pipeline to process the data as `execute()`.
pipeline.via( 'execute' )
# Set the method used in all classes in the pipeline to process the data as `borkborkbork()`.
pipeline.via( 'borkborkbork' )
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file pipelinechain-1.0.1.tar.gz
.
File metadata
- Download URL: pipelinechain-1.0.1.tar.gz
- Upload date:
- Size: 7.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.8.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2128926dc236476e6ce32f4e3f0aded1799590883f9c6d7cd6962d2541c5ca4d |
|
MD5 | fe9e71ead85a8c791e10a4837266c2e9 |
|
BLAKE2b-256 | d00cfe6213884492ed11ca73f70304230a69b306687c35860dd21f817ea6f087 |
File details
Details for the file pipelinechain-1.0.1-py3-none-any.whl
.
File metadata
- Download URL: pipelinechain-1.0.1-py3-none-any.whl
- Upload date:
- Size: 7.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.8.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | aa9f143cad7ef7de180f96aa0a07400b7cfefc7678e04f2731241298e6bad266 |
|
MD5 | 8134c38057a115ef3447ec292d82c267 |
|
BLAKE2b-256 | 6b7544395d74079221292048d8f35986f6cb057398952e2a698b47be5a2c6884 |