OmegaConf DictConfig/ListConfig support for Flyte
Project description
flyteplugins-omegaconf
Enables OmegaConf DictConfig and ListConfig as typed inputs and outputs for Flyte tasks.
Installation
pip install flyteplugins-omegaconf
Installing the package automatically registers the DictConfig and ListConfig transformers with Flyte's TypeEngine via the flyte.plugins.types entry point.
Usage
DictConfig as task inputs and outputs
import flyte
from omegaconf import DictConfig, OmegaConf
env = flyte.TaskEnvironment(name="training", image=...)
@env.task
async def preprocess(cfg: DictConfig) -> DictConfig:
return OmegaConf.merge(cfg, {"data": {"normalized": True}})
@env.task
async def train(cfg: DictConfig) -> float:
return run_experiment(cfg.optimizer.lr, cfg.training.epochs)
@env.task
async def pipeline() -> float:
cfg = OmegaConf.create({"optimizer": {"lr": 0.001}, "training": {"epochs": 10}})
processed = await preprocess(cfg)
return await train(processed)
ListConfig as task inputs and outputs
from omegaconf import ListConfig, OmegaConf
@env.task
async def build_lr_schedule(base_lr: float, num_stages: int) -> ListConfig:
return OmegaConf.create([base_lr * (0.5 ** i) for i in range(num_stages)])
@env.task
async def train_with_schedule(cfg: DictConfig, lr_schedule: ListConfig) -> float:
final_lr = float(lr_schedule[-1])
...
Ways to construct a DictConfig
All of the following are valid ways to create a DictConfig to pass to a task:
1. From a plain dict
cfg = OmegaConf.create({"optimizer": {"lr": 0.001}, "training": {"epochs": 10}})
flyte.run(train, cfg=cfg)
2. From a YAML file
cfg = OmegaConf.load("config.yaml")
flyte.run(train, cfg=cfg)
3. From a typed dataclass (structured config)
from dataclasses import dataclass, field
from omegaconf import OmegaConf
@dataclass
class OptimizerConf:
lr: float = 0.001
weight_decay: float = 1e-4
@dataclass
class TrainConf:
optimizer: OptimizerConf = field(default_factory=OptimizerConf)
epochs: int = 10
cfg = OmegaConf.structured(TrainConf())
flyte.run(train, cfg=cfg)
Structured configs provide type validation at assignment time: cfg.optimizer.lr = "oops" raises omegaconf.ValidationError.
4. Merging base config with overrides
base = OmegaConf.load("config.yaml")
override = OmegaConf.create({"optimizer": {"lr": 0.01}})
cfg = OmegaConf.merge(base, override)
flyte.run(train, cfg=cfg)
5. Structured config with MISSING required fields
from omegaconf import MISSING
@dataclass
class TrainConf:
data_path: str = MISSING # must be set before accessing
epochs: int = 10
# Pass with MISSING still unset — serialization succeeds
cfg = OmegaConf.structured(TrainConf())
flyte.run(train, cfg=cfg)
# Or fill it before passing
cfg = OmegaConf.structured(TrainConf(data_path="/data/imagenet"))
flyte.run(train, cfg=cfg)
A config with an unset MISSING field serializes and deserializes successfully — the MISSING sentinel is preserved through the wire format. Accessing the field raises MissingMandatoryValue, so the task will fail if it tries to read an unfilled field.
Structured config deserialization
When a DictConfig is deserialized in a receiving task, the plugin uses Auto mode: it attempts to reconstruct the original dataclass-backed config, and falls back to a plain DictConfig if the class is not importable in the receiving task's environment.
# Task A produces a structured config
cfg = OmegaConf.structured(TrainConf(lr=0.01))
# serialized payload: {"base_dataclass": "mymodule.TrainConf", "values": {...}}
# Task B receives it
async def task_b(cfg: DictConfig) -> ...:
# If TrainConf is importable: cfg is a TrainConf-backed DictConfig (type-validated)
# If TrainConf is not importable: cfg is a plain DictConfig (no schema)
OmegaConf.get_type(cfg) # TrainConf or dict
To ensure structured configs survive task hops, make sure the dataclass is defined in a module importable by all tasks in the pipeline.
YAML reports
The Flyte I/O panel displays the literal wire representation. For a human-readable YAML view, enable a Flyte report
on the task and log the config with log_yaml:
from omegaconf import DictConfig
from flyteplugins.omegaconf.report import log_yaml
@env.task(report=True)
async def train(cfg: DictConfig) -> DictConfig:
await log_yaml.aio(cfg, title="Input config")
...
Wire format
Both DictConfig and ListConfig are serialized as MessagePack binaries with tag "msgpack":
Literal(scalar=Scalar(binary=Binary(value=<msgpack bytes>, tag="msgpack")))
DictConfig payload (msgpack-encoded dict):
{
"base_dataclass": "mymodule.TrainConf",
"values": { "optimizer": { "lr": 0.001 }, "training": { "epochs": 10 } }
}
For plain dict-backed configs, base_dataclass is "builtins.dict".
ListConfig payload (msgpack-encoded list):
[0.001, 0.01, 0.1]
OmegaConf variable interpolations are resolved at serialization time (resolve=True). The wire representation always contains concrete values.
Limitations
- Structured config schema strictness: merging keys that don't exist as dataclass fields raises an error. Only declare structured configs when all possible keys are known upfront.
MISSINGfields: aDictConfigwith unsetMISSINGfields serializes fine — the sentinel is preserved on the wire and accessing it still raisesMissingMandatoryValue. However, in plain dict mode (when the originating dataclass is not importable in the receiving task), the field's type annotation is lost: the node becomes anAnyNodeinstead of the declared type (e.g.StringNode). In Auto mode, the schema is recovered from the dataclass, so the annotation is preserved.- ListConfig structured configs:
ListConfigalways round-trips as a plainListConfig— there is no structured (typed-element) ListConfig support. - Key types: OmegaConf enforces string keys for
DictConfig; integer-keyed dicts are not supported. - Class importability: structured config reconstruction requires the dataclass to be importable in the receiving task. If it is not, the config falls back to a plain
DictConfig(Auto mode).
Examples
See the examples/ directory:
example_dictconfig.py— plain dict configs, nested, interpolation, mergingexample_structured_config.py— structured configs, type validation, MISSING fields, config resolutionexample_listconfig.py— numeric lists, nested lists, list of dicts, LR schedulesexample_pipeline.py— multi-task pipeline with DictConfig and ListConfig flowing between tasks
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file flyteplugins_omegaconf-2.1.9-py3-none-any.whl.
File metadata
- Download URL: flyteplugins_omegaconf-2.1.9-py3-none-any.whl
- Upload date:
- Size: 8.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f3444a822b95b85dc0a1de1e361d91073eafaa538f643bca4ef351173cee9281
|
|
| MD5 |
f3a9c624a89a848939396cd554cac9e4
|
|
| BLAKE2b-256 |
27c74e69f6a6cd31e3d5fa3082e8a5d28fe1deb03a551d921b6177fd15836c7f
|