Skip to main content

Prefect integrations with Microsoft Planetary Computer

Project description

prefect-planetary-computer

PyPI

Visit the full docs here to see additional examples and the API reference.

Prefect integrations with the Microsoft Planetary Computer (PC).

Overview

This collection includes a Credentials Block 🔑 to store and retrieve a PC subscription key and Jupyter Hub token, with convenience methods to easily interact with the PC Data Catalog 🌍 and Dask Gateway 🚀 server.

For more information about:

  • using Azure services with Prefect and the Planetary Computer, check out the prefect-azure collection.
  • the integration between Prefect and Dask, check out the prefect-dask collection.
  • taking advantage of the Planetary Computer data catalog and compute resources, check out the Planetary Computer documentation.

Resources

For more tips on how to use tasks and flows in a Collection, check out Using Collections!

Installation

Install prefect-planetary-computer with pip:

pip install prefect-planetary-computer

Requires an installation of Python 3.8+.

We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.

These tasks are designed to work with Prefect 2.0. For more information about how to use Prefect, please refer to the Prefect documentation.

Usage

!!! note * The following Examples are adapted from Planetary Computer - Scale with Dask.

- Require the following additional packages:
    ```
    pip install xarray zarr adlfs netcdf4 prefect_azure
    ```
- Make sure to share the same python dependencies - in particular `dask` and `distributed` - between the flow execution environment, the Dask Scheduler and Workers, [as explained in the Dask docs](https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments).

Computing Dask Collections

Dask collection computations, such as Dask DataFrames, can be supported from within a Prefect task by creating a Dask Gateway cluster using the credentials block within the main flow or task itself.

# Prefect tasks are executed using the default ConcurrentTaskRunner
# Dask Collections tasks are executed on a new temporary Dask cluster 

import xarray as xr

from prefect import flow, task, get_run_logger
from prefect_planetary_computer import PlanetaryComputerCredentials

from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_upload

pc_credentials = PlanetaryComputerCredentials.load("PC_BLOCK_NAME")
bs_credentials = AzureBlobStorageCredentials.load("BS_BLOCK_NAME")

@task
def compute_mean(asset):
    logger = get_run_logger()

    with pc_credentials.new_gateway_cluster(
        name="test-cluster",
        image="pangeo/pangeo-notebook:latest"
    ) as cluster:

        cluster.adapt(minimum=2, maximum=10)
        client = cluster.get_client()

        ds = xr.open_zarr(
            asset.href,
            **asset.extra_fields["xarray:open_kwargs"],
            storage_options=asset.extra_fields["xarray:storage_options"]
        )
        logger.info(f"Daymet dataset info\n: {ds}")
    
        timeseries = ds["tmin"].mean(dim=["x", "y"]).compute()
        logger.info(f"Mean timeseries info\n: {timeseries}")

    return timeseries

@flow
def pc_dask_flow():

    # get a configured PySTAC client
    catalog = pc_credentials.get_stac_catalog()

    # compute the minimum daily temperature averaged over all of Hawaii, 
    # using the Daymet dataset
    asset = catalog.get_collection("daymet-daily-hi").assets["zarr-abfs"]
    prefect_future = compute_mean.submit(asset)
    timeseries = prefect_future.result()

    # save NetCDF timeseries file
    timeseries.to_netcdf("timeseries.nc")

    # upload to 'my-container' blob storage container
    with open("timeseries.nc", "rb") as f:
        blob = blob_storage_upload(
            data=f.read(),
            container="my-container",
            blob="timeseries.nc",
            blob_storage_credentials=bs_credentials,
            overwrite=False,
        )

    # return the blob name of the uploaded timeseries object
    return blob

pc_dask_flow()

Using the Dask Task Runner

Prefect's prefect_dask.DaskTaskRunner automatically instatiates a temporary Dask cluster at flow execution time, enabling submission of both Prefect and Dask Collections tasks.

!!! warning - prefect-dask requires: distributed==2022.2.0; python_version < '3.8' distributed>=2022.5.0,<=2023.3.1 - It requires less configuration on the Dask Workers side when using Prefect Cloud, you can get started for free.

