Skip to main content

Composable, linear pipelines

Project description

linepipe

linepipe sits between ad-hoc Python scripts and full orchestration frameworks. It is best suited for batch pipelines, feature engineering, and reproducible data workflows where simplicity and clarity matter more than scale.

It focuses on:

  • clear data dependencies
  • reproducible execution (linear pipelines)
  • fast iteration and debugging thanks to (local / in-memory) caching
  • simple mental models

Example of a pipeline:

Each node consumes named inputs and produces named outputs, which are stored in an internal registry and made available to downstream nodes.

pipeline = Pipeline(
    nodes=[
        Node(
            func=nodes.create_db_engine,
            inputs=["config.db"],
            outputs=["conn"],
        ),
        Node(
            func=nodes.load_data,
            inputs=["conn"],
            outputs=["df"],
        ),
        Node(
            func=nodes.preprocess_data,
            inputs=["df", "config.preprocessing"],
            outputs=["df_processed"],
        ),
        Node(
            func=nodes.feature_engineering,
            inputs=["df_processed", "config.feature_engineering"],
            outputs=["df_features"],
        ),
        Node(
            func=nodes.train,
            inputs=["df_features", "config.model_params"],
            outputs=["model"],
        ),
    ],
    config=config
)

pipeline.run()

A minimal end-to-end example is available as a Jupyter notebook:

  • examples/basic_usage.ipynb

It walks through:

  • defining nodes
  • building a pipeline
  • running it and inspecting outputs

Installation

pip install linepipe

Optionally:


Why linepipe?

Many pipeline tools are powerful but come with:

  • large configuration surfaces
  • implicit data catalogs
  • steep learning curves

linepipe intentionally keeps things simple:

  • Explicit > implicit
  • Minimal surface area
  • Easy to reason about
  • Easy to debug

Non-goals

  • Distributed execution
  • Dynamic DAG scheduling
  • Dataset catalogs
  • Orchestration / scheduling

linepipe is a building block, not a platform.


Core concepts

Node

A Node wraps a Python callable and declares:

  • which inputs it consumes
  • which outputs it produces
from linepipe.node import Node

def add(a, b):
    return a + b

node = Node(
    func=add,
    inputs=["x", "y"],
    outputs=["sum"],
)

Nodes are executed sequentially inside a Pipeline.


Pipeline

A Pipeline executes a list of Nodes in order, resolving inputs from:

  1. configuration
  2. cached / in-memory outputs
  3. runtime objects
from linepipe.pipeline import Pipeline

pipeline = Pipeline(
    nodes=[node],
    config={},
    x=2,
    y=3,
)

pipeline.run()

Intermediate results can be cached automatically.

Object Registry

During execution, linepipe maintains an internal object registry that:

  • stores intermediate node outputs
    • optionally persists them to disk
  • holds runtime-only objects (e.g. DB connections)
  • injects runtime constants into the pipeline

The registry is created per pipeline run and closed automatically after run is finished. Users can reopen in with pipeline.get_obj_registry().


Inputs and outputs

Resolution rules:

  1. Inputs starting with config. or equal to config are resolved from the pipeline configuration.
  2. All other inputs are resolved from the pipeline's internal object registry (which may contain persisted or in-memory objects)
  3. Runtime constants are injected into the registry at pipeline creation time

Name collisions

Each name in a pipeline must uniquely refer to a single object.

If a name exists both as:

  • a runtime constant, and
  • a persisted cached object

the pipeline will fail fast with an error.

This prevents subtle bugs caused by stale cache values overriding runtime state.

Explicit data flow

Each node declares string-named inputs and outputs:

Node(
    func=transform,
    inputs=["raw_data"],
    outputs=["features"],
)

Config injection

Configuration values can be injected using config.<attr> or you can pass whole config object with just "config" and handle values retrieval "by hand" in a function:

class Config:
    window = 5

Node(
    func=rolling_average,
    inputs=["data", "config.window"],
    outputs=["features"],
)

Or using nested configuration:

class Config:
    rolling_average = {
        "window": 5,
        "min_samples": 3
    }

Node(
    func=rolling_average,
    inputs=["data", "config.rolling_average.window", "config.rolling_average.min_samples"],
    outputs=["features"],
)

Caching

Persistent cache

You can turn on a disk-backed cache (shelve) to store intermediate results with use_persistent_cache=True.

pipeline = Pipeline(
    nodes=nodes,
    config=config,
    cache_storage_path="./.cache/pipeline.db",
    use_persistent_cache=True,
)
  • Cached outputs can be reused across runs, as long as output names do not conflict with runtime constants.
  • When persistent caching is enabled, cached objects from previous runs are automatically restored at startup.
  • The cache is closed automatically at the end of execution (i.e. pipeline.run())

You can reopen it for inspection:

registry = pipeline.get_obj_registry()
# placement of registred objects:
print(registry.placement)
# print persisted output from registry
print(registry["features"])
registry.close()

Side-effect nodes (writers)

Nodes may have no outputs, useful for:

  • database writes
  • file exports
  • logging
def write_to_db(df):
    ...

Node(
    func=write_to_db,
    inputs=["features"],
    outputs=[],
)

Named partial functions

