lightweight but versatile python-framework for multi-stage information processing
Project description
data-plumber
data-plumber
is a lightweight but versatile python-framework for multi-stage information processing.
It allows to construct processing pipelines from both atomic building blocks and via recombination of existing pipelines.
Forks enable more complex (i.e. non-linear) orders of execution.
Pipelines can also be collected into arrays that can be executed at once with the same input data.
Contents
Usage example
Consider a scenario where the contents of a dictionary have to be validated and a suitable error message has to be generated. Specifically, a valid input-dictionary is expected to have a key "data" with the respective value being a list of integer numbers. A suitable pipeline might look like this
>>> from data_plumber import Stage, Pipeline, Previous
>>> pipeline = Pipeline(
... Stage( # validate "data" is passed into run
... primer=lambda **kwargs: "data" in kwargs,
... status=lambda primer, **kwargs: 0 if primer else 1,
... message=lambda primer, **kwargs: "" if primer else "missing argument"
... ),
... Stage( # validate "data" is list
... requires={Previous: 0},
... primer=lambda data, **kwargs: isinstance(data, list),
... status=lambda primer, **kwargs: 0 if primer else 1,
... message=lambda primer, **kwargs: "" if primer else "bad type"
... ),
... Stage( # validate "data" contains only int
... requires={Previous: 0},
... primer=lambda data, **kwargs: all(isinstance(i, int) for i in data),
... status=lambda primer, **kwargs: 0 if primer else 1,
... message=lambda primer, **kwargs: "validation success" if primer else "bad type in data"
... ),
... exit_on_status=1
... )
>>> pipeline.run().last_message
'missing argument'
>>> pipeline.run(data=1).last_message
'bad type'
>>> pipeline.run(data=[1, "2", 3]).last_message
'bad type in data'
>>> pipeline.run(data=[1, 2, 3]).last_message
'validation success'
See section "Examples" in Documentation for more explanation.
Install
Install using pip
with
pip install data-plumber
Consider installing in a virtual environment.
Documentation
Overview
data_plumber
is designed to provide a framework for flexible data-processing based on re-usable building blocks.
At its core stands the class Pipeline
which can be understood as both a container for a collection of instructions (PipelineComponent
) and an interface for the execution of a process (Pipeline.run(...)
).
Previously defined Pipeline
s can be recombined with other Pipelines
or extended by individual PipelineComponents
.
Individual Pipeline.run
s can be triggered with run-specific input data.
PipelineComponents
are either units defining actual data-processing (Stage
) or control the flow of a Pipeline
execution (Fork
).
Until a Fork
is encountered, a Pipeline.run
iterates a pre-configured list of PipelineComponent
s.
Any Stage
-type component provides an integer status value after execution which is then available for defining conditional execution of Stage
s or changes in flow (Fork
).
A Stage
itself consists of multiple (generally optional) highly customizable sub-stages and properties that can be configured at instantiation.
In particular, a versatile referencing system based on (pre-defined) StageRef
-type classes can be used to control the requirements for Stage
s.
Similarly, this system is also used by Fork
s.
The output of a Pipeline.run
is of type PipelineOutput
. It contains extensive information on the order of operations, the response from individual Stage
s, and a data
-property (of customizable type).
The latter can be used to store and/or output processed data from the Pipeline
's execution context.
Finally, aside from recombination of Pipeline
s into more complex Pipeline
s, multiple instances of Pipeline
s can be pooled into a Pipearray
.
This construct allows to call different Pipeline
s with identical input data.
Pipeline
Building anonymous Pipelines
A Pipeline
can be created in an empty, a partially, or a fully assembled state.
For the empty Pipeline
a simple expression like
>>> from data_plumber import Pipeline, Stage
>>> Pipeline()
<data_plumber.pipeline.Pipeline object at ...>
suffices. Following up with statements like
>>> p = Pipeline()
>>> p.append(Stage())
>>> p.prepend(Pipeline())
>>> p.insert(Stage(), 1)
or simply by using the +
-operator
>>> p = Pipeline()
>>> Stage() + p + Pipeline()
<data_plumber.pipeline.Pipeline object at ...>
Note that when adding to existing Pipeline
s, the change is made in-place.
>>> p = Pipeline(Stage())
>>> len(p)
1
>>> p + Stage()
<data_plumber.pipeline.Pipeline object at ...>
>>> len(p)
2
Consequently, only properties of the first argument are inherited (refer to python's operator precedence).
Therefore, the use of this operation in combination with Pipeline
s requires caution.
Building named Pipelines
Instead of giving the individual PipelineComponents
as positional arguments during instantiation, they can be assigned names by providing components as keyword arguments (kwargs).
In addition to the kwargs, the positional arguments are still required to determine the order of operations for the Pipeline
.
These are then given by the PipelineComponent
's name:
>>> Pipeline(
... "a", "b", "a", "c",
... a=Stage(...,),
... b=Stage(...,),
... c=Stage(...,)
... )
<data_plumber.pipeline.Pipeline object at ...>
In the example above, the Pipeline
executes the Stage
s in the order of a > b > a > c
(note that the names of Stage
s can occur multiple times in the positional arguments or via Pipeline
-extending methods).
Methods like Pipeline.append
also accept string identifiers for PipelineComponents
.
If none are provided at instantiation, an internally generated identifier is used.
The two approaches of anonymous and named Pipeline
s can be combined freely:
>>> Pipeline(
... "a", Stage(...,), "a", "c",
... a=Stage(...,),
... c=Stage(...,)
... )
<data_plumber.pipeline.Pipeline object at ...>
Also note that empty PipelineComponent
can be used as well.
This can be helpful to label certain points in a Pipeline
without changing its behavior.
Consider for example a Fork
needs to point to a specific part of a Pipeline
but otherwise no named PipelineComponent
are required:
>>> Pipeline(
... Stage(...,), # stage 1
... Fork(
... lambda **kwargs: "target_position"
... ),
... Stage(...,), # stage 2
... "target_position",
... Stage(...,) # stage 3,
... ...
... )
<data_plumber.pipeline.Pipeline object at ...>
which when triggered executes the Stage
s as 'stage 1' > 'stage 3' (the associated PipelineOutput
does not contain any record of the empty component "target_position"
).
Unpacking Pipelines
Pipeline
s support unpacking to be used as, for example, positional or keyword arguments in the constructor of another Pipeline
:
>>> p = Pipeline("a", ..., a=Stage(), ...)
>>> Pipeline("b", *p, ..., b=Stage(), **p, ...)
<data_plumber.pipeline.Pipeline object at ...>
Running a Pipeline
A Pipeline
can be triggered by calling the run
-method.
>>> Pipeline(...).run(...)
PipelineOutput(...)
Passing the finalize_output
keyword argument into a run
allows to modify the persistent Pipeline
-data at exit (see section Pipeline settings for details).
Any kwargs passed to this function are forwarded to its PipelineComponent
's Callable
s.
Note that some keywords are reserved (out
, primer
, status
, count
, and records
).
While Fork
s are simply evaluated and their returned StageRef
is used to find the next target for execution, Stage
s have themselves multiple sub-stages.
First, the Pipeline
checks the Stage
's requirements, then executions its primer
before running the action
-command.
Next, any export
ed kwargs are updated in the Pipeline.run
and, finally, the status
and response
message is generated (see Stage
for details).
Pipeline settings
A Pipeline
can be configured with multiple properties at instantiation:
- initialize_output: a
Callable
that returns an object which is consequently passed forward into thePipelineComponent
'sCallable
s; this object is refered to as "persistent data-object" (default generates an empty dictionary) - finalize_output: a
Callable
that is called before (normally) exiting thePipeline.run
with therun
's kwargs as well as the persistent data-object (can be overridden in call ofPipeline.run
) - exit_on_status: either integer value (
Pipeline
exists normally if any component returns this status) or aCallable
that is called after any component with the component's status (if it evaluates toTrue
, thePipeline.run
is stopped) - loop: boolean; if
False
, thePipeline
stops automatically after iterating beyond the lastPipelineComponent
in its list of operations; ifTrue
, the execution loops back into the first component
Running a Pipeline as decorator
A Pipeline
can be used to generate kwargs for a function (i.e., based on the content of the persistent data-object).
This requires the data-object being unpackable like a mapping (e.g. a dictionary).
>>> @Pipeline(...).run_for_kwargs(...)
... def function(arg1, arg2): ...
Arguments that are passed to a call of the decorated function take priority over those generated by the Pipeline.run
.
Stage
A Stage
represents a single building block in the processing logic of a Pipeline
.
It provides a set of Callable
s that are used in a Pipeline.run
.
Their order of execution is given by primer
> action
> export
> status
> message
.
The arguments that are passed into those Callable
s vary by Callable
and execution context.
Below a list of all keywords that can occur is given (most Callable
s receive only a subset of these):
- all kwargs given to
Pipeline.run
or exported during execution are forwarded (note that this makes the following arguments reserved words in this context) - out: a persistent data-object that is passed through the entire
Pipeline
; its initial value is generated by thePipeline
'sinitialize_output
- primer: output of
Stage.primer
- status: output of
Stage.status
- count: index of
Stage
in execution ofPipeline
Stage properties
Stage
s accept a number of different (optional) arguments that are mostly Callable
s to be used by a Pipeline
during execution.
-
requires: requirements for
Stage
-execution being eitherNone
(always run thisStage
) or a dictionary with pairs of references to aStage
and the required status (uses most recent evaluation);key types are either
StageRef
,str
(identifier of aStage
in the context of aPipeline
), orint
(relative index inPipeline
stage arrangement);values are either an integer or a
Callable
taking the status as an argument and returning abool
(if it evaluates toTrue
, theStage
-requirement is met)>>> from data_plumber import Pipeline, Stage, Previous >>> Pipeline( ... Stage( ... message=lambda **kwargs: "first stage", ... status=lambda **kwargs: 1 ... ), ... Stage( ... requires={Previous: 0}, ... message=lambda **kwargs: "second stage" ... ), ... ).run().last_message 'first stage'
-
primer:
Callable
for pre-processing data(kwargs:
out
,count
)>>> Pipeline( ... Stage( ... primer=lambda **kwargs: "primer value", ... message=lambda primer, **kwargs: primer ... ), ... ).run().last_message 'primer value'
-
action:
Callable
for main-step of processing(kwargs:
out
,primer
,count
)>>> Pipeline( ... Stage( ... action=lambda out, **kwargs: out.update({"new_data": 0}) ... ), ... ).run().data {'new_data': 0}
-
export:
Callable
that returns a dictionary of additional kwargs to be exported to the parentPipeline
; in the followingStage
s, these kwargs are then available as if they were provided with thePipeline.run
-command(kwargs:
out
,primer
,count
)>>> Pipeline( ... Stage( ... export=lambda **kwargs: {"new_arg": 0} ... ), ... Stage( ... message=lambda **kwargs: ... "export successful" if "new_arg" in kwargs ... else "missing new_arg" ... ), ... ).run().last_message 'export successful'
-
status:
Callable
for generation of aStage
's integer exit status(kwargs:
out
,primer
,count
) -
message:
Callable
for generation of aStage
's exit message(kwargs:
out
,primer
,count
,status
)
Fork
A Fork
represents a conditional in the execution of a Pipeline
.
It can be used to redirect the next Pipeline
-target to a specific, absolutely or relatively positioned PipelineComponent
.
Analogous to the Stage
, a Fork
's eval
-method is called with a numer of keyword arguments:
- all kwargs given to
Pipeline.run
are forwarded (note that this makes the following arguments reserved words in this context) - out: a persistent data-object that is passed through the entire
Pipeline
; its initial value is generated by thePipeline
'sinitialize_output
- count: index of
Stage
in execution ofPipeline
- records: a list of previously generated
StageRecord
s (seePipelineOutput
for more information)
Fork properties
A Fork
takes a single Callable
as argument.
Based on the properties described above, a reference to a target Stage
is returned.
This reference can be made as one of several ways:
- integer; relative index in the
Pipeline
's list of components - string; a
PipelineComponent
's string identifier in the context of aPipeline.run
StageRef
; a more abstract form of reference, e.g.First
,Next
(seeStageRef
for details)None
; signal to (normally) exitPipeline.run
Example
>>> from data_plumber import Pipeline, Stage, Fork, Next
>>> p = Pipeline(
... Stage(
... message=lambda **kwargs: "stage 1 executed"
... ),
... Fork(
... lambda **kwargs: Next if "arg" in kwargs else None
... ),
... Stage(
... message=lambda **kwargs: "stage 2 executed"
... ),
... )
>>> p.run(arg=0).last_message
'stage 2 executed'
>>> p.run().last_message
'stage 1 executed'
StageRef
StageRef
s can be utilized to formulate requirements for Stage
s as well as flow control with Fork
s.
While additional types of StageRef
s can be defined, data_plumber
already provides rich possibilities natively.
There are two different categories of StageRef
s:
- referring to records of previously executed
PipelineComponents
(a record can uniquely identify a component within thePipeline
's sequence of components) - referring to a component within the list of registered components of a
Pipeline
directly
List of predefined StageRefs (by record)
- First: record of first component
- Previous: record of previous component (one step)
- PreviousN(n): record of previous component (
n
steps)
List of predefined StageRefs (by sequence)
- Last: last component in sequence
- Next: next component in sequence (one step)
- Skip: component after next in sequence (two steps)
- NextN(n): next component (
n
steps) - StageById(id): first occurrence of
id
in sequence - StageByIndex(index): component at
index
of sequence - StageByIncrement(n): component with relative position
n
in sequence
Example
>>> from data_plumber import Pipeline, Stage, Fork, Previous, NextN
>>> output = Pipeline(
... Stage(
... status=lambda **kwargs: 0
... ),
... Stage(
... requires={Previous: 0},
... status=lambda count, **kwargs: count
... ),
... Fork(
... lambda count, **kwargs: NextN(1)
... ),
... exit_on_status=lambda status: status > 3,
... loop=True
... ).run()
>>> len(output.records)
6
PipelineOutput
List of properties
The output of a Pipeline.run
is an object of type PipelineOutput
.
This object has the following properties:
-
records: a list of
StageRecord
s corresponding to allStage
s executed by thePipeline
;StageRecord
s themselves are a collection of propertiesindex
: index position inPipeline
's sequence ofPipelineComponents
id_
: name/id ofStage
message
: the message returned by theStage
status
: the message returned by theStage
(for legacy support (<=1.11.) this property can also be indexed, where
message
andstatus
are returned for indices 0 and 1, respectively) -
kwargs: a dictionary with the keyword arguments used in the
Pipeline.run
-
data: the persistent data-object that has been processed through the
Pipeline
For convenience, the last StageRecord
generated in the Pipeline
can be investigated using the shortcuts
- last_record:
StageRecord
of last component that generated an output - last_status: status-part of the
last_record
- last_message: message-part of the
last_record
Pipearray
A Pipearray
is a convenience class that offers to run multiple Pipeline
s based on the same input data.
Just like the Pipeline
s themselves, the Pipearray
can be either anonymous or named, depending on the use of positional and keyword arguments during setup.
The return type can then be either a list (only positional arguments) or a dictionary with keys being names/ids (at least one named Pipeline
). Both contain the PipelineOutput
objects of the individual Pipeline
s.
Example
>>> from data_plumber import Pipeline, Pipearray
>>> Pipearray(Pipeline(...), Pipeline(...)).run(...)
<list[PipelineOutput]>
>>> Pipearray(
... p=Pipeline(...),
... q=Pipeline(...)
... ).run(...)
<dict[str, PipelineOutput]>
Examples
Input Processing Revisited
Reconsider the introductory example of validating dictionary content.
The described scenario where data is validated against some schema with increasing attention to detail with every Stage
and generating helpful response messages is common in the context of, for example, user input.
The following section goes into more detail regarding the reasoning for the example-Pipeline
's setup.
A Pipeline
generally iterates the provided sequence of Stage
s if there is no flow control included (like in this example).
Within a Stage
there is again an order of operations for the Callable
s as defined in the constructor (primer
> action
> export
> status
> message
).
Starting with the first Stage
of the example,
...
... Stage( # validate "data" is passed into run
... primer=lambda **kwargs: "data" in kwargs,
... status=lambda primer, **kwargs: 0 if primer else 1,
... message=lambda primer, **kwargs: "" if primer else "missing argument"
... ),
...
the primer
is executed first.
This function validates that an argument with keyword data
has been passed in to the Pipeline.run
, i.e. Pipeline.run(data=...)
.
Following this evaluation, the result can be used in the next two Callable
s, status
and message
to generate an output from this Stage
with appropriate properties for the given situation.
In the next Stage
,
...
... Stage( # validate "data" is list
... requires={Previous: 0},
... primer=lambda data, **kwargs: isinstance(data, list),
... status=lambda primer, **kwargs: 0 if primer else 1,
... message=lambda primer, **kwargs: "" if primer else "bad type"
... ),
...
the general structure is similar.
A primer
is executed to validate a property of the data
-keyword argument. This keyword can now be written explicitly into the lambda
's signature (instead of covering this argument implicitly with **kwargs
like before) since this call to primer
precedes a requirement-check.
The requirement given here declares only a single requirement, i.e. the previous Stage
(denoted by the StageRef
named Previous
) returned with a status of 0.
Consequently, this second Stage
is only executed if the primer
of the first Stage
returned a value of True
.
The third Stage
is conceptually the same as the second one but intended to handle another aspect of the validation.
The exit from the Pipeline.run
can occur either after completing the third Stage
or if a Stage
returns a specific status as defined by the kwarg of the Pipeline itself:
>>> pipeline = Pipeline(
... ...
... exit_on_status=1
... )
This is the reason for returning a status of 1 in all Stage
s if any stage of the validation fails.
In the following, two additional features of data-plumber
are demonstrated by extending the original example.
Input Processing - Formatting Output Data
A Pipeline
can also handle reformatting of input data into a desired format.
Suppose that, following validation, only the sum of all integers given in the list data
is of interest.
In order make this information readily available right after execution within the PipelineOutput
-object that is returned by a Pipeline.run
, one may add a suitable action
callable in the last Stage
of the previous setup:
...
... Stage( # validate "data" contains only int
... ...
... action=lambda data, primer, **kwargs:
... data.update{"sum": sum(data)} if primer else None
... ...
... ),
...
With this change, after full validation, a key sum
is added to the dictionary in PipelineOutput.data
, i.e.
>>> output = pipeline.run(data=[1, 2, 3])
>>> output.data["sum"]
6
Input Processing - Optional Values
Consider that the data that is fed into the Pipeline.run
-command is an optional property.
Consequently, it is fine if it is missing but, if it exists, it should satisfy a specific format.
This scenario can, for example, be realized using a Fork
.
A Fork
takes a single Callable
on construction which returns a reference to another Stage
as next target in the current Pipeline.run
or give an exit-signal (by returning None
).
Here, the latter is more interesting.
We can change the initial portion of the Pipeline
setup to accomodate for the changes in validation logic:
>>> from data_plumber import Next
>>> pipeline = Pipeline(
... Fork(
... lambda **kwargs: Next if "data" in kwargs else None
... )
... Stage( # validate "data" is list
... ...
... ),
... ...
... exit_on_status=1
... )
Now, the validation of the first Stage
in the initial example is replaced with a Fork
.
Next
is another pre-defined StageRef
pointing at the next PipelineComponent
in the Pipeline
's sequence.
Note that for this setup, if data
is missing, the returned PipelineOutput
does not contain any StageRecord
s as Fork
does not output any status.
In order to have both, the Fork
-logic and a StageRecord
, the Fork
has to be the second component.
Using the stages
-kwarg for the Fork
's Callable
enables to use the status from a previous Stage
and circumvent a duplicate validation.
Changelog
[1.14.0] - 2024-03-30
Added
- added
finalize_output
-override option toPipeline.run
(e951f97
)
[1.13.0] - 2024-03-23
Changed
Pipeline
now supports emptyPipelineComponent
s as labels within pipeline (631bc3d
)
[1.12.1] - 2024-02-07
Fixed
- update and fix errors in documentation (
2318210
)
[1.12.0] - 2024-02-05
Changed
- added a list of previous
StageRecord
s as kwarg for the call to aFork
's conditional (2f1cb77
) - changed
StageRecord
into a proper dataclass (e7eae6d
)
[1.11.0] - 2024-02-04
Changed
- added common base class
_PipelineComponent
forPipeline
componentsStage
andFork
(f628159
)
Added
- added docs to package metadata (
061e311
) - names for
PipelineComponents
can now be declared in extension methods (append
,prepend
, ...) (8363284
) Pipeline
now supportsin
-operator (usable with either component directly or its name/id) (5701073
)- added requirements for
Pipeline
to be unpacked as mapping (b2db8fa
)
Fixed
- fixed issue where
Fork
-objects were internally not registered by their id (b267ca4
)
[1.8.0] - 2024-02-03
Changed
- refactored
Fork
andStage
to transform string/integer-references toStage
s intoStageRef
s (7ba677b
)
Added
- added decorator-factory
Pipeline.run_for_kwargs
to generate kwargs for function calls (fe616b2
) - added optional
Stage
-callable to export kwargs intoPipeline.run
(8eca1bc
) - added even more types of
StageRef
s:PreviousN
,NextN
(576820c
) - added
py.typed
-marker to package (04a2e1d
) - added more types of
StageRef
s:StageById
,StageByIndex
,StageByIncrement
(92d57ad
)
[1.4.0] - 2024-02-01
Changed
- refactored internal modules (
cf7045f
)
Added
- added
StageRefs
Next
,Last
, andSkip
(14abaa7
) - added optional finalizer-
Callable
toPipeline
(d95e5b6
) - added support for
Callable
inPipeline
-argumentexit_on_status
(154c67b
)
Fixed
PipelineOutput.last_X
-methods now returnNone
in case of empty records (b7a6ba1
)
[1.0.0] - 2024-01-31
Changed
- Breaking: refactor
PipelineOutput
and related types (1436ca1
) - Breaking: replaced forwarding kwargs of
Pipeline.run
as dictionaryin_
intoStage
/Fork
-Callable
s by forwarding directly (f2710fa
,b569bb9
)
Added
- added missing information in module- and class-docstrings (
7896742
)
[0.1.0] - 2024-01-31
initial release
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file data-plumber-1.14.0.tar.gz
.
File metadata
- Download URL: data-plumber-1.14.0.tar.gz
- Upload date:
- Size: 28.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.10.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3dc947fcdc375a10a98b6f97b7e853a280f3e6de1afb051a4603e7653232022d |
|
MD5 | 5b42f3d88f4a823852c14750a78437ac |
|
BLAKE2b-256 | fa298270e3b6d56d984d5b9c7646eb5bee3e4dba6cc12c5b72220280ec6725ff |
File details
Details for the file data_plumber-1.14.0-py3-none-any.whl
.
File metadata
- Download URL: data_plumber-1.14.0-py3-none-any.whl
- Upload date:
- Size: 23.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.10.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2d8df4d700d07f4466411b6f19b255764b39d528b97c6a191f2b5e6a042dc345 |
|
MD5 | bfd82264e5d3de64cb336e34af5b8dd5 |
|
BLAKE2b-256 | b1f4571dcfa051f48b2ad2e51c1f4590b89d14158dbde2cde5eec777df3c7d75 |