Development kit for building Metaflow orchestrator extensions
Project description
metaflow-orchestrator-kit
Build a correct Metaflow orchestrator extension the first time — scaffold, validate, and prove compliance.
Table of Contents
- The problem
- Install
- 5-minute quickstart
- What the scaffold generates
- Extension package layout
- Capabilities
- Usage
- Common pitfalls
- Development
The problem
You want to integrate a new scheduler with Metaflow. You know you need a DeployerImpl, a DeployedFlow, and a TriggeredRun — but the contract is nowhere written down. No spec lists what your deployer must handle. No test suite validates your implementation. You figure it out by reading existing orchestrators and rediscovering the same handful of non-obvious bugs every author hits: retry counts hardcoded to zero, --branch missing from step subprocesses, config env vars absent from containers, run_params passed as a tuple.
This kit is the missing artifact: a written contract, a scaffold that pre-fills every requirement, a static validator that catches mistakes before CI, and a compliance test suite you can run against a live scheduler.
Install
pip install metaflow-orchestrator-kit
# with dev dependencies:
pip install "metaflow-orchestrator-kit[dev]"
5-minute quickstart
# 1. Scaffold
python -m metaflow_orchestrator_kit.scaffold my_scheduler
cd my_scheduler/
# 2. Fill in the scheduler-specific parts (all marked # TODO: SCHEDULER API)
# - my_scheduler_deployer.py: _compile_workflow(), _build_step_command()
# - my_scheduler_objects.py: trigger(), from_deployment(), status property
# - my_scheduler_cli.py: create(), trigger()
# 3. Validate (static analysis — no scheduler needed)
python -m metaflow_orchestrator_kit.validate ./
# 4. Test (one command — writes config, finds flows, runs compliance suite)
python -m metaflow_orchestrator_kit.test \
--scheduler-type my_scheduler \
--deploy-args host=http://localhost:8000
# 5. If all green: commit the generated ux-tests-my_scheduler.yml to your repo
What the scaffold generates
python -m metaflow_orchestrator_kit.scaffold my_scheduler
Creates ./my_scheduler/ containing:
my_scheduler_deployer.py DeployerImpl subclass — all required plumbing pre-solved
my_scheduler_objects.py DeployedFlow / TriggeredRun subclasses
my_scheduler_cli.py CLI entry-point group (create, trigger)
mfextinit_my_scheduler.py Extension registration (auto-discovered by Metaflow)
ux-tests-my_scheduler.yml GitHub Actions workflow skeleton
The generated _build_step_command() has all Metaflow plumbing pre-solved. You fill in only the scheduler API calls:
# Generated deployer — pre-solved (do not change these)
required_env = {
"METAFLOW_FLOW_CONFIG_VALUE": flow_config_value or "", # Cap.CONFIG_EXPR
"METAFLOW_DATASTORE_SYSROOT_LOCAL": datastore_sysroot,
"METAFLOW_SERVICE_URL": os.environ.get("METAFLOW_SERVICE_URL", ""),
"PATH": os.environ.get("PATH", ""),
}
def _build_step_command(self, step_name, run_id, task_id, input_paths,
branch=None, retry_count=0, environment_type="local"):
cmd = [sys.executable, flow_file, "--no-pylint", "--environment", environment_type]
if branch:
cmd += ["--branch", branch] # Cap.PROJECT_BRANCH
cmd += ["step", step_name, "--run-id", run_id, "--task-id", task_id,
"--retry-count", str(retry_count), # TODO: SCHEDULER API — replace 0
"--input-paths", input_paths]
return cmd
Extension package layout
Once you're ready to publish, place the generated files in the proper namespace package structure:
your_package/
metaflow_extensions/ ← NO __init__.py here (implicit namespace package)
my_scheduler/
plugins/
mfextinit_my_scheduler.py ← extension registration
my_scheduler/
my_scheduler_deployer.py ← DeployerImpl subclass
my_scheduler_objects.py ← DeployedFlow / TriggeredRun
my_scheduler_cli.py ← CLI group
After pip install -e ., Deployer(flow_file).my_scheduler(...) is available with no other registration needed. Metaflow discovers mfextinit_*.py automatically.
CRITICAL:
metaflow_extensions/must NOT have__init__.py. It is an implicit namespace package. Adding__init__.pybreaks extension discovery —Deployer(flow).my_scheduler()will silently not exist after install.
Capabilities
Required — every orchestrator must pass these
| Capability | What it means |
|---|---|
Cap.LINEAR_DAG |
start → one or more steps → end |
Cap.BRANCHING |
static split/join: parallel branches merged at a join step |
Cap.FOREACH |
dynamic fan-out: foreach produces N tasks at runtime |
Cap.RETRY |
real attempt count from scheduler's native counter (not hardcoded 0) |
Cap.CATCH |
@catch decorator: catch step exception and continue the flow |
Cap.TIMEOUT |
@timeout decorator: abort a step that exceeds the time limit |
Cap.RESOURCES |
@resources passthrough: CPU/memory hints forwarded to scheduler |
Cap.PROJECT_BRANCH |
--branch forwarded to every step subprocess, not just start |
Cap.CONFIG_EXPR |
METAFLOW_FLOW_CONFIG_VALUE injected into every container/subprocess |
Cap.RUN_PARAMS |
trigger() run params as list, not tuple |
Cap.FROM_DEPLOYMENT |
from_deployment(identifier) handles dotted names (project.branch.FlowName) |
Cap.CONDA |
@conda environment creation at task runtime; passes --environment conda to step subprocesses |
Optional — implement or explicitly declare unsupported
| Capability | What it means |
|---|---|
Cap.NESTED_FOREACH |
foreach inside foreach |
Cap.RESUME |
ORIGIN_RUN_ID resume: re-run from a previously failed step |
Cap.SCHEDULE |
@schedule cron trigger |
Declare your supported capabilities in the deployer:
from metaflow_orchestrator_kit import Cap, REQUIRED
SUPPORTED_CAPABILITIES = REQUIRED | {Cap.NESTED_FOREACH, Cap.SCHEDULE}
Usage
Validate (static analysis — no scheduler needed)
python -m metaflow_orchestrator_kit.validate ./my_scheduler/
# equivalent:
metaflow-orchestrator-validate ./my_scheduler/
Example output:
Validating: /path/to/my_scheduler/
PASS metaflow_extensions/ has no __init__.py
PASS mfextinit_<name>.py exists
PASS DEPLOYER_IMPL_PROVIDERS_DESC has correct structure
PASS run_params uses list() not tuple()
FAIL METAFLOW_FLOW_CONFIG_VALUE in step env
Problem: METAFLOW_FLOW_CONFIG_VALUE not found in any extension file
Fix: Extract at compile time: from metaflow.flowspec import FlowStateItems; ...
PASS --branch passed to step commands
PASS retry_count reads from attempt, not hardcoded to 0
PASS DATASTORE_SYSROOT captured at compile time
PASS ENVIRONMENT_TYPE passed to step command
Results: 8 passed, 1 failed
Test (one-command compliance suite)
python -m metaflow_orchestrator_kit.test \
--scheduler-type my_scheduler \
--deploy-args host=http://localhost:8000,token=abc123
# Full options:
python -m metaflow_orchestrator_kit.test \
--scheduler-type windmill \
--deploy-args windmill_host=http://localhost:8000,windmill_token=abc123 \
--metaflow-src /path/to/metaflow \
--test-modules compliance,basic,config,dag \
--workers 4
This command:
- Writes
ux_test_config_windmill.yamlautomatically (scheduler-specific name avoids conflicts) - Creates an isolated
METAFLOW_DATASTORE_SYSROOT_LOCALper run (prevents contamination between concurrent test runs) - Verifies scheduler reachability before starting tests
- Runs the compliance tests and reports pass/fail per capability
Wire compliance into CI
- name: Run compliance tests
run: |
python -m metaflow_orchestrator_kit.test \
--scheduler-type my_scheduler \
--deploy-args host=${{ secrets.SCHEDULER_HOST }}
Or use the generated ux-tests-my_scheduler.yml as a starting point.
Common pitfalls
Every new orchestrator implementation hits the same bugs. The scaffold pre-solves most of them; the validator catches the rest before CI. 33 pitfalls have been documented from the full histories of kestra, prefect, temporal, dagster, flyte, windmill, and mage.
The top 5 pitfalls that every implementation hits:
run_paramstuple vs list — Click returns tuples; the deployer API requires a list.run_params = list(run_params) if run_params else []METAFLOW_FLOW_CONFIG_VALUEmissing — must be baked into every step container env at compile time--branchnot forwarded — must be in every step command, not just the start stepretry_countwrong — derive from scheduler's native counter; most are 1-indexed, so usemax(0, counter - 1)- Docker worker filesystem isolation — step scripts reference host paths that don't exist inside worker containers; add volume mounts
Full pitfall documentation, grouped by topic:
- Contract & API pitfalls — deployer protocol,
from_deployment,init, cancellation - Environment variable pitfalls —
METAFLOW_FLOW_CONFIG_VALUE,--branch, sysroot,--input-paths - Docker & CI pitfalls — volume mounts, root-owned files, GHA env vars, conda PATH
- Scheduler API pitfalls — auth, async indexing,
--tagplacement, coverage artifacts
Development
git clone https://github.com/npow/metaflow-orchestrator-kit
cd metaflow-orchestrator-kit
pip install -e ".[dev]"
pytest
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file metaflow_orchestrator_kit-0.1.1.tar.gz.
File metadata
- Download URL: metaflow_orchestrator_kit-0.1.1.tar.gz
- Upload date:
- Size: 57.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9d6685be4984494ab9c1db25740591a7eca7a1e50819990088f4041715082e82
|
|
| MD5 |
318e7efad99b5fb98b4f133f338e9671
|
|
| BLAKE2b-256 |
10b238cd8e7df378c904d5a5c2ca34388d86f0ec59143f71738bcd1c8532b835
|
Provenance
The following attestation bundles were made for metaflow_orchestrator_kit-0.1.1.tar.gz:
Publisher:
publish.yml on npow/metaflow-orchestrator-kit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
metaflow_orchestrator_kit-0.1.1.tar.gz -
Subject digest:
9d6685be4984494ab9c1db25740591a7eca7a1e50819990088f4041715082e82 - Sigstore transparency entry: 1092387704
- Sigstore integration time:
-
Permalink:
npow/metaflow-orchestrator-kit@bf7492b11888b1fb7eceaf6fc8594c12cb2f4c9d -
Branch / Tag:
refs/tags/v0.1.1 - Owner: https://github.com/npow
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@bf7492b11888b1fb7eceaf6fc8594c12cb2f4c9d -
Trigger Event:
push
-
Statement type:
File details
Details for the file metaflow_orchestrator_kit-0.1.1-py3-none-any.whl.
File metadata
- Download URL: metaflow_orchestrator_kit-0.1.1-py3-none-any.whl
- Upload date:
- Size: 59.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0d8381a4e5ce1034a30ed46c279f5c20d87af8506cc8d7bfc8f9cf667fef5249
|
|
| MD5 |
54cdc3434ebf1764878b6fc710fee810
|
|
| BLAKE2b-256 |
88da5488e6c0e2b9acf84208661e986cec0820df1b1701c66bf6940aac086994
|
Provenance
The following attestation bundles were made for metaflow_orchestrator_kit-0.1.1-py3-none-any.whl:
Publisher:
publish.yml on npow/metaflow-orchestrator-kit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
metaflow_orchestrator_kit-0.1.1-py3-none-any.whl -
Subject digest:
0d8381a4e5ce1034a30ed46c279f5c20d87af8506cc8d7bfc8f9cf667fef5249 - Sigstore transparency entry: 1092387712
- Sigstore integration time:
-
Permalink:
npow/metaflow-orchestrator-kit@bf7492b11888b1fb7eceaf6fc8594c12cb2f4c9d -
Branch / Tag:
refs/tags/v0.1.1 - Owner: https://github.com/npow
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@bf7492b11888b1fb7eceaf6fc8594c12cb2f4c9d -
Trigger Event:
push
-
Statement type: