Skip to main content

Computation pipelines with non-linear dependency graphs

Project description

GRPipe: A Flexible Pipeline Framework for Python

GRPipe is a minimalistic and flexible computational pipeline framework for Python that allows you to create complex data processing workflows. It provides a clean and intuitive API for defining, connecting, and executing pipeline steps, with built-in support for caching, argument binding, and parameter management.

Release Build status codecov Commit activity License

Computation pipelines with non-linear dependency graphs

Features

  • Easy-to-use decorator-based API for defining pipeline steps
  • Automatic dependency resolution between steps
  • Built-in caching mechanism for improved performance
  • Flexible argument binding and parameter management
  • Support for both linear and branching pipelines
  • Verbose logging option for debugging and monitoring

Installation

You can install GRPipe using pip:

pip install grpipe

Quickstart

Arguments vs. Parameters

In GRPipe, there's an important distinction between arguments and parameters:

  • Arguments are either external inputs passed to the final pipeline or outputs from other internal steps. They are defined using the Argument class and connected between steps.
  • Parameters are tunable settings for individual steps. They can be adjusted globally using the set_params method of the pipeline.

Here's an example illustrating the difference:

# Define arguments
data = Argument("data")
threshold = Argument("threshold")


# `operation` is a parameter of the step that can be adjusted
@step(args={"input_data": data, "cutoff": threshold})
def process_data(input_data, cutoff, operation="mean"):
    if operation == "mean":
        result = input_data[input_data > cutoff].mean()
    elif operation == "std":
        result = input_data[input_data > cutoff].std()
    else:
        raise ValueError("Invalid operation")
    return result


@step(args={"processed": process_data})
def format_result(processed):
    return f"The result is: {processed:.2f}"


# Create the pipeline
pipeline = format_result

# Run the pipeline with different arguments
result1 = pipeline(data=np.arange(100), threshold=2)
print(result1)

# Adjust a parameter
pipeline.set_params(process_data__operation="std")

# Run again with the same arguments but different parameters
result2 = pipeline(data=np.arange(100), threshold=2)
print(result2)

# Bind the value of threshold:
pipeline.bind(threshold=50)

result3 = pipeline(data=np.arange(100))
print(result3)

In this example, data and threshold are arguments that can be passed when running the pipeline, while operation is a parameter that can be adjusted using set_params.

Branching Pipelines

GRPipe supports creating complex branching pipelines. Here's an example of a more advanced pipeline for data analysis:

import polars as pl
from grpipe import Argument, step

# Define arguments
data_source = Argument("data_source")
analysis_type = Argument("analysis_type")

@step(args={"source": data_source})
def load_data(source):
    return pl.read_csv(source)

@step(args={"data": load_data})
def preprocess(data, fill_value=0):
    return data.drop_nulls().fill_nan(fill_value)

@step(args={"data": preprocess})
def calculate_statistics(data):
    return {
        "mean": data.mean().to_dict(),
        "median": data.median().to_dict(),
        "std_dev": data.std().to_dict()
    }

@step(args={"data": preprocess})
def perform_clustering(data, n_clusters = 3):
    # Implement your clustering logic here
    return {"clusters": [f"Cluster {i}" for i in range(n_clusters)]}

@step(args={"stats": calculate_statistics, "clusters": perform_clustering, "analysis": analysis_type})
def generate_report(stats, clusters, analysis):
    report = f"Analysis Type: {analysis}\n\n"
    report += "Statistics:\n"
    for key, value in stats.items():
        report += f"  {key}: {value}\n"
    report += f"\nNumber of clusters: {len(clusters['clusters'])}"
    return report

# Create the pipeline
analysis_pipeline = generate_report

# Run the pipeline
result = analysis_pipeline(
    data_source="path/to/your/data.csv",
    analysis_type="Exploratory Data Analysis"
)
print(result)

This example demonstrates how you can create a more complex pipeline that includes data loading, preprocessing, parallel analysis steps (statistics calculation and clustering), and report generation. It also shows how to use both arguments and parameters in a pipeline.

Caching and Performance

GRPipe automatically caches the results of each step, improving performance for repeated runs with the same inputs. You can control caching behavior for each argument:

frequently_changing_data = Argument("data", cachable=False)
stable_parameter = Argument("parameter", cachable=True)

@step(args={"data": frequently_changing_data, "param": stable_parameter})
def process_data(data, param):
    # Your processing logic here
    pass

Verbose Logging

Enable verbose logging for debugging and monitoring:

@step(verbose=True, args={"data": input_data})
def noisy_step(data):
    # This step will log detailed information about its execution
    pass

Generating Flowcharts

GRPipe allows you to generate flowcharts of your pipeline using the pipeline.draw() method. This method generates a flowchart in mermaid markdown format, which can be easily visualized.

Here's an example of how to generate a flowchart for a simple pipeline:

from grpipe import Argument, step, Pipeline

# Define arguments
data = Argument("data")
threshold = Argument("threshold")

@step(args={"input_data": data, "cutoff": threshold})
def process_data(input_data, cutoff, operation="mean"):
    if operation == "mean":
        result = input_data[input_data > cutoff].mean()
    elif operation == "std":
        result = input_data[input_data > cutoff].std()
    else:
        raise ValueError("Invalid operation")
    return result

@step(args={"processed": process_data})
def format_result(processed):
    return f"The result is: {processed:.2f}"

# Create the pipeline
pipeline = Pipeline(format_result)

# Generate the flowchart
flowchart = pipeline.draw(params=True)
print(flowchart)

This will generate a flowchart in mermaid markdown format, which can be visualized using mermaid tools. The params parameter allows you to include parameters in the flowchart.

License

GRPipe is released under the MIT License. See the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

grpipe-0.0.4.tar.gz (14.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

grpipe-0.0.4-py3-none-any.whl (12.6 kB view details)

Uploaded Python 3

File details

Details for the file grpipe-0.0.4.tar.gz.

File metadata

  • Download URL: grpipe-0.0.4.tar.gz
  • Upload date:
  • Size: 14.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.12.8

File hashes

Hashes for grpipe-0.0.4.tar.gz
Algorithm Hash digest
SHA256 8e017fa6d8270b6cfd19763bdb4875aa43dc814816b22c49017f3be8d6cadbb9
MD5 847754339023606f0476b21606775074
BLAKE2b-256 26ce01e555b75d555d6245b0e68174f266209a3d2571fe944a2a00b4065adf87

See more details on using hashes here.

File details

Details for the file grpipe-0.0.4-py3-none-any.whl.

File metadata

  • Download URL: grpipe-0.0.4-py3-none-any.whl
  • Upload date:
  • Size: 12.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.12.8

File hashes

Hashes for grpipe-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 25d9d4b9cf9572e2ced43a9018e2019edd0b02d5a7527b8f4687b6732c2bdbf2
MD5 c30c8515f6b3765cf2e429a8ea641e9b
BLAKE2b-256 8fb147a325b45459694844d7214ed2b84422f3147b90f3644753036ec5956cdf

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page