Skip to main content

Computation pipelines with non-linear dependency graphs

Project description

GRPipe: A Flexible Pipeline Framework for Python

GRPipe is a minimalistic and flexible computational pipeline framework for Python that allows you to create complex data processing workflows. It provides a clean and intuitive API for defining, connecting, and executing pipeline steps, with built-in support for caching, argument binding, and parameter management.

Release Build status codecov Commit activity License

Computation pipelines with non-linear dependency graphs

Features

  • Easy-to-use decorator-based API for defining pipeline steps
  • Automatic dependency resolution between steps
  • Built-in caching mechanism for improved performance
  • Flexible argument binding and parameter management
  • Support for both linear and branching pipelines
  • Verbose logging option for debugging and monitoring

Installation

You can install GRPipe using pip:

pip install grpipe

Quickstart

Arguments vs. Parameters

In GRPipe, there's an important distinction between arguments and parameters:

  • Arguments are either external inputs passed to the final pipeline or outputs from other internal steps. They are defined using the Argument class and connected between steps.
  • Parameters are tunable settings for individual steps. They can be adjusted globally using the set_params method of the pipeline.

Here's an example illustrating the difference:

# Define arguments
data = Argument("data")
threshold = Argument("threshold")


# `operation` is a parameter of the step that can be adjusted
@step(args={"input_data": data, "cutoff": threshold})
def process_data(input_data, cutoff, operation="mean"):
    if operation == "mean":
        result = input_data[input_data > cutoff].mean()
    elif operation == "std":
        result = input_data[input_data > cutoff].std()
    else:
        raise ValueError("Invalid operation")
    return result


@step(args={"processed": process_data})
def format_result(processed):
    return f"The result is: {processed:.2f}"


# Create the pipeline
pipeline = format_result

# Run the pipeline with different arguments
result1 = pipeline(data=np.arange(100), threshold=2)
print(result1)

# Adjust a parameter
pipeline.set_params(process_data__operation="std")

# Run again with the same arguments but different parameters
result2 = pipeline(data=np.arange(100), threshold=2)
print(result2)

# Bind the value of threshold:
pipeline.bind(threshold=50)

result3 = pipeline(data=np.arange(100))
print(result3)

In this example, data and threshold are arguments that can be passed when running the pipeline, while operation is a parameter that can be adjusted using set_params.

Branching Pipelines

GRPipe supports creating complex branching pipelines. Here's an example of a more advanced pipeline for data analysis:

import polars as pl
from grpipe import Argument, step

# Define arguments
data_source = Argument("data_source")
analysis_type = Argument("analysis_type")

@step(args={"source": data_source})
def load_data(source):
    return pl.read_csv(source)

@step(args={"data": load_data})
def preprocess(data, fill_value=0):
    return data.drop_nulls().fill_nan(fill_value)

@step(args={"data": preprocess})
def calculate_statistics(data):
    return {
        "mean": data.mean().to_dict(),
        "median": data.median().to_dict(),
        "std_dev": data.std().to_dict()
    }

@step(args={"data": preprocess})
def perform_clustering(data, n_clusters = 3):
    # Implement your clustering logic here
    return {"clusters": [f"Cluster {i}" for i in range(n_clusters)]}

@step(args={"stats": calculate_statistics, "clusters": perform_clustering, "analysis": analysis_type})
def generate_report(stats, clusters, analysis):
    report = f"Analysis Type: {analysis}\n\n"
    report += "Statistics:\n"
    for key, value in stats.items():
        report += f"  {key}: {value}\n"
    report += f"\nNumber of clusters: {len(clusters['clusters'])}"
    return report

# Create the pipeline
analysis_pipeline = generate_report

# Run the pipeline
result = analysis_pipeline(
    data_source="path/to/your/data.csv",
    analysis_type="Exploratory Data Analysis"
)
print(result)

This example demonstrates how you can create a more complex pipeline that includes data loading, preprocessing, parallel analysis steps (statistics calculation and clustering), and report generation. It also shows how to use both arguments and parameters in a pipeline.

Caching and Performance

GRPipe automatically caches the results of each step, improving performance for repeated runs with the same inputs. You can control caching behavior for each argument:

frequently_changing_data = Argument("data", cachable=False)
stable_parameter = Argument("parameter", cachable=True)

@step(args={"data": frequently_changing_data, "param": stable_parameter})
def process_data(data, param):
    # Your processing logic here
    pass

Verbose Logging

Enable verbose logging for debugging and monitoring:

@step(verbose=True, args={"data": input_data})
def noisy_step(data):
    # This step will log detailed information about its execution
    pass

License

GRPipe is released under the MIT License. See the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

grpipe-0.0.2a0.tar.gz (12.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

grpipe-0.0.2a0-py3-none-any.whl (9.6 kB view details)

Uploaded Python 3

File details

Details for the file grpipe-0.0.2a0.tar.gz.

File metadata

  • Download URL: grpipe-0.0.2a0.tar.gz
  • Upload date:
  • Size: 12.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for grpipe-0.0.2a0.tar.gz
Algorithm Hash digest
SHA256 13b1f25dcb0b49959d486f866fe8f3a0fa0e1cf46fd6300904dc871802d0bb37
MD5 d8cc6eef262f686b81467e462adb8a16
BLAKE2b-256 bee46291fc798ea3a1a2724a01708cb59de3ea840fcae21e50d2ba6cdf229496

See more details on using hashes here.

File details

Details for the file grpipe-0.0.2a0-py3-none-any.whl.

File metadata

  • Download URL: grpipe-0.0.2a0-py3-none-any.whl
  • Upload date:
  • Size: 9.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for grpipe-0.0.2a0-py3-none-any.whl
Algorithm Hash digest
SHA256 55b10afd8d655ca5b0d721a50b498496577d178ea787e802680144659c80664a
MD5 e38b8e5ecbf9c51bc435174300367412
BLAKE2b-256 27deff011e751917133011dfdb032695bf1a2516a2881bf56bb624e4a1fd2f62

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page