Skip to main content

Prefect integrations with the Dask execution framework.

Project description

Coordinate and parallelize your dataflow with prefect-dask


PyPI

Visit the full docs here to see additional examples and the API reference.

The prefect-dask collection makes it easy to include distributed processing for your flows. Check out the examples below to get started!

Getting Started

Integrate with Prefect flows

Perhaps you're already working with Prefect flows. Say your flow downloads many images to train your machine learning model. Unfortunately, it takes a long time to download your flows because your code is running sequentially.

After installing prefect-dask you can parallelize your flow in three simple steps:

  1. Add the import: from prefect_dask import DaskTaskRunner
  2. Specify the task runner in the flow decorator: @flow(task_runner=DaskTaskRunner)
  3. Submit tasks to the flow's task runner: a_task.submit(*args, **kwargs)

The parallelized code runs in about 1/3 of the time in our test! And that's without distributing the workload over multiple machines. Here's the before and after!

=== "Before" ```python hl_lines="1" # Completed in 15.2 seconds

from typing import List
from pathlib import Path

import httpx
from prefect import flow, task

URL_FORMAT = (
    "https://www.cpc.ncep.noaa.gov/products/NMME/archive/"
    "{year:04d}{month:02d}0800/current/images/nino34.rescaling.ENSMEAN.png"
)

@task
def download_image(year: int, month: int, directory: Path) -> Path:
    # download image from URL
    url = URL_FORMAT.format(year=year, month=month)
    resp = httpx.get(url)

    # save content to directory/YYYYMM.png
    file_path = (directory / url.split("/")[-1]).with_stem(f"{year:04d}{month:02d}")
    file_path.write_bytes(resp.content)
    return file_path

@flow
def download_nino_34_plumes_from_year(year: int) -> List[Path]:
    # create a directory to hold images
    directory = Path("data")
    directory.mkdir(exist_ok=True)

    # download all images
    file_paths = []
    for month in range(1, 12 + 1):
        file_path = download_image(year, month, directory)
        file_paths.append(file_path)
    return file_paths

if __name__ == "__main__":
    download_nino_34_plumes_from_year(2022)
```

=== "After"

```python hl_lines="1 8 26 35"
# Completed in 5.7 seconds

from typing import List
from pathlib import Path

import httpx
from prefect import flow, task
from prefect_dask import DaskTaskRunner

URL_FORMAT = (
    "https://www.cpc.ncep.noaa.gov/products/NMME/archive/"
    "{year:04d}{month:02d}0800/current/images/nino34.rescaling.ENSMEAN.png"
)

@task
def download_image(year: int, month: int, directory: Path) -> Path:
    # download image from URL
    url = URL_FORMAT.format(year=year, month=month)
    resp = httpx.get(url)

    # save content to directory/YYYYMM.png
    file_path = (directory / url.split("/")[-1]).with_stem(f"{year:04d}{month:02d}")
    file_path.write_bytes(resp.content)
    return file_path

@flow(task_runner=DaskTaskRunner(cluster_kwargs={"processes": False}))
def download_nino_34_plumes_from_year(year: int) -> List[Path]:
    # create a directory to hold images
    directory = Path("data")
    directory.mkdir(exist_ok=True)

    # download all images
    file_paths = []
    for month in range(1, 12 + 1):
        file_path = download_image.submit(year, month, directory)
        file_paths.append(file_path)
    return file_paths

if __name__ == "__main__":
    download_nino_34_plumes_from_year(2022)
```

The original flow completes in 15.2 seconds.

However, with just a few minor tweaks, we were able to reduce the runtime by nearly three folds, down to just 5.7 seconds!

Integrate with Dask client/cluster and collections

Suppose you have an existing Dask client/cluster and collection, like a dask.dataframe.DataFrame, and you want to add observability.

With prefect-dask, there's no major overhaul necessary because Prefect was designed with incremental adoption in mind! It's as easy as:

  1. Adding the imports
  2. Sprinkling a few task and flow decorators
  3. Using get_dask_client context manager on collections to distribute work across workers
  4. Specifying the task runner and client's address in the flow decorator
  5. Submitting the tasks to the flow's task runner

=== "Before"

```python
import dask.dataframe
import dask.distributed



client = dask.distributed.Client()


def read_data(start: str, end: str) -> dask.dataframe.DataFrame:
    df = dask.datasets.timeseries(start, end, partition_freq="4w")
    return df


def process_data(df: dask.dataframe.DataFrame) -> dask.dataframe.DataFrame:

    df_yearly_avg = df.groupby(df.index.year).mean()
    return df_yearly_avg.compute()


def dask_pipeline():
    df = read_data("1988", "2022")
    df_yearly_average = process_data(df)
    return df_yearly_average

dask_pipeline()
```

=== "After"

