Skip to main content

Functional Pipeline. A simple yet flexible pipeline package based on function composition.

Project description

fpipeline — Functional Pipeline

Simple but flexible python pipelines based on function composition.

Build your workflow step by step, then flexibly combine the steps to make a bigger step. Conditional execution and branching are also supported.

Organizing a data application as a pipeline brings many benefits:

  • By naming each top-level operation, a degree of natural self-documentation is provided.
  • Each step can easily be independently tested.
  • The pipeline focuses on the overall sequence of operation and dataflow.
  • A clear boundary between levels of abstraction. Details live in steps and conditions.

The pipeline orchestrates these pieces, while remaining entirely agnostic about what the operations do.

Steps

A step is a function of one argument, which we will call the context. The context can be any object, including a dict, according to the needs of the application.

This single argument will be the same for all steps in an application, but different for each invocation.

Often, it will be an object representing the data the pipeline is to operate on. It can also be contained in an object or dict along with various metadata.

type Step[D,V]

A Step type hint is defined to be:

Step = Callable[[D],V]

Where D and V are type variables,

Steps taking only a single argument would seem very limiting. But we have a solution!

decorator @stepfn

To define a step function, we use the decorator @stepfn. The function's first positional argument is interpreted as the context argument. The function is replaced with a new function that accepts all the other arguments, and returns a new function that accepts the context argument (thus, a Step), and invokes the original function with all its arguments.

We transform this:

@stepfn
def my_step(ctx; CTX, a; A, b: B, c: C) -> V:

into:

def my_step(a: A, b: B, c: C) -> Step[CTX,V]:

or more informally

my_step(A, B, C) -> (CTX) -> V

That is, we supply everything except the first argument, then apply the context parameter for each data value processed by the pipeline.

It might seem that this limits us to constant values. However, the use of pipeline variables allow different values to be injected at each execution. Pipelne variables are evaluated at each execution.

Using a simple protocol based on single-argument functions allows us to use them as building blocks, to combine them into entire pipelines, and to combine pipelines into larger pipelines, all following the same protocol.

Pipeline variables

To allow passing values between pipeline Steps in a flexible way, we provide two forms of pipeline variables, that allow capture of the return value of a Step, and then supply it as an argument to a later step function, all handled by the behind the scenes.

Pipeline variables will hold any value.

A pipeline variable is also callable as a Step, allowing them to be used in a pipeline to provide a return value for the pipeline.

function variables(context)

variables returns a VariableContext, which is a python context manager. This is used in a with ... as ... statement.

The resulting VariableContext gives out pipeline variables and manages their scope.

class VariableContext

A context manager that manages pipeline variables.

Usage goes like this:

# Dummy StepFns to illustrate
@stepfn
def readAssetStep(data: Data, path: str) -> Asset:
    return path

@stepfn
def mergeAssetsStep(data: Data, asset1: Asset, asset2: Asset) -> Asset:
    return f"{asset1}+{asset2}"

@stepfn
def writeAssetStep(data: Data, asset: Asset, path: str) -> Asset:
    print(f"{path}: ${asset}")

