Skip to main content

Computation pipelines with non-linear dependency graphs

Project description

GRPipe: A Flexible Pipeline Framework for Python

GRPipe is a minimalistic and flexible computational pipeline framework for Python that allows you to create complex data processing workflows. It provides a clean and intuitive API for defining, connecting, and executing pipeline steps, with built-in support for caching, argument binding, and parameter management.

Release Build status codecov Commit activity License

Computation pipelines with non-linear dependency graphs

Features

  • Easy-to-use decorator-based API for defining pipeline steps
  • Automatic dependency resolution between steps
  • Built-in caching mechanism for improved performance
  • Flexible argument binding and parameter management
  • Support for both linear and branching pipelines
  • Verbose logging option for debugging and monitoring

Installation

You can install GRPipe using pip:

pip install grpipe

Quickstart

Arguments vs. Parameters

In GRPipe, there's an important distinction between arguments and parameters:

  • Arguments are either external inputs passed to the final pipeline or outputs from other internal steps. They are defined using the Argument class and connected between steps.
  • Parameters are tunable settings for individual steps. They can be adjusted globally using the set_params method of the pipeline.

Here's an example illustrating the difference:

# Define arguments
data = Argument("data")
threshold = Argument("threshold")


# `operation` is a parameter of the step that can be adjusted
@step(args={"input_data": data, "cutoff": threshold})
def process_data(input_data, cutoff, operation="mean"):
    if operation == "mean":
        result = input_data[input_data > cutoff].mean()
    elif operation == "std":
        result = input_data[input_data > cutoff].std()
    else:
        raise ValueError("Invalid operation")
    return result


@step(args={"processed": process_data})
def format_result(processed):
    return f"The result is: {processed:.2f}"


# Create the pipeline
pipeline = format_result

# Run the pipeline with different arguments
result1 = pipeline(data=np.arange(100), threshold=2)
print(result1)

# Adjust a parameter
pipeline.set_params(process_data__operation="std")

# Run again with the same arguments but different parameters
result2 = pipeline(data=np.arange(100), threshold=2)
print(result2)

# Bind the value of threshold:
pipeline.bind(threshold=50)

result3 = pipeline(data=np.arange(100))
print(result3)

In this example, data and threshold are arguments that can be passed when running the pipeline, while operation is a parameter that can be adjusted using set_params.

Branching Pipelines

GRPipe supports creating complex branching pipelines. Here's an example of a more advanced pipeline for data analysis:

import polars as pl
from grpipe import Argument, step

# Define arguments
data_source = Argument("data_source")
analysis_type = Argument("analysis_type")

@step(args={"source": data_source})
def load_data(source):
    return pl.read_csv(source)

@step(args={"data": load_data})
def preprocess(data, fill_value=0):
    return data.drop_nulls().fill_nan(fill_value)

@step(args={"data": preprocess})
def calculate_statistics(data):
    return {
        "mean": data.mean().to_dict(),
        "median": data.median().to_dict(),
        "std_dev": data.std().to_dict()
    }

@step(args={"data": preprocess})
def perform_clustering(data, n_clusters = 3):
    # Implement your clustering logic here
    return {"clusters": [f"Cluster {i}" for i in range(n_clusters)]}

@step(args={"stats": calculate_statistics, "clusters": perform_clustering, "analysis": analysis_type})
def generate_report(stats, clusters, analysis):
    report = f"Analysis Type: {analysis}\n\n"
    report += "Statistics:\n"
    for key, value in stats.items():
        report += f"  {key}: {value}\n"
    report += f"\nNumber of clusters: {len(clusters['clusters'])}"
    return report

# Create the pipeline
analysis_pipeline = generate_report

# Run the pipeline
result = analysis_pipeline(
    data_source="path/to/your/data.csv",
    analysis_type="Exploratory Data Analysis"
)
print(result)

This example demonstrates how you can create a more complex pipeline that includes data loading, preprocessing, parallel analysis steps (statistics calculation and clustering), and report generation. It also shows how to use both arguments and parameters in a pipeline.

Caching and Performance

GRPipe automatically caches the results of each step, improving performance for repeated runs with the same inputs. You can control caching behavior for each argument:

frequently_changing_data = Argument("data", cachable=False)
stable_parameter = Argument("parameter", cachable=True)

@step(args={"data": frequently_changing_data, "param": stable_parameter})
def process_data(data, param):
    # Your processing logic here
    pass

Verbose Logging

Enable verbose logging for debugging and monitoring:

@step(verbose=True, args={"data": input_data})
def noisy_step(data):
    # This step will log detailed information about its execution
    pass

License

GRPipe is released under the MIT License. See the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

grpipe-0.0.2.tar.gz (12.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

grpipe-0.0.2-py3-none-any.whl (9.5 kB view details)

Uploaded Python 3

File details

Details for the file grpipe-0.0.2.tar.gz.

File metadata

  • Download URL: grpipe-0.0.2.tar.gz
  • Upload date:
  • Size: 12.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for grpipe-0.0.2.tar.gz
Algorithm Hash digest
SHA256 d8d5792f1621673d0f620d4df3d3c1bf9c7e60b49229bad2a2f021bb955be85b
MD5 8ff32b9ab114b50d2b40fb338332452f
BLAKE2b-256 669b709445db9902623b292c1931a0c122fdf9311e73170daec7c60fba8cc18d

See more details on using hashes here.

File details

Details for the file grpipe-0.0.2-py3-none-any.whl.

File metadata

  • Download URL: grpipe-0.0.2-py3-none-any.whl
  • Upload date:
  • Size: 9.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for grpipe-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 3b970bff039bb6fff484cc6409ab10141c86bfda77e542872ec29397b97fa558
MD5 9961c156e24c80c54645e6ab86244088
BLAKE2b-256 e1638416d754616224961b67678c79b8770ba30e75ac43a78bfbee8539f68453

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page