Development kit for building Metaflow orchestrator extensions
Project description
metaflow-orchestrator-kit
Build a correct Metaflow orchestrator the first time — scaffold, declare capabilities, and prove compliance.
The problem
You want to integrate a new scheduler with Metaflow. You know you need to implement a DeployerImpl, a DeployedFlow, and a TriggeredRun. But Metaflow's orchestrator contract is not written down anywhere. There is no spec listing what your deployer must handle, no test suite to run against your implementation, and no scaffold to get you started. You figure it out by reading existing orchestrator code and hitting a handful of non-obvious bugs that every new orchestrator author rediscovers independently: retry counts hardcoded to zero, --branch missing from step subprocesses, config env vars absent from containers. This kit is the missing artifact: a written contract, a compliance test suite you can run locally, and a scaffold that pre-fills every requirement so none of them are easy to skip.
5-minute quickstart
# 1. Install
pip install metaflow-orchestrator-kit
# 2. Scaffold
python -m metaflow_orchestrator_kit.scaffold my_scheduler
cd my_scheduler/
# 3. Fill in the scheduler-specific parts (all marked with # TODO: SCHEDULER API)
# - my_scheduler_deployer.py: _compile_workflow(), _build_step_command()
# - my_scheduler_objects.py: trigger(), from_deployment(), status property
# - my_scheduler_cli.py: create(), trigger()
# 4. Validate (catches all known pitfalls without running tests)
python -m metaflow_orchestrator_kit.validate ./
# 5. Test (one command — writes config, finds flows, runs compliance suite)
python -m metaflow_orchestrator_kit.test \
--scheduler-type my_scheduler \
--deploy-args host=http://localhost:8000
# 6. If all green: set up GHA with the generated ux-tests-my_scheduler.yml
What the scaffold generates
python -m metaflow_orchestrator_kit.scaffold my_scheduler
Creates ./my_scheduler/ containing:
my_scheduler_deployer.py DeployerImpl subclass — all required plumbing pre-solved
my_scheduler_objects.py DeployedFlow / TriggeredRun subclasses
my_scheduler_cli.py CLI entry-point group
mfextinit_my_scheduler.py Extension registration (auto-discovered by Metaflow)
ux-tests-my_scheduler.yml GitHub Actions workflow skeleton
The generated _build_step_command() includes all pre-solved Metaflow plumbing. You fill in only the scheduler API calls:
# Generated deployer — pre-solved (do not change these)
required_env = {
"METAFLOW_FLOW_CONFIG_VALUE": flow_config_value or "", # Cap.CONFIG_EXPR
"METAFLOW_DATASTORE_SYSROOT_LOCAL": datastore_sysroot,
"METAFLOW_SERVICE_URL": os.environ.get("METAFLOW_SERVICE_URL", ""),
"PATH": os.environ.get("PATH", ""),
}
def _build_step_command(self, step_name, run_id, task_id, input_paths,
branch=None, retry_count=0, environment_type="local"):
cmd = [sys.executable, flow_file, "--no-pylint", "--environment", environment_type]
if branch:
cmd += ["--branch", branch] # Cap.PROJECT_BRANCH
cmd += ["step", step_name, "--run-id", run_id, "--task-id", task_id,
"--retry-count", str(retry_count), # TODO: SCHEDULER API — replace 0
"--input-paths", input_paths]
return cmd
Install
pip install metaflow-orchestrator-kit
# with dev dependencies:
pip install "metaflow-orchestrator-kit[dev]"
Usage
1. Scaffold
python -m metaflow_orchestrator_kit.scaffold my_scheduler [output_dir]
# equivalent short form:
metaflow-orchestrator-scaffold my_scheduler
2. Validate (static analysis — no scheduler needed)
python -m metaflow_orchestrator_kit.validate ./my_scheduler/
# equivalent:
metaflow-orchestrator-validate ./my_scheduler/
Example output:
Validating: /path/to/my_scheduler/
PASS mfextinit_<name>.py exists
PASS DEPLOYER_IMPL_PROVIDERS_DESC has correct structure
PASS run_params uses list() not tuple()
FAIL METAFLOW_FLOW_CONFIG_VALUE in step env
Problem: METAFLOW_FLOW_CONFIG_VALUE not found in my_scheduler_deployer.py
Fix: Extract at compile time: from metaflow.flowspec import FlowStateItems; ...
PASS --branch passed to step commands
PASS retry_count reads from attempt, not hardcoded to 0
PASS DATASTORE_SYSROOT captured at compile time
PASS ENVIRONMENT_TYPE passed to step command
PASS from_deployment handles dotted names
Results: 8 passed, 1 failed
3. Test (one-command compliance suite)
python -m metaflow_orchestrator_kit.test \
--scheduler-type my_scheduler \
--deploy-args host=http://localhost:8000,token=abc123
# Full options:
python -m metaflow_orchestrator_kit.test \
--scheduler-type windmill \
--deploy-args windmill_host=http://localhost:8000,windmill_token=abc123 \
--metaflow-src /path/to/metaflow \
--test-modules compliance,basic,config,dag \
--workers 4
# equivalent:
metaflow-orchestrator-test --scheduler-type windmill --deploy-args ...
This command:
- Writes
ux_test_config_generated.yamlautomatically - Finds the test flows (from installed metaflow or from
--metaflow-src) - Runs the compliance tests
- Reports a clear pass/fail summary
4. Declare capabilities
from metaflow_orchestrator_kit import Cap, REQUIRED
# REQUIRED is the minimum set every orchestrator must pass.
# Add optional capabilities your scheduler actually supports.
SUPPORTED_CAPABILITIES = REQUIRED | {Cap.NESTED_FOREACH, Cap.SCHEDULE}
5. Wire compliance into CI
- name: Run compliance tests
run: |
python -m metaflow_orchestrator_kit.test \
--scheduler-type my_scheduler \
--deploy-args host=${{ secrets.SCHEDULER_HOST }}
Or use the generated ux-tests-my_scheduler.yml as a starting point.
Extension package layout
your_package/
metaflow_extensions/
my_scheduler/
plugins/
mfextinit_my_scheduler.py <- extension registration
my_scheduler/
my_scheduler_deployer.py <- DeployerImpl subclass
my_scheduler_objects.py <- DeployedFlow / TriggeredRun
my_scheduler_cli.py <- CLI group
After pip install -e ., Deployer(flow_file).my_scheduler(...) is available with no other registration needed. Metaflow discovers mfextinit_*.py automatically.
Common pitfalls
Every new orchestrator implementation hits the same bugs. The scaffold pre-solves most of them; the validator catches the rest before CI.
1. run_params tuple vs list (Cap.RUN_PARAMS) — Click's multi-value options return tuples. Passing a tuple to trigger() causes TypeError when two or more params are given. Fix: run_params = list(run_params) if run_params else [].
2. --branch not forwarded to step subprocesses (Cap.PROJECT_BRANCH) — @project reads current.branch_name from the --branch flag at step runtime. Without it, all step tasks produce an empty branch name. Fix: include --branch <branch> in every step command the scheduler launches.
3. METAFLOW_FLOW_CONFIG_VALUE missing from container env (Cap.CONFIG_EXPR) — @config and @project use this env var to reconstruct the config dict at task runtime. Without it, tasks run with empty config. Fix: read flow._flow_state[FlowStateItems.CONFIGS] at compile time and JSON-serialize it into the container environment.
4. retry_count hardcoded to 0 (Cap.RETRY) — Metaflow's @retry uses the attempt number to decide whether to retry. Hardcoding 0 means the flow always sees attempt 0 and never retries. Fix: derive from the scheduler's native counter (AWS_BATCH_JOB_ATTEMPT, Kubernetes restartCount, Airflow try_number - 1, etc.).
5. from_deployment() fails on dotted names (Cap.FROM_DEPLOYMENT) — DAG IDs for @project-decorated flows are dotted: project.branch.FlowName. Using the full string as a Python class name raises SyntaxError. Fix: flow_name = identifier.split(".")[-1].
6. @conda broken for in-process executors (Cap.CONDA) — @conda uses a class-level _metaflow_home that is only set in runtime_init(). For subprocess-based orchestrators this is called automatically when the step subprocess re-enters the Metaflow runtime. For in-process executors (Dagster execute_job(), Windmill sync functions) runtime_init() is never called, leaving conda packages absent from PYTHONPATH and steps failing with ModuleNotFoundError. Fix: wrap the step command with ["conda", "run", "--no-capture-output", "-n", conda_env_name, "python", ...]. For subprocess-based orchestrators, pass --environment conda to the step command (already in the generated scaffold).
7. Extension not auto-discovered (Deployer missing .my_scheduler()) — Deployer(flow_file).my_scheduler() raises AttributeError with no indication why. Metaflow reads DEPLOYER_IMPL_PROVIDERS_DESC from mfextinit_<name>.py; if it's missing, misnamed, in the wrong directory, or the descriptor is malformed, the deployer is silently not registered. Fix: ensure mfextinit_<name>.py lives at metaflow_extensions/<name>/plugins/ and DEPLOYER_IMPL_PROVIDERS_DESC = [("<name>", ".<name>.<name>_deployer.<Class>DeployerImpl")]. Run python -m metaflow_orchestrator_kit.validate . to catch this before CI.
8. Docker workers do not share /tmp between steps; bash input passing varies by scheduler — Each step in a Docker-worker scheduler (Windmill, Prefect, Argo) runs in a separate container. Files written to /tmp in the init step (e.g. a run_id file) are NOT visible to the next step container.
Additionally, different schedulers pass inputs to bash scripts in different ways:
- Some schedulers inject inputs as environment variables (
$MY_INPUT) - Windmill CE injects inputs via a prepended shell preamble (
my_input="value"prepended before the script). This is a shell variable, not an exported env var. If your script then runsexport SOME_OTHER_VAR=..., it may shadow the prepended variable. This causes inputs to appear empty even though the scheduler shows them as passed correctly.
Options for reliable inter-step data passing in Docker schedulers:
- Use
same_worker: true(Windmill-specific): forces all steps in the flow to run in the same worker container, so/tmpis shared. This is the most reliable option for local integration testing. - Use the scheduler's native return-value mechanism: some schedulers let bash modules return values (e.g. Windmill stdout) that subsequent modules receive as inputs. Test with a minimal script before relying on this pattern.
- Use an external store: write the run ID to a shared key-value store (Redis, database) and read it from subsequent steps.
8b. Docker-based workers cannot reach the local filesystem (host paths) — Schedulers that run workers in Docker containers (Windmill, Prefect, Argo) isolate the worker filesystem from the host. The step command uses the absolute host path to the flow file (e.g. /Users/me/project/flow.py), but that path does not exist inside the container. The same applies to METAFLOW_DATASTORE_SYSROOT_LOCAL: if the sysroot path is a host-local directory, the worker writes to a different directory than the deployer reads from, so wait_for_deployed_run() polls forever.
Recommended fix (production): Build a custom worker Docker image that has Metaflow installed via pip install metaflow and use a shared object store (S3/MinIO) as the datastore. This avoids all filesystem sharing problems.
Quick fix (local devstack only): Add volume mounts to your docker-compose worker service and set PYTHONPATH:
volumes:
- /Users:/Users # macOS — use /home:/home on Linux
- /tmp:/tmp
Warning: Volume mounts expose your entire host user directory to the container, including your conda site-packages. Python running inside the container will discover and load all Metaflow extensions installed on the host — including any internal/private extensions that depend on services not available inside the container (e.g. a service metadata provider that requires an internal API). This causes cryptic failures like Cannot locate metadata_provider plugin 'service'. Mitigation: set PYTHONPATH to only the OSS metaflow source, not your full site-packages path, and do NOT include the extension package itself in PYTHONPATH:
# In the bash script emitted by the compiler (wrong):
export PYTHONPATH=/path/to/metaflow:/path/to/metaflow-myscheduler
# Correct: only the core source, not extension packages
export PYTHONPATH=/path/to/metaflow
If you still see extension-loading failures, the container's Python may discover metaflow_extensions/ directories within the mounted source tree. The safest solution for local testing is to install Metaflow inside the worker container's init script:
pip install metaflow requests # in the step's bash preamble
9. METAFLOW_DATASTORE_SYSROOT_LOCAL vs --datastore-root use different path formats — METAFLOW_DATASTORE_SYSROOT_LOCAL is the sysroot (parent directory, no .metaflow suffix). The --datastore-root CLI argument to step and init commands is the full datastore path which includes the .metaflow subdirectory that LocalStorage appends. Using the same value for both causes steps to write to a double-nested path (.metaflow/.metaflow):
# Correct (these are different values):
export METAFLOW_DATASTORE_SYSROOT_LOCAL='/tmp/mytest' # no .metaflow
python3 flow.py step start ... --datastore-root /tmp/mytest/.metaflow # with .metaflow
# Wrong (do not reuse the same value):
export METAFLOW_DATASTORE_SYSROOT_LOCAL='/tmp/mytest/.metaflow' # double .metaflow on read
python3 flow.py step start ... --datastore-root /tmp/mytest/.metaflow
At compile time, use os.environ.get("METAFLOW_DATASTORE_SYSROOT_LOCAL") for the env var and sysroot + "/.metaflow" for the --datastore-root argument.
10. Every step including start requires --input-paths — Without --input-paths, the step subprocess fails with UnboundLocalError: cannot access local variable 'inputs'. This is required for every step, not just foreach splits. The format is {run_id}/{parent_step}/{parent_task_id}. The start step's parent is the _parameters task created by init:
# start step input (parent is init):
--input-paths "${run_id}/_parameters/1"
# other steps (parent is previous step, task_id 1):
--input-paths "${run_id}/{parent_step}/1"
# join steps (multiple parents, comma-separated):
--input-paths "${run_id}/branch_a/1,${run_id}/branch_b/1"
10. --run-param is not an OSS Metaflow init option; use METAFLOW_PARAMETER_<NAME> env vars instead — Some internal Metaflow forks added --run-param "name=value" to the init command to pre-populate parameters. OSS Metaflow's init does not have this option; passing it causes Error: no such option: --run-param. In OSS Metaflow, flow parameters are passed to the start step via METAFLOW_PARAMETER_<NAME> environment variables, not via --run-param in init:
# Wrong (NFLX-only):
python flow.py init --run-id $RUN_ID --task-id 1 --run-param "alpha=0.9"
# Correct (OSS): set env vars for the start step container instead:
export METAFLOW_PARAMETER_ALPHA="0.9"
python flow.py step start --run-id $RUN_ID --task-id 1 ...
Do not include --run-param in the init step command if your extension must work with OSS Metaflow.
10. init command missing --task-id — The Metaflow init subcommand requires --task-id in OSS Metaflow (some internal forks made it optional). If you generate an init script without --task-id, the init step fails with Error: Missing option '--task-id'. Fix: always include --task-id 1 in the init command (the init step always runs as task 1):
# Wrong — missing --task-id:
python flow.py init --run-id $RUN_ID
# Correct:
python flow.py init --run-id $RUN_ID --task-id 1
11. --tag passed as a global CLI flag instead of a subcommand argument — Metaflow's flow CLI does not accept --tag as a top-level global option. --tag is only valid after the subcommand (step, run, init). Passing it before the subcommand causes Error: no such option: --tag inside the Docker worker. Fix: append --tag <value> to the command list after "step", "step_name", not before:
# Wrong — --tag before "step":
cmd = [python, flow, "--no-pylint", "--tag", tag, "step", step_name, ...]
# Correct — --tag after the step subcommand:
cmd = [python, flow, "--no-pylint", "step", step_name, "--tag", tag, ...]
10. Scheduler auth tokens expire — If your scheduler issues short-lived auth tokens (Windmill, Kestra), tests that start a long-running deploy+trigger sequence may fail with 401 on the trigger API call because the token used at create() time has expired by the time trigger() is called. Fix: either use long-lived tokens (service account tokens in Windmill: Settings > Users & Tokens > Tokens > Add token with no expiry), or fetch a fresh token at the start of each trigger() call rather than caching the token from create().
12. trigger CLI command must write deployer_attribute_file BEFORE executing steps — The Metaflow Deployer API calls the trigger CLI command as a subprocess and waits for deployer_attribute_file to appear (via handle_timeout, default 3600s). For orchestrators that execute steps directly in the trigger subprocess (Mage, Dagster direct execution), the trigger command typically writes the attribute file AFTER all steps complete. This causes the deployer's trigger() call to block for the entire flow execution time. If the test timeout (pytest --timeout) is shorter than the flow execution time, the subprocess is killed before writing the file and the test fails with Timeout. Fix: write deployer_attribute_file BEFORE executing the steps, then run steps in the background or synchronously afterward. This way handle_timeout returns immediately with the run pathspec, and wait_for_deployed_run() polls for completion:
# In the trigger CLI command:
run_id = "myscheduler-" + uuid.uuid4().hex[:12]
pathspec = f"{flow_name}/{run_id}"
# WRITE THE FILE FIRST before running any steps.
# This unblocks Deployer.trigger() immediately.
if deployer_attribute_file:
with open(deployer_attribute_file, "w") as f:
json.dump({"pathspec": pathspec}, f)
# THEN execute the steps (synchronously or async).
_execute_flow_steps(run_id, ...)
13. Scheduler internal indexing delay after workflow creation — Some schedulers (Mage, Prefect, Windmill) index or cache newly-created pipelines/DAGs asynchronously. If you make a second API call immediately after the creation POST (e.g. creating a schedule, listing runs, or triggering), the scheduler may return 500 or 'NoneType' object has no attribute 'uuid' because the pipeline is not yet in the cache. Fix: add a short delay between _create_pipeline() / _compile_workflow() and any subsequent API call that references the newly-created resource. A 1–2 second sleep is enough for most schedulers. If the second call is not strictly required for the trigger to work (e.g. schedule creation for Mage), make it optional and catch failures gracefully:
try:
schedule_id = _create_api_trigger(client, pipeline_uuid)
except Exception:
schedule_id = None # non-fatal: trigger works without a registered schedule
Capabilities
Required — every orchestrator must pass these
| Capability | What it means |
|---|---|
Cap.LINEAR_DAG |
start → one or more steps → end |
Cap.BRANCHING |
static split/join: parallel branches merged at a join step |
Cap.FOREACH |
dynamic fan-out: foreach produces N tasks at runtime |
Cap.RETRY |
real attempt count from scheduler's native counter (not hardcoded 0) |
Cap.CATCH |
@catch decorator: catch step exception and continue the flow |
Cap.TIMEOUT |
@timeout decorator: abort a step that exceeds the time limit |
Cap.RESOURCES |
@resources passthrough: CPU/memory hints forwarded to scheduler |
Cap.PROJECT_BRANCH |
--branch forwarded to every step subprocess, not just start |
Cap.CONFIG_EXPR |
METAFLOW_FLOW_CONFIG_VALUE injected into every container/subprocess |
Cap.RUN_PARAMS |
trigger() run params as list, not tuple |
Cap.FROM_DEPLOYMENT |
from_deployment(identifier) handles dotted names (project.branch.FlowName) |
Optional — implement or explicitly declare unsupported
| Capability | What it means |
|---|---|
Cap.NESTED_FOREACH |
foreach inside foreach |
Cap.CONDA |
@conda environment creation at task runtime |
Cap.RESUME |
ORIGIN_RUN_ID resume: re-run from a previously failed step |
Cap.SCHEDULE |
@schedule cron trigger |
Example ux_test_config.yaml
backends:
- name: my-scheduler
scheduler_type: my_scheduler
cluster: null
decospec: null
deploy_args:
host: http://localhost:8080
enabled: true
Development
git clone https://github.com/npow/metaflow-orchestrator-kit
cd metaflow-orchestrator-kit
pip install -e ".[dev]"
pytest
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file metaflow_orchestrator_kit-0.1.0.tar.gz.
File metadata
- Download URL: metaflow_orchestrator_kit-0.1.0.tar.gz
- Upload date:
- Size: 53.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fe8080e9b68753ae76c90c281bfd621b4ba2c05adf99032bdf2a06812c8ce86f
|
|
| MD5 |
b81c0489f64a875b9ab5484bba021da6
|
|
| BLAKE2b-256 |
69e3469c7ad424d53620c2f84058946d32a5983530ba040528647afe71c237ec
|
Provenance
The following attestation bundles were made for metaflow_orchestrator_kit-0.1.0.tar.gz:
Publisher:
publish.yml on npow/metaflow-orchestrator-kit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
metaflow_orchestrator_kit-0.1.0.tar.gz -
Subject digest:
fe8080e9b68753ae76c90c281bfd621b4ba2c05adf99032bdf2a06812c8ce86f - Sigstore transparency entry: 1062786093
- Sigstore integration time:
-
Permalink:
npow/metaflow-orchestrator-kit@3b450c15d80bafadc0ad996ab883df2327310d3a -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/npow
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@3b450c15d80bafadc0ad996ab883df2327310d3a -
Trigger Event:
push
-
Statement type:
File details
Details for the file metaflow_orchestrator_kit-0.1.0-py3-none-any.whl.
File metadata
- Download URL: metaflow_orchestrator_kit-0.1.0-py3-none-any.whl
- Upload date:
- Size: 52.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4a3d068aceb888d8ac5d7070de5fcc82fffb81240b57cade2415cb4de1321516
|
|
| MD5 |
91fcd3182c75325dfc9ae039acda4b01
|
|
| BLAKE2b-256 |
a811009668a9f5d86e408833ae1f92f96aa845ffbb0b4f456a9c4409781db11b
|
Provenance
The following attestation bundles were made for metaflow_orchestrator_kit-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on npow/metaflow-orchestrator-kit
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
metaflow_orchestrator_kit-0.1.0-py3-none-any.whl -
Subject digest:
4a3d068aceb888d8ac5d7070de5fcc82fffb81240b57cade2415cb4de1321516 - Sigstore transparency entry: 1062786173
- Sigstore integration time:
-
Permalink:
npow/metaflow-orchestrator-kit@3b450c15d80bafadc0ad996ab883df2327310d3a -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/npow
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@3b450c15d80bafadc0ad996ab883df2327310d3a -
Trigger Event:
push
-
Statement type: