Skip to main content

Run Metaflow flows as Flyte workflows

Project description

metaflow-flyte

CI E2E PyPI License: Apache-2.0 Python 3.10+

Schedule and monitor your Metaflow pipelines through Flyte without rewriting them.

The problem

You've built pipelines in Metaflow and now need Flyte's scheduling, UI, and observability — but rewriting your flows in Flytekit means losing Metaflow's versioning, artifact store, and local execution model. Running both side-by-side means maintaining two copies of every pipeline.

Quick start

pip install metaflow-flyte

# Generate the Flyte workflow file
python my_flow.py --datastore=s3 flyte create my_flow_remote.py \
    --image my-registry/my-image:latest

# Run locally (no cluster required)
pyflyte run my_flow_remote.py my_flow

# Register and run on a Flyte cluster
pyflyte register --project flytesnacks --domain development my_flow_remote.py
pyflyte run --remote --project flytesnacks --domain development \
    my_flow_remote.py my_flow

Install

pip install metaflow-flyte

Or from source:

git clone https://github.com/npow/metaflow-flyte.git
cd metaflow-flyte
pip install -e ".[dev]"

Usage

Generate and register a Flyte workflow

python my_flow.py --datastore=s3 flyte create my_flow_remote.py \
    --image my-registry/my-image:latest

pyflyte register --project myproject --domain development my_flow_remote.py

All graph shapes are supported

# Linear
class SimpleFlow(FlowSpec):
    @step
    def start(self):
        self.value = 42
        self.next(self.end)
    @step
    def end(self): pass

# Split/join (branch)
class BranchFlow(FlowSpec):
    @step
    def start(self):
        self.next(self.branch_a, self.branch_b)
    ...

# Foreach fan-out
class ForeachFlow(FlowSpec):
    @step
    def start(self):
        self.items = [1, 2, 3]
        self.next(self.process, foreach="items")
    ...

Parametrised flows

Parameters defined with metaflow.Parameter are forwarded automatically as Flyte workflow arguments:

python param_flow.py --datastore=s3 flyte create param_flow_remote.py \
    --image my-registry/my-image:latest

Pass parameters at runtime:

pyflyte run --remote ... param_flow_remote.py param_flow --greeting "Hello"

Step decorators (--with)

Inject Metaflow step decorators at deploy time without modifying the flow source:

python my_flow.py --datastore=s3 flyte create my_flow_remote.py \
    --image my-registry/my-image:latest \
    --with=kubernetes:cpu=4,memory=8000

Retries

@retry on any step is picked up automatically. The generated Flyte task gets the corresponding retries parameter:

class MyFlow(FlowSpec):
    @retry(times=3)
    @step
    def train(self):
        ...

Scheduled flows

If your flow has a @schedule decorator, the generated file includes a Flyte LaunchPlan with the corresponding cron schedule automatically.

Project namespace

If the flow uses @project(name=...), the Flyte project is automatically used:

@project(name="recommendations")
class TrainFlow(FlowSpec): ...

How it works

metaflow-flyte compiles your Metaflow flow's DAG into a self-contained Flyte workflow file. Each Metaflow step becomes a @task. The generated file:

  • runs each step as a subprocess via the standard metaflow step CLI
  • derives a stable run_id from the Flyte execution ID so all steps share one Metaflow run
  • passes --input-paths correctly for joins and foreach fan-outs
  • emits Metaflow artifact retrieval snippets to the Flyte UI Deck after each step

Executions list

All workflow executions are visible in the Flyte console with status, duration, and launch plan:

Flyte console showing multiple succeeded executions

Linear flow

A simple 3-step linear flow (start → process → end) runs as 4 Flyte tasks — one to generate the shared run ID, then one per Metaflow step:

Linear flow execution with all tasks succeeded

Branch flow

Branch flows with split/join (start → branch_a + branch_b → join → end) run the parallel steps concurrently as separate Flyte tasks:

Branch flow execution showing parallel steps

Foreach flow

Foreach fan-outs use a Flyte @dynamic task to spawn one task per item at runtime. The _foreach_*_dynamic Sub-Workflow node fans out and collects results:

Foreach flow execution showing dynamic sub-workflow

Task detail and Flyte Deck

Click any task in the execution view to open the detail panel. Each Metaflow step produces a Flyte Deck accessible via the "Flyte Deck" button:

Task detail panel showing Flyte Deck button

Metaflow artifact retrieval

The metaflow tab in the Flyte Deck shows the exact Python code to retrieve each artifact from this specific task — using the full FlowName/run_id/step_name/task_id pathspec:

Metaflow deck tab showing artifact retrieval code

For tasks that produce multiple artifacts, each one is listed with its access expression:

Metaflow deck showing message and result artifacts

Parametrised flows show the parameter values alongside the artifacts:

