Skip to main content

This package provides a pipeline pattern implementation

Project description

TheCodeCrate's Pipeline

This package provides a pipeline pattern implementation.

The implementation is based on the excellent PHP League Pipeline package.

Installation

pip install thecodecrate-pipeline

Pipeline Pattern

The pipeline pattern allows you to easily compose sequential stages by chaining stages.

In this particular implementation, the interface consists of two parts:

  • StageInterface
  • PipelineInterface

A pipeline consists of zero, one, or multiple stages. A pipeline can process a payload. During the processing, the payload will be passed to the first stage. From that moment on, the resulting value is passed on from stage to stage.

In the simplest form, the execution chain can be represented as a for loop:

result = payload

for stage in stages:
    result = stage(result)

return result

Effectively, this is the same as:

result = stage3(stage2(stage1(payload)))

Immutability

Pipelines are implemented as immutable stage chains. When you pipe a new stage, a new pipeline will be created with the added stage. This makes pipelines easy to reuse and minimizes side-effects.

Usage

Operations in a pipeline, stages, can be anything that satisfies the Callable type hint. So functions and anything that's callable is acceptable.

pipeline = Pipeline().pipe(lambda payload: payload * 10)

# Returns 100
await pipeline.process(10)

Class-Based Stages

Class-based stages are also possible. The StageInterface[InputType, OutputType] interface can be implemented, which ensures you have the correct method signature for the __call__ method.

class TimesTwoStage(StageInterface[int, int]):
    async def __call__(self, payload: int) -> int:
        return payload * 2

class AddOneStage(StageInterface[int, int]):
    async def __call__(self, payload: int) -> int:
        return payload + 1

pipeline = (
    Pipeline[int, int]()
    .pipe(TimesTwoStage())
    .pipe(AddOneStage())
)

# Returns 21
await pipeline.process(10)

Reusable Pipelines

Because the PipelineInterface is an extension of the StageInterface, pipelines can be reused as stages. This creates a highly composable model to create complex execution patterns while keeping the cognitive load low.

For example, if we'd want to compose a pipeline to process API calls, we'd create something along these lines:

process_api_request = (
    Pipeline()
    .pipe(ExecuteHttpRequest())
    .pipe(ParseJsonResponse())
)

pipeline = (
    Pipeline()
    .pipe(ConvertToPsr7Request())
    .pipe(process_api_request)
    .pipe(ConvertToResponseDto())
)

await pipeline.process(DeleteBlogPost(post_id))

Type Hinting

You can specify the input and output types for pipelines and stages using type variables T_in and T_out. This allows you to handle varying types between stages, enhancing type safety and code clarity.

The T_out type variable is optional and defaults to T_in. Similarly, T_in is also optional and defaults to Any.

from typing import Any

pipeline = Pipeline[int]().pipe(lambda payload: payload * 2)

# Returns 20
await pipeline.process(10)

You can also handle varying types between stages:

pipeline = Pipeline[int, str]().pipe(lambda payload: f"Number: {payload}")

# Returns "Number: 10"
await pipeline.process(10)

This flexibility allows you to build pipelines that transform data types between stages seamlessly.

Custom Processors

You can create your own processors to customize how the pipeline processes stages. This allows you to implement different execution strategies, such as handling exceptions, processing resources, or implementing middleware patterns.

For example, you can define a custom processor:

class MyCustomProcessor(Processor[T_in, T_out]):
    async def process(
        self,
        payload: T_in,
        stages: list[StageInterface[T_in, T_out]],
    ) -> T_out:
        # Custom processing logic
        result = payload
        for stage in stages:
            result = await stage(result)
        return result

And use it in your pipeline:

pipeline = Pipeline[int, int](processor=MyCustomProcessor()).pipe(lambda x: x * 2)

Command-Based Processors

In addition to the standard processors, this package supports command-based processors, which utilize the Command Pattern to encapsulate processing logic within a command object. This approach provides better encapsulation and isolation of state for each processing request.

Using Command-Based Processors

You can create a command-based processor by specifying a command_class in your processor. The command class should inherit from Command[T_in, T_out] and implement the execute method.

class StatefulChainedProcessor(Processor[T_in, T_out]):
    command_class = StatefulChainedCommand

class StatefulChainedCommand(Command[T_in, T_out]):
    async def execute(self) -> T_out:
        for stage in self.stages:
            self.payload = await self._call_stage(
                stage=stage,
                payload=self.payload,
            )
        return self.payload