# Both Prefect tasks and Dask Collections task are executed
# on a new temporary Dask cluster 
import xarray as xr

from prefect import flow, task, get_run_logger
from prefect_planetary_computer import PlanetaryComputerCredentials

from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_upload

from prefect_dask import get_dask_client 

pc_credentials = PlanetaryComputerCredentials.load("PC_BLOCK_NAME")
bs_credentials = AzureBlobStorageCredentials.load("BS_BLOCK_NAME")

pc_runner = pc_credentials.get_dask_task_runner(
    cluster_kwargs={
        "image": "pangeo/pangeo-notebook:latest",
    },
    adapt_kwargs={'minimum': 1, 'maximum': 10, 'active': True}
)

@task
def compute_mean(asset):
    logger = get_run_logger()

    with get_dask_client() as client:
        ds = xr.open_zarr(
            asset.hr
            **asset.extra_fields["xarray:open_kwargs"],
            storage_options=asset.extra_fields["xarray:storage_options"]
        )
        logger.info(f"Daymet dataset info\n: {ds}")

        timeseries = ds["tmin"].mean(dim=["x", "y"]).compute()
        logger.info(f"Mean timeseries info\n: {timeseries}")

    return timeseries

@flow(task_runner=pc_runner)
def pc_dask_flow():
    
    # get a configured PySTAC client
    catalog = pc_credentials.get_stac_catalog()

    # compute the minimum daily temperature averaged over all of Hawaii, 
    # using the Daymet dataset
    asset = catalog.get_collection("daymet-daily-hi").assets["zarr-abfs"]

    mean_task = compute_mean.submit(asset)
    timeseries = mean_task.result()

    # save NetCDF timeseries file
    timeseries.to_netcdf("timeseries.nc")

    # upload to 'my-container' blob storage container
    with open("timeseries.nc", "rb") as f:
        blob = blob_storage_upload(
            data=f.read(),
            container="my-container",
            blob="timeseries.nc",
            blob_storage_credentials=bs_credentials,
            overwrite=False,
        )

    # return the blob name of the uploaded timeseries object
    return blob

pc_dask_flow()

Feedback

If you encounter any bugs while using prefect-planetary-computer, feel free to open an issue in the prefect-planetary-computer repository.

If you have any questions or issues while using prefect-planetary-computer, you can find help in either the Prefect Discourse forum or the Prefect Slack community.

Feel free to star or watch prefect-planetary-computer for updates too!

Contributing

If you'd like to help contribute to fix an issue or add a feature to prefect-planetary-computer, please propose changes through a pull request from a fork of the repository.

Here are the steps:

  1. Fork the repository
  2. Clone the forked repository
  3. Install the repository and its dependencies:
pip install -e ".[dev]"
  1. Make desired changes
  2. Add tests
  3. Insert an entry to CHANGELOG.md
  4. Install pre-commit to perform quality checks prior to commit:
pre-commit install
  1. git commit, git push, and create a pull request

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

prefect-planetary-computer-0.1.1.tar.gz (35.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

prefect_planetary_computer-0.1.1-py3-none-any.whl (14.6 kB view details)

Uploaded Python 3

File details

Details for the file prefect-planetary-computer-0.1.1.tar.gz.

File metadata

File hashes

Hashes for prefect-planetary-computer-0.1.1.tar.gz
Algorithm Hash digest
SHA256 d3df06c0134a532518829c5ee72bcf8f0ab918783c6374892a60ea79819ce4aa
MD5 365a6b5beadea63bfb9ad1df836e6154
BLAKE2b-256 5a5ce30996fc768e2ba3dad56927f05814c6bf4c53c1efb2cf8af8d94b36390d

See more details on using hashes here.

File details

Details for the file prefect_planetary_computer-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for prefect_planetary_computer-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 0f02d5c32018d8dcfa3b98391925dcc2227bdc4fbdd345e9aed7d73241b10bae
MD5 050e058c41589af09216d1847e7c3f62
BLAKE2b-256 c7e581f34c259c873bb8c97e0c3e3fec0a592fc5cddbce0d2ff005ec18de76c4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page