Skip to main content

lightweight pipeline for numerical experiments

Project description

PyPI version license python

Overview

lwpipe provides a lightweight pipeline. lwpipe has fewer features than luigi or Kedro, but you can quickly build and run pipelines.

Note that lwpipe is highly inspired by Kedro.

Installation

from pypi:

pip install lwpipe

Usage

Minimal example (of course, no need to use this library..):

from lwpipe import Node, Pipeline

nodes = [
    Node(func=lambda x,y: x+y, inputs=(1,2)),
    Node(func=lambda x: x**2),
]

pipe = Pipeline(nodes)
outputs = pipe.run()
assert outputs[0] == 9

Example with interim data output:

from lwpipe import Node, Pipeline
from lwpipe.io import dump_pickle, load_pickle

def time_consuming_func(x):
    return x

nodes = [
    Node(
        func=time_consuming_func,
        inputs=100,
        outputs_dumper=dump_pickle,
        outputs_path="interim_data.pickle",
        outputs_loader=load_pickle, # needed to bypass this node
    ),
    Node(func=lambda x: x**2, name="square"),
]

pipe = Pipeline(nodes)
outputs = pipe.run()
assert outputs[0] == 10000

Once the first node is executed, you can bypass the node by pipe.run(1) or pipe.run("square").

Multiple outputs with numpy:

import numpy as np
from lwpipe import Node, Pipeline
from lwpipe.io import dump_npy, load_npy

def split(x):
    return x[:5], x[5:]

nodes = [
    Node(
        func=split,
        inputs=np.arange(10),
        outputs=("former", "latter"),
        outputs_dumper=dump_npy,
        outputs_path=("df1.npy", "df2.npy"),
        outputs_loader=load_npy,
    ),
    Node(
        func=np.mean,
        name="former_mean",
        inputs="former", # calculated at the first node
        outputs="former_mean",
    ),
    Node(
        func=np.mean,
        name="latter_mean",
        inputs="latter", # calculated at the first node
        outputs="latter_mean",
    ),
]

pipe = Pipeline(nodes)
outputs = pipe.run()
assert outputs[0] == 7.0
# You can access interim results by "results" dict
assert pipe.results["former_mean"] == 2.0

batch dump example (return values are dumped to one file):

import numpy as np
from lwpipe import DumpType, Node, Pipeline
from lwpipe.io import (
    dump_dict_pickle,
    dump_savez_compressed,
    load_dict_pickle,
    load_savez_compressed,
)

def divide(x):
    return x[:, 0], x[:, 1]

nodes = [
    Node(
        func=divide,
        inputs=np.arange(1, 7).reshape((3, 2)),
        outputs=("mean_a", "mean_b"),
        outputs_dumper=dump_dict_pickle,
        outputs_dumper_type=DumpType.BATCH,
        outputs_path="1.pickle",
        outputs_loader=load_dict_pickle,
    ),
    Node(
        func=lambda x, y: (x, y),
        outputs=("a", "b"),
        outputs_dumper=dump_dict_pickle,
        outputs_dumper_type=DumpType.BATCH,
        outputs_path="2.pickle",
        outputs_loader=load_dict_pickle,
    ),
    Node(
        func=lambda x, y: (x.max(), y.max()),
        inputs=("a", "b"),
        outputs=("c", "d"),
        outputs_dumper=dump_savez_compressed,
        outputs_dumper_type=DumpType.BATCH,
        outputs_path="3.npz",
        outputs_loader=load_savez_compressed,
    )
]

pipe = Pipeline(nodes)
outputs = pipe.run()
assert outputs == (5, 6)

you can pass a config object to a function:

from lwpipe import Node, Pipeline
def add(a, cfg):
    return a + cfg["hyperparam"]

nodes = [Node(func=add, inputs=5, config={"hyperparam": 10})]
# equivalent to
# nodes = [Node(func=lambda a: add(a, {"hyperparam": 10}), inputs=5)]
pipe = Pipeline(nodes)
outputs = pipe.run()
assert outputs[0] == 15

def dumper(data, filepath, cfg):
    filepath = Path(filepath)
    filepath = filepath.with_name(filepath.name+cfg["hyperparam"])
    return dump_pickle(data, filepath)

# also, outputs_dumper can take config as its argument
nodes = [Node(func=add, inputs=5, config={"hyperparam": 10},
              outputs_dumper=dumper,
              outputs_dumper_take_config=True
)]

Pipeline also accepts a list of functions with no arguments and return-values:

from lwpipe import Pipeline

def func():
    return

funcs = [func, func]
pipe = Pipeline(funcs)
pipe.run()
# equivalent to
# for func in funcs:
#   func()

# you can specify names of functions
pipe = Pipeline(funcs, names=["func1", "func2"])
pipe.run()

More examples are included in the test cases.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lwpipe-5.1.1.tar.gz (16.6 kB view hashes)

Uploaded Source

Built Distribution

lwpipe-5.1.1-py3-none-any.whl (16.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page