Tools for making Prefect work better for typical data science workflows
Project description
prefect DS
Tools for making Prefect work better for typical data science workflows.
Install
$ pip install prefect-ds
Usage
prefect_ds
is a lightweight wrapper around Prefect
, designed
to make it easier to run workflows I typically encounter when doing data science — especially
tasks related to analyzing large datasets and building models. Specifically, it implements the
following:
PandasResultHandler
A result handler that reads to and writes from Pandas DataFrames. It should be able to handle
any file type Pandas supports, and unlike built-in handlers like LocalResultHandler
requires
the full specification of the file path — this makes it easy to inspect task results, or
use those results in other analysis. It also has support for templating, so task arguments
can be injected into the filenames (useful for things like map
).
>>> import os
>>> os.environ["PREFECT__LOGGING__LEVEL"] = "ERROR"
>>> import pandas as pd
>>> import time
>>> from prefect import task
>>> from prefect_ds.pandas_result_handler import PandasResultHandler
>>> # Note the use of the task argument {id} as a template in the filename
>>> @task(result_handler=PandasResultHandler("data_{id}.csv", "csv"))
... def demo_task(id):
... time.sleep(5)
... return pd.DataFrame({"one": [1, 2, 3], "two": [4, 5, 6]})
Note that in order to use the templating functionality of PandasResultHandler
, you will need
to run your flow using the DSTaskRunner
(see below for more details).
checkpoint_handler
and DSTaskRunner
A state handler that implements filename-based checkpointing, in concert with the specialty
result handlers in prefect_ds. It intercepts the state change from Pending
to Running
, runs the read
method of the result handler, and if successful loads
the result of that method as the result of the task, then sets the task to the Success
state. Conversely, if the read
method fails, the task is run as normal and instead the
checkpoint_handler
runs the write
method of the result handler afterwards. Using the
checkpoint_handler
makes it much easier to cache data across Prefect runs — you don't have to
explicitly persist the final flow state between runs, and you don't have to have the cache expire
after a certain amount of time.
This handler combines with DSTaskRunner
, an extension to Prefect's TaskRunner
that implements
the necessary hacks to allow for the templating of task arguments. This templating is required
to handle cases like map
, where without the templating the checkpoint_handler
will read from/write
to the same file for every iteration of the map
.
>>> import contextlib
>>> from prefect import Flow
>>> from prefect.engine import FlowRunner
>>> from prefect_ds.checkpoint_handler import checkpoint_handler
>>> from prefect_ds.task_runner import DSTaskRunner
>>> with Flow("test") as flow:
... output = demo_task(1)
>>> # First, clean up any existing task results
>>> with contextlib.suppress(FileNotFoundError):
... os.remove("data_1.csv")
>>> # Run the flow. Since the result file doesn't exist, will run the task
>>> start = time.time()
>>> state = FlowRunner(flow=flow, task_runner_cls=DSTaskRunner).run(
... task_runner_state_handlers=[checkpoint_handler]
... )
>>> print(f"Took more than 5 seconds: {(time.time() - start) > 5}")
Took more than 5 seconds: True
>>> # Run the flow again. Now that the result file exists, the task is short-circuited
>>> start = time.time()
>>> state = FlowRunner(flow=flow, task_runner_cls=DSTaskRunner).run(
... task_runner_state_handlers=[checkpoint_handler]
... )
>>> print(f"Took less than 1 second: {(time.time() - start) < 1}")
Took less than 1 second: True
DSFlowRunner
An extension to Prefect's FlowRunner
, which will automatically purge the results of upstream
tasks once all of their downstream tasks have been run. This can be useful if your task outputs are
large datasets; by default Prefect stores the results of every task for the duration of the
flow, which can overwhelm your RAM if your results are all things like multi-GB Pandas DataFrames.
While PandasResultHandler
, checkpoint_handler
, and DSTaskRunner
are all designed to be used
together, DSFlowRunner
can have value on its own.
>>> from prefect_ds.flow_runner import DSFlowRunner
>>> @task()
... def generate_data():
... return pd.DataFrame({"one": [1, 2, 3], "two": [4, 5, 6]})
>>> @task()
... def double_data(input_data):
... return input_data * 2
>>> with Flow("test") as flow:
... initial_data = generate_data()
... two_x_data = double_data(initial_data)
... four_x_data = double_data(two_x_data)
>>> state = DSFlowRunner(flow=flow).run(return_tasks=flow.tasks)
>>> state.result[initial_data].result # actual type is prefect_ds.result.PurgedResult
>>> state.result[two_x_data].result # another PurgedResult
>>> state.result[four_x_data].result # final result is correct, even though upstream results are gone
one two
0 4 16
1 8 20
2 12 24
Caveat
While these components have unit tests covering what I consider to be typical use cases, I have not attempted to comprehensively test every possible interaction with Prefect. As my understanding of Prefect is still relatively immature, I expect there are edge cases where the added functionality of prefect_ds breaks something in Prefect. I think this is especially likely with Prefect Cloud, which I have not done any testing on. If you find such a situation, please feel free to post an issue describing the problem and (ideally) including a minimum reproducible example of the bug.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file prefect_ds-0.1.1.tar.gz
.
File metadata
- Download URL: prefect_ds-0.1.1.tar.gz
- Upload date:
- Size: 10.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.8.1
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5e0144dab35c277ea79f4d228ec7d8fc62513c45c0e92fac6caae39c364744e2 |
|
MD5 | 92e0d938d5e8d39a4a140b5682e72677 |
|
BLAKE2b-256 | 072912a18e618875a1a7d8a21c4a2162abc5bfca6839d6da956387c403605422 |
File details
Details for the file prefect_ds-0.1.1-py3-none-any.whl
.
File metadata
- Download URL: prefect_ds-0.1.1-py3-none-any.whl
- Upload date:
- Size: 13.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.8.1
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | a8f358d03c2dccac30bbb3d56be66d0d44ea57db30efbbe10c62c06dd4711e33 |
|
MD5 | a2d631e6544707e51793c5419c523ebf |
|
BLAKE2b-256 | 0e4e4ad554c68f8dfcf0f1c5a63bea69399d9edbec8efdef601bf2b653d1124e |