lightweight but versatile python-framework for multi-stage information processing
Project description
data-plumber
data-plumber
is a lightweight but versatile python-framework for multi-stage
information processing. It allows to construct processing pipelines from both
atomic building blocks and via recombination of existing pipelines. Forks
enable more complex (i.e. non-linear) orders of execution. Pipelines can also
be collected into arrays that can be executed at once with the same input
data.
Contents
Usage example
Consider a scenario where the contents of a dictionary have to be validated and a suitable error message has to be generated. Specifically, a valid input- dictionary is expected to have a key "data" with the respective value being a list of integer numbers. A suitable pipeline might look like this
>>> from data_plumber import Stage, Pipeline, Previous
>>> pipeline = Pipeline(
Stage(
primer=lambda **kwargs: "data" in kwargs,
status=lambda primer, **kwargs: 0 if primer else 1,
message=lambda primer, **kwargs: "" if primer else "missing argument"
),
Stage(
requires={Previous: 0},
primer=lambda data, **kwargs: isinstance(data, list),
status=lambda primer, **kwargs: 0 if primer else 1,
message=lambda primer, **kwargs: "" if primer else "bad type"
),
Stage(
requires={Previous: 0},
primer=lambda data, **kwargs: all(isinstance(i, int) for i in data),
status=lambda primer, **kwargs: 0 if primer else 1,
message=lambda primer, **kwargs: "validation success" if primer else "bad type in data"
),
exit_on_status=1
)
>>> pipeline.run().last_message
'missing argument'
>>> pipeline.run(data=1).last_message
'bad type'
>>> pipeline.run(data=[1, "2", 3]).last_message
'bad type in data'
>>> pipeline.run(data=[1, 2, 3]).last_message
'validation success'
Install
Install using pip
with
pip install data-plumber
Consider installing in a virtual environment.
Documentation
Overview
data_plumber
is designed to provide a framework for flexible data-processing based on re-usable building blocks.
At its core stands the class Pipeline
which can be understood as both a container for a collection of instructions (PipelineComponent
) and an interface for the execution of a process (Pipeline.run(...)
).
Previously defined Pipeline
s can be recombined with other Pipelines
or extended by individual PipelineComponents
. Individual Pipeline.run
s can be triggered with run-specific arguments.
PipelineComponents
are either units defining actual data-processing (Stage
) or control the flow of a Pipeline
execution (Fork
). Until a Fork
is encountered, a Pipeline.run
iterates a pre-configured list of PipelineComponent
s. Any Stage
-type component provides an integer status value after execution which is then available for defining conditional execution of Stage
s or changes in flow (Fork
).
A Stage
itself consists of multiple (generally optional) highly customizable sub-stages and propertis that can be configured at instantiation. In particular, a versatile referencing system based on (pre-defined) StageRef
-type classes can be used to define the requirements for Stage
s. Similarly, this system is also used by Fork
s.
The output of a Pipeline.run
is of type PipelineOutput
. It contains extensive information on the order of operations, the response from individual Stage
s, and a data
-property (of customizable type). The latter can be used to store and/or output processed data from the Pipeline
's execution context.
Finally, aside from recombination of Pipeline
s into more complex Pipeline
s, multiple instances of Pipeline
s can be pooled into a Pipearray
. This construct allows to call different Pipeline
s with identical input data.
Pipeline
Building anonymous Pipelines
A Pipeline
can be created in an empty, a partially, or a fully assembled state.
For the empty Pipeline
a simple expression like
>>> from data_plumber import Pipeline, Stage
>>> Pipeline()
<data_plumber.pipeline.Pipeline object at ...>
suffices. Following up with statements like
>>> p = Pipeline()
>>> p.append(Stage())
>>> p.prepend(Pipeline())
>>> p.insert(Stage(), 1)
or simply by using the +
-operator
>>> p = Pipeline()
>>> Stage() + p + Pipeline()
<data_plumber.pipeline.Pipeline object at ...>
Note that when adding to existing Pipeline
s, the change is made in-place.
>>> p = Pipeline(Stage())
>>> len(p)
1
>>> p + Stage()
<data_plumber.pipeline.Pipeline object at ...>
>>> len(p)
2
Consequently, only properties of the first argument are inherited (refer to python's operator precedence). Therefore, the use of this operation in combination with Pipeline
s requires caution.
Building named Pipelines
Instead of simply providing the individual PipelineComponents
as positional arguments during instantiation, they can be assigned names by providing components as keyword arguments (kwargs). In addition to the kwargs, the positional arguments are still required to determine the order of operations for the Pipeline
. These are then given by the PipelineComponent
's name:
>>> Pipeline(
"a", "b", "a", "c",
a=Stage(...,),
b=Stage(...,),
c=Stage(...,)
)
<data_plumber.pipeline.Pipeline object at ...>
In the example above, the Pipeline
executes the Stage
s in the order of a > b > a > c
(note that the names of Stage
s can occur multiple times in the position arguments or via Pipeline
-extending methods).
Methods like Pipeline.append
also accept string identifiers for PipelineComponents
. If none are provided at instantiation, an internally generated identifier is used.
The two approaches of anonymous and named Pipeline
s can be combined freely:
>>> Pipeline(
"a", Stage(...,), "a", "c",
a=Stage(...,),
c=Stage(...,)
)
<data_plumber.pipeline.Pipeline object at ...>
Unpacking Pipelines
Pipeline
s support unpacking to be used as, for example, positional or keyword arguments in the constructor of another Pipeline
:
>>> p = Pipeline("a", ..., a=Stage(), ...)
>>> Pipeline("b", *p, ..., b=Stage(), **p, ...)
<data_plumber.pipeline.Pipeline object at ...>
Running a Pipeline
A Pipeline
can be triggered by calling the run
-method.
>>> Pipeline(...).run(...)
PipelineOutput(...)
Any kwargs passed to this function are forwarded to its PipelineComponent
's Callable
s. Note that some keywords are reserved (out
, primer
, status
, and count
).
While Fork
s are simply evaluated and their returned StageRef
is used to find the next target for execution, Stage
s have themselves multiple sub-stages. First, the Pipeline
checks the Stage
's requirements, then executions its primer
before running the action
-command. Next, any exported kwargs are updated in the Pipeline.run
and, finally, the status and response message is generated (see Stage
for details).
Pipeline settings
A Pipeline
can be configured with multiple properties at instantiation:
- initialize_output: a
Callable
that returns an object which is consequently passed forward into thePipelineComponent
'sCallable
s; this object is refered to as "persistent data-object" (default generates an empty dictionary) - finalize_output: a
Callable
that is called before (normally) exiting thePipeline.run
with therun
's kwargs as well as the persistent data-object - exit_on_status: either integer value (
Pipeline
exists normally if any component returns this status) or aCallable
that is called after any component with the component's status (if it evaluates toTrue
, thePipeline.run
is stopped) - loop: boolean; if
False
, thePipeline
stops automatically after iterating beyond the lastPipelineComponent
in its list of operations; ifTrue
, the execution loops back into the first component
Running a Pipeline as decorator
A Pipeline
can be used to generate kwargs for a function (i.e., based on the persistent data-object). This requires the data-object being unpackable like a mapping (e.g. a dictionary).
>>> @Pipeline(...).run_for_kwargs(...)
def function(arg1, arg2): ...
Arguments that are passed to a call of the decorated function take priority over those generated by the Pipeline.run
.
Stage
A Stage
represents a single building block in the processing logic of a Pipeline
. It provides a number of Callable
s that are used in a Pipeline.run
. The arguments that are passed into those Callable
s vary. Below a list of all keywords that can occur is given (most Callable
s receive only a subset of these):
- all kwargs given to
Pipeline.run
are forwarded (note that this makes the following arguments reserved words in this context), - out (a persistent data-object that is passed through the entire
Pipeline
; its initial value is generated by thePipeline
'sinitialize_output
), - primer (output of
Stage.primer
), - status (output of
Stage.status
), - count (index of
Stage
in execution ofPipeline
)
Stage properties
Stage
s accept a number of different (optional) arguments that are mostly Callable
s to be used by a Pipeline
during execution.
-
requires -- requirements for
Stage
-execution being eitherNone
(always run thisStage
) or a dictionary with pairs of references to aStage
and the required status (uses most recent evaluation);key types are either
StageRef
,str
(identifier of aStage
in the context of aPipeline
), orint
(relative index inPipeline
stage arrangement);values are either an integer or a
Callable
taking the status as an argument and returning abool
(if it evaluates toTrue
, theStage
-requirement is met)>>> from data_plumber import Pipeline, Stage, Previous >>> Pipeline( Stage( message=lambda **kwargs: "first stage", status=lambda **kwargs: 1 ), Stage( requires={Previous: 0}, message=lambda **kwargs: "second stage" ), ).run().last_message 'first stage'
-
primer
Callable
for pre-processing data(kwargs:
out
,count
)>>> Pipeline( Stage( primer=lambda **kwargs: "primer value", message=lambda primer, **kwargs: primer ), ).run().last_message 'primer value'
-
action
Callable
for main-step of processing(kwargs:
out
,primer
,count
)>>> Pipeline( Stage( action=lambda out, **kwargs: out.update({"new_data": 0}) ), ).run().data {'new_data': 0}
-
export
Callable
that returns a dictionary of additional kwargs to be exported to the parentPipeline
; in the followingStage
s, these kwargs are then available as if they were provided with thePipeline.run
-command(kwargs:
out
,primer
,count
)>>> Pipeline( Stage( export=lambda **kwargs: {"new_arg": 0} ), Stage( message=lambda **kwargs: "export successful" if "new_arg" in kwargs else "missing new_arg" ), ).run().last_message 'export successful'
-
status
Callable
for generation of aStage
's integer exit status(kwargs:
out
,primer
,count
) -
message
Callable
for generation of aStage
's exit message(kwargs:
out
,primer
,count
,status
)
Fork
A Fork
represents a conditional in the execution of a Pipeline
. It can be used to redirect the next Pipeline
-target to a specific absolutely or relatively positioned PipelineComponent
. Analogous to the Stage
, a Fork
's eval
-method is called with a numer of keyword arguments:
- all kwargs given to
Pipeline.run
are forwarded (note that this makes the following arguments reserved words in this context), - out (a persistent data-object that is passed through the entire
Pipeline
; its initial value is generated by thePipeline
'sinitialize_output
), - count (index of
Stage
in execution ofPipeline
)
Fork properties
A Fork
takes a single Callable
as argument. Based on the properties described above, a reference to a target Stage
is returned. This reference can be made as one of several ways:
- integer; relative index in the
Pipeline
's list of components - string; a
PipelineComponent
's string identifier in the context of aPipeline.run
StageRef
; a more abstract form of reference, e.g.First
,Ǹext
(seeStageRef
for details)None
; signal to (normally) exitPipeline.run
Example
>>> from data_plumber import Pipeline, Stage, Fork, Next
>>> p = Pipeline(
Stage(
message=lambda **kwargs: "stage 1 executed"
),
Fork(
lambda **kwargs: Next if "arg" in kwargs else None
),
Stage(
message=lambda **kwargs: "stage 2 executed"
),
)
>>> p.run(arg=0).last_message
'stage 2 executed'
>>> p.run().last_message
'stage 1 executed'
StageRef
StageRef
s can be utilized in the context of requirements of Stage
s as well as flow control with Fork
s.
While additional types of StageRef
s can be defined, data_plumber
already provides rich possibilities natively.
There are two different categories of StageRef
s:
- referring to records of previously executed
PipelineComponents
(a record then provides information on the components position in thePipeline
's sequence of components) - referring to a component within the list of registered components of a
Pipeline
List of predefined StageRefs (by record)
- First: record of first component
- Previous: record of previous component (one step)
- PreviousN(n): record of previous component (
n
steps)
List of predefined StageRefs (by sequence)
- Last: last component in sequence
- Next: next component in sequence (one step)
- Skip: component after next in sequence (two steps)
- NextN(n): next component (
n
steps) - StageById(id): first occurrence of
id
in sequence - StageByIndex(index): component at
index
of sequence - StageByIncrement(n): component with relative position
n
in sequence
Example
>>> from data_plumber import Pipeline, Stage, Fork, Previous, NextN
>>> output = Pipeline(
Stage(
status=lambda **kwargs: 0
),
Stage(
requires={Previous: 0},
status=lambda count, **kwargs: count
),
Fork(
lambda count, **kwargs: NextN(1)
),
exit_on_status=lambda status: status > 3,
loop=True
).run()
>>> len(output.records)
6
PipelineOutput
List of properties
The output of a Pipeline.run
is an object of type PipelineOutput
. This object has the following properties:
- records: a list of
StageRecord
s corresponding to all stages executed by thePipeline
;StageRecord
s themselves are an Alias for a tuple of the message and status generated from the corresponding component - kwargs: a dictionary with the keyword arguments used in the
Pipeline.run
- data: the persistent data-object that has been processed through the
Pipeline
For convenience, the last StageRecord
generated in the Pipeline
can be investigated using the conveniece-properties
- last_record:
StageRecord
of last component that generated an output - last_status: status-part of the
last_record
- last_message: message-part of the
last_record
Pipearray
A Pipearray
is a convenience class that offers to run multiple Pipeline
s based on the same input data.
Just like the Pipeline
s themselves, the Pipearray
can be either anonymous or named, depending on the use of positional and keyword arguments during initialization.
The return type can then be either a list (only positional arguments) or a dictionary with keys being names/ids (at least one named Pipeline
). Both contain the PipelineOutput
objects of the individual Pipeline
s.
Example
>>> from data_plumber import Pipeline, Pipearray
>>> Pipearray(Pipeline(...), Pipeline(...)).run(...)
<list[PipelineOutput]>
>>> Pipearray(
p=Pipeline(...),
q=Pipeline(...)
).run(...)
<dict[str, PipelineOutput]>
Changelog
[1.11.0] - 2024-02-04
Changed
- added common base class
_PipelineComponent
forPipeline
componentsStage
andFork
(f628159
)
Added
- added docs to package metadata (
061e311
) - names for
PipelineComponents
can now be declared in extension methods (append
,prepend
, ...) (8363284
) Pipeline
now supportsin
-operator (usable with either component directly or its name/id) (5701073
)- added requirements for
Pipeline
to be unpacked as mapping (b2db8fa
)
Fixed
- fixed issue where
Fork
-objects were internally not registered by their id (b267ca4
)
[1.8.0] - 2024-02-03
Changed
- refactored
Fork
andStage
to transform string/integer-references toStage
s intoStageRef
s (7ba677b
)
Added
- added decorator-factory
Pipeline.run_for_kwargs
to generate kwargs for function calls (fe616b2
) - added optional
Stage
-callable to export kwargs intoPipeline.run
(8eca1bc
) - added even more types of
StageRef
s:PreviousN
,NextN
(576820c
) - added
py.typed
-marker to package (04a2e1d
) - added more types of
StageRef
s:StageById
,StageByIndex
,StageByIncrement
(92d57ad
)
[1.4.0] - 2024-02-01
Changed
- refactored internal modules (
cf7045f
)
Added
- added
StageRefs
Next
,Last
, andSkip
(14abaa7
) - added optional finalizer-
Callable
toPipeline
(d95e5b6
) - added support for
Callable
inPipeline
-argumentexit_on_status
(154c67b
)
Fixed
PipelineOutput.last_X
-methods now returnNone
in case of empty records (b7a6ba1
)
[1.0.0] - 2024-01-31
Changed
- Breaking: refactor
PipelineOutput
and related types (1436ca1
) - Breaking: replaced forwarding kwargs of
Pipeline.run
as dictionaryin_
intoStage
/Fork
-Callable
s by forwarding directly (f2710fa
,b569bb9
)
Added
- added missing information in module- and class-docstrings (
7896742
)
[0.1.0] - 2024-01-31
initial release
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file data-plumber-1.11.0.tar.gz
.
File metadata
- Download URL: data-plumber-1.11.0.tar.gz
- Upload date:
- Size: 22.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 44f8179540e38d826a3632340202bc664e131dfd5612dd09a196894ca25e7900 |
|
MD5 | 4f1bfb080e87abff05046947d9d7cd95 |
|
BLAKE2b-256 | f5e22ce8cfdf2aca98ec0eb6a2ff85895606d9aa3edade9f349ee11ef55d60b5 |
File details
Details for the file data_plumber-1.11.0-py3-none-any.whl
.
File metadata
- Download URL: data_plumber-1.11.0-py3-none-any.whl
- Upload date:
- Size: 20.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8ffa20f75359d520f9e4ad9c33a0ce1bc11473cbb5fe792d5c5b4d82bce8d482 |
|
MD5 | f6e4a856ee7d278abae1a0d9e763ea08 |
|
BLAKE2b-256 | 0e40850528974e620165481b3efd5211573cc4a3bee29ba8984ece34e0decd87 |