lightweight but versatile python-framework for multi-stage information processing
Project description
data-plumber
data-plumber is a lightweight but versatile python-framework for multi-stage information processing.
It allows to construct processing pipelines from both atomic building blocks and via recombination of existing pipelines.
Forks enable more complex (i.e. non-linear) orders of execution.
Pipelines can also be collected into arrays that can be executed at once with the same input data.
Contents
Usage example
Consider a scenario where the contents of a dictionary have to be validated and a suitable error message has to be generated. Specifically, a valid input-dictionary is expected to have a key "data" with the respective value being a list of integer numbers. A suitable pipeline might look like this
>>> from data_plumber import Stage, Pipeline, Previous
>>> pipeline = Pipeline(
... Stage( # validate "data" is passed into run
... primer=lambda **kwargs: "data" in kwargs,
... status=lambda primer, **kwargs: 0 if primer else 1,
... message=lambda primer, **kwargs: "" if primer else "missing argument"
... ),
... Stage( # validate "data" is list
... requires={Previous: 0},
... primer=lambda data, **kwargs: isinstance(data, list),
... status=lambda primer, **kwargs: 0 if primer else 1,
... message=lambda primer, **kwargs: "" if primer else "bad type"
... ),
... Stage( # validate "data" contains only int
... requires={Previous: 0},
... primer=lambda data, **kwargs: all(isinstance(i, int) for i in data),
... status=lambda primer, **kwargs: 0 if primer else 1,
... message=lambda primer, **kwargs: "validation success" if primer else "bad type in data"
... ),
... exit_on_status=1
... )
>>> pipeline.run().last_message
'missing argument'
>>> pipeline.run(data=1).last_message
'bad type'
>>> pipeline.run(data=[1, "2", 3]).last_message
'bad type in data'
>>> pipeline.run(data=[1, 2, 3]).last_message
'validation success'
See section "Examples" in Documentation for more explanation.
Install
Install using pip with
pip install data-plumber
Consider installing in a virtual environment.
Documentation
Overview
data_plumber is designed to provide a framework for flexible data-processing based on re-usable building blocks.
At its core stands the class Pipeline which can be understood as both a container for a collection of instructions (PipelineComponent) and an interface for the execution of a process (Pipeline.run(...)).
Previously defined Pipelines can be recombined with other Pipelines or extended by individual PipelineComponents.
Individual Pipeline.runs can be triggered with run-specific input data.
PipelineComponents are either units defining actual data-processing (Stage) or control the flow of a Pipeline execution (Fork).
Until a Fork is encountered, a Pipeline.run iterates a pre-configured list of PipelineComponents.
Any Stage-type component provides an integer status value after execution which is then available for defining conditional execution of Stages or changes in flow (Fork).
A Stage itself consists of multiple (generally optional) highly customizable sub-stages and properties that can be configured at instantiation.
In particular, a versatile referencing system based on (pre-defined) StageRef-type classes can be used to control the requirements for Stages.
Similarly, this system is also used by Forks.
The output of a Pipeline.run is of type PipelineOutput. It contains extensive information on the order of operations, the response from individual Stages, and a data-property (of customizable type).
The latter can be used to store and/or output processed data from the Pipeline's execution context.
Finally, aside from recombination of Pipelines into more complex Pipelines, multiple instances of Pipelines can be pooled into a Pipearray.
This construct allows to call different Pipelines with identical input data.
Pipeline
Building anonymous Pipelines
A Pipeline can be created in an empty, a partially, or a fully assembled state.
For the empty Pipeline a simple expression like
>>> from data_plumber import Pipeline, Stage
>>> Pipeline()
<data_plumber.pipeline.Pipeline object at ...>
suffices. Following up with statements like
>>> p = Pipeline()
>>> p.append(Stage())
>>> p.prepend(Pipeline())
>>> p.insert(Stage(), 1)
or simply by using the +-operator
>>> p = Pipeline()
>>> Stage() + p + Pipeline()
<data_plumber.pipeline.Pipeline object at ...>
Note that when adding to existing Pipelines, the change is made in-place.
>>> p = Pipeline(Stage())
>>> len(p)
1
>>> p + Stage()
<data_plumber.pipeline.Pipeline object at ...>
>>> len(p)
2
Consequently, only properties of the first argument are inherited (refer to python's operator precedence).
Therefore, the use of this operation in combination with Pipelines requires caution.
Building named Pipelines
Instead of giving the individual PipelineComponents as positional arguments during instantiation, they can be assigned names by providing components as keyword arguments (kwargs).
In addition to the kwargs, the positional arguments are still required to determine the order of operations for the Pipeline.
These are then given by the PipelineComponent's name:
>>> Pipeline(
... "a", "b", "a", "c",
... a=Stage(...,),
... b=Stage(...,),
... c=Stage(...,)
... )
<data_plumber.pipeline.Pipeline object at ...>
In the example above, the Pipeline executes the Stages in the order of a > b > a > c (note that the names of Stages can occur multiple times in the positional arguments or via Pipeline-extending methods).
Methods like Pipeline.append also accept string identifiers for PipelineComponents.
If none are provided at instantiation, an internally generated identifier is used.
The two approaches of anonymous and named Pipelines can be combined freely:
>>> Pipeline(
... "a", Stage(...,), "a", "c",
... a=Stage(...,),
... c=Stage(...,)
... )
<data_plumber.pipeline.Pipeline object at ...>
Also note that empty PipelineComponent can be used as well.
This can be helpful to label certain points in a Pipeline without changing its behavior.
Consider for example a Fork needs to point to a specific part of a Pipeline but otherwise no named PipelineComponent are required:
>>> Pipeline(
... Stage(...,), # stage 1
... Fork(
... lambda **kwargs: "target_position"
... ),
... Stage(...,), # stage 2
... "target_position",
... Stage(...,) # stage 3,
... ...
... )
<data_plumber.pipeline.Pipeline object at ...>
which when triggered executes the Stages as 'stage 1' > 'stage 3' (the associated PipelineOutput does not contain any record of the empty component "target_position").
Unpacking Pipelines
Pipelines support unpacking to be used as, for example, positional or keyword arguments in the constructor of another Pipeline:
>>> p = Pipeline("a", ..., a=Stage(), ...)
>>> Pipeline("b", *p, ..., b=Stage(), **p, ...)
<data_plumber.pipeline.Pipeline object at ...>
Running a Pipeline
A Pipeline can be triggered by calling the run-method.
>>> Pipeline(...).run(...)
PipelineOutput(...)
Passing the finalize_output keyword argument into a run allows to modify the persistent Pipeline-data at exit (see section Pipeline settings for details).
Any kwargs passed to this function are forwarded to its PipelineComponent's Callables.
Note that some keywords are reserved (out, primer, status, count, and records).
While Forks are simply evaluated and their returned StageRef is used to find the next target for execution, Stages have themselves multiple sub-stages.
First, the Pipeline checks the Stage's requirements, then executions its primer before running the action-command.
Next, any exported kwargs are updated in the Pipeline.run and, finally, the status and response message is generated (see Stage for details).
Pipeline settings
A Pipeline can be configured with multiple properties at instantiation:
- initialize_output: a
Callablethat returns an object which is consequently passed forward into thePipelineComponent'sCallables; this object is refered to as "persistent data-object" (default generates an empty dictionary) - finalize_output: a
Callablethat is called before (normally) exiting thePipeline.runwith therun's kwargs as well as the persistent data-object and a list of previousStageRecordscalledrecords(can be overridden in call ofPipeline.run) - exit_on_status: either integer value (
Pipelineexists normally if any component returns this status) or aCallablethat is called after any component with the component's status (if it evaluates toTrue, thePipeline.runis stopped) - loop: boolean; if
False, thePipelinestops automatically after iterating beyond the lastPipelineComponentin its list of operations; ifTrue, the execution loops back into the first component
Running a Pipeline as decorator
A Pipeline can be used to generate kwargs for a function (i.e., based on the content of the persistent data-object).
This requires the data-object being unpackable like a mapping (e.g. a dictionary).
>>> @Pipeline(...).run_for_kwargs(...)
... def function(arg1, arg2): ...
Arguments that are passed to a call of the decorated function take priority over those generated by the Pipeline.run.
Stage
A Stage represents a single building block in the processing logic of a Pipeline.
It provides a set of Callables that are used in a Pipeline.run.
Their order of execution is given by primer > action > export > status > message.
The arguments that are passed into those Callables vary by Callable and execution context.
Below a list of all keywords that can occur is given (most Callables receive only a subset of these):
- all kwargs given to
Pipeline.runor exported during execution are forwarded (note that this makes the following arguments reserved words in this context) - out: a persistent data-object that is passed through the entire
Pipeline; its initial value is generated by thePipeline'sinitialize_output - primer: output of
Stage.primer - status: output of
Stage.status - count: index of
Stagein execution ofPipeline
Stage properties
Stages accept a number of different (optional) arguments that are mostly Callables to be used by a Pipeline during execution.
-
requires: requirements for
Stage-execution being eitherNone(always run thisStage) or a dictionary with pairs of references to aStageand the required status (uses most recent evaluation);key types are either
StageRef,str(identifier of aStagein the context of aPipeline), orint(relative index inPipelinestage arrangement);values are either an integer or a
Callabletaking the status as an argument and returning abool(if it evaluates toTrue, theStage-requirement is met)>>> from data_plumber import Pipeline, Stage, Previous >>> Pipeline( ... Stage( ... message=lambda **kwargs: "first stage", ... status=lambda **kwargs: 1 ... ), ... Stage( ... requires={Previous: 0}, ... message=lambda **kwargs: "second stage" ... ), ... ).run().last_message 'first stage' -
primer:
Callablefor pre-processing data(kwargs:
out,count)>>> Pipeline( ... Stage( ... primer=lambda **kwargs: "primer value", ... message=lambda primer, **kwargs: primer ... ), ... ).run().last_message 'primer value' -
action:
Callablefor main-step of processing(kwargs:
out,primer,count)>>> Pipeline( ... Stage( ... action=lambda out, **kwargs: out.update({"new_data": 0}) ... ), ... ).run().data {'new_data': 0} -
export:
Callablethat returns a dictionary of additional kwargs to be exported to the parentPipeline; in the followingStages, these kwargs are then available as if they were provided with thePipeline.run-command(kwargs:
out,primer,count)>>> Pipeline( ... Stage( ... export=lambda **kwargs: {"new_arg": 0} ... ), ... Stage( ... message=lambda **kwargs: ... "export successful" if "new_arg" in kwargs ... else "missing new_arg" ... ), ... ).run().last_message 'export successful' -
status:
Callablefor generation of aStage's integer exit status(kwargs:
out,primer,count) -
message:
Callablefor generation of aStage's exit message(kwargs:
out,primer,count,status)
Fork
A Fork represents a conditional in the execution of a Pipeline.
It can be used to redirect the next Pipeline-target to a specific, absolutely or relatively positioned PipelineComponent.
Analogous to the Stage, a Fork's eval-method is called with a numer of keyword arguments:
- all kwargs given to
Pipeline.runare forwarded (note that this makes the following arguments reserved words in this context) - out: a persistent data-object that is passed through the entire
Pipeline; its initial value is generated by thePipeline'sinitialize_output - count: index of
Stagein execution ofPipeline - records: a list of previously generated
StageRecords (seePipelineOutputfor more information)
Fork properties
A Fork takes a single Callable as argument.
Based on the properties described above, a reference to a target Stage is returned.
This reference can be made as one of several ways:
- integer; relative index in the
Pipeline's list of components - string; a
PipelineComponent's string identifier in the context of aPipeline.run StageRef; a more abstract form of reference, e.g.First,Next(seeStageReffor details)None; signal to (normally) exitPipeline.run
Example
>>> from data_plumber import Pipeline, Stage, Fork, Next
>>> p = Pipeline(
... Stage(
... message=lambda **kwargs: "stage 1 executed"
... ),
... Fork(
... lambda **kwargs: Next if "arg" in kwargs else None
... ),
... Stage(
... message=lambda **kwargs: "stage 2 executed"
... ),
... )
>>> p.run(arg=0).last_message
'stage 2 executed'
>>> p.run().last_message
'stage 1 executed'
StageRef
StageRefs can be utilized to formulate requirements for Stages as well as flow control with Forks.
While additional types of StageRefs can be defined, data_plumber already provides rich possibilities natively.
There are two different categories of StageRefs:
- referring to records of previously executed
PipelineComponents(a record can uniquely identify a component within thePipeline's sequence of components) - referring to a component within the list of registered components of a
Pipelinedirectly
List of predefined StageRefs (by record)
- First: record of first component
- Previous: record of previous component (one step)
- PreviousN(n): record of previous component (
nsteps)
List of predefined StageRefs (by sequence)
- Last: last component in sequence
- Next: next component in sequence (one step)
- Skip: component after next in sequence (two steps)
- NextN(n): next component (
nsteps) - StageById(id): first occurrence of
idin sequence - StageByIndex(index): component at
indexof sequence - StageByIncrement(n): component with relative position
nin sequence
Example
>>> from data_plumber import Pipeline, Stage, Fork, Previous, NextN
>>> output = Pipeline(
... Stage(
... status=lambda **kwargs: 0
... ),
... Stage(
... requires={Previous: 0},
... status=lambda count, **kwargs: count
... ),
... Fork(
... lambda count, **kwargs: NextN(1)
... ),
... exit_on_status=lambda status: status > 3,
... loop=True
... ).run()
>>> len(output.records)
6
PipelineOutput
List of properties
The output of a Pipeline.run is an object of type PipelineOutput.
This object has the following properties:
-
records: a list of
StageRecords corresponding to allStages executed by thePipeline;StageRecords themselves are a collection of propertiesindex: index position inPipeline's sequence ofPipelineComponentsid_: name/id ofStagemessage: the message returned by theStagestatus: the message returned by theStage
(for legacy support (<=1.11.) this property can also be indexed, where
messageandstatusare returned for indices 0 and 1, respectively) -
kwargs: a dictionary with the keyword arguments used in the
Pipeline.run -
data: the persistent data-object that has been processed through the
Pipeline
For convenience, the last StageRecord generated in the Pipeline can be investigated using the shortcuts
- last_record:
StageRecordof last component that generated an output - last_status: status-part of the
last_record - last_message: message-part of the
last_record
Pipearray
A Pipearray is a convenience class that offers to run multiple Pipelines based on the same input data.
Just like the Pipelines themselves, the Pipearray can be either anonymous or named, depending on the use of positional and keyword arguments during setup.
The return type can then be either a list (only positional arguments) or a dictionary with keys being names/ids (at least one named Pipeline). Both contain the PipelineOutput objects of the individual Pipelines.
Example
>>> from data_plumber import Pipeline, Pipearray
>>> Pipearray(Pipeline(...), Pipeline(...)).run(...)
<list[PipelineOutput]>
>>> Pipearray(
... p=Pipeline(...),
... q=Pipeline(...)
... ).run(...)
<dict[str, PipelineOutput]>
Examples
Input Processing Revisited
Reconsider the introductory example of validating dictionary content.
The described scenario where data is validated against some schema with increasing attention to detail with every Stage and generating helpful response messages is common in the context of, for example, user input.
The following section goes into more detail regarding the reasoning for the example-Pipeline's setup.
A Pipeline generally iterates the provided sequence of Stages if there is no flow control included (like in this example).
Within a Stage there is again an order of operations for the Callables as defined in the constructor (primer > action > export > status > message).
Starting with the first Stage of the example,
...
... Stage( # validate "data" is passed into run
... primer=lambda **kwargs: "data" in kwargs,
... status=lambda primer, **kwargs: 0 if primer else 1,
... message=lambda primer, **kwargs: "" if primer else "missing argument"
... ),
...
the primer is executed first.
This function validates that an argument with keyword data has been passed in to the Pipeline.run, i.e. Pipeline.run(data=...).
Following this evaluation, the result can be used in the next two Callables, status and message to generate an output from this Stage with appropriate properties for the given situation.
In the next Stage,
...
... Stage( # validate "data" is list
... requires={Previous: 0},
... primer=lambda data, **kwargs: isinstance(data, list),
... status=lambda primer, **kwargs: 0 if primer else 1,
... message=lambda primer, **kwargs: "" if primer else "bad type"
... ),
...
the general structure is similar.
A primer is executed to validate a property of the data-keyword argument. This keyword can now be written explicitly into the lambda's signature (instead of covering this argument implicitly with **kwargs like before) since this call to primer precedes a requirement-check.
The requirement given here declares only a single requirement, i.e. the previous Stage (denoted by the StageRef named Previous) returned with a status of 0.
Consequently, this second Stage is only executed if the primer of the first Stage returned a value of True.
The third Stage is conceptually the same as the second one but intended to handle another aspect of the validation.
The exit from the Pipeline.run can occur either after completing the third Stage or if a Stage returns a specific status as defined by the kwarg of the Pipeline itself:
>>> pipeline = Pipeline(
... ...
... exit_on_status=1
... )
This is the reason for returning a status of 1 in all Stages if any stage of the validation fails.
In the following, two additional features of data-plumber are demonstrated by extending the original example.
Input Processing - Formatting Output Data
A Pipeline can also handle reformatting of input data into a desired format.
Suppose that, following validation, only the sum of all integers given in the list data is of interest.
In order make this information readily available right after execution within the PipelineOutput-object that is returned by a Pipeline.run, one may add a suitable action callable in the last Stage of the previous setup:
...
... Stage( # validate "data" contains only int
... ...
... action=lambda data, primer, **kwargs:
... data.update{"sum": sum(data)} if primer else None
... ...
... ),
...
With this change, after full validation, a key sum is added to the dictionary in PipelineOutput.data, i.e.
>>> output = pipeline.run(data=[1, 2, 3])
>>> output.data["sum"]
6
Input Processing - Optional Values
Consider that the data that is fed into the Pipeline.run-command is an optional property.
Consequently, it is fine if it is missing but, if it exists, it should satisfy a specific format.
This scenario can, for example, be realized using a Fork.
A Fork takes a single Callable on construction which returns a reference to another Stage as next target in the current Pipeline.run or give an exit-signal (by returning None).
Here, the latter is more interesting.
We can change the initial portion of the Pipeline setup to accomodate for the changes in validation logic:
>>> from data_plumber import Next
>>> pipeline = Pipeline(
... Fork(
... lambda **kwargs: Next if "data" in kwargs else None
... )
... Stage( # validate "data" is list
... ...
... ),
... ...
... exit_on_status=1
... )
Now, the validation of the first Stage in the initial example is replaced with a Fork.
Next is another pre-defined StageRef pointing at the next PipelineComponent in the Pipeline's sequence.
Note that for this setup, if data is missing, the returned PipelineOutput does not contain any StageRecords as Fork does not output any status.
In order to have both, the Fork-logic and a StageRecord, the Fork has to be the second component.
Using the stages-kwarg for the Fork's Callable enables to use the status from a previous Stage and circumvent a duplicate validation.
Changelog
[1.15.0] - 2024-04-16
Changed
- changed
finalize_output-callable argument signature, it now also takesrecords(16623d9)
[1.14.0] - 2024-03-30
Added
- added
finalize_output-override option toPipeline.run(e951f97)
[1.13.0] - 2024-03-23
Changed
Pipelinenow supports emptyPipelineComponents as labels within pipeline (631bc3d)
[1.12.1] - 2024-02-07
Fixed
- update and fix errors in documentation (
2318210)
[1.12.0] - 2024-02-05
Changed
- added a list of previous
StageRecords as kwarg for the call to aFork's conditional (2f1cb77) - changed
StageRecordinto a proper dataclass (e7eae6d)
[1.11.0] - 2024-02-04
Changed
- added common base class
_PipelineComponentforPipelinecomponentsStageandFork(f628159)
Added
- added docs to package metadata (
061e311) - names for
PipelineComponentscan now be declared in extension methods (append,prepend, ...) (8363284) Pipelinenow supportsin-operator (usable with either component directly or its name/id) (5701073)- added requirements for
Pipelineto be unpacked as mapping (b2db8fa)
Fixed
- fixed issue where
Fork-objects were internally not registered by their id (b267ca4)
[1.8.0] - 2024-02-03
Changed
- refactored
ForkandStageto transform string/integer-references toStages intoStageRefs (7ba677b)
Added
- added decorator-factory
Pipeline.run_for_kwargsto generate kwargs for function calls (fe616b2) - added optional
Stage-callable to export kwargs intoPipeline.run(8eca1bc) - added even more types of
StageRefs:PreviousN,NextN(576820c) - added
py.typed-marker to package (04a2e1d) - added more types of
StageRefs:StageById,StageByIndex,StageByIncrement(92d57ad)
[1.4.0] - 2024-02-01
Changed
- refactored internal modules (
cf7045f)
Added
- added
StageRefsNext,Last, andSkip(14abaa7) - added optional finalizer-
CallabletoPipeline(d95e5b6) - added support for
CallableinPipeline-argumentexit_on_status(154c67b)
Fixed
PipelineOutput.last_X-methods now returnNonein case of empty records (b7a6ba1)
[1.0.0] - 2024-01-31
Changed
- Breaking: refactor
PipelineOutputand related types (1436ca1) - Breaking: replaced forwarding kwargs of
Pipeline.runas dictionaryin_intoStage/Fork-Callables by forwarding directly (f2710fa,b569bb9)
Added
- added missing information in module- and class-docstrings (
7896742)
[0.1.0] - 2024-01-31
initial release
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file data-plumber-1.15.0.tar.gz.
File metadata
- Download URL: data-plumber-1.15.0.tar.gz
- Upload date:
- Size: 28.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
31f4d81fa840e837f73b336f42b933c2e58cde44bf080d70558cede8a8bdea5b
|
|
| MD5 |
f3c54b1ac471717771783b9b94a5879c
|
|
| BLAKE2b-256 |
0a10af3b51cb1df3a380cbd2dbc1468bde7929fd28057540078523131c13684e
|
File details
Details for the file data_plumber-1.15.0-py3-none-any.whl.
File metadata
- Download URL: data_plumber-1.15.0-py3-none-any.whl
- Upload date:
- Size: 23.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dd703547f5279ad221466722200ac5bebc4a1cce625bc9298136acfcc3b92c73
|
|
| MD5 |
de85c614c36b52518d685772f342ece2
|
|
| BLAKE2b-256 |
414a7f4094621c0d2d95766e7026e09fd1b13d1d3edfe5307fea6f4c8ed24ab8
|