Flexible Pipeline Processing Framework
Project description
Pipeline Processing Framework
This module implements a flexible pipeline processing framework, designed for tracking, executing, and managing data transformations. It supports step-by-step execution, lazy execution, caching, and easy output management.
Install
pip install flexpipe
Example
The following will create a tracker.json
file, as well as the step MyTransform
:
from flexpipe import Pipeline, Transformation
class MyTransform(Transformation):
def process(self, *args, **kwargs):
# You code here
pass
with Pipeline("./out_dir") as pipe:
pipe.start(
MyTransform("my_arg"),
)
Table of Contents
- Pipeline Class
- Tracker Class
- Transformation Class
- Logging
Pipeline Class
The Pipeline class coordinates a series of transformation steps, tracking the progress and managing the results of each step.
Attributes:
- out (Path): Directory where pipeline outputs will be stored.
- steps (list[Transformation]): List of Transformation objects representing the pipeline steps.
- tracker (Tracker): A Tracker instance that manages tracking of pipeline steps.
Methods:
- init(self, out: Path, steps: list["Transformation"]): Initializes the pipeline, creating an output directory and associating a list of transformation steps.
- out: Path where the pipeline outputs will be saved.
- steps: List of transformations to be executed.
- start(self): Starts the pipeline, running each transformation step sequentially.
- clean(self): Cleans up files created by the pipeline, depending on the tracker settings (e.g., cached files or step outputs).
Tracker Class
The Tracker class manages the state and progress of each step in the pipeline. It handles saving intermediate results, managing cache, and storing metadata.
Attributes:
- out (Path): Directory where pipeline outputs will be stored.
- fname (Path): Path to the tracker JSON file that stores the progress of each step.
- steps (dict): Dictionary of steps with metadata tracking their start and end.
- _meta (dict): Metadata about each step, including caching and deletion options.
Methods:
- init(self, parent: Pipeline): Initializes the tracker, loading previous progress if available from tracker.json.
- parent: The parent Pipeline instance.
- clean_all(self): Cleans up all tracked outputs based on step metadata, such as deleting step outputs or cached CSV files.
- start_step(self, tr): Starts tracking a step. If the step has already been completed and the lazy flag is set, the step is skipped.
- tr: The transformation step to be started.
- Returns: True if the step should be skipped (due to laziness and completion), otherwise None.
- get_filename(self, fname, _class=None): Generates and returns the output file path for a specific step and filename. Tracks the output file in the step’s metadata.
- fname: Name of the file to track.
- _class: The transformation class instance associated with the step.
- end_step(self, tr, df=None, meta=None): Marks the step as complete, optionally saving a DataFrame and metadata.
- tr: The transformation step that just finished.
- df: DataFrame to be saved (optional).
- meta: Metadata to be saved (optional).
- save(self): Saves the current progress and step information into the tracker.json file.
- get_last_df(self) -> str: Retrieves the filename of the last output DataFrame from the most recent completed step.
- Returns: Filename (string) of the most recent DataFrame output.
Transformation Class
The Transformation class defines individual steps in the pipeline. Each transformation can process data and optionally read from the output of the previous step.
Attributes:
- args: Arguments passed to the transformation during initialization.
- kwargs: Keyword arguments passed to the transformation during initialization.
- lazy (bool): If True, the transformation is skipped if it has already been completed.
- df (pd.DataFrame): The result DataFrame after processing the transformation.
- meta (dict): Metadata associated with the transformation.
- prev_df_fname (str): Filename of the previous step’s output DataFrame.
- prev_df (pd.DataFrame): DataFrame from the previous step.
- delete_step (bool): If True, the step’s output will be deleted during cleanup.
- df_delete_cache (bool): If True, cached DataFrame files will be deleted during cleanup.
Methods:
__init__(self, *args, lazy=False, **kwargs):
Initializes the transformation with optional arguments and keyword arguments. The lazy flag can be used to skip already completed transformations.
- args: Positional arguments for the transformation.
- lazy: Whether to skip this step if it has already been completed.
- kwargs: Additional keyword arguments.
- _run(self, ctx: Pipeline): Runs the transformation step, retrieving the output from the previous step (if applicable), calling the transformation’s process method, and saving the output.
- ctx: The parent Pipeline instance.
Logging
The module uses Python’s standard logging library for debugging and tracking events during the pipeline execution.
The logger is instantiated as log and is used to record information such as when steps are skipped, when files are deleted, and general debugging information.
Conclusion
This concludes the documentation for the pipeline processing framework. Each class and function works together to provide a flexible, lazy-execution pipeline with automatic tracking and caching capabilities.
Docker
docker build -t flexpipe . --no-cache
docker run -it --rm flexpipe bash # debug
docker run -d --name flexpipe_ct flexpipe:latest
docker stop flexpipe_ct
docker rm flexpipe_ct
docker rmi flexpipe
PYPI
rm -rf build dist
python setup.py sdist bdist_wheel
python -m twine upload dist/*
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file flexpipe-2.2.0.tar.gz
.
File metadata
- Download URL: flexpipe-2.2.0.tar.gz
- Upload date:
- Size: 7.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.11.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6e77f343f3b30dbb828d6cb88c45a65af6b2c08d4bc957930b9517d33204c422 |
|
MD5 | 963186e8aab431ee3bfc0228b026cfdd |
|
BLAKE2b-256 | 8228ad1116389461fb50089f61044a45686dce1ed411d688a26418fbdac6d992 |
File details
Details for the file flexpipe-2.2.0-py3-none-any.whl
.
File metadata
- Download URL: flexpipe-2.2.0-py3-none-any.whl
- Upload date:
- Size: 8.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.11.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | a2038a545414ade8a34573bb941281bd26b48c4563d6d5d73285f81d4dca77c6 |
|
MD5 | b949e930d7e1eb93857e97a1f0e60b37 |
|
BLAKE2b-256 | 1016e9ac34a900ae100ca39462e51b84fe57b16e1ad4ff53a72918288fb9bb27 |