Skip to main content

Development kit for building Metaflow orchestrator extensions

Project description

metaflow-orchestrator-kit

CI PyPI License: Apache-2.0 Python 3.8+

Build a correct Metaflow orchestrator the first time — scaffold, declare capabilities, and prove compliance.

The problem

You want to integrate a new scheduler with Metaflow. You know you need to implement a DeployerImpl, a DeployedFlow, and a TriggeredRun. But Metaflow's orchestrator contract is not written down anywhere. There is no spec listing what your deployer must handle, no test suite to run against your implementation, and no scaffold to get you started. You figure it out by reading existing orchestrator code and hitting a handful of non-obvious bugs that every new orchestrator author rediscovers independently: retry counts hardcoded to zero, --branch missing from step subprocesses, config env vars absent from containers. This kit is the missing artifact: a written contract, a compliance test suite you can run locally, and a scaffold that pre-fills every requirement so none of them are easy to skip.

5-minute quickstart

# 1. Install
pip install metaflow-orchestrator-kit

# 2. Scaffold
python -m metaflow_orchestrator_kit.scaffold my_scheduler
cd my_scheduler/

# 3. Fill in the scheduler-specific parts (all marked with # TODO: SCHEDULER API)
#    - my_scheduler_deployer.py: _compile_workflow(), _build_step_command()
#    - my_scheduler_objects.py:  trigger(), from_deployment(), status property
#    - my_scheduler_cli.py:      create(), trigger()

# 4. Validate (catches all known pitfalls without running tests)
python -m metaflow_orchestrator_kit.validate ./

# 5. Test (one command — writes config, finds flows, runs compliance suite)
python -m metaflow_orchestrator_kit.test \
  --scheduler-type my_scheduler \
  --deploy-args host=http://localhost:8000

# 6. If all green: set up GHA with the generated ux-tests-my_scheduler.yml

What the scaffold generates

python -m metaflow_orchestrator_kit.scaffold my_scheduler

Creates ./my_scheduler/ containing:

my_scheduler_deployer.py    DeployerImpl subclass — all required plumbing pre-solved
my_scheduler_objects.py     DeployedFlow / TriggeredRun subclasses
my_scheduler_cli.py         CLI entry-point group
mfextinit_my_scheduler.py   Extension registration (auto-discovered by Metaflow)
ux-tests-my_scheduler.yml   GitHub Actions workflow skeleton

The generated _build_step_command() includes all pre-solved Metaflow plumbing. You fill in only the scheduler API calls:

# Generated deployer — pre-solved (do not change these)
required_env = {
    "METAFLOW_FLOW_CONFIG_VALUE": flow_config_value or "",   # Cap.CONFIG_EXPR
    "METAFLOW_DATASTORE_SYSROOT_LOCAL": datastore_sysroot,
    "METAFLOW_SERVICE_URL": os.environ.get("METAFLOW_SERVICE_URL", ""),
    "PATH": os.environ.get("PATH", ""),
}

def _build_step_command(self, step_name, run_id, task_id, input_paths,
                         branch=None, retry_count=0, environment_type="local"):
    cmd = [sys.executable, flow_file, "--no-pylint", "--environment", environment_type]
    if branch:
        cmd += ["--branch", branch]   # Cap.PROJECT_BRANCH
    cmd += ["step", step_name, "--run-id", run_id, "--task-id", task_id,
            "--retry-count", str(retry_count),   # TODO: SCHEDULER API — replace 0
            "--input-paths", input_paths]
    return cmd

Install

pip install metaflow-orchestrator-kit
# with dev dependencies:
pip install "metaflow-orchestrator-kit[dev]"

Usage

1. Scaffold

python -m metaflow_orchestrator_kit.scaffold my_scheduler [output_dir]
# equivalent short form:
metaflow-orchestrator-scaffold my_scheduler

2. Validate (static analysis — no scheduler needed)

python -m metaflow_orchestrator_kit.validate ./my_scheduler/
# equivalent:
metaflow-orchestrator-validate ./my_scheduler/

Example output:

