Skip to main content

Tools for making Prefect work better for typical data science workflows

Project description

prefect DS

Tools for making Prefect work better for typical data science workflows.

Install

$ pip install prefect-ds

Usage

prefect_ds is a lightweight wrapper around Prefect, designed to make it easier to run workflows I typically encounter when doing data science — especially tasks related to analyzing large datasets and building models. Specifically, it implements the following:

PandasResultHandler

A result handler that reads to and writes from Pandas DataFrames. It should be able to handle any file type Pandas supports, and unlike built-in handlers like LocalResultHandler requires the full specification of the file path — this makes it easy to inspect task results, or use those results in other analysis. It also has support for templating, so task arguments can be injected into the filenames (useful for things like map).

>>> import os
>>> os.environ["PREFECT__LOGGING__LEVEL"] = "ERROR"

>>> import pandas as pd
>>> import time

>>> from prefect import task
>>> from prefect_ds.pandas_result_handler import PandasResultHandler 

>>> # Note the use of the task argument {id} as a template in the filename
>>> @task(result_handler=PandasResultHandler("data_{id}.csv", "csv"))
... def demo_task(id):
...     time.sleep(5)
...     return pd.DataFrame({"one": [1, 2, 3], "two": [4, 5, 6]})

Note that in order to use the templating functionality of PandasResultHandler, you will need to run your flow using the DSTaskRunner (see below for more details).

checkpoint_handler and DSTaskRunner

A state handler that implements filename-based checkpointing, in concert with the specialty result handlers in prefect_ds. It intercepts the state change from Pending to Running, runs the read method of the result handler, and if successful loads the result of that method as the result of the task, then sets the task to the Success state. Conversely, if the read method fails, the task is run as normal and instead the checkpoint_handler runs the write method of the result handler afterwards. Using the checkpoint_handler makes it much easier to cache data across Prefect runs — you don't have to explicitly persist the final flow state between runs, and you don't have to have the cache expire after a certain amount of time.

This handler combines with DSTaskRunner, an extension to Prefect's TaskRunner that implements the necessary hacks to allow for the templating of task arguments. This templating is required to handle cases like map, where without the templating the checkpoint_handler will read from/write to the same file for every iteration of the map.

>>> import contextlib

>>> from prefect import Flow
>>> from prefect.engine import FlowRunner
>>> from prefect_ds.checkpoint_handler import checkpoint_handler
>>> from prefect_ds.task_runner import DSTaskRunner

>>> with Flow("test") as flow:
...     output = demo_task(1)

>>> # First, clean up any existing task results
>>> with contextlib.suppress(FileNotFoundError):
...     os.remove("data_1.csv")

>>> # Run the flow. Since the result file doesn't exist, will run the task
>>> start = time.time()
>>> state = FlowRunner(flow=flow, task_runner_cls=DSTaskRunner).run(
...     task_runner_state_handlers=[checkpoint_handler]
... )
>>> print(f"Took more than 5 seconds: {(time.time() - start) > 5}")
Took more than 5 seconds: True

>>> # Run the flow again. Now that the result file exists, the task is short-circuited
>>> start = time.time()
>>> state = FlowRunner(flow=flow, task_runner_cls=DSTaskRunner).run(
...     task_runner_state_handlers=[checkpoint_handler]
... )
>>> print(f"Took less than 1 second: {(time.time() - start) < 1}")
Took less than 1 second: True

DSFlowRunner

An extension to Prefect's FlowRunner, which will automatically purge the results of upstream tasks once all of their downstream tasks have been run. This can be useful if your task outputs are large datasets; by default Prefect stores the results of every task for the duration of the flow, which can overwhelm your RAM if your results are all things like multi-GB Pandas DataFrames. While PandasResultHandler, checkpoint_handler, and DSTaskRunner are all designed to be used together, DSFlowRunner can have value on its own.

>>> from prefect_ds.flow_runner import DSFlowRunner

>>> @task()
... def generate_data():
...     return pd.DataFrame({"one": [1, 2, 3], "two": [4, 5, 6]})

>>> @task()
... def double_data(input_data):
...     return input_data * 2


>>> with Flow("test") as flow:
...     initial_data = generate_data()
...     two_x_data = double_data(initial_data)
...     four_x_data = double_data(two_x_data)

>>> state = DSFlowRunner(flow=flow).run(return_tasks=flow.tasks)
>>> state.result[initial_data].result # actual type is prefect_ds.result.PurgedResult

>>> state.result[two_x_data].result # another PurgedResult

>>> state.result[four_x_data].result # final result is correct, even though upstream results are gone
   one  two
0    4   16
1    8   20
2   12   24

Caveat

While these components have unit tests covering what I consider to be typical use cases, I have not attempted to comprehensively test every possible interaction with Prefect. As my understanding of Prefect is still relatively immature, I expect there are edge cases where the added functionality of prefect_ds breaks something in Prefect. I think this is especially likely with Prefect Cloud, which I have not done any testing on. If you find such a situation, please feel free to post an issue describing the problem and (ideally) including a minimum reproducible example of the bug.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

prefect_ds-0.1.1.tar.gz (10.5 kB view details)

Uploaded Source

Built Distribution

prefect_ds-0.1.1-py3-none-any.whl (13.1 kB view details)

Uploaded Python 3

File details

Details for the file prefect_ds-0.1.1.tar.gz.

File metadata

  • Download URL: prefect_ds-0.1.1.tar.gz
  • Upload date:
  • Size: 10.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.8.1

File hashes

Hashes for prefect_ds-0.1.1.tar.gz
Algorithm Hash digest
SHA256 5e0144dab35c277ea79f4d228ec7d8fc62513c45c0e92fac6caae39c364744e2
MD5 92e0d938d5e8d39a4a140b5682e72677
BLAKE2b-256 072912a18e618875a1a7d8a21c4a2162abc5bfca6839d6da956387c403605422

See more details on using hashes here.

File details

Details for the file prefect_ds-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: prefect_ds-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 13.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.8.1

File hashes

Hashes for prefect_ds-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 a8f358d03c2dccac30bbb3d56be66d0d44ea57db30efbbe10c62c06dd4711e33
MD5 a2d631e6544707e51793c5419c523ebf
BLAKE2b-256 0e4e4ad554c68f8dfcf0f1c5a63bea69399d9edbec8efdef601bf2b653d1124e

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page