Metaflow Functions
Project description
Metaflow Functions
Metaflow Functions are a way to share computations with other systems. These computations could be any aspect of an ML pipeline, e.g. feature engineering, a model, a whole pipeline in your favorite framework, etc. It could contain your own custom modules and interfaces (e.g. all your custom code) alongside your favorite data science packages.
Background
Functions can be shared without the user of the function needing to know the details of:
- The environment (e.g. python packages),
- The code (e.g. your function and supporting files),
- The parameters (e.g. the weights, or configuration of a model, or details of your custom framework)
All these are provided from a Metaflow Step during training. A Function can be
rehydrated from its reference without worry of these details, only data must be
provided. This is a property we call relocatable computation. Different
runtimes (e.g. default python, spark, ray) can interpret a Metaflow
Function metadata and decide the best runtime implementation.
We provide a default one that works in pure python with no dependencies, with all the encapsulation features mentioned above. Think of a Metaflow Function's as an abstract UX for creating different types of relocatable computations for different user experiences. You write your code once, and we target a back-end that is suitable for your computation.
Metaflow Function UX (DataFrameFunction)
DataFrameFunction is a concrete implementation of a Metaflow
Function. It's a useful abstraction for processing tabular data, and
its meant to work with other tools Metaflow provides like Table, and
MetaflowDataFrame. You can ship around your tabular data processing
functions to external system. We are actively building bridges to
other paved path MLP provided systems like online serving and offline
serving, more to come on these integrations.
DataframeFunction Entities
There are three entities to represent a metaflow dataframe function and the composition of functions:
Note - These are user facing entities, internally these can be implemented as derived classes from common base classes, and additionally derived classes can be created with similar relationships.
-
dataframe_function: The raw, callable function. It's decorated with@dataframe_functionwhich enforces the type structure.@dataframe_function's must take as a first argument a MetaflowDataFrame (possibly typed) and as a second argument aFunctionParameters(Also possibly typed), which indicates the name and types parameters expected.FunctionParametersis required to be passed at the call point or have defaults. Calling this function will run as a normal function in the current environment. -
DataFrameFunction: A class that wraps a_dataframe_function. It represents an atomic execution unit, including its environment and arguments/parameters (e.g. provided from aTask), and packaging of the code for distribution to external systems. It represents a relocatable unit of computation where run time parameters are fixed and only data must be provided. It is a callable that takes only a single parameter, aMetaflowDataFrame, which executes the function on the dataframe in an isolated environment. -
DataFrameFunctionSystem:DataFrameFunctionSystemis a container that represents the composition of one or moreDataFrameFunction's. Implementations provide additional semantics to aid in the composition ofDataFrameFunction's. The result of encapsulatingDataFrameFunction's into a system is anotherDataFrameFunctionThe additional information to express relationships is represented as metadata and can be implemented by a run time.
dataframe_function decorator
from metaflow import MetaflowDataFrame as MDF
from metaflow import FunctionParameters as FP
@dataframe_function
def f0(data: MDF, params: FP = FP(constant=1)) -> MDF:
pandas_df = data.to_pandas()
pandas_df['count'] = pandas_df['count'] + params.constant
return MDF.from_pandas(pandas_df)
A function decorated with @dataframe_function must satisfy the following
conditions:
- The first argument must be a
MetaflowDataFrame(possibly typed) - The second argument must be a
FunctionParameters(also possibly typed). - The function must return a
MetaflowDataFrame(possibly typed).
The function is a callable and can be used just like any other function in python. The annotated function will have support for static type checking as well as runtime type checking. More on typing later.
output1 = f0(input_data)
output2 = f0(input_data, params=FP(constant=1))
DataFrameFunction class
DataFrameFunction constructor takes a function annotated
with dataframe_function and a task that:
- Provides arguments for the
FunctionParameters - Supplies the environment information that is necessary to operate on the
FunctionParameterswithin the function.
This class essentially binds the raw function to a task object and instantiates the function parameters with artifact information from the task. The binding can happen outside a flow as well as within a flow. It is also a callable and can be used as shown below:
# Bind within flow
from my_functions import f0
from metaflow import DataFrameFunction
class BindFlow(FlowSpec):
@step
def start(self):
self.constant = 1
self.next(self.end)
@step
def end(self):
task = ... #use client to get task
self.function = DataFrameFunction(f0, task=task)
# Bind outside flow
F0 = DataFrameFunction(f0, task=task)
The DataFrameFunction is also a callable. Unlike the raw function, it
will only take a single argument, a MetaflowDataFrame, and during execution,
it will execute the function in an isolated environment defined by the
task object. Using it is as simple as:
output = F0(input_data)
Type information
The user can encode type information in the function signature using a combination of python type hints and dataclasses. An example of a function with type information is shown below:
from metaflow import MetaflowDataFrame as MDF
from metaflow import FunctionParameters as FP
from typing import Any, Protocol
class MyMdfInp1(Protocol):
x: Any
class MyMdfInp2(Protocol):
x: int
class MyMdfInp3(Protocol):
x: int
y: int
class Inp4:
x: int
y: str
z: MyDataClass3
class MyMdfOut1(Protocol):
a: Any
class MyMdfOut2(Protocol):
a: Int
# Defining Functions
# The input and output MetaflowDataFrame types are not specified in f0
# We can pass any dataframe to f0, and the output can also be any dataframe
@dataframe_function
def f0(data: MDF, arguments: FP) -> MDF:
pass
# The input dataframe needs to at least have a column called 'x' of type Any
# The output dataframe will at least have a column called 'a' of type Any
@dataframe_function
def f1(data: MDF[MyMdfInp1], arguments: FP) -> MDF[MyMdfOut1]:
pass
# The input dataframe needs to at least have a column called 'x' of type int
# The output dataframe will at least have a column called 'a' of type int
@dataframe_function
def f2(data: MDF[MyMdfInp2], arguments: FP) -> MDF[MyMdfOut2]:
pass
# The input dataframe needs to at least have columns called 'x' and 'y' of type int
# The output dataframe will at least have a column called 'a' of type int
@dataframe_function
def f3(data: MDF[MyMdfInp3], arguments: FP) -> MDF[MyMdfOut2]:
pass
For the purpose of dataframe functions, we will assume that the typing is
covariant. This is common for read-only structures, like Sequence[T], where a
Sequence[Dog] can be treated as a Sequence[Animal] because every Dog is an
Animal. Specifically for MetaflowDataFrame, this means that if a function takes
a dataframe of type A, it can also take a dataframe of type B where B is a subclass
of A.
Additionally, we will use structural typing instead of nominal typing for type checks. What this means is that we will check if the dataframe has the required columns and types, and it is fine if the dataframe has more columns.
Here are some examples of how this would work in practice:
input_data0: Any = MDF.from_pandas(df, ...)
input_data1: MDF[MyMdfInp1] = MDF.from_pandas(df, ...)
input_data2: MDF[MyMdfInp2] = MDF.from_pandas(df, ...)
input_data3: MDF[MyMdfInp3] = MDF.from_pandas(df, ...)
# f0 can take any dataframe
output0 = f0(input_data0) # should work
output1 = f0(input_data1) # should work
output2 = f0(input_data2) # should work
output3 = f0(input_data3) # should work
# f1 can take any dataframe that at least has a column called 'x' of type Any
output0 = f1(input_data0) # fails, since input_data0 may not have a column called 'x'
output1 = f1(input_data1) # should work
output2 = f1(input_data2) # should work
output3 = f1(input_data3) # should work
# f2 can take any dataframe that at least has a column called 'x' of type int
output0 = f2(input_data0) # fails, since input_data0 may not have a column called 'x'
output1 = f2(input_data1) # fails, since input_data1 will have a column called 'x' but it could be not of type int
output2 = f2(input_data2) # should work
output3 = f2(input_data3) # should work
# f3 can take any dataframe that at least has columns called 'x' and 'y' of type int
output0 = f3(input_data0) # fails, since input_data0 may not have columns called 'x' and 'y'
output1 = f3(input_data1) # fails, since input_data1 may not have column 'y' or column 'x' might not be of type int
output2 = f3(input_data2) # fails, since input_data2 may not have column 'y'
output3 = f3(input_data3) # should work
Metaflow will expose a method/class to help users define these protocols encoding type information easily.
from metaflow import MDFSchemaGen
# Returns a protocol that has columns x and y of type Any
my_schema1 = MDFSchemaGen(['x', 'y'])
# Returns a protocol that has columns a and b of type int
my_schema2 = MDFSchemaGen([('a', int), ('b', int)])
# Complex example
# Returns a protocol that has columns x and y of type int
# and column z consists of a list of structs that implement the my_schema2 protocol
# i.e. z is a
my_schema3 = MDFSchemaGen([
('x', int),
('y', int),
('z', List[my_schema2]),
])
The type checks will be done at compile time using mypy, and during
runtime we will raise an error if the input or output dataframes do not respect
the type information. The underlying types to use to encode the type information
of columns is still open for discussion:
arroworavrois an option since we are usingarrowfor serialization and deserialization of the dataframes.pyicebergtypes since the data is most likely coming from Iceberg tables and there will be scaffolding on the JVM side for it already.- Native python types like str , Dict , List and more
DataFrameFunctionSystem UX
DataFrameFunctionSystem is a container that takes one or more
DataFrameFunction's and has methods to compose these DataFrameFunction's
into a DAG. The DataFrameFunctionSystem also has the same semantics as
those of DataFrameFunction, i.e. it is a callable, and it relocatable.
schema0 = MDFSchemaGen(['x1', 'x2'])
schema1 = MDFSchemaGen(['y1', 'y2'])
schema2 = MDFSchemaGen(['z1', 'z2'])
schema3 = MDFSchemaGen(['w1', 'w2'])
inp: MDF[schema0] = MDF.from_pandas(df, ...)
# f0(schema0) -> schema1
# f1(schema1) -> schema2
# f2(schema2) -> schema3
# Users can implicitly define the DAG
# The only requirement is that all functions need to output unique column names
# to help with disambiguation
# f0 -> f1 -> f2
# Consolidated input: [x1, x2]
# Consolidated output: [w1, w2]
my_function_system1 = DataFrameFunctionSystem(
[f0, f1, f2],
)
# Users can explicitly define the DAG and
# Users can remap column names between functions when defining the DAG
# f0 -> f2 -> f1
my_function_system3 = DataFrameFunctionSystem(
[f0, f1, f2],
)
my_function_system3.add_edge(
source="f0",
target="f2",
column_mapping={
"z1": "y1",
"z2": "y2",
}
)
my_function_system3.add_edge(
source="f2",
target="f1",
column_mapping={
"y1": "w1",
"y2": "w2",
}
)
# Usage
output = my_function_system1(input_data)
The output of the DataFrameFunctionSystem is a MetaflowDataFrame that
has all the columns from all the leaf nodes in the DAG. The
DataFrameFunctionSystem class will expose methods to view the DAG, view
the consolidated schema, and more.
Metaflow Function Implementation details
All Metaflow function decorators and classes will inherit from a base decorator and Function class. There will be two entities in the base function and decorator implementation:
MetaflowFunctionDecorator: The raw, callable function that the user can specify using the@mf_functionannotation. The decorator simply marks that a function is a Metaflow function. Note: User will never use this directly, and will instead use its concrete implementation like@dataframe_functionor@pytorch_function.MetaflowFunction: A class that wraps a function annotated with@mf_function. Note: User will never use this directly, and will instead use its concrete implementation likeDataFrameFunctionorPytorchFunction.
Base Function Decorator Implementation
The base function decorator is a simple callable that takes a function, sets
the is_metaflow_function attribute to True, and does some validation of
the function signature.
import functools
class MetaflowFunctionDecorator:
TYPE = "metaflow_function"
def __init__(self, func):
# Preserves original function's metadata
functools.update_wrapper(self, func)
self.func = func
self.func.is_metaflow_function = True
self._validate_function_signature()
def __call__(self, *args, **kwargs):
# Validate arguments before calling the wrapped function
self.validate(args, kwargs)
# Execute the wrapped function
out = self.func(*args, **kwargs)
self.validate(out)
return out
def _validate_function_signature(self):
"""
Placeholder for function signature validation logic.
"""
pass
def validate(self, *args, **kwargs):
"""
Placeholder for validation logic.
"""
pass
Custom decorators like @dataframe_function will inherit from this and add
custom validation logic. For instance,
class DataFrameFunctionDecorator(MetaflowFunctionDecorator):
TYPE = "dataframe_function"
pass
# We will only expose dataframe_function to the user, and not the
# DataFrameFunctionDecorator class
dataframe_function = DataFrameFunctionDecorator
Implementation of DataFrameFunction
The DataFrameFunction class will be a concrete implementation of the
MetaflowFunction base class. The base MetaflowFunction class will expose
the following interface:
class MetaflowFunction(object):
def __init__(self, func, task):
self.func = func
self.task = task
self._validate_function()
def _validate_function(self):
pass
def runtime(self):
# Each Function will implement its own runtime and override the runtime logic from the parent function's
pass
def __call__(self, *args, **kwargs):
return self.func(*args, **kwargs)
def __getstate__(self, *args, **kwargs):
pass
def __setstate__(self, *args, **kwargs):
pass
@classmethod
def from_reference(cls: Type[T], path: str) -> T:
"""
Return a MetaflowFunction object from reference file
"""
pass
Note: An active question is whether the run time validation should happen
in the raw function or in the MetaflowFunction class.
Implementation of Typing
We will simply make MetaflowDataFrame and FunctionParameters a generic class that takes a type parameter.
from typing import Generic, TypeVar, Type
from dataclasses import fields
import pandas as pd
from typing import Any
T_co = TypeVar('T', covariant=True)
T_FP_co = TypeVar('T_FP_co', covariant=True)
class MetaflowDataFrame(Generic[T_co]):
def __init__(self, *args, **kwargs):
pass
class FunctionParameters(Generic[T_FP_co]):
pass
Implementation of DataFrameFunctionSystem
There will be no base class since other function systems may or may not support composition.
class DataFrameFunctionSystem:
def __init__(self, functions: List[DataFrameFunction]):
pass
def add_edge(self, source: str, target: str, column_mapping: Optional[Dict[str, str]] = None):
pass
def _construct_dag(self):
pass
def __call__(self, *args, **kwargs):
pass
def visualize(self):
pass
def get_schema(self):
pass
def __repr__(self):
pass
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file metaflow_functions-0.2.4.tar.gz.
File metadata
- Download URL: metaflow_functions-0.2.4.tar.gz
- Upload date:
- Size: 95.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3a0d46e6e13e302ef8f4cfb04a3c9dc5dcc29aa61b3a4e6c7217bf9f196d6578
|
|
| MD5 |
c95edb13a9fe3dc2aeb0cd7dcef9e014
|
|
| BLAKE2b-256 |
5a6be8be5b8dfc56701ff30a912a06d8fb0b9ede546a07242533d934114a8a8b
|
Provenance
The following attestation bundles were made for metaflow_functions-0.2.4.tar.gz:
Publisher:
python-publish.yml on Netflix/metaflow-nflx-extensions
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
metaflow_functions-0.2.4.tar.gz -
Subject digest:
3a0d46e6e13e302ef8f4cfb04a3c9dc5dcc29aa61b3a4e6c7217bf9f196d6578 - Sigstore transparency entry: 1334128506
- Sigstore integration time:
-
Permalink:
Netflix/metaflow-nflx-extensions@58c313d7861230af19ffa74283c04e8114ec48ff -
Branch / Tag:
refs/tags/functions/v0.2.4 - Owner: https://github.com/Netflix
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@58c313d7861230af19ffa74283c04e8114ec48ff -
Trigger Event:
release
-
Statement type:
File details
Details for the file metaflow_functions-0.2.4-py2.py3-none-any.whl.
File metadata
- Download URL: metaflow_functions-0.2.4-py2.py3-none-any.whl
- Upload date:
- Size: 114.8 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
06fbe460ffaf6713e2cb3879111e140e115d65aca65fc25d48248c155c0263fb
|
|
| MD5 |
3d0b235f55e6f3e88763dd6917c8b9f5
|
|
| BLAKE2b-256 |
2a9a1e10c5ff8df05632c9ae0d7c58ba8db318013f24051c8fc24a98e483db35
|
Provenance
The following attestation bundles were made for metaflow_functions-0.2.4-py2.py3-none-any.whl:
Publisher:
python-publish.yml on Netflix/metaflow-nflx-extensions
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
metaflow_functions-0.2.4-py2.py3-none-any.whl -
Subject digest:
06fbe460ffaf6713e2cb3879111e140e115d65aca65fc25d48248c155c0263fb - Sigstore transparency entry: 1334128646
- Sigstore integration time:
-
Permalink:
Netflix/metaflow-nflx-extensions@58c313d7861230af19ffa74283c04e8114ec48ff -
Branch / Tag:
refs/tags/functions/v0.2.4 - Owner: https://github.com/Netflix
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@58c313d7861230af19ffa74283c04e8114ec48ff -
Trigger Event:
release
-
Statement type: