Skip to main content

A library that implements the Chain of Responsibility pattern.

Project description

PipelineChain

Tests

A Pipeline / Chain of Responsibility design pattern implementation based on Laravel's Pipeline implementation.

A Pipeline is an object that accepts input data and sends it through a series of handlers (or "pipes") — which are functions, closures, and/or classes — to get a result at the end.

flowchart LR
	input[Input data]
	subgraph Pipeline
		pipe1[Pipe 1]
		pipe2[Pipe 2]
		pipe3[Pipe 3]

		pipe1 --> pipe2
		pipe2 --> pipe3
	end
	output[Output data]

	input --> pipe1
	pipe3 --> output

Table of contents

Installation

Install PyPipeline via pip:

pip install pipelinechain

Getting started

Luckily, there isn't a whole lot to Pipelines from a code perspective, so implementing them is pretty easy. We'll walk through some of the basic concepts.

Important note: The examples provide a string as input data. That is just for simplicity's sake! You can actually pass whatever you want - a scalar, an object, an array, whatever.

Creating a simple pipeline

Let's say you have a string that you want to pass through a series of steps in order to manipulate it. You can create a pipeline to do that like so:

flowchart LR
	input[String]
	subgraph Pipeline
		pipe1[title]
		pipe2[trim]

		pipe1 --> pipe2
	end
	output[Output data]

	input --> pipe1
	pipe2 --> output
from pipelinechain import Pipeline

# Create a new pipeline instance.
pipeline = Pipeline()

# Send a string through the pipeline.
result = pipeline.send('   hello world   ').through([
    str.title,
    str.strip
]).then_return()

print(result)  # Output: 'Hello world'

Building pipelines in parts

You don't need to build the pipeline all at once, you can spread it out over a number of lines.

flowchart LR
	input[String]
	subgraph Pipeline
		pipe1[title]
		pipe2[trim]
		pipe3[reverse_string]

		pipe1 --> pipe2
		pipe2 --> pipe3
	end
	output[Output data]

	input --> pipe1
	pipe3 --> output
from pipelinechain import Pipeline

# Create a new pipeline instance.
pipeline = Pipeline()

# Set the pipes you want to use.
pipeline.through([ str.title, str.strip ])

# Add another transformation.
def replace(s):
    return s.replace('Hello', 'Goodbye')

# Add it to the pipeline.
pipeline.pipe(replace)

# Send data through the pipeline.
result = pipeline.send('   hello world   ').then_return()

print(result)  # Output: 'Goodbye World'

Using custom pipes

If you have a more complicated function that you wish to use as a pipe, you can pass in a callable instead of a string. Your closure will need to accept two parameters, the first being the input data and the second being the next item in the pipeline.

flowchart LR
	input[String]
	subgraph Pipeline
		pipe1[custom_pipe]
		pipe2[captialize]

		pipe1 --> pipe2
	end
	output[Output data]

	input --> pipe1
	pipe2 --> output
from pipelinechain import Pipeline

pipeline = Pipeline()

def custom_pipe(passable, next_pipe):
    passable = passable.replace('hello', 'goodbye')
    return next_pipe(passable)

pipeline.through([custom_pipe, str.title])

result = pipeline.send('hello world').then_return()
print(result)  # Output: 'Goodbye world'

Using classes with the handle method

You can even create your own classes to use as pipes in the pipeline. For a class to be usable in the pipeline, it needs a method that accepts two parameters, the first being the input data and the second being the next item in the pipeline.

By default, the Pipeline expects that the method is called handle. If you want to use that method name, you can optionally implement the StellarWP\Pipeline\Contracts\Pipe interface to enforce that method convention.

Example classes

First class:

class TitlePipe:
    def handle(self, passable, next_pipe):
        return next_pipe(passable.title())

Second class:

class StripPipe:
    def handle(self, passable, next_pipe):
        return next_pipe(passable.strip())

Example pipeline

flowchart LR
	input[String]
	subgraph Pipeline
		pipe1[TitlePipe::handle]
		pipe2[StripPipe::handle]

		pipe1 --> pipe2
	end
	output[Output data]

	input --> pipe1
	pipe2 --> output
from pipelinechain import Pipeline

pipeline = Pipeline().through([TitlePipe(), StripPipe()])
result = pipeline.send('   hello world   ').then_return()
print(result)  # Output: 'Hello world'

Using classes with a custom method

If you want to use classes but want to use a different method than the expected default (handle), you can declare the alternate method name using the via() method.

Example classes

First class:

class TitlePipe:
    def execute(self, passable, next_pipe):
        return next_pipe(passable.title())

Second class:

class StripPipe:
    def execute(self, passable, next_pipe):
        return next_pipe(passable.strip())

Example pipeline

