Skip to main content

lightweight but versatile python-framework for multi-stage information processing

Project description

Tests

data-plumber

data-plumber is a lightweight but versatile python-framework for multi-stage information processing. It allows to construct processing pipelines from both atomic building blocks and via recombination of existing pipelines. Forks enable more complex (i.e. non-linear) orders of execution. Pipelines can also be collected into arrays that can be executed at once with the same input data.

Contents

  1. Usage Example
  2. Install
  3. Documentation
  4. Changelog

Usage example

Consider a scenario where the contents of a dictionary have to be validated and a suitable error message has to be generated. Specifically, a valid input- dictionary is expected to have a key "data" with the respective value being a list of integer numbers. A suitable pipeline might look like this

>>> from data_plumber import Stage, Pipeline, Previous
>>> pipeline = Pipeline(
        Stage(
            primer=lambda **kwargs: "data" in kwargs,
            status=lambda primer, **kwargs: 0 if primer else 1,
            message=lambda primer, **kwargs: "" if primer else "missing argument"
        ),
        Stage(
            requires={Previous: 0},
            primer=lambda data, **kwargs: isinstance(data, list),
            status=lambda primer, **kwargs: 0 if primer else 1,
            message=lambda primer, **kwargs: "" if primer else "bad type"
        ),
        Stage(
            requires={Previous: 0},
            primer=lambda data, **kwargs: all(isinstance(i, int) for i in data),
            status=lambda primer, **kwargs: 0 if primer else 1,
            message=lambda primer, **kwargs: "validation success" if primer else "bad type in data"
        ),
        exit_on_status=1
    )
>>> pipeline.run().last_message
'missing argument'
>>> pipeline.run(data=1).last_message
'bad type'
>>> pipeline.run(data=[1, "2", 3]).last_message
'bad type in data'
>>> pipeline.run(data=[1, 2, 3]).last_message
'validation success'

Install

Install using pip with

pip install data-plumber

Consider installing in a virtual environment.

Documentation

Overview

Back

data_plumber is designed to provide a framework for flexible data-processing based on re-usable building blocks.

At its core stands the class Pipeline which can be understood as both a container for a collection of instructions (PipelineComponent) and an interface for the execution of a process (Pipeline.run(...)). Previously defined Pipelines can be recombined with other Pipelines or extended by individual PipelineComponents. Individual Pipeline.runs can be triggered with run-specific arguments.

PipelineComponents are either units defining actual data-processing (Stage) or control the flow of a Pipeline execution (Fork). Until a Fork is encountered, a Pipeline.run iterates a pre-configured list of PipelineComponents. Any Stage-type component provides an integer status value after execution which is then available for defining conditional execution of Stages or changes in flow (Fork).

A Stage itself consists of multiple (generally optional) highly customizable sub-stages and propertis that can be configured at instantiation. In particular, a versatile referencing system based on (pre-defined) StageRef-type classes can be used to define the requirements for Stages. Similarly, this system is also used by Forks.

The output of a Pipeline.run is of type PipelineOutput. It contains extensive information on the order of operations, the response from individual Stages, and a data-property (of customizable type). The latter can be used to store and/or output processed data from the Pipeline's execution context.

Finally, aside from recombination of Pipelines into more complex Pipelines, multiple instances of Pipelines can be pooled into a Pipearray. This construct allows to call different Pipelines with identical input data.

Pipeline

Back

Building anonymous Pipelines

A Pipeline can be created in an empty, a partially, or a fully assembled state.

For the empty Pipeline a simple expression like

>>> from data_plumber import Pipeline, Stage
>>> Pipeline()
<data_plumber.pipeline.Pipeline object at ...>

suffices. Following up with statements like

>>> p = Pipeline()
>>> p.append(Stage())
>>> p.prepend(Pipeline())
>>> p.insert(Stage(), 1)

or simply by using the +-operator

>>> p = Pipeline()
>>> Stage() + p + Pipeline()
<data_plumber.pipeline.Pipeline object at ...>

Note that when adding to existing Pipelines, the change is made in-place.

>>> p = Pipeline(Stage())
>>> len(p)
1
>>> p + Stage()
<data_plumber.pipeline.Pipeline object at ...>
>>> len(p)
2