```python hl_lines="3 4 8 13 15 19 21 22"
import dask.dataframe
import dask.distributed
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client

client = dask.distributed.Client()

@task
def read_data(start: str, end: str) -> dask.dataframe.DataFrame:
    df = dask.datasets.timeseries(start, end, partition_freq="4w")
    return df

@task
def process_data(df: dask.dataframe.DataFrame) -> dask.dataframe.DataFrame:
    with get_dask_client():
        df_yearly_avg = df.groupby(df.index.year).mean()
        return df_yearly_avg.compute()

@flow(task_runner=DaskTaskRunner(address=client.scheduler.address))
def dask_pipeline():
    df = read_data.submit("1988", "2022")
    df_yearly_average = process_data.submit(df)
    return df_yearly_average

dask_pipeline()
```

Now, you can conveniently see when each task completed, both in the terminal and the UI!

14:10:09.845 | INFO    | prefect.engine - Created flow run 'chocolate-pony' for flow 'dask-flow'
14:10:09.847 | INFO    | prefect.task_runner.dask - Connecting to an existing Dask cluster at tcp://127.0.0.1:59255
14:10:09.857 | INFO    | distributed.scheduler - Receive client connection: Client-8c1e0f24-9133-11ed-800e-86f2469c4e7a
14:10:09.859 | INFO    | distributed.core - Starting established connection to tcp://127.0.0.1:59516
14:10:09.862 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at http://127.0.0.1:8787/status
14:10:11.344 | INFO    | Flow run 'chocolate-pony' - Created task run 'read_data-5bc97744-0' for task 'read_data'
14:10:11.626 | INFO    | Flow run 'chocolate-pony' - Submitted task run 'read_data-5bc97744-0' for execution.
14:10:11.795 | INFO    | Flow run 'chocolate-pony' - Created task run 'process_data-090555ba-0' for task 'process_data'
14:10:11.798 | INFO    | Flow run 'chocolate-pony' - Submitted task run 'process_data-090555ba-0' for execution.
14:10:13.279 | INFO    | Task run 'read_data-5bc97744-0' - Finished in state Completed()
14:11:43.539 | INFO    | Task run 'process_data-090555ba-0' - Finished in state Completed()
14:11:43.883 | INFO    | Flow run 'chocolate-pony' - Finished in state Completed('All states completed.')

Resources

For additional examples, check out the Usage Guide!

Installation

Get started by installing prefect-dask!

=== "pip"

```bash
pip install -U prefect-dask
```

=== "conda"

```bash
conda install -c conda-forge prefect-dask
```

Requires an installation of Python 3.7+.

We recommend using a Python virtual environment manager such as pipenv, conda, or virtualenv.

These tasks are designed to work with Prefect 2. For more information about how to use Prefect, please refer to the Prefect documentation.

Feedback

If you encounter any bugs while using prefect-dask, feel free to open an issue in the prefect-dask repository.

If you have any questions or issues while using prefect-dask, you can find help in either the Prefect Discourse forum or the Prefect Slack community.

Feel free to star or watch prefect-dask for updates too!

Contributing

If you'd like to help contribute to fix an issue or add a feature to prefect-dask, please propose changes through a pull request from a fork of the repository.

Here are the steps:

  1. Fork the repository
  2. Clone the forked repository
  3. Install the repository and its dependencies:
pip install -e ".[dev]"
  1. Make desired changes
  2. Add tests
  3. Insert an entry to CHANGELOG.md
  4. Install pre-commit to perform quality checks prior to commit:
pre-commit install
  1. git commit, git push, and create a pull request

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

prefect-dask-0.2.5.tar.gz (38.8 kB view details)

Uploaded Source

Built Distribution

prefect_dask-0.2.5-py3-none-any.whl (15.9 kB view details)

Uploaded Python 3

File details

Details for the file prefect-dask-0.2.5.tar.gz.

File metadata

  • Download URL: prefect-dask-0.2.5.tar.gz
  • Upload date:
  • Size: 38.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.5

File hashes

Hashes for prefect-dask-0.2.5.tar.gz
Algorithm Hash digest
SHA256 6651f23da97c9b624dd5cd27ef4344140913a4d0e1e6ef731cbde7664d72a347
MD5 ce1501e5872c51a9f0aa8317038b6106
BLAKE2b-256 03c259cf71e224e647168615bd1e2b88750a0569202b6361ac69324ab7aa56b6

See more details on using hashes here.

File details

Details for the file prefect_dask-0.2.5-py3-none-any.whl.

File metadata

File hashes

Hashes for prefect_dask-0.2.5-py3-none-any.whl
Algorithm Hash digest
SHA256 ea90d75ede36e400aa4f79d8ffebafe7945eca92c5d762f3d8dd524f0057e7d3
MD5 2605055af07564bc6ebddbbf99f7e54f
BLAKE2b-256 2f7b3c6fcfcfebe8ae1607e3614c33116f35af8e2b7443821c11758a9bee5838

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page