Validating: /path/to/my_scheduler/

  PASS  mfextinit_<name>.py exists
  PASS  DEPLOYER_IMPL_PROVIDERS_DESC has correct structure
  PASS  run_params uses list() not tuple()
  FAIL  METAFLOW_FLOW_CONFIG_VALUE in step env
        Problem: METAFLOW_FLOW_CONFIG_VALUE not found in my_scheduler_deployer.py
        Fix: Extract at compile time: from metaflow.flowspec import FlowStateItems; ...
  PASS  --branch passed to step commands
  PASS  retry_count reads from attempt, not hardcoded to 0
  PASS  DATASTORE_SYSROOT captured at compile time
  PASS  ENVIRONMENT_TYPE passed to step command
  PASS  from_deployment handles dotted names

Results: 8 passed, 1 failed

3. Test (one-command compliance suite)

python -m metaflow_orchestrator_kit.test \
  --scheduler-type my_scheduler \
  --deploy-args host=http://localhost:8000,token=abc123

# Full options:
python -m metaflow_orchestrator_kit.test \
  --scheduler-type windmill \
  --deploy-args windmill_host=http://localhost:8000,windmill_token=abc123 \
  --metaflow-src /path/to/metaflow \
  --test-modules compliance,basic,config,dag \
  --workers 4
# equivalent:
metaflow-orchestrator-test --scheduler-type windmill --deploy-args ...

This command:

  1. Writes ux_test_config_generated.yaml automatically
  2. Finds the test flows (from installed metaflow or from --metaflow-src)
  3. Runs the compliance tests
  4. Reports a clear pass/fail summary

4. Declare capabilities

from metaflow_orchestrator_kit import Cap, REQUIRED

# REQUIRED is the minimum set every orchestrator must pass.
# Add optional capabilities your scheduler actually supports.
SUPPORTED_CAPABILITIES = REQUIRED | {Cap.NESTED_FOREACH, Cap.SCHEDULE}

5. Wire compliance into CI

- name: Run compliance tests
  run: |
    python -m metaflow_orchestrator_kit.test \
      --scheduler-type my_scheduler \
      --deploy-args host=${{ secrets.SCHEDULER_HOST }}

Or use the generated ux-tests-my_scheduler.yml as a starting point.

Extension package layout

your_package/
  metaflow_extensions/
    my_scheduler/
      plugins/
        mfextinit_my_scheduler.py     <- extension registration
        my_scheduler/
          my_scheduler_deployer.py    <- DeployerImpl subclass
          my_scheduler_objects.py     <- DeployedFlow / TriggeredRun
          my_scheduler_cli.py         <- CLI group

After pip install -e ., Deployer(flow_file).my_scheduler(...) is available with no other registration needed. Metaflow discovers mfextinit_*.py automatically.

Common pitfalls

Every new orchestrator implementation hits the same bugs. The scaffold pre-solves most of them; the validator catches the rest before CI.

1. run_params tuple vs list (Cap.RUN_PARAMS) — Click's multi-value options return tuples. Passing a tuple to trigger() causes TypeError when two or more params are given. Fix: run_params = list(run_params) if run_params else [].

2. --branch not forwarded to step subprocesses (Cap.PROJECT_BRANCH)@project reads current.branch_name from the --branch flag at step runtime. Without it, all step tasks produce an empty branch name. Fix: include --branch <branch> in every step command the scheduler launches.

3. METAFLOW_FLOW_CONFIG_VALUE missing from container env (Cap.CONFIG_EXPR)@config and @project use this env var to reconstruct the config dict at task runtime. Without it, tasks run with empty config. Fix: read flow._flow_state[FlowStateItems.CONFIGS] at compile time and JSON-serialize it into the container environment.

4. retry_count hardcoded to 0 (Cap.RETRY) — Metaflow's @retry uses the attempt number to decide whether to retry. Hardcoding 0 means the flow always sees attempt 0 and never retries. Fix: derive from the scheduler's native counter (AWS_BATCH_JOB_ATTEMPT, Kubernetes restartCount, Airflow try_number - 1, etc.).

5. from_deployment() fails on dotted names (Cap.FROM_DEPLOYMENT) — DAG IDs for @project-decorated flows are dotted: project.branch.FlowName. Using the full string as a Python class name raises SyntaxError. Fix: flow_name = identifier.split(".")[-1].

6. @conda broken for in-process executors (Cap.CONDA)@conda uses a class-level _metaflow_home that is only set in runtime_init(). For subprocess-based orchestrators this is called automatically when the step subprocess re-enters the Metaflow runtime. For in-process executors (Dagster execute_job(), Windmill sync functions) runtime_init() is never called, leaving conda packages absent from PYTHONPATH and steps failing with ModuleNotFoundError. Fix: wrap the step command with ["conda", "run", "--no-capture-output", "-n", conda_env_name, "python", ...]. For subprocess-based orchestrators, pass --environment conda to the step command (already in the generated scaffold).