# a `StepFn` (a pipeline is a `Step`) that takes two src paths to assets,
# merges them, stores the result in data.result, and writes it to a file.
# The asset paths are provided per-invocation in the context
# The output directory is configured as an argument
# when creating the pipeline.
@stepfn
def merge(data: Data, outdir: str) -> Asset:
    with variables() as vars:
        # declare the pipeline variables that we need
        src1, src2 = vars.attribute('src1', 'src2')
        asset1 = vars.variable('asset1', 'asset2')
        result = vars.variable('result') # stors in data.result
        return vars.pipeline(
            store(asset2, readAssetStep(src2)),
            store(asset1, readAssetStep(src1),
            store(result, mergeAssetsStep(asset1, asset2),
            writeAssetStep(result, outdir),
            result
        )(data)

merge can now be invoked by omitting the data argument, giving a function of one argument (data).

pair1 = {
    'asset1': '/data/assets/src1',
    'asset2': '/data/assets/src2'
}
merge_and_store = merge(outdir='/data/assets/merged')

# Perform the operation
merged = merge_and_store(pair1)

Our new Step (merge_and_store) can then be calld for each merger to be performed.

If we have two directories of files to be merged, this will take them pairwise and feed each pair through the pipeline.

def get_assets(asset1, asset2):
    list1 = glob.glob(asset1)
    list2 = glob.glob(asset2)
    paired = zip(list1, list2)
    return ({'asset1': a1, 'asset2': a2}
            for (a1, a2) in paired)
left = '/data/assets1/*.asset'
right = '/data/assets2/*.asset'
results = list(map(merge_and_store, get_assets(left, right)))

method VariableContext.variable(*names)

Returns a Variable, or a tuple of Variables if more than one name is given.

This allows assignment of multiple variables in a single statement:

    a, b = vars.variable('a', 'b')

method VariableContext.attribute(*names)

Returns a type of pipeline variable called Attribute](#class-attribute), or a tuple of [Attribute`s if more than one name is given.

This allows assignment of multiple attribute variables in a single statement:

    a, b = vars.attribute('a', 'b')

method VariableContext.pipeline(*steps)

Creates and runs pipeline in this context.

class Variable

Represents a place to store and retrieve values between steps.

attribute Variable.value

The value of a Variable. Not usually referenced directly; rather the variable is passed to step functions, or assigned with the store step function.

attribute Variable.name

The name of the variable. It must be unique within a VariableContext. Multiple uses of the same name will yield the same variable.

class Attribute

A pipeline variable that access the context. The name names the field or key to access.

attribute Attribute.value

The value of a Attribute. Not usually referenced directly; rather the variable is passed to step functions, or assigned with the store step function.

attribute Attribute.name

The name of the variable. It must be unique within a VariableContext. Multiple uses of the same name will yield the same variable.

It is also the name of the field or key in the context.

function store(variable, step)

Store the result of step into the variable.

function eval_vars(var, depth=10)

[Advanced]

If var is any type of pipeline variable, its value is returned.

If var is a container type (list, tuple, namedtuple, dict, or set), a copy is returned with the variables replaced with their values. This is performed to depth levels.

In most cases, this will be called for you at appropriate points.

Conditional execution

A pipeline that executes every step on every input would severely limit flexibility.

fpipeline provides for branching, allowing steps to be skipped where they don't apply, or entire different flows be selected.

The primary means is via the if_ step function.

These functions have a '_' suffix to avoid conflicts while maintaining readability. They are not in any sense private; they ae a fully public part of the interface.

@stepfn if_(cond, then, else)

cond is a Condition, which is like a Step except the return value is a bool. It should be defined using the @conditionfn decorator in the same way as @stepfn is used for step functions.

then and else are steps (or pipelines), executed according to the value of cond. They may be omitted or supplied as None.

If then or else are lists, they will be treated implicitly as a pipeline.

@condfn not_(cond)

not_ returns a new Condition with the opposite sense.

@condfn and_(*conds)

and_ returns a new Condition that returns False if any of its arguments return False, and True otherwise.

@condfn or_(*conds)

or_ returns a new Condition that returns True if any of its arguements return True, and False otherwise.

Utility Steps

@Stepfn pipeline(*steps)

Create a pipeline—a step that executes steps in order, and returns the value returned by the last.

@stepfn apply(fn, *args, **kwargs)

Calls an an arbitrary function on the context, plus any additional arguments supplied. Variables and steps will be replaced by their values.

@stepfn list_(*values)

Return a list of values. Steps and variables will be evaluated.

@stepfn tuple_(*values)

Return a tuple of values. Steps and variables will be evalaued.

@stepfn dict_(**values)

Return a dict from the supplied keyword arguments. Steps and variables will be evaluated.

@stepfn set_(**values)

Return a set from the supplied keyword arguments. Steps and variables will be evaluated.

Development

Pull requests are welcome, as are [bug reports, feature requests, documentation requests or fixes)[https://github.com/BobKerns/fpipeline/issues]

Pull requests do not need to be rebased or linear, but you should at least pull from the latest main branch and resolve conflicts.

You should pass all test in the test notebook, and add any new ones to the test notebook.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fpipeline-1.3.1.tar.gz (40.2 kB view details)

Uploaded Source

Built Distribution

fpipeline-1.3.1-py3-none-any.whl (10.1 kB view details)

Uploaded Python 3

File details

Details for the file fpipeline-1.3.1.tar.gz.

File metadata

  • Download URL: fpipeline-1.3.1.tar.gz
  • Upload date:
  • Size: 40.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: python-requests/2.32.3

File hashes

Hashes for fpipeline-1.3.1.tar.gz
Algorithm Hash digest
SHA256 b39f46d6e4052f07c3bb061be3d20a3cb2dd6790e575e5a1489818e3f293fe68
MD5 54ed7a2102ac90e37b0464199b758197
BLAKE2b-256 e830533fb5bd56b8ba7f959c3a93e2d2f62f378c9c7ca49fde69342766358f93

See more details on using hashes here.

File details

Details for the file fpipeline-1.3.1-py3-none-any.whl.

File metadata

  • Download URL: fpipeline-1.3.1-py3-none-any.whl
  • Upload date:
  • Size: 10.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: python-requests/2.32.3

File hashes

Hashes for fpipeline-1.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 771883531248d82e3c44cb8a02538105f75ced46cbc9169f30dc52b7f22befb9
MD5 d30419b3b2545291256d90b70212a1ad
BLAKE2b-256 f4d324cacdbe6b3a37ee822fa5a9ad43e8336b24712465625e72cd069db859bc

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page