Skip to main content

Flexible Pipeline Processing Framework

Project description

Pipeline Processing Framework

This module implements a flexible pipeline processing framework, designed for tracking, executing, and managing data transformations. It supports step-by-step execution, lazy execution, caching, and easy output management.

Install pip install flexpipe

Example

The following will create a tracker.json file, as well as the step MyTransform:

from flexpipe import Pipeline, Transformation

class MyTransform(Transformation):
    def process(self, *args, **kwargs):
        # You code here
        pass

with Pipeline("./out_dir") as pipe:
    pipe.start(
        MyTransform("my_arg"),
    )

Table of Contents

  1. Pipeline Class
  2. Tracker Class
  3. Transformation Class
  4. Logging

Pipeline Class

The Pipeline class coordinates a series of transformation steps, tracking the progress and managing the results of each step.

Attributes:

  • out (Path): Directory where pipeline outputs will be stored.
  • steps (list[Transformation]): List of Transformation objects representing the pipeline steps.
  • tracker (Tracker): A Tracker instance that manages tracking of pipeline steps.

Methods:

  • init(self, out: Path, steps: list["Transformation"]): Initializes the pipeline, creating an output directory and associating a list of transformation steps.
  • out: Path where the pipeline outputs will be saved.
  • steps: List of transformations to be executed.
  • start(self): Starts the pipeline, running each transformation step sequentially.
  • clean(self): Cleans up files created by the pipeline, depending on the tracker settings (e.g., cached files or step outputs).

Tracker Class

The Tracker class manages the state and progress of each step in the pipeline. It handles saving intermediate results, managing cache, and storing metadata.

Attributes:

  • out (Path): Directory where pipeline outputs will be stored.
  • fname (Path): Path to the tracker JSON file that stores the progress of each step.
  • steps (dict): Dictionary of steps with metadata tracking their start and end.
  • _meta (dict): Metadata about each step, including caching and deletion options.

Methods:

  • init(self, parent: Pipeline): Initializes the tracker, loading previous progress if available from tracker.json.
  • parent: The parent Pipeline instance.
  • clean_all(self): Cleans up all tracked outputs based on step metadata, such as deleting step outputs or cached CSV files.
  • start_step(self, tr): Starts tracking a step. If the step has already been completed and the lazy flag is set, the step is skipped.
  • tr: The transformation step to be started.
  • Returns: True if the step should be skipped (due to laziness and completion), otherwise None.
  • get_filename(self, fname, _class=None): Generates and returns the output file path for a specific step and filename. Tracks the output file in the step’s metadata.
  • fname: Name of the file to track.
  • _class: The transformation class instance associated with the step.
  • end_step(self, tr, df=None, meta=None): Marks the step as complete, optionally saving a DataFrame and metadata.
  • tr: The transformation step that just finished.
  • df: DataFrame to be saved (optional).
  • meta: Metadata to be saved (optional).
  • save(self): Saves the current progress and step information into the tracker.json file.
  • get_last_df(self) -> str: Retrieves the filename of the last output DataFrame from the most recent completed step.
  • Returns: Filename (string) of the most recent DataFrame output.

Transformation Class

The Transformation class defines individual steps in the pipeline. Each transformation can process data and optionally read from the output of the previous step.

Attributes:

  • args: Arguments passed to the transformation during initialization.
  • kwargs: Keyword arguments passed to the transformation during initialization.
  • lazy (bool): If True, the transformation is skipped if it has already been completed.
  • df (pd.DataFrame): The result DataFrame after processing the transformation.
  • meta (dict): Metadata associated with the transformation.
  • prev_df_fname (str): Filename of the previous step’s output DataFrame.
  • prev_df (pd.DataFrame): DataFrame from the previous step.
  • delete_step (bool): If True, the step’s output will be deleted during cleanup.
  • df_delete_cache (bool): If True, cached DataFrame files will be deleted during cleanup.

Methods:

__init__(self, *args, lazy=False, **kwargs):

Initializes the transformation with optional arguments and keyword arguments. The lazy flag can be used to skip already completed transformations.

  • args: Positional arguments for the transformation.
  • lazy: Whether to skip this step if it has already been completed.
  • kwargs: Additional keyword arguments.
  • _run(self, ctx: Pipeline): Runs the transformation step, retrieving the output from the previous step (if applicable), calling the transformation’s process method, and saving the output.
  • ctx: The parent Pipeline instance.

Logging

The module uses Python’s standard logging library for debugging and tracking events during the pipeline execution.

The logger is instantiated as log and is used to record information such as when steps are skipped, when files are deleted, and general debugging information.

Conclusion

This concludes the documentation for the pipeline processing framework. Each class and function works together to provide a flexible, lazy-execution pipeline with automatic tracking and caching capabilities.

Docker

docker build -t flexpipe . --no-cache
docker run -it --rm flexpipe bash  # debug

docker run -d --name flexpipe_ct flexpipe:latest
docker stop flexpipe_ct
docker rm flexpipe_ct
docker rmi flexpipe

PYPI

rm -rf build dist
python setup.py sdist bdist_wheel
python -m twine upload dist/*

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flexpipe-2.2.0.tar.gz (7.4 kB view details)

Uploaded Source

Built Distribution

flexpipe-2.2.0-py3-none-any.whl (8.5 kB view details)

Uploaded Python 3

File details

Details for the file flexpipe-2.2.0.tar.gz.

File metadata

  • Download URL: flexpipe-2.2.0.tar.gz
  • Upload date:
  • Size: 7.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.5

File hashes

Hashes for flexpipe-2.2.0.tar.gz
Algorithm Hash digest
SHA256 6e77f343f3b30dbb828d6cb88c45a65af6b2c08d4bc957930b9517d33204c422
MD5 963186e8aab431ee3bfc0228b026cfdd
BLAKE2b-256 8228ad1116389461fb50089f61044a45686dce1ed411d688a26418fbdac6d992

See more details on using hashes here.

File details

Details for the file flexpipe-2.2.0-py3-none-any.whl.

File metadata

  • Download URL: flexpipe-2.2.0-py3-none-any.whl
  • Upload date:
  • Size: 8.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.5

File hashes

Hashes for flexpipe-2.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a2038a545414ade8a34573bb941281bd26b48c4563d6d5d73285f81d4dca77c6
MD5 b949e930d7e1eb93857e97a1f0e60b37
BLAKE2b-256 1016e9ac34a900ae100ca39462e51b84fe57b16e1ad4ff53a72918288fb9bb27

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page