Skip to main content

Flexible Pipeline Processing Framework

Project description

Pipeline Processing Framework

This module implements a flexible pipeline processing framework, designed for tracking, executing, and managing data transformations. It supports step-by-step execution, lazy execution, caching, and easy output management.

Install pip install flexpipe

Table of Contents

  1. Pipeline Class
  2. Tracker Class
  3. Transformation Class
  4. Logging

Pipeline Class

The Pipeline class coordinates a series of transformation steps, tracking the progress and managing the results of each step.

Attributes:

  • out (Path): Directory where pipeline outputs will be stored.
  • steps (list[Transformation]): List of Transformation objects representing the pipeline steps.
  • tracker (Tracker): A Tracker instance that manages tracking of pipeline steps.

Methods:

  • init(self, out: Path, steps: list["Transformation"]): Initializes the pipeline, creating an output directory and associating a list of transformation steps.
  • out: Path where the pipeline outputs will be saved.
  • steps: List of transformations to be executed.
  • start(self): Starts the pipeline, running each transformation step sequentially.
  • clean(self): Cleans up files created by the pipeline, depending on the tracker settings (e.g., cached files or step outputs).

Tracker Class

The Tracker class manages the state and progress of each step in the pipeline. It handles saving intermediate results, managing cache, and storing metadata.

Attributes:

  • out (Path): Directory where pipeline outputs will be stored.
  • fname (Path): Path to the tracker JSON file that stores the progress of each step.
  • steps (dict): Dictionary of steps with metadata tracking their start and end.
  • _meta (dict): Metadata about each step, including caching and deletion options.

Methods:

  • init(self, parent: Pipeline): Initializes the tracker, loading previous progress if available from tracker.json.
  • parent: The parent Pipeline instance.
  • clean_all(self): Cleans up all tracked outputs based on step metadata, such as deleting step outputs or cached CSV files.
  • start_step(self, tr): Starts tracking a step. If the step has already been completed and the lazy flag is set, the step is skipped.
  • tr: The transformation step to be started.
  • Returns: True if the step should be skipped (due to laziness and completion), otherwise None.
  • get_filename(self, fname, _class=None): Generates and returns the output file path for a specific step and filename. Tracks the output file in the step’s metadata.
  • fname: Name of the file to track.
  • _class: The transformation class instance associated with the step.
  • end_step(self, tr, df=None, meta=None): Marks the step as complete, optionally saving a DataFrame and metadata.
  • tr: The transformation step that just finished.
  • df: DataFrame to be saved (optional).
  • meta: Metadata to be saved (optional).
  • save(self): Saves the current progress and step information into the tracker.json file.
  • get_last_df(self) -> str: Retrieves the filename of the last output DataFrame from the most recent completed step.
  • Returns: Filename (string) of the most recent DataFrame output.

Transformation Class

The Transformation class defines individual steps in the pipeline. Each transformation can process data and optionally read from the output of the previous step.

Attributes:

  • args: Arguments passed to the transformation during initialization.
  • kwargs: Keyword arguments passed to the transformation during initialization.
  • lazy (bool): If True, the transformation is skipped if it has already been completed.
  • df (pd.DataFrame): The result DataFrame after processing the transformation.
  • meta (dict): Metadata associated with the transformation.
  • prev_df_fname (str): Filename of the previous step’s output DataFrame.
  • prev_df (pd.DataFrame): DataFrame from the previous step.
  • delete_step (bool): If True, the step’s output will be deleted during cleanup.
  • df_delete_cache (bool): If True, cached DataFrame files will be deleted during cleanup.

Methods:

__init__(self, *args, lazy=False, **kwargs):

Initializes the transformation with optional arguments and keyword arguments. The lazy flag can be used to skip already completed transformations.

  • args: Positional arguments for the transformation.
  • lazy: Whether to skip this step if it has already been completed.
  • kwargs: Additional keyword arguments.
  • _run(self, ctx: Pipeline): Runs the transformation step, retrieving the output from the previous step (if applicable), calling the transformation’s process method, and saving the output.
  • ctx: The parent Pipeline instance.

Logging

The module uses Python’s standard logging library for debugging and tracking events during the pipeline execution.

The logger is instantiated as log and is used to record information such as when steps are skipped, when files are deleted, and general debugging information.

Conclusion

This concludes the documentation for the pipeline processing framework. Each class and function works together to provide a flexible, lazy-execution pipeline with automatic tracking and caching capabilities.

Docker

docker build -t flexpipe . --no-cache
docker run -it --rm flexpipe bash  # debug

docker run -d --name flexpipe_ct flexpipe:latest
docker stop flexpipe_ct
docker rm flexpipe_ct
docker rmi flexpipe

PYPI

rm -rf build dist
python setup.py sdist bdist_wheel
python -m twine upload dist/*

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flexpipe-2.1.0.tar.gz (6.9 kB view details)

Uploaded Source

Built Distribution

flexpipe-2.1.0-py3-none-any.whl (7.9 kB view details)

Uploaded Python 3

File details

Details for the file flexpipe-2.1.0.tar.gz.

File metadata

  • Download URL: flexpipe-2.1.0.tar.gz
  • Upload date:
  • Size: 6.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.5

File hashes

Hashes for flexpipe-2.1.0.tar.gz
Algorithm Hash digest
SHA256 214a3e165c0cdd0e75c4c4ae9af6dd29e1188ea67cf6fe0e2fd120e443837607
MD5 0b981c7e4343f87fb6947c837f62ab74
BLAKE2b-256 144ea6573ebec9363b9f385c5566c8d58a093aeb30a0f7331055bad9446e3146

See more details on using hashes here.

File details

Details for the file flexpipe-2.1.0-py3-none-any.whl.

File metadata

  • Download URL: flexpipe-2.1.0-py3-none-any.whl
  • Upload date:
  • Size: 7.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.5

File hashes

Hashes for flexpipe-2.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d0dc80dbfba4d1b7e03f32c06c48fec4c117824334cdfac4e27af0e75229a1b8
MD5 051a908020c07b02bee7bdf9c4223d04
BLAKE2b-256 9529fdf4530f0ccfaa059148a829126a250f29dd584a5b003e6e4b1219035e18

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page