You can then use this processor in your pipeline:

pipeline = (
    Pipeline[int, int](processor=StatefulChainedProcessor())
    .pipe(TimesTwoStage())
    .pipe(AddFiveStage())
)

# Returns 25 ((10 * 2) + 5)
await pipeline.process(10)

When to Use Command-Based Processors

Command-based processors are useful when you need to maintain state within the processing of each payload or prefer an object-oriented approach that aligns with the Command Pattern.

Declarative Stages

Instead of using pipe to add stages at runtime, you can define stages declaratively by specifying them as class-level attributes. This makes pipelines easier to set up and reuse with predefined stages.

class MyPipeline(Pipeline[int, int]):
    stages = [
        TimesTwoStage(),
        TimesThreeStage(),
    ]

# Process the payload through the pipeline with the declared stages
result = await MyPipeline().process(5)

# Returns 30
print(result)

In this example, MyPipeline declares its stages directly in the class definition, making the pipeline setup more readable and maintainable.

Declarative Processor

You can also specify the processor in a declarative way by setting the processor_class attribute in your pipeline class.

class MyPipeline(Pipeline[T_in, T_out]):
    processor_class = MyCustomProcessor

This allows you to customize the processing behavior of your pipeline while keeping the definition clean and declarative.

Processing Streams

The pipeline can also process streams in real-time, allowing you to handle asynchronous iterators and process data as it becomes available.

from typing import AsyncIterator
import asyncio

async def input_stream() -> AsyncIterator[int]:
    for i in range(5):
        yield i

async def stage1(stream: AsyncIterator[int]) -> AsyncIterator[int]:
    async for item in stream:
        yield item * 2
        await asyncio.sleep(1)  # Simulate processing delay

async def stage2(stream: AsyncIterator[int]) -> AsyncIterator[str]:
    async for item in stream:
        yield f"Number: {item}"


async def main():
    pipeline = (
        Pipeline[AsyncIterator[int], AsyncIterator[str]]()
        .pipe(stage1)
        .pipe(stage2)
    )

    stream = await pipeline.process(input_stream())

    async for result in stream:
        print(result)

# Run the async main function
await main()

This allows you to process data in a streaming fashion, where each stage can yield results that are immediately consumed by the next stage.

Pipeline Builders

Because pipelines themselves are immutable, pipeline builders are introduced to facilitate distributed composition of a pipeline.

The PipelineBuilder[InputType, OutputType] collects stages and allows you to create a pipeline at any given time.

pipeline_builder = (
    PipelineBuilder()
    .add(LogicalStage())
    .add(AnotherStage())
    .add(LastStage())
)

# Build the pipeline
pipeline = pipeline_builder.build()

Exception Handling

This package is completely transparent when dealing with exceptions. In no case will this package catch an exception or silence an error. Exceptions should be dealt with on a per-case basis, either inside a stage or at the time the pipeline processes a payload.

pipeline = Pipeline().pipe(lambda payload: payload / 0)

try:
    await pipeline.process(10)
except ZeroDivisionError as e:
    # Handle the exception.
    pass

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

thecodecrate_pipeline-1.13.0.tar.gz (19.1 kB view details)

Uploaded Source

Built Distribution

thecodecrate_pipeline-1.13.0-py3-none-any.whl (33.7 kB view details)

Uploaded Python 3

File details

Details for the file thecodecrate_pipeline-1.13.0.tar.gz.

File metadata

File hashes

Hashes for thecodecrate_pipeline-1.13.0.tar.gz
Algorithm Hash digest
SHA256 bb2672018179ae19d79ce27f11f01befc7868ba996c725e81dc96c25a89b60c1
MD5 7f42bc7f8ebdf1b419cbacc344ad1f2e
BLAKE2b-256 a596c61d844f72492e40870abf5e2870eecb27ad14ce4e1e82c2099c35e42c5d

See more details on using hashes here.

File details

Details for the file thecodecrate_pipeline-1.13.0-py3-none-any.whl.

File metadata

File hashes

Hashes for thecodecrate_pipeline-1.13.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4b1cad2962f4533ca0548b2f0daf717bfee44e2797c5f3cf9135edb84e9f53d7
MD5 f2a98e31bfded7d90a27adf9ffa2e331
BLAKE2b-256 d785586c03a8f1c6d234129449931b3531e2d38d414f818a7495ff94a699a5d4

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page