This package provides a pipeline pattern implementation
Project description
Python-Pipeline
This package provides a pipeline pattern implementation.
It is a port of the PHP League Pipeline package.
Installation
pip install thecodecrate-pipeline
Pipeline Pattern
The pipeline pattern allows you to easily compose sequential stages by chaining stages.
In this particular implementation the interface consists of two parts:
- StageInterface
- PipelineInterface
A pipeline consists of zero, one, or multiple stages. A pipeline can process a payload. During the processing the payload will be passed to the first stage. From that moment on the resulting value is passed on from stage to stage.
In the simplest form, the execution chain can be represented as a foreach:
result = payload
for stage in stages:
result = stage(result)
return result
Effectively this is the same as:
result = stage3(stage2(stage1(payload)))
Immutability
Pipelines are implemented as immutable stage chains. When you pipe a new stage, a new pipeline will be created with the added stage. This makes pipelines easy to reuse, and minimizes side-effects.
Usage
Operations in a pipeline, stages, can be anything that satisfies the Callable
type-hint. So closures and anything that's invokable is good.
pipeline = Pipeline().pipe(lambda payload: payload * 10)
# Returns 100
await pipeline.process(10)
Type hinting
Use Pipeline[PayloadType]
to type hint the payload type.
pipeline = (
(Pipeline[int]())
.pipe(lambda payload: payload * 10)
)
# Returns 100
await pipeline.process(10)
Class based stages
Class based stages are also possible. The StageInterface[PayloadType]
can be implemented which ensures you have the correct method signature
for the __call__
method.
class TimesTwoStage(StageInterface[int]):
async def __call__(self, payload: int) -> int:
return payload * 2
class AddOneStage(StageInterface[int]):
async def __call__(self, payload: int) -> int:
return payload + 1
pipeline = (
(Pipeline[int]())
.pipe(TimesTwoStage())
.pipe(AddOneStage())
)
# Returns 21
await pipeline.process(10)
Re-usable Pipelines
Because the PipelineInterface is an extension of the StageInterface pipelines can be re-used as stages. This creates a highly composable model to create complex execution patterns while keeping the cognitive load low.
For example, if we'd want to compose a pipeline to process API calls, we'd create something along these lines:
process_api_request = (
(Pipeline())
.pipe(ExecuteHttpRequest())
.pipe(ParseJsonResponse())
)
pipeline = (
(Pipeline())
.pipe(ConvertToPsr7Request())
.pipe(process_api_request)
.pipe(ConvertToResponseDto())
)
await pipeline.process(DeleteBlogPost(post_id))
Pipeline Builders
Because pipelines themselves are immutable, pipeline builders are introduced to facilitate distributed composition of a pipeline.
The PipelineBuilder[PayloadType]
builder collect stages and
allow you to create a pipeline at any given time.
pipeline_builder = (
(PipelineBuilder())
.add(LogicalStage())
.add(AnotherStage())
.add(LastStage())
)
# Build the pipeline
pipeline = pipeline_builder.build()
Exception handling
This package is completely transparent when dealing with exceptions. In no case will this package catch an exception or silence an error. Exceptions should be dealt with on a per-case basis. Either inside a stage or at the time the pipeline processes a payload.
pipeline = Pipeline().pipe(lambda payload: payload / 0)
try:
await pipeline.process(10)
except ValueError as e:
# Handle the exception.
pass
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for thecodecrate_pipeline-1.4.0.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 88436a000e9245d17533badade48034d2a692dffcf71bf374970e4d35f178852 |
|
MD5 | 164b45479af7a3eded832aeee9720bd7 |
|
BLAKE2b-256 | 6346e64637459b8668a8aaf9a26d27f5595b6e9ca500f7eb4657d7a25b4520c8 |
Hashes for thecodecrate_pipeline-1.4.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 824d50e86b24c7b7a8d89125d8313a825bc73189ba92a114170b1736213d56f7 |
|
MD5 | aef6ed79a55cb49301bbab68d42b4f80 |
|
BLAKE2b-256 | cd65dbabad7bef0b643dd49b3d01a74759fd638a51ef25af032c1fc7e5e61777 |