Skip to main content

Extensions for kubeflow pipeline sdk.

Project description


PyPI version Build Status Coverage Status Documentation Status Code style: black Downloads

kfx is a python package with the namespace kfx. Currently, it provides the following sub-packages

NOTE this is currently alpha

There will likely to have breaking changes, and feel free to do a feature request

Known issues

  • kfx.vis.vega.vega_web_app and KfpArtifact does not work well together (see example) because of CORs - the web app is hosted inside an iFrame which prevents it from accessing the ml-pipeline-ui API server.
  • kfx.vis.vega.vega_web_app is only supported in the latest kubeflow pipeline UI (as inline is only supported after 0.2.5)


Refer to

Quick start


pip install kfx


Example: Using ContainerOpTransform to configure the internal k8s properties of kubeflow pipelines tasks.

kfx.dsl.ContainerOpTransform is a helper to modify the interal k8s properties (e.g. resources, environment variables, etc) of kubeflow pipeline tasks.

import kfp.components
import kfp.dsl
import kfx.dsl

transforms = (
    .set_resources(cpu="500m", memory=("1G", "4G"))
    .set_env_vars({"ENV": "production"})
    .set_env_var_from_secret("AWS_ACCESS_KEY", secret_name="aws", secret_key="access_key")
    .set_annotations({"": "some-arn"})

def echo(text: str) -> str:
    return text

def pipeline(text: str):
    op1 = echo(text)
    op2 = echo("%s-%s" % text)

    # u can apply the transform on op1 only
    # op1.apply(transforms)

    # or apply on all ops in the pipeline

Example: Using ArtifactLocationHelper and KfpArtifact to determine the uri of your data artifact generated by the kubeflow pipeline task.

kfx.dsl.ArtifactLocationHelper is a helper to modify the kubeflow pipeline task so that you can use kfx.dsl.KfpArtifact to represent the artifact generated inside the task.

import kfp.components
import kfp.dsl
import kfx.dsl

# creates the helper that has the argo configs (tells you how artifacts will be stored)
# see
helper = kfx.dsl.ArtifactLocationHelper(
    scheme="minio", bucket="mlpipeline", key_prefix="artifacts/"

def test_op(
    mlpipeline_ui_metadata: OutputTextFile(str), markdown_data_file: OutputTextFile(str)
    "A test kubeflow pipeline task."

    import json

    import kfx.dsl
    import kfx.vis
    import kfx.vis.vega

    # `KfpArtifact` provides the reference to data artifact created
    # inside this task
    spec = {
        "$schema": "",
        "description": "A simple bar chart",
        "data": {
            "values": [
                {"a": "A", "b": 28},
                {"a": "B", "b": 55},
                {"a": "C", "b": 43},
                {"a": "D", "b": 91},
                {"a": "E", "b": 81},
                {"a": "F", "b": 53},
                {"a": "G", "b": 19},
                {"a": "H", "b": 87},
                {"a": "I", "b": 52},
        "mark": "bar",
        "encoding": {
            "x": {"field": "a", "type": "ordinal"},
            "y": {"field": "b", "type": "quantitative"},

    # write the markdown to the `markdown-data` artifact
    markdown_data_file.write("### hello world")

    # creates an ui metadata object
    ui_metadata = kfx.vis.kfp_ui_metadata(
        # Describes the vis to generate in the kubeflow pipeline UI.
            # markdown vis from a markdown artifact.
            # `KfpArtifact` provides the reference to data artifact created
            # inside this task
            # a vega web app from the vega data artifact.

    # writes the ui metadata object as the `mlpipeline-ui-metadata` artifact

    # prints the uri to the markdown artifact

def test_pipeline():
    "A test kubeflow pipeline"

    op: kfp.dsl.ContainerOp = test_op()

    # modify kfp operator with artifact location metadata through env vars

Example: Using pydantic data models to generate mlpipeline-metrics.json and mlpipeline-ui-metadata.json.

(See also and

kfx.vis has helper functions (with corresponding hints) to describe and create mlpipeline-metrics.json and mlpipeline-ui-metadata.json files (required by kubeflow pipeline UI to render any metrics or visualizations).

import functools

import kfp.components

# install kfx
kfx_component = functools.partial(kfp.components.func_to_container_op, packages_to_install=["kfx"])

def some_op(
    # mlpipeline_metrics is a path - i.e. open(mlpipeline_metrics, "w")
    mlpipeline_metrics: kfp.components.OutputPath(str),
    # mlpipeline_ui_metadata is a FileLike obj - i.e. mlpipeline_ui_metadata.write("something")
    mlpipeline_ui_metadata: kfp.components.OutputTextFile(str),
    "kfp operator that provides metrics and metadata for visualizations."

    # import inside kfp task
    import kfx.vis

    # output metrics to mlpipeline_metrics path
        # render as percent
        kfx.vis.kfp_metric("recall-score", 0.9, percent=true),
        # override metric format with custom value
        kfx.vis.kfp_metric(name="percision-score", value=0.8, metric_format="PERCENTAGE"),
        # render raw score
        kfx.vis.kfp_metric("raw-score", 123.45),

    # output visualization metadata to mlpipeline_ui_metadata obj
            # creates a confusion matrix vis
                labels=["True", "False"],
            # creates a markdown with inline source
                "# Inline Markdown: [A link](",
            # creates a markdown with a remote source
            # creates a ROC curve with a remote source
            # creates a Table with a remote source
                header=["col1", "col2"],
            # creates a tensorboard viewer
            # creates a custom web app from a remote html file
            # creates a Vega-Lite vis as a web app
                "$schema": "",
                "description": "A simple bar chart with embedded data.",
                "data": {
                    "values": [
                        {"a": "A", "b": 28}, {"a": "B", "b": 55}, {"a": "C", "b": 43},
                        {"a": "D", "b": 91}, {"a": "E", "b": 81}, {"a": "F", "b": 53},
                        {"a": "G", "b": 19}, {"a": "H", "b": 87}, {"a": "I", "b": 52}
                "mark": "bar",
                "encoding": {
                    "x": {"field": "a", "type": "ordinal"},
                    "y": {"field": "b", "type": "quantitative"}

Developer guide

This project used:

  • isort: to manage import order
  • pylint: to manage general coding best practices
  • flake8: to manage code complexity and coding best practices
  • black: to manage formats and styles
  • pydocstyle: to manage docstr style/format
  • pytest/coverage: to manage unit tests and code coverage
  • bandit: to find common security issues
  • pyenv: to manage dev env: python version (3.6)
  • pipenv: to manage dev env: python packages

Convention for unit tests are to suffix with _test and colocate with the actual python module - i.e. <module_name>

The version of the package is read from version.txt - i.e. please update the appropriate semantic version (major -> breaking changes, minor -> new features, patch -> bug fix, postfix -> pre-release/post-release).


# autoformat codes with docformatter, isort, and black
make format

# check style, formats, and code complexity
make check

# check style, formats, code complexity, and run unit tests
make test

# test everything including building the package and check the sdist
make test-all

# run unit test only
make test-only

# generate and update the requirements.txt and requirements-dev.txt
make requirements

# generate the docs with sphinx and autoapi extension
make docs

# generate distributions
make dists

# publish to pypi with twine (twine must be configured)
make publish

Project details

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kfx-0.1.0.tar.gz (21.3 kB view hashes)

Uploaded source

Built Distribution

kfx-0.1.0-py3-none-any.whl (21.6 kB view hashes)

Uploaded py3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page