Skip to main content

Computation pipelines with non-linear dependency graphs

Project description

GRPipe: A Flexible Pipeline Framework for Python

GRPipe is a minimalistic and flexible computational pipeline framework for Python that allows you to create complex data processing workflows. It provides a clean and intuitive API for defining, connecting, and executing pipeline steps, with built-in support for caching, argument binding, and parameter management.

Release Build status codecov Commit activity License

Computation pipelines with non-linear dependency graphs

Features

  • Easy-to-use decorator-based API for defining pipeline steps
  • Automatic dependency resolution between steps
  • Built-in caching mechanism for improved performance
  • Flexible argument binding and parameter management
  • Support for both linear and branching pipelines
  • Verbose logging option for debugging and monitoring

Installation

You can install GRPipe using pip:

pip install grpipe

Quickstart

Arguments vs. Parameters

In GRPipe, there's an important distinction between arguments and parameters:

  • Arguments are either external inputs passed to the final pipeline or outputs from other internal steps. They are defined using the Argument class and connected between steps.
  • Parameters are tunable settings for individual steps. They can be adjusted globally using the set_params method of the pipeline.

Here's an example illustrating the difference:

# Define arguments
data = Argument("data")
threshold = Argument("threshold")


# `operation` is a parameter of the step that can be adjusted
@step(args={"input_data": data, "cutoff": threshold})
def process_data(input_data, cutoff, operation="mean"):
    if operation == "mean":
        result = input_data[input_data > cutoff].mean()
    elif operation == "std":
        result = input_data[input_data > cutoff].std()
    else:
        raise ValueError("Invalid operation")
    return result


@step(args={"processed": process_data})
def format_result(processed):
    return f"The result is: {processed:.2f}"


# Create the pipeline
pipeline = format_result

# Run the pipeline with different arguments
result1 = pipeline(data=np.arange(100), threshold=2)
print(result1)

# Adjust a parameter
pipeline.set_params(process_data__operation="std")

# Run again with the same arguments but different parameters
result2 = pipeline(data=np.arange(100), threshold=2)
print(result2)

# Bind the value of threshold:
pipeline.bind(threshold=50)

result3 = pipeline(data=np.arange(100))
print(result3)

In this example, data and threshold are arguments that can be passed when running the pipeline, while operation is a parameter that can be adjusted using set_params.

Branching Pipelines

GRPipe supports creating complex branching pipelines. Here's an example of a more advanced pipeline for data analysis:

import polars as pl
from grpipe import Argument, step

# Define arguments
data_source = Argument("data_source")
analysis_type = Argument("analysis_type")

@step(args={"source": data_source})
def load_data(source):
    return pl.read_csv(source)

@step(args={"data": load_data})
def preprocess(data, fill_value=0):
    return data.drop_nulls().fill_nan(fill_value)

@step(args={"data": preprocess})
def calculate_statistics(data):
    return {
        "mean": data.mean().to_dict(),
        "median": data.median().to_dict(),
        "std_dev": data.std().to_dict()
    }

@step(args={"data": preprocess})
def perform_clustering(data, n_clusters = 3):
    # Implement your clustering logic here
    return {"clusters": [f"Cluster {i}" for i in range(n_clusters)]}

@step(args={"stats": calculate_statistics, "clusters": perform_clustering, "analysis": analysis_type})
def generate_report(stats, clusters, analysis):
    report = f"Analysis Type: {analysis}\n\n"
    report += "Statistics:\n"
    for key, value in stats.items():
        report += f"  {key}: {value}\n"
    report += f"\nNumber of clusters: {len(clusters['clusters'])}"
    return report

# Create the pipeline
analysis_pipeline = generate_report

# Run the pipeline
result = analysis_pipeline(
    data_source="path/to/your/data.csv",
    analysis_type="Exploratory Data Analysis"
)
print(result)

This example demonstrates how you can create a more complex pipeline that includes data loading, preprocessing, parallel analysis steps (statistics calculation and clustering), and report generation. It also shows how to use both arguments and parameters in a pipeline.

Caching and Performance

GRPipe automatically caches the results of each step, improving performance for repeated runs with the same inputs. You can control caching behavior for each argument:

frequently_changing_data = Argument("data", cachable=False)
stable_parameter = Argument("parameter", cachable=True)

@step(args={"data": frequently_changing_data, "param": stable_parameter})
def process_data(data, param):
    # Your processing logic here
    pass

Verbose Logging

Enable verbose logging for debugging and monitoring:

@step(verbose=True, args={"data": input_data})
def noisy_step(data):
    # This step will log detailed information about its execution
    pass

Generating Flowcharts

GRPipe allows you to generate flowcharts of your pipeline using the pipeline.draw() method. This method generates a flowchart in mermaid markdown format, which can be easily visualized.

Here's an example of how to generate a flowchart for a simple pipeline:

from grpipe import Argument, step, Pipeline

# Define arguments
data = Argument("data")
threshold = Argument("threshold")

@step(args={"input_data": data, "cutoff": threshold})
def process_data(input_data, cutoff, operation="mean"):
    if operation == "mean":
        result = input_data[input_data > cutoff].mean()
    elif operation == "std":
        result = input_data[input_data > cutoff].std()
    else:
        raise ValueError("Invalid operation")
    return result

@step(args={"processed": process_data})
def format_result(processed):
    return f"The result is: {processed:.2f}"

# Create the pipeline
pipeline = Pipeline(format_result)

# Generate the flowchart
flowchart = pipeline.draw(params=True)
print(flowchart)

This will generate a flowchart in mermaid markdown format, which can be visualized using mermaid tools. The params parameter allows you to include parameters in the flowchart.

License

GRPipe is released under the MIT License. See the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

grpipe-0.0.3.tar.gz (14.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

grpipe-0.0.3-py3-none-any.whl (12.4 kB view details)

Uploaded Python 3

File details

Details for the file grpipe-0.0.3.tar.gz.

File metadata

  • Download URL: grpipe-0.0.3.tar.gz
  • Upload date:
  • Size: 14.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for grpipe-0.0.3.tar.gz
Algorithm Hash digest
SHA256 0dddd8b6b3ed1a8e6d377ee25aea9cd49c9b7aff402134901a6927400e29727a
MD5 a96afda8deee8d75ae75f33e9ce99d44
BLAKE2b-256 292533422e3f0b61eee5de6194e871077e25f01678030b65b8f95704c8347f4d

See more details on using hashes here.

File details

Details for the file grpipe-0.0.3-py3-none-any.whl.

File metadata

  • Download URL: grpipe-0.0.3-py3-none-any.whl
  • Upload date:
  • Size: 12.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for grpipe-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 9cedcde27ad9d21d6c576f4f9af1d4e99ec793874adf47057c0f3ec4d93d5293
MD5 48436a4a2e35fce43af3674bb48ad79a
BLAKE2b-256 47f9e65e00a907fe7195ed66736004a605dcf4d68271b51a2df1ee3c354e1799

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page