Skip to main content

Kubernetes platform configuration library and generated protos.

Project description

Kubeflow Pipelines SDK kfp-kubernetes API Reference

The Kubeflow Pipelines SDK kfp-kubernetes python library (part of the Kubeflow Pipelines project) is an addon to the Kubeflow Pipelines SDK that enables authoring Kubeflow pipelines with Kubernetes-specific features and concepts, such as:

Be sure to check out the full API Reference for more details.

Installation

The kfp-kubernetes package can be installed as a KFP SDK extra dependency.

pip install kfp[kubernetes]

Or installed independently:

pip install kfp-kubernetes

Getting started

The following is an example of a simple pipeline that uses the kfp-kubernetes library to mount a pre-existing secret as an environment variable available in the task's container.

Secret: As environment variable

from kfp import dsl
from kfp import kubernetes

@dsl.component
def print_secret():
    import os
    print(os.environ['SECRET_VAR'])

@dsl.pipeline
def pipeline():
    task = print_secret()
    kubernetes.use_secret_as_env(task,
                                 secret_name='my-secret',
                                 secret_key_to_env={'password': 'SECRET_VAR'})

Other examples

Here is a non-exhaustive list of some other examples of how to use the kfp-kubernetes library. Be sure to check out the full API Reference for more details.

Secret: As mounted volume

from kfp import dsl
from kfp import kubernetes

@dsl.component
def print_secret():
    with open('/mnt/my_vol') as f:
        print(f.read())

@dsl.pipeline
def pipeline():
    task = print_secret()
    kubernetes.use_secret_as_volume(task,
                                    secret_name='my-secret',
                                    mount_path='/mnt/my_vol')

Secret: As optional source for a mounted volume

from kfp import dsl
from kfp import kubernetes

@dsl.component
def print_secret():
    with open('/mnt/my_vol') as f:
        print(f.read())

@dsl.pipeline
def pipeline():
    task = print_secret()
    kubernetes.use_secret_as_volume(task,
                                    secret_name='my-secret',
                                    mount_path='/mnt/my_vol'
                                    optional=True)

ConfigMap: As environment variable

from kfp import dsl
from kfp import kubernetes

@dsl.component
def print_config_map():
    import os
    print(os.environ['CM_VAR'])

@dsl.pipeline
def pipeline():
    task = print_config_map()
    kubernetes.use_config_map_as_env(task,
                                 config_map_name='my-cm',
                                 config_map_key_to_env={'foo': 'CM_VAR'})

ConfigMap: As mounted volume

from kfp import dsl
from kfp import kubernetes

@dsl.component
def print_config_map():
    with open('/mnt/my_vol') as f:
        print(f.read())

@dsl.pipeline
def pipeline():
    task = print_config_map()
    kubernetes.use_config_map_as_volume(task,
                                       config_map_name='my-cm',
                                       mount_path='/mnt/my_vol')

ConfigMap: As optional source for a mounted volume

from kfp import dsl
from kfp import kubernetes

@dsl.component
def print_config_map():
    with open('/mnt/my_vol') as f:
        print(f.read())

@dsl.pipeline
def pipeline():
    task = print_config_map()
    kubernetes.use_config_map_as_volume(task,
                                       config_map_name='my-cm',
                                       mount_path='/mnt/my_vol',
				       optional=True)

PersistentVolumeClaim: Dynamically create PVC, mount, then delete

from kfp import dsl
from kfp import kubernetes

@dsl.component
def make_data():
    with open('/data/file.txt', 'w') as f:
        f.write('my data')

@dsl.component
def read_data():
    with open('/reused_data/file.txt') as f:
        print(f.read())

@dsl.pipeline
def my_pipeline():
    pvc1 = kubernetes.CreatePVC(
        # can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
        pvc_name_suffix='-my-pvc',
        access_modes=['ReadWriteOnce'],
        size='5Gi',
        storage_class_name='standard',
    )

    task1 = make_data()
    # normally task sequencing is handled by data exchange via component inputs/outputs
    # but since data is exchanged via volume, we need to call .after explicitly to sequence tasks
    task2 = read_data().after(task1)

    kubernetes.mount_pvc(
        task1,
        pvc_name=pvc1.outputs['name'],
        mount_path='/data',
    )
    kubernetes.mount_pvc(
        task2,
        pvc_name=pvc1.outputs['name'],
        mount_path='/reused_data',
    )

    # wait to delete the PVC until after task2 completes
    delete_pvc1 = kubernetes.DeletePVC(
        pvc_name=pvc1.outputs['name']).after(task2)

PersistentVolumeClaim: Create PVC on-the-fly tied to your pod's lifecycle

from kfp import dsl
from kfp import kubernetes