7. Extension not auto-discovered (Deployer missing .my_scheduler())Deployer(flow_file).my_scheduler() raises AttributeError with no indication why. Metaflow reads DEPLOYER_IMPL_PROVIDERS_DESC from mfextinit_<name>.py; if it's missing, misnamed, in the wrong directory, or the descriptor is malformed, the deployer is silently not registered. Fix: ensure mfextinit_<name>.py lives at metaflow_extensions/<name>/plugins/ and DEPLOYER_IMPL_PROVIDERS_DESC = [("<name>", ".<name>.<name>_deployer.<Class>DeployerImpl")]. Run python -m metaflow_orchestrator_kit.validate . to catch this before CI.

8. Docker workers do not share /tmp between steps; bash input passing varies by scheduler — Each step in a Docker-worker scheduler (Windmill, Prefect, Argo) runs in a separate container. Files written to /tmp in the init step (e.g. a run_id file) are NOT visible to the next step container.

Additionally, different schedulers pass inputs to bash scripts in different ways:

  • Some schedulers inject inputs as environment variables ($MY_INPUT)
  • Windmill CE injects inputs via a prepended shell preamble (my_input="value" prepended before the script). This is a shell variable, not an exported env var. If your script then runs export SOME_OTHER_VAR=..., it may shadow the prepended variable. This causes inputs to appear empty even though the scheduler shows them as passed correctly.

Options for reliable inter-step data passing in Docker schedulers:

  1. Use same_worker: true (Windmill-specific): forces all steps in the flow to run in the same worker container, so /tmp is shared. This is the most reliable option for local integration testing.
  2. Use the scheduler's native return-value mechanism: some schedulers let bash modules return values (e.g. Windmill stdout) that subsequent modules receive as inputs. Test with a minimal script before relying on this pattern.
  3. Use an external store: write the run ID to a shared key-value store (Redis, database) and read it from subsequent steps.

8b. Docker-based workers cannot reach the local filesystem (host paths) — Schedulers that run workers in Docker containers (Windmill, Prefect, Argo) isolate the worker filesystem from the host. The step command uses the absolute host path to the flow file (e.g. /Users/me/project/flow.py), but that path does not exist inside the container. The same applies to METAFLOW_DATASTORE_SYSROOT_LOCAL: if the sysroot path is a host-local directory, the worker writes to a different directory than the deployer reads from, so wait_for_deployed_run() polls forever.

Recommended fix (production): Build a custom worker Docker image that has Metaflow installed via pip install metaflow and use a shared object store (S3/MinIO) as the datastore. This avoids all filesystem sharing problems.

Quick fix (local devstack only): Add volume mounts to your docker-compose worker service and set PYTHONPATH:

volumes:
  - /Users:/Users   # macOS — use /home:/home on Linux
  - /tmp:/tmp

Warning: Volume mounts expose your entire host user directory to the container, including your conda site-packages. Python running inside the container will discover and load all Metaflow extensions installed on the host — including any internal/private extensions that depend on services not available inside the container (e.g. a service metadata provider that requires an internal API). This causes cryptic failures like Cannot locate metadata_provider plugin 'service'. Mitigation: set PYTHONPATH to only the OSS metaflow source, not your full site-packages path, and do NOT include the extension package itself in PYTHONPATH:

# In the bash script emitted by the compiler (wrong):
export PYTHONPATH=/path/to/metaflow:/path/to/metaflow-myscheduler

# Correct: only the core source, not extension packages
export PYTHONPATH=/path/to/metaflow

If you still see extension-loading failures, the container's Python may discover metaflow_extensions/ directories within the mounted source tree. The safest solution for local testing is to install Metaflow inside the worker container's init script:

pip install metaflow requests  # in the step's bash preamble

9. METAFLOW_DATASTORE_SYSROOT_LOCAL vs --datastore-root use different path formatsMETAFLOW_DATASTORE_SYSROOT_LOCAL is the sysroot (parent directory, no .metaflow suffix). The --datastore-root CLI argument to step and init commands is the full datastore path which includes the .metaflow subdirectory that LocalStorage appends. Using the same value for both causes steps to write to a double-nested path (.metaflow/.metaflow):