Param flow deck showing greeting and message artifacts

# Retrieve artifacts from any completed Metaflow step
from metaflow import Task
task = Task('LinearFlow/flyte-atlw559q7zhbg2mw92sq/process/62e7a9511b5646b6')

task.data.message   # access the 'message' artifact
task.data.result    # access the 'result' artifact

Configuration

The generated file bakes in datastore and image settings at creation time so every task subprocess uses the same configuration.

# Use S3 datastore with a custom image
python my_flow.py \
  --datastore=s3 \
  flyte create my_flow_remote.py \
  --image my-registry/my-image:latest \
  --project flytesnacks \
  --domain development

Docker image requirements

Your Docker image must contain:

  1. The flow Python file at the same absolute path as on your local machine
  2. All Python dependencies (metaflow, flytekit, boto3, etc.)
  3. USERNAME environment variable set (e.g. ENV USERNAME=metaflow)
  4. S3/datastore credentials and endpoint configuration

Example Dockerfile:

FROM python:3.11-slim
RUN pip install "metaflow>=2.9" "flytekit>=1.10" "boto3>=1.26"
COPY my_flow.py /path/to/my_flow.py
ENV USERNAME=metaflow

Development

git clone https://github.com/npow/metaflow-flyte.git
cd metaflow-flyte
pip install -e ".[dev]"

# Compilation tests only (fast, ~25s)
pytest tests/ -m "not integration and not e2e"

# Compilation + local pyflyte run (~3 min)
pytest tests/ -m "not e2e"

# E2e against a real Flyte cluster (see below)
pytest tests/test_e2e_remote.py -v --timeout=300

The test suite covers three tiers:

  • Tier 1: compile all graph shapes → assert generated file content
  • Tier 2: pyflyte run locally (in-process) → verify Metaflow artifacts written to disk
  • Tier 3: pyflyte run --remote against a live cluster → real Flyte task pods, real S3 artifacts

Running Tier 3 locally:

# Start a local Flyte sandbox
flytectl demo start

# Build and tag the task container image
docker build -f Dockerfile.e2e -t localhost:30000/metaflow-flyte-e2e:dev .
docker push localhost:30000/metaflow-flyte-e2e:dev

# Copy flows to the stable container path and run
sudo mkdir -p /app/tests/flows && sudo cp tests/flows/*.py /app/tests/flows/

E2E_FLOWS_DIR=/app/tests/flows \
FLYTE_TEST_IMAGE=localhost:30000/metaflow-flyte-e2e:dev \
METAFLOW_DEFAULT_DATASTORE=s3 \
METAFLOW_DATASTORE_SYSROOT_S3=s3://my-s3-bucket/metaflow/ \
METAFLOW_S3_ENDPOINT_URL=http://localhost:30002 \
AWS_ACCESS_KEY_ID=minio AWS_SECRET_ACCESS_KEY=miniostorage AWS_DEFAULT_REGION=us-east-1 \
pytest tests/test_e2e_remote.py -v --timeout=300

Tier 3 also runs automatically on every push to main via the E2E workflow.

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

metaflow_flyte-0.4.1.tar.gz (40.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

metaflow_flyte-0.4.1-py3-none-any.whl (31.7 kB view details)

Uploaded Python 3

File details

Details for the file metaflow_flyte-0.4.1.tar.gz.

File metadata

  • Download URL: metaflow_flyte-0.4.1.tar.gz
  • Upload date:
  • Size: 40.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for metaflow_flyte-0.4.1.tar.gz
Algorithm Hash digest
SHA256 f20a7b5af695fa63880113ac1063296cfa961a642eec82c42394c5d11a1940a3
MD5 e1a7f37a0b68fe0137caf8e22a7c28fd
BLAKE2b-256 6b2fe85c03244ade1873ace08729804bc006fed7b6bc293feaa39bcc985c5cb3

See more details on using hashes here.

Provenance

The following attestation bundles were made for metaflow_flyte-0.4.1.tar.gz:

Publisher: publish.yml on npow/metaflow-flyte

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file metaflow_flyte-0.4.1-py3-none-any.whl.

File metadata

  • Download URL: metaflow_flyte-0.4.1-py3-none-any.whl
  • Upload date:
  • Size: 31.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for metaflow_flyte-0.4.1-py3-none-any.whl
Algorithm Hash digest
SHA256 dcece34e27742c0dde95a5c31b4934c9750bf86cce3236e3b1027d9a59f27ea4
MD5 163a0b29610436fc78705c55d7f96e6d
BLAKE2b-256 c3779007a4f975b34feebf50d630fc38cce1a9c63541b772cfc2c29346c92bab

See more details on using hashes here.

Provenance

The following attestation bundles were made for metaflow_flyte-0.4.1-py3-none-any.whl:

Publisher: publish.yml on npow/metaflow-flyte

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page