Airflow plugin for Google Cloud Run Jobs
Project description
airflow-google-cloud-run-plugin
Airflow plugin for orchestrating Google Cloud Run jobs.
Features
- Easier to use alternative
to
KubernetesPodOperator
- Securely use sensitive data stored in Google Cloud Secrets Manager
- Create tasks with isolated dependencies
- Enables polyglot workflows
Resources
Core Operators
CloudRunJobOperator
CRUD-Based Operators
CloudRunCreateJobOperator
CloudRunGetJobOperator
🔜CloudRunUpdateJobOperator
🔜CloudRunDeleteJobOperator
CloudRunListJobsOperator
🔜
Hooks
CloudRunJobHook
Sensors
CloudRunJobExecutionSensor
🔜
Usage
Simple Job Lifecycle
from airflow import DAG
from airflow_google_cloud_run_plugin.operators.cloud_run import CloudRunJobOperator
with DAG(dag_id="example_dag") as dag:
job = CloudRunJobOperator(
task_id="example-job",
name="example-job",
location="us-central1",
project_id="example-project",
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
command=["echo"],
cpu="1000m",
memory="512Mi",
create_if_not_exists=True,
delete_on_exit=True
)
CRUD Job Lifecycle
from airflow import DAG
from airflow_google_cloud_run_plugin.operators.cloud_run import (
CloudRunJobOperator,
CloudRunCreateJobOperator,
CloudRunDeleteJobOperator,
)
with DAG(dag_id="example_dag") as dag:
create_job = CloudRunCreateJobOperator(
task_id="create",
name="example-job",
location="us-central1",
project_id="example-project",
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
command=["echo"],
cpu="1000m",
memory="512Mi"
)
run_job = CloudRunJobOperator(
task_id="run",
name="example-job",
location="us-central1",
project_id="example-project"
)
delete_job = CloudRunDeleteJobOperator(
task_id="delete",
name="example-job",
location="us-central1",
project_id="example-project"
)
create_job >> run_job >> delete_job
Using Environment Variables
from airflow import DAG
from airflow_google_cloud_run_plugin.operators.cloud_run import CloudRunJobOperator
# Simple environment variable
FOO = {
"name": "FOO",
"value": "not_so_secret_value_123"
}
# Environment variable from Secret Manager
BAR = {
"name": "BAR",
"valueFrom": {
"secretKeyRef": {
"name": "super_secret_password",
"key": "1" # or "latest" for latest secret version
}
}
}
with DAG(dag_id="example_dag") as dag:
job = CloudRunJobOperator(
task_id="example-job",
name="example-job",
location="us-central1",
project_id="example-project",
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
command=["echo"],
args=["$FOO", "$BAR"],
env_vars=[FOO, BAR],
cpu="1000m",
memory="512Mi",
create_if_not_exists=True,
delete_on_exit=True
)
Improvement Suggestions
- Add support for Cloud Run services
- Nicer user experience for defining args and commands
- Use approach from other GCP operators once this issue is resolved https://github.com/googleapis/python-run/issues/64
- Add operators for all CRUD operations
- Add run sensor (see link)
- Enable volume mounts (see TaskSpec)
- Allow user to configure resource requirements
requests
( see ResourceRequirements) - Add remaining container options (see Container)
- Allow non-default credentials and for user to specify service account ( see link)
- Allow failure threshold. If more than one task is specified, user should be allowed to specify number of failures allowed
- Add custom links for log URIs
- Add wrapper class for easier environment variable definition. Similar to
Secret
from Kubernetes provider ( see link) - Add slight time padding between job create and run
- Add ability to choose to replace the job with new config values if values have changed
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for airflow-google-cloud-run-plugin-0.3.2.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 08a8389740d9c3c952c9d06f7a15aaf6cefcb7666b9097f9bde9d4452010f028 |
|
MD5 | d6b721b8009c390e2bdfb8ccd808653c |
|
BLAKE2b-256 | 138452b7dbeea7f1c5acf90e7dd5cd6a452b05fabf56797a17219442569cc17b |
Close
Hashes for airflow_google_cloud_run_plugin-0.3.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5c9c71ca128b99ff574c2e6eb61ea3f85f63666933eb0e0e3c36218bfd2c1dc7 |
|
MD5 | c32568a8109ee393bda954c77ca9c46e |
|
BLAKE2b-256 | 0a79465063e30f665995d6ae70cad8a77580ea14721441269e4754855cfa3956 |