Skip to main content

Prefect integrations with Microsoft Azure services

Project description

prefect-azure

PyPI

prefect-azure is a collection of Prefect integrations for orchestration workflows with Azure.

Getting Started

Installation

Install prefect-azure with pip

pip install prefect-azure

To use Blob Storage:

pip install "prefect-azure[blob_storage]"

To use Cosmos DB:

pip install "prefect-azure[cosmos_db]"

To use ML Datastore:

pip install "prefect-azure[ml_datastore]"

Examples

Download a blob

from prefect import flow

from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_download

@flow
def example_blob_storage_download_flow():
    connection_string = "connection_string"
    blob_storage_credentials = AzureBlobStorageCredentials(
        connection_string=connection_string,
    )
    data = blob_storage_download(
        blob="prefect.txt",
        container="prefect",
        azure_credentials=blob_storage_credentials,
    )
    return data

example_blob_storage_download_flow()

Use with_options to customize options on any existing task or flow:

custom_blob_storage_download_flow = example_blob_storage_download_flow.with_options(
    name="My custom task name",
    retries=2,
    retry_delay_seconds=10,
)

Run a command on an Azure container instance

from prefect import flow
from prefect_azure import AzureContainerInstanceCredentials
from prefect_azure.container_instance import AzureContainerInstanceJob


@flow
def container_instance_job_flow():
    aci_credentials = AzureContainerInstanceCredentials.load("MY_BLOCK_NAME")
    container_instance_job = AzureContainerInstanceJob(
        aci_credentials=aci_credentials,
        resource_group_name="azure_resource_group.example.name",
        subscription_id="<MY_AZURE_SUBSCRIPTION_ID>",
        command=["echo", "hello world"],
    )
    return container_instance_job.run()

Use Azure Container Instance as infrastructure

If we have a_flow_module.py:

from prefect import flow, get_run_logger

@flow
def log_hello_flow(name="Marvin"):
    logger = get_run_logger()
    logger.info(f"{name} said hello!")

if __name__ == "__main__":
    log_hello_flow()

We can run that flow using an Azure Container Instance, but first create the infrastructure block:

from prefect_azure import AzureContainerInstanceCredentials
from prefect_azure.container_instance import AzureContainerInstanceJob

container_instance_job = AzureContainerInstanceJob(
    aci_credentials=AzureContainerInstanceCredentials.load("MY_BLOCK_NAME"),
    resource_group_name="azure_resource_group.example.name",
    subscription_id="<MY_AZURE_SUBSCRIPTION_ID>",
)
container_instance_job.save("aci-dev")

Then, create the deployment either on the UI or through the CLI:

prefect deployment build a_flow_module.py:log_hello_flow --name aci-dev -ib container-instance-job/aci-dev

Visit Prefect Deployments for more information about deployments.

Azure Container Instance Worker

The Azure Container Instance worker is an excellent way to run your workflows on Azure.

To get started, create an Azure Container Instances typed work pool:

prefect work-pool create -t azure-container-instance my-aci-work-pool

Then, run a worker that pulls jobs from the work pool:

prefect worker start -n my-aci-worker -p my-aci-work-pool

The worker should automatically read the work pool's type and start an Azure Container Instance worker.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

prefect_azure-0.3.8.tar.gz (55.4 kB view hashes)

Uploaded Source

Built Distribution

prefect_azure-0.3.8-py3-none-any.whl (37.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page