Skip to main content

Kubernetes platform configuration library and generated protos.

Project description

Kubeflow Pipelines SDK kfp-kubernetes API Reference

The Kubeflow Pipelines SDK kfp-kubernetes python library (part of the Kubeflow Pipelines project) is an addon to the Kubeflow Pipelines SDK that enables authoring Kubeflow pipelines with Kubernetes-specific features and concepts, such as:

Be sure to check out the full API Reference for more details.

Installation

The kfp-kubernetes package can be installed as a KFP SDK extra dependency.

pip install kfp[kubernetes]

Or installed independently:

pip install kfp-kubernetes

Getting started

The following is an example of a simple pipeline that uses the kfp-kubernetes library to mount a pre-existing secret as an environment variable available in the task's container.

Secret: As environment variable

from kfp import dsl
from kfp import kubernetes

@dsl.component
def print_secret():
    import os
    print(os.environ['SECRET_VAR'])

@dsl.pipeline
def pipeline():
    task = print_secret()
    kubernetes.use_secret_as_env(task,
                                 secret_name='my-secret',
                                 secret_key_to_env={'password': 'SECRET_VAR'})

Other examples

Here is a non-exhaustive list of some other examples of how to use the kfp-kubernetes library. Be sure to check out the full API Reference for more details.

Secret: As mounted volume

from kfp import dsl
from kfp import kubernetes

@dsl.component
def print_secret():
    with open('/mnt/my_vol') as f:
        print(f.read())

@dsl.pipeline
def pipeline():
    task = print_secret()
    kubernetes.use_secret_as_volume(task,
                                    secret_name='my-secret',
                                    mount_path='/mnt/my_vol')

Secret: As optional source for a mounted volume

from kfp import dsl
from kfp import kubernetes

@dsl.component
def print_secret():
    with open('/mnt/my_vol') as f:
        print(f.read())

@dsl.pipeline
def pipeline():
    task = print_secret()
    kubernetes.use_secret_as_volume(task,
                                    secret_name='my-secret',
                                    mount_path='/mnt/my_vol'
                                    optional=True)

ConfigMap: As environment variable

from kfp import dsl
from kfp import kubernetes

@dsl.component
def print_config_map():
    import os
    print(os.environ['CM_VAR'])

@dsl.pipeline
def pipeline():
    task = print_config_map()
    kubernetes.use_config_map_as_env(task,
                                 config_map_name='my-cm',
                                 config_map_key_to_env={'foo': 'CM_VAR'})

ConfigMap: As mounted volume

from kfp import dsl
from kfp import kubernetes

@dsl.component
def print_config_map():
    with open('/mnt/my_vol') as f:
        print(f.read())

@dsl.pipeline
def pipeline():
    task = print_config_map()
    kubernetes.use_config_map_as_volume(task,
                                       config_map_name='my-cm',
                                       mount_path='/mnt/my_vol')

ConfigMap: As optional source for a mounted volume

from kfp import dsl
from kfp import kubernetes

@dsl.component
def print_config_map():
    with open('/mnt/my_vol') as f:
        print(f.read())

@dsl.pipeline
def pipeline():
    task = print_config_map()
    kubernetes.use_config_map_as_volume(task,
                                       config_map_name='my-cm',
                                       mount_path='/mnt/my_vol',
				       optional=True)

PersistentVolumeClaim: Dynamically create PVC, mount, then delete

from kfp import dsl
from kfp import kubernetes

@dsl.component
def make_data():
    with open('/data/file.txt', 'w') as f:
        f.write('my data')

@dsl.component
def read_data():
    with open('/reused_data/file.txt') as f:
        print(f.read())

@dsl.pipeline
def my_pipeline():
    pvc1 = kubernetes.CreatePVC(
        # can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
        pvc_name_suffix='-my-pvc',
        access_modes=['ReadWriteOnce'],
        size='5Gi',
        storage_class_name='standard',
    )

    task1 = make_data()
    # normally task sequencing is handled by data exchange via component inputs/outputs
    # but since data is exchanged via volume, we need to call .after explicitly to sequence tasks
    task2 = read_data().after(task1)

    kubernetes.mount_pvc(
        task1,
        pvc_name=pvc1.outputs['name'],
        mount_path='/data',
    )
    kubernetes.mount_pvc(
        task2,
        pvc_name=pvc1.outputs['name'],
        mount_path='/reused_data',
    )

    # wait to delete the PVC until after task2 completes
    delete_pvc1 = kubernetes.DeletePVC(
        pvc_name=pvc1.outputs['name']).after(task2)

PersistentVolumeClaim: Create PVC on-the-fly tied to your pod's lifecycle

from kfp import dsl
from kfp import kubernetes

@dsl.component
def make_data():
    with open('/data/file.txt', 'w') as f:
        f.write('my data')

@dsl.pipeline
def my_pipeline():
    task1 = make_data()
    # note that the created pvc will be autoamatically cleaned up once pod disappeared and cannot be shared between pods
    kubernetes.add_ephemeral_volume(
        task1,
        volume_name="my-pvc",
        mount_path="/data",
        access_modes=['ReadWriteOnce'],
        size='5Gi',
    )

Pod Metadata: Add pod labels and annotations to the container pod's definition

from kfp import dsl
from kfp import kubernetes


@dsl.component
def comp():
    pass


@dsl.pipeline
def my_pipeline():
    task = comp()
    kubernetes.add_pod_label(
        task,
        label_key='kubeflow.com/kfp',
        label_value='pipeline-node',
    )
    kubernetes.add_pod_annotation(
        task,
        annotation_key='run_id',
        annotation_value='123456',
    )

Kubernetes Field: Use Kubernetes Field Path as enviornment variable

from kfp import dsl
from kfp import kubernetes


@dsl.component
def comp():
    pass


@dsl.pipeline
def my_pipeline():
    task = comp()
    kubernetes.use_field_path_as_env(
        task,
        env_name='KFP_RUN_NAME',
        field_path="metadata.annotations['pipelines.kubeflow.org/run_name']"
    )

Timeout: Set timeout in seconds defined as pod spec's activeDeadlineSeconds

from kfp import dsl
from kfp import kubernetes

@dsl.component
def comp():
    pass

@dsl.pipeline
def my_pipeline():
    task = comp()
    kubernetes.set_timeout(task, 20)

ImagePullPolicy: One of "Always" "Never", "IfNotPresent".

from kfp import dsl
from kfp import kubernetes

@dsl.component
def simple_task():
    print("hello-world")

@dsl.pipeline
def pipeline():
    task = simple_task()
    kubernetes.set_image_pull_policy(task, "Always")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kfp-kubernetes-1.4.0.tar.gz (43.7 kB view details)

Uploaded Source

File details

Details for the file kfp-kubernetes-1.4.0.tar.gz.

File metadata

  • Download URL: kfp-kubernetes-1.4.0.tar.gz
  • Upload date:
  • Size: 43.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.9

File hashes

Hashes for kfp-kubernetes-1.4.0.tar.gz
Algorithm Hash digest
SHA256 bbb2ce9230be2cbf8b344897a518cc8debe008e153277675b16fd4634e81f358
MD5 ea1ee74c70cffcf497154cae57c97a02
BLAKE2b-256 78925df7b43e06329132d15e43eaae75518b7e9b785cfa72c5cde1f902be88a3

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page