Skip to main content

lightweight but versatile python-framework for multi-stage information processing

Project description

Tests PyPI - License GitHub top language PyPI - Python Version PyPI version PyPI - Wheel

data-plumber

data-plumber is a lightweight but versatile python-framework for multi-stage information processing. It allows to construct processing pipelines from both atomic building blocks and via recombination of existing pipelines. Forks enable more complex (i.e. non-linear) orders of execution. Pipelines can also be collected into arrays that can be executed at once with the same input data.

Contents

  1. Usage Example
  2. Install
  3. Documentation
  4. Changelog

Usage example

Consider a scenario where the contents of a dictionary have to be validated and a suitable error message has to be generated. Specifically, a valid input-dictionary is expected to have a key "data" with the respective value being a list of integer numbers. A suitable pipeline might look like this

>>> from data_plumber import Stage, Pipeline, Previous
>>> pipeline = Pipeline(
...   Stage(  # validate "data" is passed into run
...     primer=lambda **kwargs: "data" in kwargs,
...     status=lambda primer, **kwargs: 0 if primer else 1,
...     message=lambda primer, **kwargs: "" if primer else "missing argument"
...   ),
...   Stage(  # validate "data" is list
...     requires={Previous: 0},
...     primer=lambda data, **kwargs: isinstance(data, list),
...     status=lambda primer, **kwargs: 0 if primer else 1,
...     message=lambda primer, **kwargs: "" if primer else "bad type"
...   ),
...   Stage(  # validate "data" contains only int
...     requires={Previous: 0},
...     primer=lambda data, **kwargs: all(isinstance(i, int) for i in data),
...     status=lambda primer, **kwargs: 0 if primer else 1,
...     message=lambda primer, **kwargs: "validation success" if primer else "bad type in data"
...   ),
...   exit_on_status=1
... )
>>> pipeline.run().last_message
'missing argument'
>>> pipeline.run(data=1).last_message
'bad type'
>>> pipeline.run(data=[1, "2", 3]).last_message
'bad type in data'
>>> pipeline.run(data=[1, 2, 3]).last_message
'validation success'

See section "Examples" in Documentation for more explanation.

Install

Install using pip with

pip install data-plumber

Consider installing in a virtual environment.

Documentation

Overview

Back

data_plumber is designed to provide a framework for flexible data-processing based on re-usable building blocks.

At its core stands the class Pipeline which can be understood as both a container for a collection of instructions (PipelineComponent) and an interface for the execution of a process (Pipeline.run(...)). Previously defined Pipelines can be recombined with other Pipelines or extended by individual PipelineComponents. Individual Pipeline.runs can be triggered with run-specific input data.

PipelineComponents are either units defining actual data-processing (Stage) or control the flow of a Pipeline execution (Fork). Until a Fork is encountered, a Pipeline.run iterates a pre-configured list of PipelineComponents. Any Stage-type component provides an integer status value after execution which is then available for defining conditional execution of Stages or changes in flow (Fork).

A Stage itself consists of multiple (generally optional) highly customizable sub-stages and properties that can be configured at instantiation. In particular, a versatile referencing system based on (pre-defined) StageRef-type classes can be used to control the requirements for Stages. Similarly, this system is also used by Forks.

The output of a Pipeline.run is of type PipelineOutput. It contains extensive information on the order of operations, the response from individual Stages, and a data-property (of customizable type). The latter can be used to store and/or output processed data from the Pipeline's execution context.

Finally, aside from recombination of Pipelines into more complex Pipelines, multiple instances of Pipelines can be pooled into a Pipearray. This construct allows to call different Pipelines with identical input data.

Pipeline

Back

Building anonymous Pipelines

A Pipeline can be created in an empty, a partially, or a fully assembled state.

For the empty Pipeline a simple expression like

>>> from data_plumber import Pipeline, Stage
>>> Pipeline()
<data_plumber.pipeline.Pipeline object at ...>

suffices. Following up with statements like

>>> p = Pipeline()
>>> p.append(Stage())
>>> p.prepend(Pipeline())
>>> p.insert(Stage(), 1)

or simply by using the +-operator

>>> p = Pipeline()
>>> Stage() + p + Pipeline()
<data_plumber.pipeline.Pipeline object at ...>

Note that when adding to existing Pipelines, the change is made in-place.

>>> p = Pipeline(Stage())
>>> len(p)
1
>>> p + Stage()
<data_plumber.pipeline.Pipeline object at ...>
>>> len(p)
2

