Functional Pipeline. A simple yet flexible pipeline package based on function composition.
Project description
fpipeline — Functional Pipeline
Simple but flexible python pipelines based on function composition.
Build your workflow step by step, then flexibly combine the steps to make a bigger step. Conditional execution and branching are also supported.
- Author: Bob Kerns
- License: MIT
- Bugs> https://github.com/BobKerns/fpipeline/issues
- Examples
Organizing a data application as a pipeline brings many benefits:
- By naming each top-level operation, a degree of natural self-documentation is provided.
- Each step can easily be independently tested.
- The pipeline focuses on the overall sequence of operation and dataflow.
- A clear boundary between levels of abstraction. Details live in steps and conditions.
The pipeline orchestrates these pieces, while remaining entirely agnostic about what the operations do.
Steps
A step is a function of one argument, which we will call the context. The context can be any object, including a dict, according to the needs of the application.
This single argument will be the same for all steps in an application, but different for each invocation.
Often, it will be an object representing the data the
pipeline is to operate on. It can also be contained in
an object or dict
along with various metadata.
type Step
[D,V]
A Step
type hint is defined to be:
Step = Callable[[D],V]
Where D
and V
are type variables,
Steps taking only a single argument would seem very limiting. But we have a solution!
decorator @stepfn
To define a step function, we use the decorator @stepfn
. The function's first positional argument is interpreted as the context
argument. The function is replaced with a new function
that accepts all the other arguments, and returns a
new function that accepts the context argument (thus,
a Step
), and invokes the original
function with all its arguments.
We transform this:
@stepfn
def my_step(ctx; CTX, a; A, b: B, c: C) -> V:
into:
def my_step(a: A, b: B, c: C) -> Step[CTX,V]:
or more informally
my_step(A, B, C) -> (CTX) -> V
That is, we supply everything except the first argument, then apply the context parameter for each data value processed by the pipeline.
It might seem that this limits us to constant values. However, the use of pipeline variables allow different values to be injected at each execution. Pipelne variables are evaluated at each execution.
Using a simple protocol based on single-argument functions allows us to use them as building blocks, to combine them into entire pipelines, and to combine pipelines into larger pipelines, all following the same protocol.
Pipeline variables
To allow passing values between pipeline Step
s in a flexible way, we provide two forms of pipeline variables, that allow capture of the return value of a Step
, and then supply it as an argument to a later step
function, all handled by the behind the scenes.
Pipeline variables will hold any value.
A pipeline variable is also callable as a Step
, allowing them to be used in a
pipeline to provide a return value for the pipeline.
function variables
(context)
variables
returns a VariableContext
, which is a python context manager. This is used in a with ... as ...
statement.
The resulting VariableContext
gives out pipeline variables and manages their scope.
class VariableContext
A context manager that manages pipeline variables.
Usage goes like this:
# Dummy StepFns to illustrate
@stepfn
def readAssetStep(data: Data, path: str) -> Asset:
return path
@stepfn
def mergeAssetsStep(data: Data, asset1: Asset, asset2: Asset) -> Asset:
return f"{asset1}+{asset2}"
@stepfn
def writeAssetStep(data: Data, asset: Asset, path: str) -> Asset:
print(f"{path}: ${asset}")
# a `StepFn` (a pipeline is a `Step`) that takes two src paths to assets,
# merges them, stores the result in data.result, and writes it to a file.
# The asset paths are provided per-invocation in the context
# The output directory is configured as an argument
# when creating the pipeline.
@stepfn
def merge(data: Data, outdir: str) -> Asset:
with variables() as vars:
# declare the pipeline variables that we need
src1, src2 = vars.attribute('src1', 'src2')
asset1 = vars.variable('asset1', 'asset2')
result = vars.variable('result') # stors in data.result
return vars.pipeline(
store(asset2, readAssetStep(src2)),
store(asset1, readAssetStep(src1),
store(result, mergeAssetsStep(asset1, asset2),
writeAssetStep(result, outdir),
result
)(data)
merge
can now be invoked by omitting the data argument, giving a function of one
argument (data).
pair1 = {
'asset1': '/data/assets/src1',
'asset2': '/data/assets/src2'
}
merge_and_store = merge(outdir='/data/assets/merged')
# Perform the operation
merged = merge_and_store(pair1)
Our new Step
(merge_and_store
) can then be calld for
each merger to be performed.
If we have two directories of files to be merged, this will take them pairwise and feed each pair through the pipeline.
def get_assets(asset1, asset2):
list1 = glob.glob(asset1)
list2 = glob.glob(asset2)
paired = zip(list1, list2)
return ({'asset1': a1, 'asset2': a2}
for (a1, a2) in paired)
left = '/data/assets1/*.asset'
right = '/data/assets2/*.asset'
results = list(map(merge_and_store, get_assets(left, right)))
method VariableContext.variable
(*
names)
Returns a Variable
, or a tuple of Variable
s if more than one name is given.
This allows assignment of multiple variables in a single statement:
a, b = vars.variable('a', 'b')
method VariableContext.attribute
(*
names)
Returns a type of pipeline variable called Attribute](#class-attribute), or a tuple of [
Attribute`s if
more than one name is given.
This allows assignment of multiple attribute variables in a single statement:
a, b = vars.attribute('a', 'b')
method VariableContext.pipeline
(*
steps)
Creates and runs pipeline in this context.
class Variable
Represents a place to store and retrieve values between steps.
attribute Variable.value
The value of a Variable
. Not usually referenced directly; rather the variable is passed to step functions, or assigned with the store
step function.
attribute Variable.name
The name of the variable. It must be unique within a VariableContext
. Multiple uses of the same name will yield the same variable.
class Attribute
A pipeline variable that access the context. The name names the field or key to access.
attribute Attribute.value
The value of a Attribute
. Not usually referenced directly; rather the variable is passed to step functions, or assigned with the store
step function.
attribute Attribute.name
The name of the variable. It must be unique within a VariableContext
. Multiple uses of the same name will yield the same variable.
It is also the name of the field or key in the context.
function store
(variable, step)
Store the result of step into the variable.
function eval_vars
(var, depth=10)
[Advanced]
If var is any type of pipeline variable, its value is returned.
If var is a container type (list
, tuple
, namedtuple
, dict
, or set
),
a copy is returned with the variables replaced with their values.
This is performed to depth levels.
In most cases, this will be called for you at appropriate points.
Conditional execution
A pipeline that executes every step on every input would severely limit flexibility.
fpipeline
provides for branching, allowing steps to be skipped where
they don't apply, or entire different flows be selected.
The primary means is via the if_
step function.
These functions have a '_' suffix to avoid conflicts while maintaining readability. They are not in any sense private; they ae a fully public part of the interface.
@stepfn
if_
(cond, then, else)
cond is a Condition
, which is like a Step
except
the return value is a bool
. It should be defined using the
@conditionfn
decorator in the same way as
@stepfn
is used for step functions.
then and else are steps (or pipelines), executed according to the value of cond. They may be omitted or supplied as None.
If then or else are lists, they will be treated implicitly as a pipeline.
@condfn
not_
(cond)
not_
returns a new Condition
with the opposite sense.
@condfn
and_
(*
conds)
and_
returns a new Condition
that returns
False
if any of its arguments return False
,
and True
otherwise.
@condfn
or_
(*
conds)
or_
returns a new Condition
that returns
True
if any of its arguements return True
,
and False
otherwise.
Utility Steps
@Stepfn
pipeline
(*
steps)
Create a pipeline—a step that executes steps in order, and returns the value returned by the last.
@stepfn
apply
(fn, *
args, **
kwargs)
Calls an an arbitrary function on the context, plus any additional arguments supplied. Variables and steps will be replaced by their values.
@stepfn
list_
(*
values)
Return a list of values. Steps and variables will be evaluated.
@stepfn
tuple_
(*
values)
Return a tuple of values. Steps and variables will be evalaued.
@stepfn
dict_
(**
values)
Return a dict from the supplied keyword arguments. Steps and variables will be evaluated.
@stepfn
set_
(**
values)
Return a set from the supplied keyword arguments. Steps and variables will be evaluated.
Development
Pull requests are welcome, as are [bug reports, feature requests, documentation requests or fixes)[https://github.com/BobKerns/fpipeline/issues]
Pull requests do not need to be rebased or linear, but you should at least pull from the latest
main
branch and resolve conflicts.
You should pass all test in the test notebook, and add any new ones to the test notebook.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file fpipeline-1.3.1.tar.gz
.
File metadata
- Download URL: fpipeline-1.3.1.tar.gz
- Upload date:
- Size: 40.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: python-requests/2.32.3
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | b39f46d6e4052f07c3bb061be3d20a3cb2dd6790e575e5a1489818e3f293fe68 |
|
MD5 | 54ed7a2102ac90e37b0464199b758197 |
|
BLAKE2b-256 | e830533fb5bd56b8ba7f959c3a93e2d2f62f378c9c7ca49fde69342766358f93 |
File details
Details for the file fpipeline-1.3.1-py3-none-any.whl
.
File metadata
- Download URL: fpipeline-1.3.1-py3-none-any.whl
- Upload date:
- Size: 10.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: python-requests/2.32.3
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 771883531248d82e3c44cb8a02538105f75ced46cbc9169f30dc52b7f22befb9 |
|
MD5 | d30419b3b2545291256d90b70212a1ad |
|
BLAKE2b-256 | f4d324cacdbe6b3a37ee822fa5a9ad43e8336b24712465625e72cd069db859bc |