Kedro plugin for running pipelines on Argo Workflows
Project description
What is argo-kedro
argo-kedro is a kedro-plugin for executing Kedro pipelines on Argo Workflows. It's core functionalities are:
-
Workflow construction:
argo-kedroconstructs an Argo Workflow manifest from your Kedro pipeline for execution on your cluster. This ensures that the Kedro pipeline definition remains the single source of truth. -
Defining compute resources:
argo-kedroexposes a customNodetype that can be used to control the compute resouces available to the node. -
Node fusing: To maximize parallelisation,
argo-kedroexecutes each Kedro node in a dedicated Argo task. The plugin exposes aFusedPipelineobject that can be used to co-locate nodes for execution on a single Argo task. -
Custom and init templates: Register custom templates, either to customize the task in which your Kedro nodes runs, or to execute before the pipeline runs. The former unlocks spinning up auxiliary services for the duration of you Kedro node, e.g., an emphemeral Neo4j sidecar that can be used to run graph algorihms. The latter allows for bootstrapping external systems, e.g.., creating an MLFlow run for the pipeline.
Table of contents
How do I install argo-kedro?
Set up your Kedro project
Use the Kedro CLI to setup your project, i.e.,
kedro new
Set up your venv
uv sync
Install the plugin
uv add argo-kedro
Next, initialise the plugin, this will create a argo.yml file that will house components of the argo configuration. Moreover, the plugin will prompt for the creation of baseline Dockerfile and .dockerignore files.
uv run kedro argo init
Validate the files, and make any changes required.
Setting up your cloud environment
Argo Workflows executes pipelines in a parallelized fashion, i.e., on different compute instances. It's therefore important that data exchanged between nodes is materialized in remote storage, as local data storage is not shared among these machines. Let's start by installing the gcsfs package.
NOTE: The split between the
baseandcloudenvironment enables development workflows where local data storage is used when iterating locally, while the cluster uses Google Cloud storage.
uv add "fsspec[gcs]"
Registering the globals file
Kedro allows customizing variables based on the environment, which unlocks local data storage for testing, while leveraging Cloud Storage for running on the cluster. First, enable the use of the globals in the settings.py file. To do so, replace the CONFIG_LOADER_ARGS setting with the contents below:
# Add the following import on top of the file
from omegaconf.resolvers import oc
CONFIG_LOADER_ARGS = {
"base_env": "base",
"default_run_env": "local",
"merge_strategy": {"parameters": "soft", "globals": "soft"},
"config_patterns": {
"globals": ["globals*", "globals*/**", "**/globals*"],
"parameters": [
"parameters*",
"parameters*/**",
"**/parameters*",
"**/parameters*/**",
],
},
"custom_resolvers": {
"oc.env": oc.env,
}
}
Parametrizing the base path
Create a new file in conf/base folder called globals.yml. Start by defining the globals file for the base environment.
# Definition for conf/base/globals.yml for local storage
paths:
base: data
Next, create the globals.yml file for the cloud env in conf/cloud folder (if the folder doesn't exist, please create it), then define the globals file for the cloud environment with the following:
# Definition for conf/cloud/globals.yml for cloud storage
paths:
base: gs://<your_bucket_name>/<your_project_name>/${oc.env:WORKFLOW_ID, dummy}
Important Ensure to replace <your_bucket_name> <your_project_name> with bucket and subdirectory respectively.
The plugin adds a few environment variables to the container automatically, one of these is the
WORKFLOW_IDwhich is a unique identifier of the workflow. This can be used as a unit of versioning as displayed below.
Finally, ensure the parametrized path is used, this should be done in the conf/base/catalog.yml file. For example:
preprocessed_companies:
type: pandas.ParquetDataset
# This ensures that local storage is used in the base, while cloud storage
# is used while running on the cluster.
filepath: ${globals:paths.base}/02_intermediate/preprocessed_companies.parquet
IMPORTANT: Make sure you replace
data/string in theconf/base/catalog.ymlfile with${globals:paths.base}/as kedro isn't aware of the Cloud storage. This change would allow Kedro to switch betweenlocalandcloudenv easily.
Submitting to the cluster
Ensure you have the correct kubeconfig set
Run the following CLI command to setup the cluster credentials.
gcloud container clusters get-credentials $CLUSTER_NAME --region us-central1 --project $PROJECT
Ensure all catalog entries are registered
This is a very early version of the plugin, which does not support memory datasets. Ensure your pipeline does not use memory datasets, as this will lead to failures. We will be introducing a mechanism that will support this in the future.
Execute pipeline
Run the following command to run on the cluster:
uv run kedro argo submit
Retrying a failed workload
If a pipeline fails, instead of submitting a new pipeline run, you could simply resubmit the failed workflow by running the following command:
uv run kedro argo resubmit
Note: This command overrides the Docker image name and tag specified in the Argo configuration, because resubmitting requires that the workflow's Docker image and tag remain unchanged. No additional configuration is required on your part.
When using resubmit, you can optionally supply a --workflow-name argument to select which existing workflow run to retry; it does not set the name of a new workflow.
Advanced
Configuring machines types
The argo.yml file defines the possible machine typess that can be used by nodes in the pipeline, the platform team will share a list of valid machine types.
# ...
# argo.yml
machine_types:
default:
mem: 16
cpu: 4
num_gpu: 0
default_machine_type: default
By default, the default_machine_type is used for all nodes of the pipeline, if you wish to configure the machine type, import the plugin's Node extension.
# NOTE: Import from the plugin, this is a drop in replacement!
from argo_kedro.pipeline import Node
def create_pipeline(**kwargs) -> Pipeline:
return Pipeline(
[
Node(
func=preprocess_companies,
inputs="companies",
outputs="preprocessed_companies",
name="preprocess_companies_node",
machine_type="n1-standard-4", # NOTE: enter a valid machine type from the configuration here
),
...
]
)
GPU support
The template Dockerfile comes with built-in support for running GPU workloads on Nvidia GPUs.
To run a pipeline on GPU, you would need to configure the pipeline machine type to a g2 instance type. Currently supported GPU machine types are:
| Machine Type | CPU | Memory | GPU | GPU memory |
|---|---|---|---|---|
| g2-standard-4 | 4 | 16 | 1 | 24 |
| g2-standard-8 | 8 | 32 | 1 | 24 |
| g2-standard-12 | 12 | 48 | 1 | 24 |
| g2-standard-16 | 16 | 64 | 1 | 24 |
| g2-standard-24 | 24 | 96 | 2 | 48 |
| g2-standard-32 | 32 | 128 | 1 | 24 |
| g2-standard-48 | 48 | 192 | 4 | 96 |
| g2-standard-96 | 96 | 384 | 8 | 192 |
To use the following machine type, you would need to modify the pipeline code as follows:
# NOTE: Import from the plugin, this is a drop in replacement!
from argo_kedro.pipeline import Node
def create_pipeline(**kwargs) -> Pipeline:
return Pipeline(
[
Node(
func=preprocess_companies,
inputs="companies",
outputs="preprocessed_companies",
name="preprocess_companies_node",
machine_type="g2-standard-4", # NOTE: enter a valid machine type from the above mentioned list.
),
...
]
)
Fusing nodes for execution
Why fusing?
To run a Kedro pipeline on Argo, the question of how to map Kedro nodes to Argo tasks arises. There are two immediately obvious, albeit extreme, directions:
- Single Argo task for entire pipeline
- Pros:
- Simple setup, Argo task invokes
kedro runfor entire pipeline
- Simple setup, Argo task invokes
- Cons:
- Limited options for leveraging parallelization
- Entire pipeline has to run with single hardware configuration
- May be very expensive for pipelines requiring GPUs in some steps
- Pros:
- Argo task for each node in the pipeline
- Pros:
- Maximize parallel processing capabilities
- Allow for different hardware configuration per node
- Cons:
- Scheduling overhead for very small Kedro nodes
- Complex DAG in Argo Workflows
- Pros:
For our use-case, a pipeline with hundreds of nodes, we want to enable fusing sets of related2 nodes for execution on single Argo task. This avoids scheduling overhead while still supporting heterogeneous hardware configurations within the pipeline.
2 Related here is used in the broad sense of the word, i.e., they may have similar hardware needs, are highly coupled, or all rely on an external service.
The FusedPipeline object
The FusedPipeline is an extension of Kedro's Pipeline object, that guarantees that the nodes contained within it are executed on the same Argo task. See the following code example:
from kedro.pipeline import Pipeline
from argo_kedro.pipeline import FusedPipeline, Node
from .nodes import create_model_input_table, preprocess_companies, preprocess_shuttles
def create_pipeline(**kwargs) -> Pipeline:
return Pipeline(
[
FusedPipeline(
nodes=[
Node(
func=preprocess_companies,
inputs="companies",
outputs="preprocessed_companies",
name="preprocess_companies_node",
),
Node(
func=preprocess_shuttles,
inputs="shuttles",
outputs="preprocessed_shuttles",
name="preprocess_shuttles_node",
),
],
name="preprocess_data_fused",
machine_type="n1-standard-1"
),
Node(
func=create_model_input_table,
inputs=["preprocessed_shuttles", "preprocessed_companies", "reviews"],
outputs="model_input_table",
name="create_model_input_table_node",
),
]
)
The code snippet above wraps the preprocess_companies_node and preprocess_shuttles_node nodes together for execution on the same machine. Similar to the plugins' Node object, the FusedPipeline accepts a machine_type argument that allows for customizing the machine type to use.
Given that the nodes within the
FusedPipelinenow execute on the same machine, the plugin performs a small optimization step to reduce IO. Specifically, each intermediate, i,.e., non-output dataset within theFusedPipelineis transformed into aMemoryDataset. This allows for Kedro to keep these datasets in memory, without having to materialize them to disk. The behaviour can be toggled byrunner.use_memory_datasetsinargo.yml.
Using cluster Secrets
Workflows are allowed to consume secrets provided by the cluster. Secrets can be mounted using the template section of the argo.yml file.
# argo.yml
...
template:
environment:
# The configuration below mounts the `secret.TOKEN`
# to the `TOKEN` environment variable.
- name: TOKEN
secret_ref:
name: secret
key: TOKEN
This ensures that the pods running the workflow nodes have access to the secret, next use the oc.env resolver to pull the secret in the globals, catalog or parameters, as follows:
# base/globals.yml
token: ${oc.env:TOKEN}
Custom templates
This is an experimental capability of the plugin.
The plugin allows for customizing the workflow manifest through the templates section in the argo.yml file. A first use-case that comes to mind is customizing the Argo task that executes a specific Kedro node. For instance, to spin up an auxiliary service for the duration of the node.
# argo.py
template:
templates:
- name: neo4j
container:
name: kedro
command: ["/bin/sh", "-c"]
env:
- name: NEO4J_HOST
value: "bolt://127.0.0.1:7687"
args: |
echo "Setting up temporary directories..."
mkdir -p /data/tmp /data/spark-temp /data/spark-warehouse /data/checkpoints
echo "Waiting for Neo4j to be ready..."
until curl -s http://localhost:7474 > /dev/null; do
echo "Waiting..."
sleep 5
done
echo "Neo4j is ready. Starting main application..."
kedro run -p "{{inputs.parameters.pipeline}}" -e "{{inputs.parameters.environment}}" -n "{{inputs.parameters.kedro_nodes}}"
sidecars:
- name: neo4j
image: neo4j:5.21.0-enterprise
# NOTE: Import from the plugin, this is a drop in replacement!
from argo_kedro.pipeline import Node
def create_pipeline(**kwargs) -> Pipeline:
return Pipeline(
[
Node(
func=run_neo4j_graph_algortim,
inputs=["nodes", "edges"],
outputs="output",
name="run_neo4j_graph_algorithm",
template="neo4j"
),
]
)
Init templates
This is an experimental capability of the plugin.
template:
templates:
- name: init-mlflow-run
container:
name: init-mlflow-run
command: ["python", "-c"]
args: |
import os
import mlflow
mlflow.set_tracking_uri(os.environ["MLFLOW_TRACKING_URI"])
mlflow.set_experiment(os.environ["MLFLOW_EXPERIMENT_NAME"])
run = mlflow.start_run(run_name=os.environ["WORKFLOW_ID"])
with open("/tmp/mlflow_run_id", "w", encoding="utf-8") as f:
f.write(run.info.run_id)
outputs:
parameters:
- name: mlflow_run_id
path: /tmp/mlflow_run_id
init_templates: ["init-mlflow-run"]
A template can optionally be listed as an init_templates. This has the following effect:
- The template is ran before the tasks that represent the Kedro pipeline
- All template outputs are passed to the tasks of the pipeline, and mounted as environment variables.
Known issues
Default pipeline with fusing for sub-pipelines
Kedro handles the __default__ pipeline through auto-detection, given it's current implementation of the sum operator, the Fusing is ignored if the full pipeline is wrapped in a FusedPipeline object. The current work-around is to use the sum_pipelines function from the package to build the default pipeline, i.e.,
from kedro.framework.project import find_pipelines
from kedro.pipeline import Pipeline
from argo_kedro.pipeline import sum_pipelines
def register_pipelines() -> dict[str, Pipeline]:
"""Register the project's pipelines.
Returns:
A mapping from pipeline names to ``Pipeline`` objects.
"""
pipelines = find_pipelines()
pipelines["__default__"] = sum_pipelines(pipelines.values())
return pipelines
Kedro run with node target within fusing boundary
The current implementation of the fusing algorithm does not allow invocations of kedro run with a target node within the fuse boundary. The current work-around is to target the fused node in it's entirety.
Common errors
Authentication errors while submitting to the cluster
Occasionally, the combination of the fsspec[gcs] and kubernetes dependencies give inconsistencies. A current solution is to pin the following dependency:
proto-plus==1.24.0.dev1
Dataset saving errors
The Google Cloud filesystem implementation sometimes seems to result in some issues with Kedro. Resulting in VersionedDataset errors, even when versioning is disabled.
DatasetError: Cannot save versioned dataset '...' to
'...' because a file with
the same name already exists in the directory. This is likely because versioning
was enabled on a dataset already saved previously.
To fix the issue, pin the version of the following library:
gcsfs==2024.3.1
Brand and Trademark Notice
Important: The "Every Cure" name, logo, and related trademarks are the exclusive property of Every Cure. Contributors and users of this open-source project are not authorized to use the Every Cure brand, logo, or trademarks in any way that suggests endorsement, affiliation, or sponsorship without explicit written permission from Every Cure.
This project is open source and available under the terms of its license, but the Every Cure brand and trademarks remain protected. Please respect these intellectual property rights.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file argo_kedro-0.1.41.tar.gz.
File metadata
- Download URL: argo_kedro-0.1.41.tar.gz
- Upload date:
- Size: 75.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.20
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6eb7a32ff5951bd0c95291bac1c202ded6bd173ef1724f98a3ff9e0231b4284d
|
|
| MD5 |
1ae6d39e890da9feb7a07b32b9fd3bb7
|
|
| BLAKE2b-256 |
aba077fef5474ab2e19defc4d112ed42997cecf7a28fd733bc0f47d2b8909821
|
File details
Details for the file argo_kedro-0.1.41-py3-none-any.whl.
File metadata
- Download URL: argo_kedro-0.1.41-py3-none-any.whl
- Upload date:
- Size: 27.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.20
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4f7c22d7cda89cba18ec5f3797eaddb603147fabd4bc4c4523ec96c4dea44683
|
|
| MD5 |
5f3ef29b8a91550c31543f0a205f7b69
|
|
| BLAKE2b-256 |
7d3c9de60a57d033c249b915c803374009e0ef338990f892b9094d54f9012398
|