# Correct (these are different values):
export METAFLOW_DATASTORE_SYSROOT_LOCAL='/tmp/mytest'              # no .metaflow
python3 flow.py step start ... --datastore-root /tmp/mytest/.metaflow  # with .metaflow

# Wrong (do not reuse the same value):
export METAFLOW_DATASTORE_SYSROOT_LOCAL='/tmp/mytest/.metaflow'  # double .metaflow on read
python3 flow.py step start ... --datastore-root /tmp/mytest/.metaflow

At compile time, use os.environ.get("METAFLOW_DATASTORE_SYSROOT_LOCAL") for the env var and sysroot + "/.metaflow" for the --datastore-root argument.

10. Every step including start requires --input-paths — Without --input-paths, the step subprocess fails with UnboundLocalError: cannot access local variable 'inputs'. This is required for every step, not just foreach splits. The format is {run_id}/{parent_step}/{parent_task_id}. The start step's parent is the _parameters task created by init:

# start step input (parent is init):
--input-paths "${run_id}/_parameters/1"

# other steps (parent is previous step, task_id 1):
--input-paths "${run_id}/{parent_step}/1"

# join steps (multiple parents, comma-separated):
--input-paths "${run_id}/branch_a/1,${run_id}/branch_b/1"

10. --run-param is not an OSS Metaflow init option; use METAFLOW_PARAMETER_<NAME> env vars instead — Some internal Metaflow forks added --run-param "name=value" to the init command to pre-populate parameters. OSS Metaflow's init does not have this option; passing it causes Error: no such option: --run-param. In OSS Metaflow, flow parameters are passed to the start step via METAFLOW_PARAMETER_<NAME> environment variables, not via --run-param in init:

# Wrong (NFLX-only):
python flow.py init --run-id $RUN_ID --task-id 1 --run-param "alpha=0.9"

# Correct (OSS): set env vars for the start step container instead:
export METAFLOW_PARAMETER_ALPHA="0.9"
python flow.py step start --run-id $RUN_ID --task-id 1 ...

Do not include --run-param in the init step command if your extension must work with OSS Metaflow.

10. init command missing --task-id — The Metaflow init subcommand requires --task-id in OSS Metaflow (some internal forks made it optional). If you generate an init script without --task-id, the init step fails with Error: Missing option '--task-id'. Fix: always include --task-id 1 in the init command (the init step always runs as task 1):

# Wrong — missing --task-id:
python flow.py init --run-id $RUN_ID

# Correct:
python flow.py init --run-id $RUN_ID --task-id 1

11. --tag passed as a global CLI flag instead of a subcommand argument — Metaflow's flow CLI does not accept --tag as a top-level global option. --tag is only valid after the subcommand (step, run, init). Passing it before the subcommand causes Error: no such option: --tag inside the Docker worker. Fix: append --tag <value> to the command list after "step", "step_name", not before:

# Wrong — --tag before "step":
cmd = [python, flow, "--no-pylint", "--tag", tag, "step", step_name, ...]

# Correct — --tag after the step subcommand:
cmd = [python, flow, "--no-pylint", "step", step_name, "--tag", tag, ...]

10. Scheduler auth tokens expire — If your scheduler issues short-lived auth tokens (Windmill, Kestra), tests that start a long-running deploy+trigger sequence may fail with 401 on the trigger API call because the token used at create() time has expired by the time trigger() is called. Fix: either use long-lived tokens (service account tokens in Windmill: Settings > Users & Tokens > Tokens > Add token with no expiry), or fetch a fresh token at the start of each trigger() call rather than caching the token from create().

12. trigger CLI command must write deployer_attribute_file BEFORE executing steps — The Metaflow Deployer API calls the trigger CLI command as a subprocess and waits for deployer_attribute_file to appear (via handle_timeout, default 3600s). For orchestrators that execute steps directly in the trigger subprocess (Mage, Dagster direct execution), the trigger command typically writes the attribute file AFTER all steps complete. This causes the deployer's trigger() call to block for the entire flow execution time. If the test timeout (pytest --timeout) is shorter than the flow execution time, the subprocess is killed before writing the file and the test fails with Timeout. Fix: write deployer_attribute_file BEFORE executing the steps, then run steps in the background or synchronously afterward. This way handle_timeout returns immediately with the run pathspec, and wait_for_deployed_run() polls for completion:

# In the trigger CLI command:
run_id = "myscheduler-" + uuid.uuid4().hex[:12]
pathspec = f"{flow_name}/{run_id}"

