Extensions for kubeflow pipeline sdk.
Project description
kfx
kfx
is a python package with the namespace kfx
. Currently, it provides the
following sub-packages
-
kfx.lib.dsl
- Extensions to the kubeflow pipeline dsl. -
kfx.lib.vis
- Data models and helpers to help generate themlpipeline-metrics.json
andmlpipeline-ui-metadata.json
required to render visualization in the kubeflow pipeline UI. See also https://www.kubeflow.org/docs/pipelines/sdk/pipelines-metrics/ and https://www.kubeflow.org/docs/pipelines/sdk/output-viewer/
- Documentation: https://kfx.readthedocs.io.
- Repo: https://github.com/e2fyi/kfx
NOTE this is currently alpha
There will likely to have breaking changes, and feel free to do a feature request
Known issues
kfx.vis.vega.vega_web_app
andKfpArtifact
does not work well together (see example) because of CORs - the web app is hosted inside an iFrame which prevents it from accessing theml-pipeline-ui
API server.kfx.vis.vega.vega_web_app
is only supported in the latest kubeflow pipeline UI (as inline is only supported after0.2.5
)
Changelog
Refer to CHANGELOG.md.
Quick start
Installation
pip install kfx
Usage
Example: Using ContainerOpTransform
to configure the internal k8s properties
of kubeflow pipelines tasks.
kfx.dsl.ContainerOpTransform
is a helper to modify the interal k8s properties (e.g. resources, environment variables, etc) of kubeflow pipeline tasks.
import kfp.components
import kfp.dsl
import kfx.dsl
transforms = (
kfx.dsl.ContainerOpTransform()
.set_resources(cpu="500m", memory=("1G", "4G"))
.set_image_pull_policy("Always")
.set_env_vars({"ENV": "production"})
.set_env_var_from_secret("AWS_ACCESS_KEY", secret_name="aws", secret_key="access_key")
.set_annotations({"iam.amazonaws.com/role": "some-arn"})
)
@kfp.dsl.components.func_to_container_op
def echo(text: str) -> str:
print(text)
return text
@kfp.dsl.pipeline(name="demo")
def pipeline(text: str):
op1 = echo(text)
op2 = echo("%s-%s" % text)
# u can apply the transform on op1 only
# op1.apply(transforms)
# or apply on all ops in the pipeline
kfp.dsl.get_pipeline_conf().add_op_transformer(transforms)
Example: Using ArtifactLocationHelper
and KfpArtifact
to determine the
uri of your data artifact generated by the kubeflow pipeline task.
kfx.dsl.ArtifactLocationHelper
is a helper to modify the kubeflow pipeline task so that you can usekfx.dsl.KfpArtifact
to represent the artifact generated inside the task.
import kfp.components
import kfp.dsl
import kfx.dsl
# creates the helper that has the argo configs (tells you how artifacts will be stored)
# see https://github.com/argoproj/argo/blob/master/docs/workflow-controller-configmap.yaml
helper = kfx.dsl.ArtifactLocationHelper(
scheme="minio", bucket="mlpipeline", key_prefix="artifacts/"
)
@kfp.components.func_to_container_op
def test_op(
mlpipeline_ui_metadata: OutputTextFile(str), markdown_data_file: OutputTextFile(str)
):
"A test kubeflow pipeline task."
import json
import kfx.dsl
import kfx.vis
import kfx.vis.vega
# `KfpArtifact` provides the reference to data artifact created
# inside this task
spec = {
"$schema": "https://vega.github.io/schema/vega-lite/v4.json",
"description": "A simple bar chart",
"data": {
"values": [
{"a": "A", "b": 28},
{"a": "B", "b": 55},
{"a": "C", "b": 43},
{"a": "D", "b": 91},
{"a": "E", "b": 81},
{"a": "F", "b": 53},
{"a": "G", "b": 19},
{"a": "H", "b": 87},
{"a": "I", "b": 52},
]
},
"mark": "bar",
"encoding": {
"x": {"field": "a", "type": "ordinal"},
"y": {"field": "b", "type": "quantitative"},
},
}
# write the markdown to the `markdown-data` artifact
markdown_data_file.write("### hello world")
# creates an ui metadata object
ui_metadata = kfx.vis.kfp_ui_metadata(
# Describes the vis to generate in the kubeflow pipeline UI.
[
# markdown vis from a markdown artifact.
# `KfpArtifact` provides the reference to data artifact created
# inside this task
kfx.vis.markdown(kfx.dsl.KfpArtifact("markdown_data_file")),
# a vega web app from the vega data artifact.
kfx.vis.vega.vega_web_app(spec),
]
)
# writes the ui metadata object as the `mlpipeline-ui-metadata` artifact
mlpipeline_ui_metadata.write(kfx.vis.asjson(ui_metadata))
# prints the uri to the markdown artifact
print(ui_metadata.outputs[0].source)
@kfp.dsl.pipeline()
def test_pipeline():
"A test kubeflow pipeline"
op: kfp.dsl.ContainerOp = test_op()
# modify kfp operator with artifact location metadata through env vars
op.apply(helper.set_envs())
Example: Using pydantic
data models to generate mlpipeline-metrics.json
and
mlpipeline-ui-metadata.json
.
(See also https://www.kubeflow.org/docs/pipelines/sdk/output-viewer/ and https://www.kubeflow.org/docs/pipelines/sdk/pipelines-metrics/).
kfx.vis
has helper functions (with corresponding hints) to describe and createmlpipeline-metrics.json
andmlpipeline-ui-metadata.json
files (required by kubeflow pipeline UI to render any metrics or visualizations).
import functools
import kfp.components
# install kfx
kfx_component = functools.partial(kfp.components.func_to_container_op, packages_to_install=["kfx"])
@kfx_component
def some_op(
# mlpipeline_metrics is a path - i.e. open(mlpipeline_metrics, "w")
mlpipeline_metrics: kfp.components.OutputPath(str),
# mlpipeline_ui_metadata is a FileLike obj - i.e. mlpipeline_ui_metadata.write("something")
mlpipeline_ui_metadata: kfp.components.OutputTextFile(str),
):
"kfp operator that provides metrics and metadata for visualizations."
# import inside kfp task
import kfx.vis
# output metrics to mlpipeline_metrics path
kfx.vis.kfp_metrics([
# render as percent
kfx.vis.kfp_metric("recall-score", 0.9, percent=true),
# override metric format with custom value
kfx.vis.kfp_metric(name="percision-score", value=0.8, metric_format="PERCENTAGE"),
# render raw score
kfx.vis.kfp_metric("raw-score", 123.45),
]).write_to(mlpipeline_metrics)
# output visualization metadata to mlpipeline_ui_metadata obj
kfx.vis.kfp_ui_metadata(
[
# creates a confusion matrix vis
kfx.vis.confusion_matrix(
source="gs://your_project/your_bucket/your_cm_file",
labels=["True", "False"],
),
# creates a markdown with inline source
kfx.vis.markdown(
"# Inline Markdown: [A link](https://www.kubeflow.org/)",
storage="inline",
),
# creates a markdown with a remote source
kfx.vis.markdown(
"gs://your_project/your_bucket/your_markdown_file",
),
# creates a ROC curve with a remote source
kfx.vis.roc(
"gs://your_project/your_bucket/your_roc_file",
),
# creates a Table with a remote source
kfx.vis.table(
"gs://your_project/your_bucket/your_csv_file",
header=["col1", "col2"],
),
# creates a tensorboard viewer
kfx.vis.tensorboard(
"gs://your_project/your_bucket/logs/*",
),
# creates a custom web app from a remote html file
kfx.vis.web_app(
"gs://your_project/your_bucket/your_html_file",
),
# creates a Vega-Lite vis as a web app
kfx.vis.vega.vega_web_app(spec={
"$schema": "https://vega.github.io/schema/vega-lite/v4.json",
"description": "A simple bar chart with embedded data.",
"data": {
"values": [
{"a": "A", "b": 28}, {"a": "B", "b": 55}, {"a": "C", "b": 43},
{"a": "D", "b": 91}, {"a": "E", "b": 81}, {"a": "F", "b": 53},
{"a": "G", "b": 19}, {"a": "H", "b": 87}, {"a": "I", "b": 52}
]
},
"mark": "bar",
"encoding": {
"x": {"field": "a", "type": "ordinal"},
"y": {"field": "b", "type": "quantitative"}
}
})
]
).write_to(mlpipeline_ui_metadata)
Developer guide
This project used:
- isort: to manage import order
- pylint: to manage general coding best practices
- flake8: to manage code complexity and coding best practices
- black: to manage formats and styles
- pydocstyle: to manage docstr style/format
- pytest/coverage: to manage unit tests and code coverage
- bandit: to find common security issues
- pyenv: to manage dev env: python version (3.6)
- pipenv: to manage dev env: python packages
Convention for unit tests are to suffix with _test
and colocate with the actual
python module - i.e. <module_name>_test.py
.
The version of the package is read from version.txt
- i.e. please update the
appropriate semantic version (major -> breaking changes, minor -> new features, patch -> bug fix, postfix -> pre-release/post-release).
Makefile
:
# autoformat codes with docformatter, isort, and black
make format
# check style, formats, and code complexity
make check
# check style, formats, code complexity, and run unit tests
make test
# test everything including building the package and check the sdist
make test-all
# run unit test only
make test-only
# generate and update the requirements.txt and requirements-dev.txt
make requirements
# generate the docs with sphinx and autoapi extension
make docs
# generate distributions
make dists
# publish to pypi with twine (twine must be configured)
make publish
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file kfx-0.1.0.tar.gz
.
File metadata
- Download URL: kfx-0.1.0.tar.gz
- Upload date:
- Size: 21.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.6 CPython/3.6.7 Linux/4.15.0-1077-gcp
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 |
47f1b78de23c5beba3761a6a1679f87a896482543da2aeb62fce9c7323a89fa3
|
|
MD5 |
a66e86919dac66d837ea1e7214c2dfc5
|
|
BLAKE2b-256 |
9315c4619eb357f3888a7c7a3dd67c286cd8be2df4a7e5ced40127d70eac2b1c
|
File details
Details for the file kfx-0.1.0-py3-none-any.whl
.
File metadata
- Download URL: kfx-0.1.0-py3-none-any.whl
- Upload date:
- Size: 21.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.6 CPython/3.6.7 Linux/4.15.0-1077-gcp
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 |
f566d7787ed0c93763c9f08192ae803a243ca8d3ef9aaa53715378eea71a910b
|
|
MD5 |
d21682081fc23b3991351655e8e33829
|
|
BLAKE2b-256 |
6594c29292f659086ebf1e17aa3b3a92a73cf1cf74ca731cf5d16259079d2a1c
|