Consequently, only properties of the first argument are inherited (refer to python's operator precedence). Therefore, the use of this operation in combination with Pipelines requires caution.

Building named Pipelines

Instead of giving the individual PipelineComponents as positional arguments during instantiation, they can be assigned names by providing components as keyword arguments (kwargs). In addition to the kwargs, the positional arguments are still required to determine the order of operations for the Pipeline. These are then given by the PipelineComponent's name:

>>> Pipeline(
...   "a", "b", "a", "c",
...   a=Stage(...,),
...   b=Stage(...,),
...   c=Stage(...,)
... )
<data_plumber.pipeline.Pipeline object at ...>

In the example above, the Pipeline executes the Stages in the order of a > b > a > c (note that the names of Stages can occur multiple times in the positional arguments or via Pipeline-extending methods). Methods like Pipeline.append also accept string identifiers for PipelineComponents. If none are provided at instantiation, an internally generated identifier is used.

The two approaches of anonymous and named Pipelines can be combined freely:

>>> Pipeline(
...   "a", Stage(...,), "a", "c",
...   a=Stage(...,),
...   c=Stage(...,)
... )
<data_plumber.pipeline.Pipeline object at ...>

Also note that empty PipelineComponent can be used as well. This can be helpful to label certain points in a Pipeline without changing its behavior. Consider for example a Fork needs to point to a specific part of a Pipeline but otherwise no named PipelineComponent are required:

>>> Pipeline(
...   Stage(...,),  # stage 1
...   Fork(
...     lambda **kwargs: "target_position"
...   ),
...   Stage(...,),  # stage 2
...   "target_position",
...   Stage(...,)   # stage 3,
...   ...
... )
<data_plumber.pipeline.Pipeline object at ...>

which when triggered executes the Stages as 'stage 1' > 'stage 3' (the associated PipelineOutput does not contain any record of the empty component "target_position").

Unpacking Pipelines

Pipelines support unpacking to be used as, for example, positional or keyword arguments in the constructor of another Pipeline:

>>> p = Pipeline("a", ..., a=Stage(), ...)
>>> Pipeline("b", *p, ..., b=Stage(), **p, ...)
<data_plumber.pipeline.Pipeline object at ...>

Running a Pipeline

A Pipeline can be triggered by calling the run-method.

>>> Pipeline(...).run(...)
PipelineOutput(...)

Passing the finalize_output keyword argument into a run allows to modify the persistent Pipeline-data at exit (see section Pipeline settings for details). Any kwargs passed to this function are forwarded to its PipelineComponent's Callables. Note that some keywords are reserved (out, primer, status, count, and records).

While Forks are simply evaluated and their returned StageRef is used to find the next target for execution, Stages have themselves multiple sub-stages. First, the Pipeline checks the Stage's requirements, then executions its primer before running the action-command. Next, any exported kwargs are updated in the Pipeline.run and, finally, the status and response message is generated (see Stage for details).

Pipeline settings

A Pipeline can be configured with multiple properties at instantiation:

  • initialize_output: a Callable that returns an object which is consequently passed forward into the PipelineComponent's Callables; this object is refered to as "persistent data-object" (default generates an empty dictionary)
  • finalize_output: a Callable that is called before (normally) exiting the Pipeline.run with the run's kwargs as well as the persistent data-object and a list of previous StageRecords called records (can be overridden in call of Pipeline.run)
  • exit_on_status: either integer value (Pipeline exists normally if any component returns this status) or a Callable that is called after any component with the component's status (if it evaluates to True, the Pipeline.run is stopped)
  • loop: boolean; if False, the Pipeline stops automatically after iterating beyond the last PipelineComponent in its list of operations; if True, the execution loops back into the first component

Running a Pipeline as decorator

A Pipeline can be used to generate kwargs for a function (i.e., based on the content of the persistent data-object). This requires the data-object being unpackable like a mapping (e.g. a dictionary).

>>> @Pipeline(...).run_for_kwargs(...)
... def function(arg1, arg2): ...

Arguments that are passed to a call of the decorated function take priority over those generated by the Pipeline.run.

Stage

Back

A Stage represents a single building block in the processing logic of a Pipeline. It provides a set of Callables that are used in a Pipeline.run. Their order of execution is given by primer > action > export > status > message. The arguments that are passed into those Callables vary by Callable and execution context. Below a list of all keywords that can occur is given (most Callables receive only a subset of these):

  • all kwargs given to Pipeline.run or exported during execution are forwarded (note that this makes the following arguments reserved words in this context)
  • out: a persistent data-object that is passed through the entire Pipeline; its initial value is generated by the Pipeline's initialize_output
  • primer: output of Stage.primer
  • status: output of Stage.status
  • count: index of Stage in execution of Pipeline

Stage properties

Stages accept a number of different (optional) arguments that are mostly Callables to be used by a Pipeline during execution.

  • requires: requirements for Stage-execution being either None (always run this Stage) or a dictionary with pairs of references to a Stage and the required status (uses most recent evaluation);

    key types are either StageRef, str (identifier of a Stage in the context of a Pipeline), or int (relative index in Pipeline stage arrangement);

    values are either an integer or a Callable taking the status as an argument and returning a bool (if it evaluates to True, the Stage-requirement is met)

    >>> from data_plumber import Pipeline, Stage, Previous
    >>> Pipeline(
    ...   Stage(
    ...     message=lambda **kwargs: "first stage",
    ...     status=lambda **kwargs: 1
    ...   ),
    ...   Stage(
    ...     requires={Previous: 0},
    ...     message=lambda **kwargs: "second stage"
    ...   ),
    ... ).run().last_message
    'first stage'
    
  • primer: Callable for pre-processing data

    (kwargs: out, count)

    >>> Pipeline(
    ...   Stage(
    ...     primer=lambda **kwargs: "primer value",
    ...     message=lambda primer, **kwargs: primer
    ...   ),
    ... ).run().last_message
    'primer value'
    
  • action: Callable for main-step of processing

    (kwargs: out, primer, count)

    >>> Pipeline(
    ...   Stage(
    ...     action=lambda out, **kwargs: out.update({"new_data": 0})
    ...   ),
    ... ).run().data
    {'new_data': 0}
    
  • export: Callable that returns a dictionary of additional kwargs to be exported to the parent Pipeline; in the following Stages, these kwargs are then available as if they were provided with the Pipeline.run-command

    (kwargs: out, primer, count)

    >>> Pipeline(
    ...   Stage(
    ...     export=lambda **kwargs: {"new_arg": 0}
    ...   ),
    ...   Stage(
    ...     message=lambda **kwargs:
    ...       "export successful" if "new_arg" in kwargs
    ...       else "missing new_arg"
    ...   ),
    ... ).run().last_message
    'export successful'
    
  • status: Callable for generation of a Stage's integer exit status

    (kwargs: out, primer, count)

  • message: Callable for generation of a Stage's exit message

    (kwargs: out, primer, count, status)

Fork

Back

A Fork represents a conditional in the execution of a Pipeline. It can be used to redirect the next Pipeline-target to a specific, absolutely or relatively positioned PipelineComponent. Analogous to the Stage, a Fork's eval-method is called with a numer of keyword arguments:

  • all kwargs given to Pipeline.run are forwarded (note that this makes the following arguments reserved words in this context)
  • out: a persistent data-object that is passed through the entire Pipeline; its initial value is generated by the Pipeline's initialize_output
  • count: index of Stage in execution of Pipeline
  • records: a list of previously generated StageRecords (see PipelineOutput for more information)

Fork properties

A Fork takes a single Callable as argument. Based on the properties described above, a reference to a target Stage is returned. This reference can be made as one of several ways:

  • integer; relative index in the Pipeline's list of components
  • string; a PipelineComponent's string identifier in the context of a Pipeline.run
  • StageRef; a more abstract form of reference, e.g. First, Next (see StageRef for details)
  • None; signal to (normally) exit Pipeline.run

Example

>>> from data_plumber import Pipeline, Stage, Fork, Next
>>> p = Pipeline(
...   Stage(
...     message=lambda **kwargs: "stage 1 executed"
...   ),
...   Fork(
...     lambda **kwargs: Next if "arg" in kwargs else None
...   ),
...   Stage(
...     message=lambda **kwargs: "stage 2 executed"
...   ),
... )
>>> p.run(arg=0).last_message
'stage 2 executed'
>>> p.run().last_message
'stage 1 executed'

StageRef

Back

StageRefs can be utilized to formulate requirements for Stages as well as flow control with Forks. While additional types of StageRefs can be defined, data_plumber already provides rich possibilities natively.

There are two different categories of StageRefs:

  1. referring to records of previously executed PipelineComponents (a record can uniquely identify a component within the Pipeline's sequence of components)
  2. referring to a component within the list of registered components of a Pipeline directly

List of predefined StageRefs (by record)

  • First: record of first component
  • Previous: record of previous component (one step)
  • PreviousN(n): record of previous component (n steps)

List of predefined StageRefs (by sequence)

  • Last: last component in sequence
  • Next: next component in sequence (one step)
  • Skip: component after next in sequence (two steps)
  • NextN(n): next component (n steps)
  • StageById(id): first occurrence of id in sequence
  • StageByIndex(index): component at index of sequence
  • StageByIncrement(n): component with relative position n in sequence

Example

>>> from data_plumber import Pipeline, Stage, Fork, Previous, NextN
>>> output = Pipeline(
...   Stage(
...     status=lambda **kwargs: 0
...   ),
...   Stage(
...     requires={Previous: 0},
...     status=lambda count, **kwargs: count
...   ),
...   Fork(
...     lambda count, **kwargs: NextN(1)
...   ),
...   exit_on_status=lambda status: status > 3,
...   loop=True
... ).run()
>>> len(output.records)
6

PipelineOutput

Back

List of properties

The output of a Pipeline.run is an object of type PipelineOutput. This object has the following properties:

  • records: a list of StageRecords corresponding to all Stages executed by the Pipeline; StageRecords themselves are a collection of properties

    • index: index position in Pipeline's sequence of PipelineComponents
    • id_: name/id of Stage
    • message: the message returned by the Stage
    • status: the message returned by the Stage

    (for legacy support (<=1.11.) this property can also be indexed, where message and status are returned for indices 0 and 1, respectively)

  • kwargs: a dictionary with the keyword arguments used in the Pipeline.run

  • data: the persistent data-object that has been processed through the Pipeline

For convenience, the last StageRecord generated in the Pipeline can be investigated using the shortcuts

  • last_record: StageRecord of last component that generated an output
  • last_status: status-part of the last_record
  • last_message: message-part of the last_record

Pipearray

Back

A Pipearray is a convenience class that offers to run multiple Pipelines based on the same input data. Just like the Pipelines themselves, the Pipearray can be either anonymous or named, depending on the use of positional and keyword arguments during setup. The return type can then be either a list (only positional arguments) or a dictionary with keys being names/ids (at least one named Pipeline). Both contain the PipelineOutput objects of the individual Pipelines.

Example

>>> from data_plumber import Pipeline, Pipearray
>>> Pipearray(Pipeline(...), Pipeline(...)).run(...)
<list[PipelineOutput]>
>>> Pipearray(
...   p=Pipeline(...),
...   q=Pipeline(...)
... ).run(...)
<dict[str, PipelineOutput]>

Examples

Back

Input Processing Revisited

Reconsider the introductory example of validating dictionary content. The described scenario where data is validated against some schema with increasing attention to detail with every Stage and generating helpful response messages is common in the context of, for example, user input.

The following section goes into more detail regarding the reasoning for the example-Pipeline's setup. A Pipeline generally iterates the provided sequence of Stages if there is no flow control included (like in this example). Within a Stage there is again an order of operations for the Callables as defined in the constructor (primer > action > export > status > message). Starting with the first Stage of the example,

    ...
...   Stage(  # validate "data" is passed into run
...     primer=lambda **kwargs: "data" in kwargs,
...     status=lambda primer, **kwargs: 0 if primer else 1,
...     message=lambda primer, **kwargs: "" if primer else "missing argument"
...   ),
    ...

the primer is executed first. This function validates that an argument with keyword data has been passed in to the Pipeline.run, i.e. Pipeline.run(data=...). Following this evaluation, the result can be used in the next two Callables, status and message to generate an output from this Stage with appropriate properties for the given situation.

In the next Stage,

    ...
...   Stage(  # validate "data" is list
...     requires={Previous: 0},
...     primer=lambda data, **kwargs: isinstance(data, list),
...     status=lambda primer, **kwargs: 0 if primer else 1,
...     message=lambda primer, **kwargs: "" if primer else "bad type"
...   ),
    ...

the general structure is similar. A primer is executed to validate a property of the data-keyword argument. This keyword can now be written explicitly into the lambda's signature (instead of covering this argument implicitly with **kwargs like before) since this call to primer precedes a requirement-check. The requirement given here declares only a single requirement, i.e. the previous Stage (denoted by the StageRef named Previous) returned with a status of 0. Consequently, this second Stage is only executed if the primer of the first Stage returned a value of True.

The third Stage is conceptually the same as the second one but intended to handle another aspect of the validation.

The exit from the Pipeline.run can occur either after completing the third Stage or if a Stage returns a specific status as defined by the kwarg of the Pipeline itself:

>>> pipeline = Pipeline(
...  ...
...   exit_on_status=1
... )

This is the reason for returning a status of 1 in all Stages if any stage of the validation fails.

In the following, two additional features of data-plumber are demonstrated by extending the original example.

Input Processing - Formatting Output Data

A Pipeline can also handle reformatting of input data into a desired format. Suppose that, following validation, only the sum of all integers given in the list data is of interest. In order make this information readily available right after execution within the PipelineOutput-object that is returned by a Pipeline.run, one may add a suitable action callable in the last Stage of the previous setup:

    ...
...   Stage(  # validate "data" contains only int
...    ...
...     action=lambda data, primer, **kwargs:
...       data.update{"sum": sum(data)} if primer else None
...    ...
...   ),
    ...

With this change, after full validation, a key sum is added to the dictionary in PipelineOutput.data, i.e.

>>> output = pipeline.run(data=[1, 2, 3])
>>> output.data["sum"]
6

Input Processing - Optional Values

Consider that the data that is fed into the Pipeline.run-command is an optional property. Consequently, it is fine if it is missing but, if it exists, it should satisfy a specific format.

This scenario can, for example, be realized using a Fork. A Fork takes a single Callable on construction which returns a reference to another Stage as next target in the current Pipeline.run or give an exit-signal (by returning None). Here, the latter is more interesting. We can change the initial portion of the Pipeline setup to accomodate for the changes in validation logic:

>>> from data_plumber import Next
>>> pipeline = Pipeline(
...   Fork(
...     lambda **kwargs: Next if "data" in kwargs else None
...   )
...   Stage(  # validate "data" is list
...     ...
...   ),
...  ...
...   exit_on_status=1
... )

Now, the validation of the first Stage in the initial example is replaced with a Fork. Next is another pre-defined StageRef pointing at the next PipelineComponent in the Pipeline's sequence.

Note that for this setup, if data is missing, the returned PipelineOutput does not contain any StageRecords as Fork does not output any status. In order to have both, the Fork-logic and a StageRecord, the Fork has to be the second component. Using the stages-kwarg for the Fork's Callable enables to use the status from a previous Stage and circumvent a duplicate validation.

Changelog

[1.15.0] - 2024-04-16

Changed

  • changed finalize_output-callable argument signature, it now also takes records (16623d9)

[1.14.0] - 2024-03-30

Added

  • added finalize_output-override option to Pipeline.run (e951f97)

[1.13.0] - 2024-03-23

Changed

  • Pipeline now supports empty PipelineComponents as labels within pipeline (631bc3d)

[1.12.1] - 2024-02-07

Fixed

  • update and fix errors in documentation (2318210)

[1.12.0] - 2024-02-05

Changed

  • added a list of previous StageRecords as kwarg for the call to a Fork's conditional (2f1cb77)
  • changed StageRecord into a proper dataclass (e7eae6d)

[1.11.0] - 2024-02-04

Changed

  • added common base class _PipelineComponent for Pipeline components Stage and Fork (f628159)

Added

  • added docs to package metadata (061e311)
  • names for PipelineComponents can now be declared in extension methods (append, prepend, ...) (8363284)
  • Pipeline now supports in-operator (usable with either component directly or its name/id) (5701073)
  • added requirements for Pipeline to be unpacked as mapping (b2db8fa)

Fixed

  • fixed issue where Fork-objects were internally not registered by their id (b267ca4)

[1.8.0] - 2024-02-03

Changed

  • refactored Fork and Stage to transform string/integer-references to Stages into StageRefs (7ba677b)

Added

  • added decorator-factory Pipeline.run_for_kwargs to generate kwargs for function calls (fe616b2)
  • added optional Stage-callable to export kwargs into Pipeline.run (8eca1bc)
  • added even more types of StageRefs: PreviousN, NextN (576820c)
  • added py.typed-marker to package (04a2e1d)
  • added more types of StageRefs: StageById, StageByIndex, StageByIncrement (92d57ad)

[1.4.0] - 2024-02-01

Changed

  • refactored internal modules (cf7045f)

Added

  • added StageRefs Next, Last, and Skip (14abaa7)
  • added optional finalizer-Callable to Pipeline (d95e5b6)
  • added support for Callable in Pipeline-argument exit_on_status (154c67b)

Fixed

  • PipelineOutput.last_X-methods now return None in case of empty records (b7a6ba1)

[1.0.0] - 2024-01-31

Changed

  • Breaking: refactor PipelineOutput and related types (1436ca1)
  • Breaking: replaced forwarding kwargs of Pipeline.run as dictionary in_ into Stage/Fork-Callables by forwarding directly (f2710fa, b569bb9)

Added

  • added missing information in module- and class-docstrings (7896742)

[0.1.0] - 2024-01-31

initial release

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

data-plumber-1.15.0.tar.gz (28.3 kB view hashes)

Uploaded Source

Built Distribution

data_plumber-1.15.0-py3-none-any.whl (23.4 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page