flowchart LR
	input[String]
	subgraph Pipeline
		pipe1[ReversePipe::execute]
		pipe2[StripPipe::execute]

		pipe1 --> pipe2
	end
	output[Output data]

	input --> pipe1
	pipe2 --> output
from pipelinechain import Pipeline

pipeline = Pipeline().via('execute').through([StripPipe(), ReversePipe()])
result = pipeline.send('     hello      ').then_return()
print(result)  # Output: 'Hello'

Bailing early

Sometimes in the middle of a pipeline, you want to stop processing the rest of the pipes and return a value. Luckily, you can do this with a return statement!

Example pipeline

from pipelinechain import Pipeline

def check_content(passable, next_pipe):
    if 'stop' in passable:
        return 'Early termination'
    return next_pipe(passable)

pipeline = Pipeline().through([check_content, str.upper])
result = pipeline.send('please stop').then_return()
print(result)  # Output: 'Early termination'

Doing more than returning

Sometimes you may want to do more than returning the result when the pipeline completes. You can do that by using the then() (or its alias, run()) method instead of then_return().

Example pipeline

flowchart LR
	input[String]
	pipe3[Closure]
	subgraph Pipeline
		pipe1[strip]
		pipe2[upper]

		pipe1 --> pipe2
	end
	output[Output data]

	input --> pipe1
	pipe2 --> pipe3
	pipe3 --> output
from pipelinechain import Pipeline

pipeline = Pipeline().through([str.strip, str.upper])
result = pipeline.send('   hello world   ').then(lambda x: len(x))

print(result)  # Output: 11

Methods

pipe()

This method is used to add a pipe to the pipeline.

def pipe(pipes: Union[List, Any]): Pipeline

Aliases: add_pipe()

Examples

pipeline.pipe( str.strip )
# or
pipeline.add_pipe( str.strip )
# or
pipeline.pipe( [ str.title, str.strip ] )

send()

This method is used to set the object being passed through the pipeline.

def send( passable: Any ): Pipeline

Examples

# Send a scalar.
pipeline.send( 'Some string' )

# Send an object.
pipeline.send( my_object )

then()

This method is used to run the pipeline and return the result.

def then( destination: Optional[Callable] = None ): Any

Aliases: run()

Examples

pipeline.then()

# Use the alias.
pipeline.run()

# Provide a function to run before returning the result.
pipeline.then( str.strip )

# Provide a closure to run before returning the result.
pipeline.then( lambda passable: passable.strip() )

# Provide an object as a pipe to run before returning the result.
pipeline.then( StripPipe() )

then_return()

This method is used to run the pipeline and return the result.

def then_return(): Any

Aliases: run_and_return(), thenReturn()

Examples

pipeline.then_return()
# Use an alias.
pipeline.thenReturn()
# Use the other alias.
pipeline.run_and_return()

through()

This method is used to set the handlers (or "pipes") that are used to process the data.

def through( pipes: Union[List, Any] ): Pipeline

Aliases: pipes()

Examples

# You can provide any number of pipes.
pipeline.through([ str.title, str.strip ])

# Using the alias.
pipeline.pipes([ str.title, str.strip ])

# Pass closures as pipes.
pipeline.through([ str.title, lambda passable, next: next_pipe(passable.strip)])

# Pass objects as pipes.
pipeline.through([ TitlePipe(), StripPipe() ])

via()

This method is used to set the method to call on all the pipes in the pipeline.

def via( method: str ): Pipeline

Examples

# Set the method used in all classes in the pipeline to process the data as `execute()`.
pipeline.via( 'execute' )

# Set the method used in all classes in the pipeline to process the data as `borkborkbork()`.
pipeline.via( 'borkborkbork' )

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pipelinechain-1.0.1.tar.gz (7.9 kB view details)

Uploaded Source

Built Distribution

pipelinechain-1.0.1-py3-none-any.whl (7.9 kB view details)

Uploaded Python 3

File details

Details for the file pipelinechain-1.0.1.tar.gz.

File metadata

  • Download URL: pipelinechain-1.0.1.tar.gz
  • Upload date:
  • Size: 7.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.8.10

File hashes

Hashes for pipelinechain-1.0.1.tar.gz
Algorithm Hash digest
SHA256 2128926dc236476e6ce32f4e3f0aded1799590883f9c6d7cd6962d2541c5ca4d
MD5 fe9e71ead85a8c791e10a4837266c2e9
BLAKE2b-256 d00cfe6213884492ed11ca73f70304230a69b306687c35860dd21f817ea6f087

See more details on using hashes here.

File details

Details for the file pipelinechain-1.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for pipelinechain-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 aa9f143cad7ef7de180f96aa0a07400b7cfefc7678e04f2731241298e6bad266
MD5 8134c38057a115ef3447ec292d82c267
BLAKE2b-256 6b7544395d74079221292048d8f35986f6cb057398952e2a698b47be5a2c6884

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page