Skip to main content

Composable, linear pipelines

Project description

linepipe

linepipe sits between ad-hoc Python scripts and full orchestration frameworks. It is best suited for batch pipelines, feature engineering, and reproducible data workflows where simplicity and clarity matter more than scale.

It focuses on:

  • clear data dependencies
  • reproducible execution (linear pipelines)
  • fast iteration and debugging thanks to (local / in-memory) caching
  • simple mental models

Example of a pipeline:

pipeline = Pipeline(
    nodes=[
        Node(
            func=nodes.create_db_engine,
            inputs=["config.db"],
            outputs=["conn"],
        ),
        Node(
            func=nodes.load_data,
            inputs=["conn"],
            outputs=["df"],
        ),
        Node(
            func=nodes.preprocess_data,
            inputs=["df", "config.preprocessing"],
            outputs=["df_processed"],
        ),
        Node(
            func=nodes.feature_engineering,
            inputs=["df_processed", "config.feature_engineering"],
            outputs=["df_features"],
        ),
        Node(
            func=nodes.train,
            inputs=["df_features", "config.model_params"],
            outputs=["model"],
        ),
    ],
    config=config
)

pipeline.run()

A minimal end-to-end example is available as a Jupyter notebook:

  • examples/basic_usage.ipynb

It walks through:

  • defining nodes
  • building a pipeline
  • running it and inspecting outputs

Installation

pip install linepipe

Optionally:


Why linepipe?

Many pipeline tools are powerful but come with:

  • large configuration surfaces
  • implicit data catalogs
  • steep learning curves

linepipe intentionally keeps things simple:

  • Explicit > implicit
  • Minimal surface area
  • Easy to reason about
  • Easy to debug

Non-goals

  • Distributed execution
  • Dynamic DAG scheduling
  • Dataset catalogs
  • Orchestration / scheduling

linepipe is a building block, not a platform.


Core concepts

Node

A Node wraps a Python callable and declares:

  • which inputs it consumes
  • which outputs it produces
from linepipe.node import Node

def add(a, b):
    return a + b

node = Node(
    func=add,
    inputs=["x", "y"],
    outputs=["sum"],
)

Nodes are executed sequentially inside a Pipeline.


Pipeline

A Pipeline executes a list of Nodes in order, resolving inputs from:

  1. configuration
  2. cached / in-memory outputs
  3. runtime objects
from linepipe.pipeline import Pipeline

pipeline = Pipeline(
    nodes=[node],
    config={},
    x=2,
    y=3,
)

pipeline.run()

Intermediate results can be cached automatically.


Inputs and outputs

Resolution order is:

  1. config
  2. data-cache
  3. Runtime constants (i.e. kwargs in pipeline object)

Explicit data flow

Each node declares string-named inputs and outputs:

Node(
    func=transform,
    inputs=["raw_data"],
    outputs=["features"],
)

Config injection

Configuration values can be injected using config.<attr> or you can pass whole config object with "config":

class Config:
    window = 5

Node(
    func=rolling_average,
    inputs=["data", "config.window"],
    outputs=["features"],
)

Caching

Persistent cache

You can turn on a disk-backed cache (shelve) to store intermediate results with use_persistent_cache=True.

pipeline = Pipeline(
    nodes=nodes,
    config=config,
    cache_storage_path="./.cache/pipeline.db",
    use_persistent_cache=True,
)
  • Cached outputs can be reused across runs. Ideal for debugging and development - if pipeline fails at some point, it is easy to load the inputs, and debug the function.
  • The cache is closed automatically at the end of execution

You can reopen it for inspection:

ds = pipeline.open_cache()
print(ds.keys())
ds.close()

Side-effect nodes (writers)

Nodes may have no outputs, useful for:

  • database writes
  • file exports
  • logging
def write_to_db(df):
    ...

Node(
    func=write_to_db,
    inputs=["features"],
    outputs=[],
)

Named partial functions

linepipe provides a helper to adapt generic functions into distinct pipeline nodes.

from linepipe.node import create_named_partial_function
from my_package.pipelines.outputs import nodes

write_val_metrics = Node(
    func=create_named_partial_function(
        func=nodes.write_pipeline_output,
        func_name="write_val_metrics",
        table_name="metrics",
        schema="ml"
    ),
    inputs=["val_metrics"],
    outputs=[],
)

