Prefect tasks and subflows for interacting with Google Cloud Platform.
Project description
prefect-gcp
Welcome!
prefect-gcp
is a collection of prebuilt Prefect tasks that can be used to quickly construct Prefect flows.
Getting Started
Python setup
Requires an installation of Python 3.7+.
We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.
These tasks are designed to work with Prefect 2.0. For more information about how to use Prefect, please refer to the Prefect documentation.
Installation
To use prefect-gcp
and Cloud Run:
pip install prefect-gcp
To use Cloud Storage:
pip install "prefect-gcp[cloud_storage]"
To use BigQuery:
pip install "prefect-gcp[bigquery]"
To use Secret Manager:
pip install "prefect-gcp[secret_manager]"
To use Vertex AI:
pip install "prefect-gcp[aiplatform]"
A list of available blocks in prefect-gcp
and their setup instructions can be found here.
Write and run a flow
Download blob from bucket
from prefect import flow
from prefect_gcp.cloud_storage import GcsBucket
@flow
def donwload_flow():
gcs_bucket = GcsBucket.load("my-bucket")
path = gcs_bucket.download_object_to_path("my_folder/notes.txt", "notes.txt")
return path
download_flow()
Deploy command on Cloud Run
Save the following as prefect_gcp_flow.py
:
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_run import CloudRunJob
@flow
def cloud_run_job_flow():
cloud_run_job = CloudRunJob(
image="us-docker.pkg.dev/cloudrun/container/job:latest",
credentials=GcpCredentials.load("MY_BLOCK_NAME"),
region="us-central1",
command=["echo", "hello world"],
)
return cloud_run_job.run()
Deploy prefect_gcp_flow.py
:
from prefect.deployments import Deployment
from prefect_gcp_flow import cloud_run_job_flow
deployment = Deployment.build_from_flow(
flow=cloud_run_job_flow,
name="cloud_run_job_deployment",
version=1,
work_queue_name="demo",
)
deployment.apply()
Run the deployment either on the UI or through the CLI:
prefect deployment run cloud-run-job-flow/cloud_run_job_deployment
Visit Prefect Deployments for more information about deployments.
Get Google auth credentials from GcpCredentials
To instantiate a Google Cloud client, like bigquery.Client
, GcpCredentials
is not a valid input. Instead, use the get_credentials_from_service_account
method.
import google.cloud.bigquery
from prefect import flow
from prefect_gcp import GcpCredentials
@flow
def create_bigquery_client():
gcp_credentials_block = GcpCredentials.load("BLOCK_NAME")
google_auth_credentials = gcp_credentials_block.get_credentials_from_service_account()
bigquery_client = bigquery.Client(credentials=google_auth_credentials)
Or simply call get_bigquery_client
from GcpCredentials
.
from prefect import flow
from prefect_gcp import GcpCredentials
@flow
def create_bigquery_client():
gcp_credentials_block = GcpCredentials.load("BLOCK_NAME")
bigquery_client = gcp_credentials_block.get_bigquery_client()
Deploy command on Vertex AI as a flow
Save the following as prefect_gcp_flow.py
:
from prefect import flow
from prefect_gcp.credentials import GcpCredentials
from prefect_gcp.aiplatform import VertexAICustomTrainingJob
@flow
def vertex_ai_job_flow():
gcp_credentials = GcpCredentials.load("MY_BLOCK")
job = VertexAICustomTrainingJob(
command=["echo", "hello world"],
region="us-east1",
image="us-docker.pkg.dev/cloudrun/container/job:latest",
gcp_credentials=gcp_credentials,
)
job.run()
vertex_ai_job_flow()
Deploy prefect_gcp_flow.py
:
from prefect.deployments import Deployment
from prefect_gcp_flow import vertex_ai_job_flow
deployment = Deployment.build_from_flow(
flow=vertex_ai_job_flow,
name="vertex-ai-job-deployment",
version=1,
work_queue_name="demo",
)
deployment.apply()
Run the deployment either on the UI or through the CLI:
prefect deployment run vertex-ai-job-flow/vertex-ai-job-deployment
Visit Prefect Deployments for more information about deployments.
Use with_options
to customize options on any existing task or flow
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_storage import cloud_storage_download_blob_as_bytes
custom_download = cloud_storage_download_blob_as_bytes.with_options(
name="My custom task name",
retries=2,
retry_delay_seconds=10,
)
@flow
def example_with_options_flow():
gcp_credentials = GcpCredentials(
service_account_file="/path/to/service/account/keyfile.json")
contents = custom_download("bucket", "blob", gcp_credentials)
return contents()
example_with_options_flow()
For more tips on how to use tasks and flows in a Collection, check out Using Collections!
Resources
If you encounter any bugs while using prefect-gcp
, feel free to open an issue in the prefect-gcp repository.
If you have any questions or issues while using prefect-gcp
, you can find help in either the Prefect Discourse forum or the Prefect Slack community.
Feel free to ⭐️ or watch prefect-gcp
for updates too!
Development
If you'd like to install a version of prefect-gcp
for development, clone the repository and perform an editable install with pip
:
git clone https://github.com/PrefectHQ/prefect-gcp.git
cd prefect-gcp/
pip install -e ".[dev]"
# Install linting pre-commit hooks
pre-commit install
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for prefect_gcp-0.2.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 678caac5edc4a1e33422373048d7764521b6fa4de2ce9f7f7122afcbd0c0ba33 |
|
MD5 | 371c6e7012e62dbdc45984bf95ed7d8c |
|
BLAKE2b-256 | 1584ded41e5afa390ce4f096986f9fc1c1d396a31c55d5d5549f9486e2204e0e |