@dsl.component
def make_data():
    with open('/data/file.txt', 'w') as f:
        f.write('my data')

@dsl.pipeline
def my_pipeline():
    task1 = make_data()
    # note that the created pvc will be autoamatically cleaned up once pod disappeared and cannot be shared between pods
    kubernetes.add_ephemeral_volume(
        task1,
        volume_name="my-pvc",
        mount_path="/data",
        access_modes=['ReadWriteOnce'],
        size='5Gi',
    )

Pod Metadata: Add pod labels and annotations to the container pod's definition

from kfp import dsl
from kfp import kubernetes


@dsl.component
def comp():
    pass


@dsl.pipeline
def my_pipeline():
    task = comp()
    kubernetes.add_pod_label(
        task,
        label_key='kubeflow.com/kfp',
        label_value='pipeline-node',
    )
    kubernetes.add_pod_annotation(
        task,
        annotation_key='run_id',
        annotation_value='123456',
    )

Kubernetes Field: Use Kubernetes Field Path as enviornment variable

from kfp import dsl
from kfp import kubernetes


@dsl.component
def comp():
    pass


@dsl.pipeline
def my_pipeline():
    task = comp()
    kubernetes.use_field_path_as_env(
        task,
        env_name='KFP_RUN_NAME',
        field_path="metadata.annotations['pipelines.kubeflow.org/run_name']"
    )

Timeout: Set timeout in seconds defined as pod spec's activeDeadlineSeconds

from kfp import dsl
from kfp import kubernetes

@dsl.component
def comp():
    pass

@dsl.pipeline
def my_pipeline():
    task = comp()
    kubernetes.set_timeout(task, 20)

ImagePullPolicy: One of "Always" "Never", "IfNotPresent".

from kfp import dsl
from kfp import kubernetes

@dsl.component
def simple_task():
    print("hello-world")

@dsl.pipeline
def pipeline():
    task = simple_task()
    kubernetes.set_image_pull_policy(task, "Always")

SecurityContext: Set security context for the task's container

from kfp import dsl
from kfp import kubernetes

@dsl.component
def my_task():
    print("running with custom security context")

@dsl.pipeline
def pipeline():
    task = my_task()
    kubernetes.set_security_context(
        task,
        run_as_user=1000,
        run_as_group=3000,
    )

Note: Cluster administrators can configure default runAsUser and runAsGroup values for all customer workload containers via the pipeline-install-config ConfigMap (defaultSecurityContextRunAsUser and defaultSecurityContextRunAsGroup). When admin defaults are configured, SDK-specified values for those fields are overridden.

ImagePullSecrets: Set secrets to authenticate image pulls

from kfp import dsl
from kfp import kubernetes

@dsl.container_component
def hello():
    return dsl.ContainerSpec(
        image='some-private-image:tag',
        command=['echo', 'hello']
    )

@dsl.pipeline
def pipeline():
    task = hello()
    kubernetes.set_image_pull_secrets(task, ['secret-name'])

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kfp_kubernetes-2.16.1.tar.gz (19.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kfp_kubernetes-2.16.1-py3-none-any.whl (27.3 kB view details)

Uploaded Python 3

File details

Details for the file kfp_kubernetes-2.16.1.tar.gz.

File metadata

  • Download URL: kfp_kubernetes-2.16.1.tar.gz
  • Upload date:
  • Size: 19.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for kfp_kubernetes-2.16.1.tar.gz
Algorithm Hash digest
SHA256 31267b58d04ad97165fb640ab8fe99c2ac58b9f46077a046bd34d281e9491d54
MD5 55c81f50af330c73632fd75db9ca80de
BLAKE2b-256 febfec1d0b97220b779fe373a2e52b217e70ac391cbcdbe184d5d4a128aecb3d

See more details on using hashes here.

Provenance

The following attestation bundles were made for kfp_kubernetes-2.16.1.tar.gz:

Publisher: publish-packages.yml on kubeflow/pipelines

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file kfp_kubernetes-2.16.1-py3-none-any.whl.

File metadata

File hashes

Hashes for kfp_kubernetes-2.16.1-py3-none-any.whl
Algorithm Hash digest
SHA256 d6bb854d93a6bc6893f70e9e4f4978d6133760ae62d8626be8a4360993cc9375
MD5 f001682f1da390577e5bd8b86b6876ee
BLAKE2b-256 38bef6022a3c453318b867d2ba9ec9d07f52ecb01ff0bfccd3567b2d3280d0d7

See more details on using hashes here.

Provenance

The following attestation bundles were made for kfp_kubernetes-2.16.1-py3-none-any.whl:

Publisher: publish-packages.yml on kubeflow/pipelines

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page