Consequently, only properties of the first argument are inherited (refer to python's operator precedence). Therefore, the use of this operation in combination with Pipelines requires caution.

Building named Pipelines

Instead of simply providing the individual PipelineComponents as positional arguments during instantiation, they can be assigned names by providing components as keyword arguments (kwargs). In addition to the kwargs, the positional arguments are still required to determine the order of operations for the Pipeline. These are then given by the PipelineComponent's name:

>>> Pipeline(
    "a", "b", "a", "c",
    a=Stage(...,),
    b=Stage(...,),
    c=Stage(...,)
)
<data_plumber.pipeline.Pipeline object at ...>

In the example above, the Pipeline executes the Stages in the order of a > b > a > c (note that the names of Stages can occur multiple times in the position arguments or via Pipeline-extending methods). Methods like Pipeline.append also accept string identifiers for PipelineComponents. If none are provided at instantiation, an internally generated identifier is used.

The two approaches of anonymous and named Pipelines can be combined freely:

>>> Pipeline(
    "a", Stage(...,), "a", "c",
    a=Stage(...,),
    c=Stage(...,)
)
<data_plumber.pipeline.Pipeline object at ...>

Unpacking Pipelines

Pipelines support unpacking to be used as, for example, positional or keyword arguments in the constructor of another Pipeline:

>>> p = Pipeline("a", ..., a=Stage(), ...)
>>> Pipeline("b", *p, ..., b=Stage(), **p, ...)
<data_plumber.pipeline.Pipeline object at ...>

Running a Pipeline

A Pipeline can be triggered by calling the run-method.

>>> Pipeline(...).run(...)
PipelineOutput(...)

Any kwargs passed to this function are forwarded to its PipelineComponent's Callables. Note that some keywords are reserved (out, primer, status, and count).

While Forks are simply evaluated and their returned StageRef is used to find the next target for execution, Stages have themselves multiple sub-stages. First, the Pipeline checks the Stage's requirements, then executions its primer before running the action-command. Next, any exported kwargs are updated in the Pipeline.run and, finally, the status and response message is generated (see Stage for details).

Pipeline settings

A Pipeline can be configured with multiple properties at instantiation:

  • initialize_output: a Callable that returns an object which is consequently passed forward into the PipelineComponent's Callables; this object is refered to as "persistent data-object" (default generates an empty dictionary)
  • finalize_output: a Callable that is called before (normally) exiting the Pipeline.run with the run's kwargs as well as the persistent data-object
  • exit_on_status: either integer value (Pipeline exists normally if any component returns this status) or a Callable that is called after any component with the component's status (if it evaluates to True, the Pipeline.run is stopped)
  • loop: boolean; if False, the Pipeline stops automatically after iterating beyond the last PipelineComponent in its list of operations; if True, the execution loops back into the first component

Running a Pipeline as decorator

A Pipeline can be used to generate kwargs for a function (i.e., based on the persistent data-object). This requires the data-object being unpackable like a mapping (e.g. a dictionary).

>>> @Pipeline(...).run_for_kwargs(...)
    def function(arg1, arg2): ...

Arguments that are passed to a call of the decorated function take priority over those generated by the Pipeline.run.

Stage

Back

A Stage represents a single building block in the processing logic of a Pipeline. It provides a number of Callables that are used in a Pipeline.run. The arguments that are passed into those Callables vary. Below a list of all keywords that can occur is given (most Callables receive only a subset of these):

  • all kwargs given to Pipeline.run are forwarded (note that this makes the following arguments reserved words in this context),
  • out (a persistent data-object that is passed through the entire Pipeline; its initial value is generated by the Pipeline's initialize_output),
  • primer (output of Stage.primer),
  • status (output of Stage.status),
  • count (index of Stage in execution of Pipeline)

Stage properties

Stages accept a number of different (optional) arguments that are mostly Callables to be used by a Pipeline during execution.

  • requires -- requirements for Stage-execution being either None (always run this Stage) or a dictionary with pairs of references to a Stage and the required status (uses most recent evaluation);

    key types are either StageRef, str (identifier of a Stage in the context of a Pipeline), or int (relative index in Pipeline stage arrangement);

    values are either an integer or a Callable taking the status as an argument and returning a bool (if it evaluates to True, the Stage-requirement is met)

    >>> from data_plumber import Pipeline, Stage, Previous
    >>> Pipeline(
          Stage(
            message=lambda **kwargs: "first stage",
            status=lambda **kwargs: 1
          ),
          Stage(
            requires={Previous: 0},
            message=lambda **kwargs: "second stage"
          ),
        ).run().last_message
    'first stage'
    
  • primer Callable for pre-processing data

    (kwargs: out, count)

    >>> Pipeline(
          Stage(
            primer=lambda **kwargs: "primer value",
            message=lambda primer, **kwargs: primer
          ),
        ).run().last_message
    'primer value'
    
  • action Callable for main-step of processing

    (kwargs: out, primer, count)

    >>> Pipeline(
          Stage(
            action=lambda out, **kwargs: out.update({"new_data": 0})
          ),
        ).run().data
    {'new_data': 0}
    
  • export Callable that returns a dictionary of additional kwargs to be exported to the parent Pipeline; in the following Stages, these kwargs are then available as if they were provided with the Pipeline.run-command

    (kwargs: out, primer, count)

    >>> Pipeline(
          Stage(
            export=lambda **kwargs: {"new_arg": 0}
          ),
          Stage(
            message=lambda **kwargs:
              "export successful" if "new_arg" in kwargs
              else "missing new_arg"
          ),
        ).run().last_message
    'export successful'
    
  • status Callable for generation of a Stage's integer exit status

    (kwargs: out, primer, count)

  • message Callable for generation of a Stage's exit message

    (kwargs: out, primer, count, status)

Fork

Back

A Fork represents a conditional in the execution of a Pipeline. It can be used to redirect the next Pipeline-target to a specific absolutely or relatively positioned PipelineComponent. Analogous to the Stage, a Fork's eval-method is called with a numer of keyword arguments:

  • all kwargs given to Pipeline.run are forwarded (note that this makes the following arguments reserved words in this context),
  • out (a persistent data-object that is passed through the entire Pipeline; its initial value is generated by the Pipeline's initialize_output),
  • count (index of Stage in execution of Pipeline)

Fork properties

A Fork takes a single Callable as argument. Based on the properties described above, a reference to a target Stage is returned. This reference can be made as one of several ways:

  • integer; relative index in the Pipeline's list of components
  • string; a PipelineComponent's string identifier in the context of a Pipeline.run
  • StageRef; a more abstract form of reference, e.g. First, Ǹext (see StageRef for details)
  • None; signal to (normally) exit Pipeline.run

Example

>>> from data_plumber import Pipeline, Stage, Fork, Next
>>> p = Pipeline(
      Stage(
        message=lambda **kwargs: "stage 1 executed"
      ),
      Fork(
        lambda **kwargs: Next if "arg" in kwargs else None
      ),
      Stage(
        message=lambda **kwargs: "stage 2 executed"
      ),
    )
>>> p.run(arg=0).last_message
'stage 2 executed'
>>> p.run().last_message
'stage 1 executed'

StageRef

Back

StageRefs can be utilized in the context of requirements of Stages as well as flow control with Forks. While additional types of StageRefs can be defined, data_plumber already provides rich possibilities natively.

There are two different categories of StageRefs:

  1. referring to records of previously executed PipelineComponents (a record then provides information on the components position in the Pipeline's sequence of components)
  2. referring to a component within the list of registered components of a Pipeline

List of predefined StageRefs (by record)

  • First: record of first component
  • Previous: record of previous component (one step)
  • PreviousN(n): record of previous component (n steps)

List of predefined StageRefs (by sequence)

  • Last: last component in sequence
  • Next: next component in sequence (one step)
  • Skip: component after next in sequence (two steps)
  • NextN(n): next component (n steps)
  • StageById(id): first occurrence of id in sequence
  • StageByIndex(index): component at index of sequence
  • StageByIncrement(n): component with relative position n in sequence

Example

>>> from data_plumber import Pipeline, Stage, Fork, Previous, NextN
>>> output = Pipeline(
        Stage(
            status=lambda **kwargs: 0
        ),
        Stage(
            requires={Previous: 0},
            status=lambda count, **kwargs: count
        ),
        Fork(
            lambda count, **kwargs: NextN(1)
        ),
        exit_on_status=lambda status: status > 3,
        loop=True
    ).run()
>>> len(output.records)
6

PipelineOutput

Back

List of properties

The output of a Pipeline.run is an object of type PipelineOutput. This object has the following properties:

  • records: a list of StageRecords corresponding to all stages executed by the Pipeline; StageRecords themselves are an Alias for a tuple of the message and status generated from the corresponding component
  • kwargs: a dictionary with the keyword arguments used in the Pipeline.run
  • data: the persistent data-object that has been processed through the Pipeline

For convenience, the last StageRecord generated in the Pipeline can be investigated using the conveniece-properties

  • last_record: StageRecord of last component that generated an output
  • last_status: status-part of the last_record
  • last_message: message-part of the last_record

Pipearray

Back

A Pipearray is a convenience class that offers to run multiple Pipelines based on the same input data. Just like the Pipelines themselves, the Pipearray can be either anonymous or named, depending on the use of positional and keyword arguments during initialization. The return type can then be either a list (only positional arguments) or a dictionary with keys being names/ids (at least one named Pipeline). Both contain the PipelineOutput objects of the individual Pipelines.

Example

>>> from data_plumber import Pipeline, Pipearray
>>> Pipearray(Pipeline(...), Pipeline(...)).run(...)
<list[PipelineOutput]>
>>> Pipearray(
        p=Pipeline(...),
        q=Pipeline(...)
    ).run(...)
<dict[str, PipelineOutput]>

Changelog

[1.11.0] - 2024-02-04

Changed

  • added common base class _PipelineComponent for Pipeline components Stage and Fork (f628159)

Added

  • added docs to package metadata (061e311)
  • names for PipelineComponents can now be declared in extension methods (append, prepend, ...) (8363284)
  • Pipeline now supports in-operator (usable with either component directly or its name/id) (5701073)
  • added requirements for Pipeline to be unpacked as mapping (b2db8fa)

Fixed

  • fixed issue where Fork-objects were internally not registered by their id (b267ca4)

[1.8.0] - 2024-02-03

Changed

  • refactored Fork and Stage to transform string/integer-references to Stages into StageRefs (7ba677b)

Added

  • added decorator-factory Pipeline.run_for_kwargs to generate kwargs for function calls (fe616b2)
  • added optional Stage-callable to export kwargs into Pipeline.run (8eca1bc)
  • added even more types of StageRefs: PreviousN, NextN (576820c)
  • added py.typed-marker to package (04a2e1d)
  • added more types of StageRefs: StageById, StageByIndex, StageByIncrement (92d57ad)

[1.4.0] - 2024-02-01

Changed

  • refactored internal modules (cf7045f)

Added

  • added StageRefs Next, Last, and Skip (14abaa7)
  • added optional finalizer-Callable to Pipeline (d95e5b6)
  • added support for Callable in Pipeline-argument exit_on_status (154c67b)

Fixed

  • PipelineOutput.last_X-methods now return None in case of empty records (b7a6ba1)

[1.0.0] - 2024-01-31

Changed

  • Breaking: refactor PipelineOutput and related types (1436ca1)
  • Breaking: replaced forwarding kwargs of Pipeline.run as dictionary in_ into Stage/Fork-Callables by forwarding directly (f2710fa, b569bb9)

Added

  • added missing information in module- and class-docstrings (7896742)

[0.1.0] - 2024-01-31

initial release

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

data-plumber-1.11.0.tar.gz (22.6 kB view details)

Uploaded Source

Built Distribution

data_plumber-1.11.0-py3-none-any.whl (20.7 kB view details)

Uploaded Python 3

File details

Details for the file data-plumber-1.11.0.tar.gz.

File metadata

  • Download URL: data-plumber-1.11.0.tar.gz
  • Upload date:
  • Size: 22.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.12

File hashes

Hashes for data-plumber-1.11.0.tar.gz
Algorithm Hash digest
SHA256 44f8179540e38d826a3632340202bc664e131dfd5612dd09a196894ca25e7900
MD5 4f1bfb080e87abff05046947d9d7cd95
BLAKE2b-256 f5e22ce8cfdf2aca98ec0eb6a2ff85895606d9aa3edade9f349ee11ef55d60b5

See more details on using hashes here.

File details

Details for the file data_plumber-1.11.0-py3-none-any.whl.

File metadata

File hashes

Hashes for data_plumber-1.11.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8ffa20f75359d520f9e4ad9c33a0ce1bc11473cbb5fe792d5c5b4d82bce8d482
MD5 f6e4a856ee7d278abae1a0d9e763ea08
BLAKE2b-256 0e40850528974e620165481b3efd5211573cc4a3bee29ba8984ece34e0decd87

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page