# WRITE THE FILE FIRST before running any steps.
# This unblocks Deployer.trigger() immediately.
if deployer_attribute_file:
    with open(deployer_attribute_file, "w") as f:
        json.dump({"pathspec": pathspec}, f)

# THEN execute the steps (synchronously or async).
_execute_flow_steps(run_id, ...)

13. Scheduler internal indexing delay after workflow creation — Some schedulers (Mage, Prefect, Windmill) index or cache newly-created pipelines/DAGs asynchronously. If you make a second API call immediately after the creation POST (e.g. creating a schedule, listing runs, or triggering), the scheduler may return 500 or 'NoneType' object has no attribute 'uuid' because the pipeline is not yet in the cache. Fix: add a short delay between _create_pipeline() / _compile_workflow() and any subsequent API call that references the newly-created resource. A 1–2 second sleep is enough for most schedulers. If the second call is not strictly required for the trigger to work (e.g. schedule creation for Mage), make it optional and catch failures gracefully:

try:
    schedule_id = _create_api_trigger(client, pipeline_uuid)
except Exception:
    schedule_id = None  # non-fatal: trigger works without a registered schedule

Capabilities

Required — every orchestrator must pass these

Capability What it means
Cap.LINEAR_DAG start → one or more steps → end
Cap.BRANCHING static split/join: parallel branches merged at a join step
Cap.FOREACH dynamic fan-out: foreach produces N tasks at runtime
Cap.RETRY real attempt count from scheduler's native counter (not hardcoded 0)
Cap.CATCH @catch decorator: catch step exception and continue the flow
Cap.TIMEOUT @timeout decorator: abort a step that exceeds the time limit
Cap.RESOURCES @resources passthrough: CPU/memory hints forwarded to scheduler
Cap.PROJECT_BRANCH --branch forwarded to every step subprocess, not just start
Cap.CONFIG_EXPR METAFLOW_FLOW_CONFIG_VALUE injected into every container/subprocess
Cap.RUN_PARAMS trigger() run params as list, not tuple
Cap.FROM_DEPLOYMENT from_deployment(identifier) handles dotted names (project.branch.FlowName)

Optional — implement or explicitly declare unsupported

Capability What it means
Cap.NESTED_FOREACH foreach inside foreach
Cap.CONDA @conda environment creation at task runtime
Cap.RESUME ORIGIN_RUN_ID resume: re-run from a previously failed step
Cap.SCHEDULE @schedule cron trigger

Example ux_test_config.yaml

backends:
  - name: my-scheduler
    scheduler_type: my_scheduler
    cluster: null
    decospec: null
    deploy_args:
      host: http://localhost:8080
    enabled: true

Development

git clone https://github.com/npow/metaflow-orchestrator-kit
cd metaflow-orchestrator-kit
pip install -e ".[dev]"
pytest

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

metaflow_orchestrator_kit-0.1.0.tar.gz (53.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

metaflow_orchestrator_kit-0.1.0-py3-none-any.whl (52.6 kB view details)

Uploaded Python 3

File details

Details for the file metaflow_orchestrator_kit-0.1.0.tar.gz.

File metadata

File hashes

Hashes for metaflow_orchestrator_kit-0.1.0.tar.gz
Algorithm Hash digest
SHA256 fe8080e9b68753ae76c90c281bfd621b4ba2c05adf99032bdf2a06812c8ce86f
MD5 b81c0489f64a875b9ab5484bba021da6
BLAKE2b-256 69e3469c7ad424d53620c2f84058946d32a5983530ba040528647afe71c237ec

See more details on using hashes here.

Provenance

The following attestation bundles were made for metaflow_orchestrator_kit-0.1.0.tar.gz:

Publisher: publish.yml on npow/metaflow-orchestrator-kit

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file metaflow_orchestrator_kit-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for metaflow_orchestrator_kit-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4a3d068aceb888d8ac5d7070de5fcc82fffb81240b57cade2415cb4de1321516
MD5 91fcd3182c75325dfc9ae039acda4b01
BLAKE2b-256 a811009668a9f5d86e408833ae1f92f96aa845ffbb0b4f456a9c4409781db11b

See more details on using hashes here.

Provenance

The following attestation bundles were made for metaflow_orchestrator_kit-0.1.0-py3-none-any.whl:

Publisher: publish.yml on npow/metaflow-orchestrator-kit

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page