Composable, linear pipelines
Project description
linepipe
linepipe sits between ad-hoc Python scripts and full orchestration frameworks.
It is best suited for batch pipelines, feature engineering, and reproducible data workflows where simplicity and clarity matter more than scale.
It focuses on:
- clear data dependencies
- reproducible execution (linear pipelines)
- fast iteration and debugging thanks to (local / in-memory) caching
- simple mental models
Example of a pipeline:
Each node consumes named inputs and produces named outputs, which are stored in an internal registry and made available to downstream nodes.
pipeline = Pipeline(
nodes=[
Node(
func=nodes.create_db_engine,
inputs=["config.db"],
outputs=["conn"],
),
Node(
func=nodes.load_data,
inputs=["conn"],
outputs=["df"],
),
Node(
func=nodes.preprocess_data,
inputs=["df", "config.preprocessing"],
outputs=["df_processed"],
),
Node(
func=nodes.feature_engineering,
inputs=["df_processed", "config.feature_engineering"],
outputs=["df_features"],
),
Node(
func=nodes.train,
inputs=["df_features", "config.model_params"],
outputs=["model"],
),
],
config=config
)
pipeline.run()
A minimal end-to-end example is available as a Jupyter notebook:
examples/basic_usage.ipynb
It walks through:
- defining nodes
- building a pipeline
- running it and inspecting outputs
Installation
pip install linepipe
Optionally:
pip install linepipe[memory]- memory profiling nodespip install linepipe[plot]- plotly instead default string pipeline graph
Why linepipe?
Many pipeline tools are powerful but come with:
- large configuration surfaces
- implicit data catalogs
- steep learning curves
linepipe intentionally keeps things simple:
- Explicit > implicit
- Minimal surface area
- Easy to reason about
- Easy to debug
Non-goals
- Distributed execution
- Dynamic DAG scheduling
- Dataset catalogs
- Orchestration / scheduling
linepipe is a building block, not a platform.
Core concepts
Node
A Node wraps a Python callable and declares:
- which inputs it consumes
- which outputs it produces
from linepipe.node import Node
def add(a, b):
return a + b
node = Node(
func=add,
inputs=["x", "y"],
outputs=["sum"],
)
Nodes are executed sequentially inside a Pipeline.
Pipeline
A Pipeline executes a list of Nodes in order, resolving inputs from:
- configuration
- cached / in-memory outputs
- runtime objects
from linepipe.pipeline import Pipeline
pipeline = Pipeline(
nodes=[node],
config={},
x=2,
y=3,
)
pipeline.run()
Intermediate results can be cached automatically.
Object Registry
During execution, linepipe maintains an internal object registry that:
- stores intermediate node outputs
- optionally persists them to disk
- holds runtime-only objects (e.g. DB connections)
- injects runtime constants into the pipeline
The registry is created per pipeline run and closed automatically after run is finished.
Users can reopen in with pipeline.get_obj_registry().
Inputs and outputs
Resolution rules:
- Inputs starting with
config.or equal toconfigare resolved from the pipeline configuration. - All other inputs are resolved from the pipeline's internal object registry (which may contain persisted or in-memory objects)
- Runtime constants are injected into the registry at pipeline creation time
Name collisions
Each name in a pipeline must uniquely refer to a single object.
If a name exists both as:
- a runtime constant, and
- a persisted cached object
the pipeline will fail fast with an error.
This prevents subtle bugs caused by stale cache values overriding runtime state.
Explicit data flow
Each node declares string-named inputs and outputs:
Node(
func=transform,
inputs=["raw_data"],
outputs=["features"],
)
Config injection
Configuration values can be injected using config.<attr> or you can pass whole config object with "config":
class Config:
window = 5
Node(
func=rolling_average,
inputs=["data", "config.window"],
outputs=["features"],
)
Caching
Persistent cache
You can turn on a disk-backed cache (shelve) to store intermediate results with use_persistent_cache=True.
pipeline = Pipeline(
nodes=nodes,
config=config,
cache_storage_path="./.cache/pipeline.db",
use_persistent_cache=True,
)
- Cached outputs can be reused across runs, as long as output names do not conflict with runtime constants.
- When persistent caching is enabled, cached objects from previous runs are automatically restored at startup.
- The cache is closed automatically at the end of execution (i.e.
pipeline.run())
You can reopen it for inspection:
ds = pipeline.get_obj_registry()
# placement of registred objects:
print(registry.placement)
registry.close()
Side-effect nodes (writers)
Nodes may have no outputs, useful for:
- database writes
- file exports
- logging
def write_to_db(df):
...
Node(
func=write_to_db,
inputs=["features"],
outputs=[],
)
Named partial functions
linepipe provides a helper to adapt generic functions into distinct pipeline nodes.
from linepipe.node import create_named_partial_function
from my_package.pipelines.outputs import nodes
write_val_metrics = Node(
func=create_named_partial_function(
func=nodes.write_pipeline_output,
func_name="write_val_metrics",
table_name="metrics",
schema="ml"
),
inputs=["val_metrics"],
outputs=[],
)
write_predictions = Node(
func=create_named_partial_function(
func=nodes.write_pipeline_output,
func_name="write_predictions",
table_name="predictions",
schema="ml"
),
inputs=["predictions"],
outputs=[],
)
This:
- pre-binds keyword arguments
- gives the node a stable, readable name
- improves logging, history, and introspection
Pipeline composition
Pipelines can be combined using +:
full_pipeline = feature_pipeline + writer_pipeline
full_pipeline.run()
Composition:
- preserves execution order
- prevents output name collisions
- merges runtime constants and configuration
- Gives you pipeline reusability, for example, same feature-generation pipeline in train and predict
Pipeline visualization
linepipe.viz provides simple, no-dependency ascii graph drawing function draw_ascii_pipeline. It draws nodes in order they are defined along with their inputs and outputs - good for quick check.
from linepipe import viz
print(viz.draw_ascii_pipeline(pipeline))
Output can look like this:
Pipeline (5 nodes)
[load_data]
outputs:
- raw_data
|
v
[clean_data]
inputs:
- raw_data
outputs:
- clean_data
|
v
[rolling_goals]
inputs:
- clean_data
- config.window
- config.min_periods
outputs:
- rolling_features
|
v
[merge]
inputs:
- clean_data
- rolling_features
outputs:
- features
|
v
[write_features]
inputs:
- features
(no outputs)
If you need something more visual for documentation purposes you can install optional dependency - plotly (pip install linepipe[plot]) and use plot_pipeline function - however, this function is experimental. Graph of pipeline above would look like:
viz.plot_pipeline_graph(full_pipeline)
History tracking
Enable execution history for debugging:
pipeline = Pipeline(
nodes=nodes,
config=config,
track_history=True, # in-memory persistent after `run()` is finished
use_persistent_cache=False, # (if True) disk persistent after `run()` is finished
)
After execution:
pipeline.history
Each entry contains:
- node name
- resolved inputs
- outputs
Inputs and outputs are deep-copied at execution time to preserve snapshots. For large objects, this can significantly increase memory usage.
[Optional] Memory profiling
If you wish to profile your nodes you can install optional memory_profiler dependency with pip install linepipe[memory]. This gives you ability to log memory usage during run of a node. For example:
pipeline = Pipeline(
nodes= [
Node(
func=nodes.expensive_function,
inputs=["df"],
outputs=["df_processed"],
profile=True
)
]
)
pipeline.run()
This will produce logs like:
[expensive_function] ΔMem: 0.94 MiB | Peak: 0.94 MiB
Structure is [node_name] ΔMem: {float} MiB | Peak: {float} MiB
- ΔMem: delta between first and last recorded value
- Peak: delta between maximum recorded value and first recorded value
License
MIT License.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file linepipe-0.2.1.tar.gz.
File metadata
- Download URL: linepipe-0.2.1.tar.gz
- Upload date:
- Size: 15.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1d69883e9b6d7cd71a6b2c4c24f6be411e562cbdd767cdfb611928b616b60caa
|
|
| MD5 |
19b822aea3537a0e5fb52c22f87a95fa
|
|
| BLAKE2b-256 |
251d47de1440042ff3be5548c05964ca57a1dcac0f6de03003b7f4fc1e460593
|
Provenance
The following attestation bundles were made for linepipe-0.2.1.tar.gz:
Publisher:
release.yaml on marek-kan/linepipe
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
linepipe-0.2.1.tar.gz -
Subject digest:
1d69883e9b6d7cd71a6b2c4c24f6be411e562cbdd767cdfb611928b616b60caa - Sigstore transparency entry: 870376911
- Sigstore integration time:
-
Permalink:
marek-kan/linepipe@7195cff1bcf4f455b868e8beda9a0384403c9318 -
Branch / Tag:
refs/tags/v0.2.1 - Owner: https://github.com/marek-kan
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yaml@7195cff1bcf4f455b868e8beda9a0384403c9318 -
Trigger Event:
push
-
Statement type:
File details
Details for the file linepipe-0.2.1-py3-none-any.whl.
File metadata
- Download URL: linepipe-0.2.1-py3-none-any.whl
- Upload date:
- Size: 14.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9b6b0f4cb3bf02c5ee5351db7fa4e5d3e0ac3f5e95ef33574e74a5cbfa03b900
|
|
| MD5 |
32c8e8cfa40014236cc1206bacd5347c
|
|
| BLAKE2b-256 |
b78d837670d2d178aa7df039b64aa73dbfe9a63caced8451ed466d4c82592944
|
Provenance
The following attestation bundles were made for linepipe-0.2.1-py3-none-any.whl:
Publisher:
release.yaml on marek-kan/linepipe
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
linepipe-0.2.1-py3-none-any.whl -
Subject digest:
9b6b0f4cb3bf02c5ee5351db7fa4e5d3e0ac3f5e95ef33574e74a5cbfa03b900 - Sigstore transparency entry: 870376914
- Sigstore integration time:
-
Permalink:
marek-kan/linepipe@7195cff1bcf4f455b868e8beda9a0384403c9318 -
Branch / Tag:
refs/tags/v0.2.1 - Owner: https://github.com/marek-kan
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yaml@7195cff1bcf4f455b868e8beda9a0384403c9318 -
Trigger Event:
push
-
Statement type: