Skip to main content

Simple pipelining framework in Python

Project description

ROR

Unittesting Documentation PyPI Deployment

ROR is a pipelining framework for Python which makes it easier to define complex ML and data-processing stages.

Install it from PyPI

pip install ror

Usage

To get started with creating your first pipeline, you can base it on this example which defines a simple GMM pipeline. Firstly, we import the relevant packages.

  import matplotlib.pyplot as plt
  from sklearn import datasets
  from sklearn.mixture import GaussianMixture
  from sklearn.decomposition import PCA
  from sklearn.preprocessing import StandardScaler

  from dataclasses import dataclass
  from typing import Tuple

  from ror.schemas import BaseSchema
  from ror.schemas.fields import field_perishable, field_persistance
  from ror.stages import IInitStage, ITerminalStage, IForwardStage
  from ror.controlers import BaseController

Then we can define the schemas which will determine the structure of the data communicated between the different stages.

  @dataclass
  class InitStageInput(BaseSchema):
      data: object = field_perishable()

  @dataclass
  class InitStageOutput(BaseSchema):
      X_pca: object = field_persistance()
      X_std: object = field_perishable()
      model: object = field_persistance()

  @dataclass
  class InferenceStageOutput(BaseSchema):
      X_pca: object = field_perishable()
      model: object = field_perishable()
      labels: object = field_persistance()

  @dataclass
  class VisStageOutput(BaseSchema):
      labels: object = field_persistance()

We can then define the logical stages which will be utilizing these schemas as input and output between stages.

  class VisStage(ITerminalStage[InferenceStageOutput, VisStageOutput]):
      def compute(self) -> None:
          # Visualize the clusters
          plt.figure(figsize=(8, 6))
          colors = ['r', 'g', 'b']

          for i in range(3):
              plt.scatter(
                  self.input.X_pca[self.input.labels == i, 0],
                  self.input.X_pca[self.input.labels == i, 1],
                  color=colors[i],
                  label=f'Cluster {i+1}'
              )

          plt.title('Gaussian Mixture Model Clustering')
          plt.xlabel('Principal Component 1')
          plt.ylabel('Principal Component 2')
          plt.legend()
          plt.show()

          self._output = self.input.get_carry()

      def get_output(self) -> VisStageOutput:
          return VisStageOutput(**self._output)

  class InferenceStage(IForwardStage[InitStageOutput, InferenceStageOutput, VisStage]):
      def compute(self) -> None:
          # Fit Guassian mixture to dataset
          self.input.model.fit(self.input.X_std)

          # Predict the labels
          labels = self.input.model.predict(self.input.X_std)

          self._output = {
              "labels": labels,
              **self.input.get_carry()
          }

      def get_output(self) -> Tuple[VisStage, InferenceStageOutput]:
          return VisStage(), InferenceStageOutput(**self._output)


  class InitStage(IInitStage[InitStageInput, InitStageOutput, InferenceStage]):
      def compute(self) -> None:
          # Load the dataset
          X = self.input.data.data

          # Standardize the features
          scaler = StandardScaler()
          X_std = scaler.fit_transform(X)

          # Apply PCA to reduce dimensionality for visualization
          pca = PCA(n_components=2)
          X_pca = pca.fit_transform(X_std)

          # Fit a Gaussian Mixture Model
          gmm = GaussianMixture(n_components=3, random_state=42)

          self._output = {
              "X_pca": X_pca,
              "X_std": X_std,
              "model": gmm,
              **self.input.get_carry()
          }

      def get_output(self) -> Tuple[InferenceStage, InitStageOutput]:
          return InferenceStage(), InitStageOutput(**self._output)

Then we can define a simple controller which will be given an instance of the init stage and the input data to be passed through the pipeline.

  iris = datasets.load_iris()

  input_data = InitStageInput(data=iris)
  controller = BaseController(init_data=input_data, init_stage=InitStage)
  controller.discover() # Shows a table of the connected stages

  output, run_id = controller.start()

And that's it! With this you can define logical processing stages for your ML inference pipelines whilst keeping a high level of seperation.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ror-0.1.1.tar.gz (10.0 kB view details)

Uploaded Source

Built Distribution

ror-0.1.1-py3-none-any.whl (13.8 kB view details)

Uploaded Python 3

File details

Details for the file ror-0.1.1.tar.gz.

File metadata

  • Download URL: ror-0.1.1.tar.gz
  • Upload date:
  • Size: 10.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.11.0 Linux/6.2.0-1018-azure

File hashes

Hashes for ror-0.1.1.tar.gz
Algorithm Hash digest
SHA256 de585d0a65be7852a2a1cea69e649c865617bebae014e82bc562d313cdf3bab0
MD5 966fe21805bd59ed9f5d68cdf459f891
BLAKE2b-256 41722f97449fcabe0673b7579d39da23e23d3fbdef578083d345e325c8df1aaf

See more details on using hashes here.

File details

Details for the file ror-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: ror-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 13.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.11.0 Linux/6.2.0-1018-azure

File hashes

Hashes for ror-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 5d295861e1dee53a6be5202db31c96bb5106a44f9cafef9abd35f1f3dbb3f49f
MD5 643cbe005b8605d3d992024f0d969d06
BLAKE2b-256 f053ae4bcb977b76eb5c8855f7e322e41d36f67f22d28c3669916dcb013a4e48

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page