A package for caching repeat runs of pipelines that have expensive operations along the way
Project description
Pickled Pipeline
A Python package for caching repeat runs of pipelines that have expensive operations along the way.
Overview
pickled_pipeline
provides a simple and elegant way to cache the outputs of functions within a pipeline, especially when those functions involve expensive computations, such as calls to Large Language Models (LLMs) or other resource-intensive operations. By caching intermediate results, you can save time and computational resources during iterative development and testing.
Features
- Function Caching: Use decorators to cache function outputs based on their inputs.
- Checkpointing: Assign checkpoints to pipeline steps to manage caching and recomputation.
- Cache Truncation: Remove cached results from a specific checkpoint onwards to recompute parts of the pipeline.
- Input Sensitivity: Cache keys are sensitive to function arguments, ensuring that different inputs result in different cache entries.
- Easy Integration: Minimal changes to your existing codebase are needed to integrate caching.
Installation
Using PDM
pickled_pipeline
can be installed using PDM:
pdm add pickled_pipeline
Using pip
Alternatively, you can install pickled_pipeline
using pip:
pip install pickled_pipeline
Usage
Importing the Cache Class
First, import the Cache
class from the pickled_pipeline
package and create an instance of it:
from pickled_pipeline import Cache
cache = Cache(cache_dir="my_cache_directory")
cache_dir
: Optional parameter to specify the directory where cache files will be stored. Defaults to"pipeline_cache"
.
Decorating Functions with @cache.checkpoint
Use the @cache.checkpoint()
decorator to cache the outputs of your functions:
@cache.checkpoint()
def step1_user_input(user_text):
# Your code here
return user_text
By default, the checkpoint name is the name of the function being decorated. If you wish to specify a custom name, you can pass it as an argument:
@cache.checkpoint(name="custom_checkpoint_name")
def my_function(...):
# Your code here
pass
This flexibility allows you to simplify your code and reduce redundancy when the function name suffices as a unique identifier.
Building a Pipeline
Here's an example of how to build a pipeline using cached functions:
def run_pipeline(user_text):
text = step1_user_input(user_text)
enhanced_text = step2_enhance_text(text)
document = step3_produce_document(enhanced_text)
documents = step4_generate_additional_documents(document)
summary = step5_summarize_documents(documents)
return summary
Example Functions
@cache.checkpoint(name="step2_enhance_text")
def step2_enhance_text(text):
# Simulate an expensive operation
enhanced_text = text.upper()
return enhanced_text
@cache.checkpoint(name="step3_produce_document")
def step3_produce_document(enhanced_text):
document = f"Document based on: {enhanced_text}"
return document
@cache.checkpoint(name="step4_generate_additional_documents")
def step4_generate_additional_documents(document):
documents = [f"{document} - Version {i}" for i in range(3)]
return documents
@cache.checkpoint(name="step5_summarize_documents")
def step5_summarize_documents(documents):
summary = "Summary of documents: " + ", ".join(documents)
return summary
Running the Pipeline
if __name__ == "__main__":
user_text = "Initial input from user."
summary = run_pipeline(user_text)
print(summary)
Truncating the Cache
If you need to recompute parts of the pipeline, you can truncate the cache from a specific checkpoint:
cache.truncate_cache("step3_produce_document")
This will remove cached results from "step3_produce_document"
onwards, forcing the pipeline to recompute those steps the next time it's run.
Examples
Full Pipeline Example
from pickled_pipeline import Cache
cache = Cache(cache_dir="my_cache_directory")
@cache.checkpoint(name="step1_user_input")
def step1_user_input(user_text):
return user_text
@cache.checkpoint(name="step2_enhance_text")
def step2_enhance_text(text):
# Simulate an expensive operation
enhanced_text = text.upper()
return enhanced_text
# ... (other steps)
def run_pipeline(user_text):
text = step1_user_input(user_text)
enhanced_text = step2_enhance_text(text)
# ... (other steps)
return summary
if __name__ == "__main__":
user_text = "Initial input from user."
summary = run_pipeline(user_text)
print(summary)
Handling Different Inputs
The cache system is sensitive to function arguments. Running the pipeline with different inputs will result in new computations and cache entries.
# First run with initial input
summary1 = run_pipeline("First input from user.")
# Second run with different input
summary2 = run_pipeline("Second input from user.")
Contributing
Contributions are welcome! Please open an issue or submit a pull request.
License
This project is licensed under the MIT License.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file pickled_pipeline-0.1.0.tar.gz
.
File metadata
- Download URL: pickled_pipeline-0.1.0.tar.gz
- Upload date:
- Size: 5.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.0.0 CPython/3.12.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 12deab575cb8e2649effa496c21fb4034a1f528645532a8b7ecb299141d1829e |
|
MD5 | cac1af062a522a79ca9b3a82ea070328 |
|
BLAKE2b-256 | 0b35ea5fe6d4c5a05ef077bcccea31f8155f9b4bdac93e1de090d10b8671bee2 |
File details
Details for the file pickled_pipeline-0.1.0-py3-none-any.whl
.
File metadata
- Download URL: pickled_pipeline-0.1.0-py3-none-any.whl
- Upload date:
- Size: 7.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.0.0 CPython/3.12.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | fa22383a9297f4bb521d8a942e131c5f0f448231ecc5c5f52d5011e5721342cf |
|
MD5 | 06f9e4fed8c5d07c5b5c7bbe0532578b |
|
BLAKE2b-256 | 04fd05fc58e38429c2755cbda0c5747a94361afc0ad4773d2b2c5cbaf23e6881 |