write_predictions = Node(
    func=create_named_partial_function(
        func=nodes.write_pipeline_output,
        func_name="write_predictions",
        table_name="predictions",
        schema="ml"
    ),
    inputs=["predictions"],
    outputs=[],
)

This:

  • pre-binds keyword arguments
  • gives the node a stable, readable name
  • improves logging, history, and introspection

Pipeline composition

Pipelines can be combined using +:

full_pipeline = feature_pipeline + writer_pipeline
full_pipeline.run()

Composition:

  • preserves execution order
  • prevents output name collisions
  • merges runtime objects and configuration
  • Gives you pipeline reusability, for example, same feature-generation pipeline in train and predict

Pipeline visualization

linepipe.viz provides simple, no-dependency ascii graph drawing function draw_ascii_pipeline. It draws nodes in order they are defined along with their inputs and outputs - good for quick check.

from linepipe import viz

print(viz.draw_ascii_pipeline(pipeline))

Output can look like this:

Pipeline (5 nodes)

[load_data]
  outputs:
    - raw_data
      |
      v
[clean_data]
  inputs:
    - raw_data
  outputs:
    - clean_data
      |
      v
[rolling_goals]
  inputs:
    - clean_data
    - config.window
    - config.min_periods
  outputs:
    - rolling_features
      |
      v
[merge]
  inputs:
    - clean_data
    - rolling_features
  outputs:
    - features
      |
      v
[write_features]
  inputs:
    - features
  (no outputs)

If you need something more visual for documentation purposes you can install optional dependency - plotly (pip install linepipe[plot]) and use plot_pipeline function - however, this function is experimental. Graph of pipeline above would look like:

viz.plot_pipeline_graph(full_pipeline)

Pipeline plot


History tracking

Enable execution history for debugging:

pipeline = Pipeline(
    nodes=nodes,
    config=config,
    track_history=True,  # in-memory persistent after `run()` is finished
    use_persistent_cache=False,  # (if True) disk persistent after `run()` is finished
)

After execution:

pipeline.history

Each entry contains:

  • node name
  • resolved inputs
  • outputs

[Optional] Memory profiling

If you wish to profile your nodes you can install optional memory_profiler dependency with pip install linepipe[memory]. This gives you ability to log memory usage during run of a node. For example:

pipeline = Pipeline(
    nodes= [
        Node(
            func=nodes.expensive_function,
            inputs=["df"],
            outputs=["df_processed"],
            profile=True
        )
    ]
)
pipeline.run()

This will produce logs like:

[expensive_function] ΔMem: 0.94 MiB | Peak: 0.94 MiB

Structure is [node_name] ΔMem: {float} MiB | Peak: {float} MiB

  • ΔMem: delta between first and last recorded value
  • Peak: delta between maximum recorded value and first recorded value

License

MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

linepipe-0.1.1.tar.gz (13.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

linepipe-0.1.1-py3-none-any.whl (12.1 kB view details)

Uploaded Python 3

File details

Details for the file linepipe-0.1.1.tar.gz.

File metadata

  • Download URL: linepipe-0.1.1.tar.gz
  • Upload date:
  • Size: 13.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for linepipe-0.1.1.tar.gz
Algorithm Hash digest
SHA256 a641932242a46869c99484fa80a039f11189141c1a8ffa0089a7247ebe44a38a
MD5 23e0149f87c28eb30db4a3c095b611e9
BLAKE2b-256 7b4356a37f76fbfc0205c558fb14e13ef9a458e1a798cd664eb3a1a491233ede

See more details on using hashes here.

Provenance

The following attestation bundles were made for linepipe-0.1.1.tar.gz:

Publisher: release.yaml on marek-kan/linepipe

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file linepipe-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: linepipe-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 12.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for linepipe-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 7cbe15ed42608ab4f0b49af8916e802ed61a2d760e35525f521013df4fb14929
MD5 2b9142ba0b424430d33a664c97782476
BLAKE2b-256 d9b21d7ecffdd03e88f17bee7f2d1aaaab48e5cb54be9d9ee0149fec6e0e4788

See more details on using hashes here.

Provenance

The following attestation bundles were made for linepipe-0.1.1-py3-none-any.whl:

Publisher: release.yaml on marek-kan/linepipe

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page