linepipe provides a helper to adapt generic functions into distinct pipeline nodes. Standard functools.partial objects lack a __name__ attribute, which can degrade logging and history tracking. Our create_named_partial_function solves this by assigning a stable name to the resulting callable. In the following example, node function write_pipeline_output accepts keyword arguments:

  • df: pd.DataFrame
  • table_name: str
  • schema: str

We pre-bind these arguments with create_named_partial_function and the pipeline will only pass the data (e.g., df) at runtime.

from linepipe.node import create_named_partial_function
from my_package.pipelines.outputs import nodes

write_val_metrics = Node(
    func=create_named_partial_function(
        func=nodes.write_pipeline_output,  # accepts kwargs: df, table_name, schema
        func_name="write_val_metrics",  # new function name, it will appear in logs
        table_name="metrics",
        schema="ml"
    ),
    inputs=["val_metrics"],  # Only 'df' remains to be resolved from the registry
    outputs=[],
)

write_predictions = Node(
    func=create_named_partial_function(
        func=nodes.write_pipeline_output,
        func_name="write_predictions",
        table_name="predictions",
        schema="ml"
    ),
    inputs=["predictions"],
    outputs=[],
)

This approach:

  • pre-binds keyword arguments
  • gives a stable, readable name to a partial function
  • makes logging, history, and introspection more readable

Pipeline composition

Pipelines can be combined using +:

full_pipeline = feature_pipeline + writer_pipeline
full_pipeline.run()

Composition:

  • preserves execution order
  • prevents output name collisions
  • merges runtime constants and configuration
  • Gives you pipeline reusability, for example, same feature-generation pipeline in train and predict

Pipeline visualization

linepipe.viz provides simple, no-dependency ascii graph drawing function draw_ascii_pipeline. It draws nodes in order they are defined along with their inputs and outputs - good for quick check.

from linepipe import viz

print(viz.draw_ascii_pipeline(pipeline))

Output can look like this:

Pipeline (5 nodes)

[load_data]
  outputs:
    - raw_data
      |
      v
[clean_data]
  inputs:
    - raw_data
  outputs:
    - clean_data
      |
      v
[rolling_goals]
  inputs:
    - clean_data
    - config.window
    - config.min_periods
  outputs:
    - rolling_features
      |
      v
[merge]
  inputs:
    - clean_data
    - rolling_features
  outputs:
    - features
      |
      v
[write_features]
  inputs:
    - features
  (no outputs)

If you need something more visual for documentation purposes you can install optional dependency - plotly (pip install linepipe[plot]) and use plot_pipeline function - however, this function is experimental. Graph of pipeline above would look like:

viz.plot_pipeline_graph(full_pipeline)

Pipeline plot


History tracking

Enable execution history for debugging:

pipeline = Pipeline(
    nodes=nodes,
    config=config,
    track_history=True,  # in-memory persistent after `run()` is finished, no need to re-open registry
    use_persistent_cache=False,  # (if True) disk persistent after `run()` is finished
)

After execution:

pipeline.history

Each entry contains:

  • node name
  • resolved inputs
  • outputs

Inputs and outputs are deep-copied at execution time to preserve snapshots. For large objects, this can significantly increase memory usage.


[Optional] Memory profiling

If you wish to profile your nodes you can install optional memory_profiler dependency with pip install linepipe[memory]. This gives you ability to log memory usage during run of a node. For example:

pipeline = Pipeline(
    nodes= [
        Node(
            func=nodes.expensive_function,
            inputs=["df"],
            outputs=["df_processed"],
            profile=True
        )
    ]
)
pipeline.run()

This will produce logs like:

[expensive_function] ΔMem: 0.94 MiB | Peak: 0.94 MiB

Structure is [node_name] ΔMem: {float} MiB | Peak: {float} MiB

  • ΔMem: delta between first and last recorded value
  • Peak: delta between maximum recorded value and first recorded value

License

MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

linepipe-1.0.0.tar.gz (16.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

linepipe-1.0.0-py3-none-any.whl (15.3 kB view details)

Uploaded Python 3

File details

Details for the file linepipe-1.0.0.tar.gz.

File metadata

  • Download URL: linepipe-1.0.0.tar.gz
  • Upload date:
  • Size: 16.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for linepipe-1.0.0.tar.gz
Algorithm Hash digest
SHA256 4cff6fbfd9a1000450773c176b12fc6a78c05a8eea66e9fe456656b7f80e10ae
MD5 b2d5d86274d632df5c97209e43b495ac
BLAKE2b-256 c26c7587f475a7e1d93a71b32373cb0631312b60ac0c5f6ea65babe474e78cef

See more details on using hashes here.

Provenance

The following attestation bundles were made for linepipe-1.0.0.tar.gz:

Publisher: release.yaml on marek-kan/linepipe

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file linepipe-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: linepipe-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 15.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for linepipe-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9d4cdaa9ddf2a7b632c02e9031e14856747cf7e136dce65bc2ad56d3183ef3a9
MD5 27c346fbf1b98ecbbcf8067ed274f3df
BLAKE2b-256 0583f3a9fc2cbdc552e420cdd5d60521cc4da2a7177d1e5365aff76632fc32b6

See more details on using hashes here.

Provenance

The following attestation bundles were made for linepipe-1.0.0-py3-none-any.whl:

Publisher: release.yaml on marek-kan/linepipe

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page