Computation pipelines with non-linear dependency graphs
Project description
GRPipe: A Flexible Pipeline Framework for Python
GRPipe is a minimalistic and flexible computational pipeline framework for Python that allows you to create complex data processing workflows. It provides a clean and intuitive API for defining, connecting, and executing pipeline steps, with built-in support for caching, argument binding, and parameter management.
Computation pipelines with non-linear dependency graphs
- Github repository: https://github.com/wirhabenzeit/grpipe/
- Documentation https://wirhabenzeit.github.io/grpipe/
Features
- Easy-to-use decorator-based API for defining pipeline steps
- Automatic dependency resolution between steps
- Built-in caching mechanism for improved performance
- Flexible argument binding and parameter management
- Support for both linear and branching pipelines
- Verbose logging option for debugging and monitoring
Installation
You can install GRPipe using pip:
pip install grpipe
Quickstart
Arguments vs. Parameters
In GRPipe, there's an important distinction between arguments and parameters:
- Arguments are either external inputs passed to the final pipeline or outputs from other internal steps. They are defined using the
Argumentclass and connected between steps. - Parameters are tunable settings for individual steps. They can be adjusted globally using the
set_paramsmethod of the pipeline.
Here's an example illustrating the difference:
# Define arguments
data = Argument("data")
threshold = Argument("threshold")
# `operation` is a parameter of the step that can be adjusted
@step(args={"input_data": data, "cutoff": threshold})
def process_data(input_data, cutoff, operation="mean"):
if operation == "mean":
result = input_data[input_data > cutoff].mean()
elif operation == "std":
result = input_data[input_data > cutoff].std()
else:
raise ValueError("Invalid operation")
return result
@step(args={"processed": process_data})
def format_result(processed):
return f"The result is: {processed:.2f}"
# Create the pipeline
pipeline = format_result
# Run the pipeline with different arguments
result1 = pipeline(data=np.arange(100), threshold=2)
print(result1)
# Adjust a parameter
pipeline.set_params(process_data__operation="std")
# Run again with the same arguments but different parameters
result2 = pipeline(data=np.arange(100), threshold=2)
print(result2)
# Bind the value of threshold:
pipeline.bind(threshold=50)
result3 = pipeline(data=np.arange(100))
print(result3)
In this example, data and threshold are arguments that can be passed when running the pipeline, while operation is a parameter that can be adjusted using set_params.
Branching Pipelines
GRPipe supports creating complex branching pipelines. Here's an example of a more advanced pipeline for data analysis:
import polars as pl
from grpipe import Argument, step
# Define arguments
data_source = Argument("data_source")
analysis_type = Argument("analysis_type")
@step(args={"source": data_source})
def load_data(source):
return pl.read_csv(source)
@step(args={"data": load_data})
def preprocess(data, fill_value=0):
return data.drop_nulls().fill_nan(fill_value)
@step(args={"data": preprocess})
def calculate_statistics(data):
return {
"mean": data.mean().to_dict(),
"median": data.median().to_dict(),
"std_dev": data.std().to_dict()
}
@step(args={"data": preprocess})
def perform_clustering(data, n_clusters = 3):
# Implement your clustering logic here
return {"clusters": [f"Cluster {i}" for i in range(n_clusters)]}
@step(args={"stats": calculate_statistics, "clusters": perform_clustering, "analysis": analysis_type})
def generate_report(stats, clusters, analysis):
report = f"Analysis Type: {analysis}\n\n"
report += "Statistics:\n"
for key, value in stats.items():
report += f" {key}: {value}\n"
report += f"\nNumber of clusters: {len(clusters['clusters'])}"
return report
# Create the pipeline
analysis_pipeline = generate_report
# Run the pipeline
result = analysis_pipeline(
data_source="path/to/your/data.csv",
analysis_type="Exploratory Data Analysis"
)
print(result)
This example demonstrates how you can create a more complex pipeline that includes data loading, preprocessing, parallel analysis steps (statistics calculation and clustering), and report generation. It also shows how to use both arguments and parameters in a pipeline.
Caching and Performance
GRPipe automatically caches the results of each step, improving performance for repeated runs with the same inputs. You can control caching behavior for each argument:
frequently_changing_data = Argument("data", cachable=False)
stable_parameter = Argument("parameter", cachable=True)
@step(args={"data": frequently_changing_data, "param": stable_parameter})
def process_data(data, param):
# Your processing logic here
pass
Verbose Logging
Enable verbose logging for debugging and monitoring:
@step(verbose=True, args={"data": input_data})
def noisy_step(data):
# This step will log detailed information about its execution
pass
License
GRPipe is released under the MIT License. See the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file grpipe-0.0.2.tar.gz.
File metadata
- Download URL: grpipe-0.0.2.tar.gz
- Upload date:
- Size: 12.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d8d5792f1621673d0f620d4df3d3c1bf9c7e60b49229bad2a2f021bb955be85b
|
|
| MD5 |
8ff32b9ab114b50d2b40fb338332452f
|
|
| BLAKE2b-256 |
669b709445db9902623b292c1931a0c122fdf9311e73170daec7c60fba8cc18d
|
File details
Details for the file grpipe-0.0.2-py3-none-any.whl.
File metadata
- Download URL: grpipe-0.0.2-py3-none-any.whl
- Upload date:
- Size: 9.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3b970bff039bb6fff484cc6409ab10141c86bfda77e542872ec29397b97fa558
|
|
| MD5 |
9961c156e24c80c54645e6ab86244088
|
|
| BLAKE2b-256 |
e1638416d754616224961b67678c79b8770ba30e75ac43a78bfbee8539f68453
|