Skip to main content

lightweight pipeline for numerical experiments

Project description

PyPI version license python

Overview

lwpipe provides a lightweight pipeline for numerical experiments. For example, this module can be used in preprocessing steps of machine learning. Preprocessing consists of several steps, some of which take time to execute. In this case, it is common in the trial-and-error stage, such as numerical experiments, to dump the calculation results of the computationally-intensive steps and load them in the later programs to reduce the time required when the later steps are changed. This module reduces boilerplate code for file IO in the use cases above.

Note that

  • pipelines in this module do not have the concept of dependency between nodes (tasks), and nodes are executed sequentially.
  • lwpipe is highly inspired by Kedro.

Installation

from pypi:

pip install lwpipe

Usage

Minimal example (of course, no need to use this library..):

from lwpipe import Node, Pipeline

nodes = [
    Node(func=lambda x,y: x+y, inputs=(1,2)),
    Node(func=lambda x: x**2),
]

pipe = Pipeline(nodes)
outputs = pipe.run()
assert outputs[0] == 9

Example with interim data output:

from lwpipe import Node, Pipeline
from lwpipe.io import dump_pickle, load_pickle

def time_consuming_func(x):
    return x

nodes = [
    Node(
        func=time_consuming_func,
        inputs=100,
        outputs_dumper=dump_pickle,
        outputs_path="interim_data.pickle",
        outputs_loader=load_pickle, # needed to bypass this node
    ),
    Node(func=lambda x: x**2, name="square"),
]

pipe = Pipeline(nodes)
outputs = pipe.run()
assert outputs[0] == 10000

Once the first node is executed, you can bypass the node by pipe.run(1) or pipe.run("square").

Multiple outputs with numpy:

import numpy as np
from lwpipe import Node, Pipeline
from lwpipe.io import dump_npy, load_npy

def split(x):
    return x[:5], x[5:]

nodes = [
    Node(
        func=split,
        inputs=np.arange(10),
        outputs=("former", "latter"),
        outputs_dumper=dump_npy,
        outputs_path=("df1.npy", "df2.npy"),
        outputs_loader=load_npy,
    ),
    Node(
        func=np.mean,
        name="former_mean",
        inputs="former", # calculated at the first node
        outputs="former_mean",
    ),
    Node(
        func=np.mean,
        name="latter_mean",
        inputs="latter", # calculated at the first node
        outputs="latter_mean",
    ),
]

pipe = Pipeline(nodes)
outputs = pipe.run()
assert outputs[0] == 7.0
# You can access interim results by "results" dict
assert pipe.results["former_mean"] == 2.0

batch dump example (return values are dumped to one file):

import numpy as np
from lwpipe import DumpType, Node, Pipeline
from lwpipe.io import (
    dump_dict_pickle,
    dump_savez_compressed,
    load_dict_pickle,
    load_savez_compressed,
)

def divide(x):
    return x[:, 0], x[:, 1]

nodes = [
    Node(
        func=divide,
        inputs=np.arange(1, 7).reshape((3, 2)),
        outputs=("mean_a", "mean_b"),
        outputs_dumper=dump_dict_pickle,
        outputs_dumper_type=DumpType.BATCH,
        outputs_path="1.pickle",
        outputs_loader=load_dict_pickle,
    ),
    Node(
        func=lambda x, y: (x, y),
        outputs=("a", "b"),
        outputs_dumper=dump_dict_pickle,
        outputs_dumper_type=DumpType.BATCH,
        outputs_path="2.pickle",
        outputs_loader=load_dict_pickle,
    ),
    Node(
        func=lambda x, y: (x.max(), y.max()),
        inputs=("a", "b"),
        outputs=("c", "d"),
        outputs_dumper=dump_savez_compressed,
        outputs_dumper_type=DumpType.BATCH,
        outputs_path="3.npz",
        outputs_loader=load_savez_compressed,
    )
]

pipe = Pipeline(nodes)
outputs = pipe.run()
assert outputs == (5, 6)

Func with config example:

from lwpipe import Node, Pipeline
def add(a, cfg):
    return a + cfg["hyperparam"]

nodes = [Node(func=add, inputs=5, config={"hyperparam": 10})]
# equivalent to
# nodes = [Node(func=lambda a: add(a, {"hyperparam": 10}), inputs=5)]
pipe = Pipeline(nodes)
outputs = pipe.run()
assert outputs[0] == 15

This module also provides TrivialPipeline which just sequentially calls functions:

from lwpipe import TrivialPipeline

def func():
    return

funcs = [func, func]
pipe = TrivialPipeline(funcs)
pipe.run()
# equivalent to
# for func in funcs:
#   func()

# you can specify names of functions
pipe = TrivialPipeline(funcs, names=["func1", "func2"])
pipe.run()

More examples are included in the test cases.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lwpipe-4.0.0.tar.gz (15.7 kB view hashes)

Uploaded Source

Built Distribution

lwpipe-4.0.0-py3-none-any.whl (15.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page