Skip to main content

Metaflow extension: deploy and run flows as Prefect deployments

Project description

metaflow-prefect

CI PyPI License: Apache-2.0 Python 3.10+

Deploy and run Metaflow flows as Prefect deployments.

metaflow-prefect generates a self-contained Prefect flow file from any Metaflow flow, letting you schedule, deploy, and monitor your pipelines through Prefect while keeping all your existing Metaflow code unchanged.

Install

pip install metaflow-prefect

Or from source:

git clone https://github.com/npow/metaflow-prefect.git
cd metaflow-prefect
pip install -e ".[dev]"

Quick start

python my_flow.py prefect create my_flow_prefect.py
python my_flow_prefect.py

Usage

Generate and run a Prefect flow

# Write the generated file and run it directly
python my_flow.py prefect create my_flow_prefect.py
python my_flow_prefect.py

# Or compile and run in one step
python my_flow.py prefect run

# Register a named deployment on a Prefect server
python my_flow.py prefect deploy --name prod --work-pool my-pool

All graph shapes are supported

# Linear
class SimpleFlow(FlowSpec):
    @step
    def start(self):
        self.value = 42
        self.next(self.end)
    @step
    def end(self): pass

# Split/join (branch)
class BranchFlow(FlowSpec):
    @step
    def start(self):
        self.next(self.branch_a, self.branch_b)
    ...

# Foreach fan-out (body tasks run concurrently)
class ForeachFlow(FlowSpec):
    @step
    def start(self):
        self.items = [1, 2, 3]
        self.next(self.process, foreach="items")
    ...

Parametrised flows

Parameters defined with metaflow.Parameter are forwarded automatically:

python param_flow.py prefect create param_flow_prefect.py
python param_flow_prefect.py --message "hello" --count 5

Step decorator support

@retry, @timeout, and @environment decorators are read from your flow and applied to the generated Prefect tasks automatically — no changes to your flow code required.

class MyFlow(FlowSpec):
    @retry(times=3, minutes_between_retries=2)
    @timeout(seconds=600)
    @environment(vars={"API_KEY": "secret"})
    @step
    def train(self):
        ...

The generated Prefect task becomes:

@task(name="train", retries=3, timeout_seconds=600, retry_delay_seconds=120)
def _step_train(run_id, prev_task_id):
    _extra_env.update({"API_KEY": "secret"})
    ...

Configuration

Metadata service and datastore

By default, metaflow-prefect uses whatever metadata and datastore backends are active in your Metaflow environment. The generated Prefect file bakes in METADATA_TYPE and DATASTORE_TYPE at creation time so every step subprocess uses the same backend.

To use a remote metadata service or object store, configure them before running prefect create:

# Remote metadata service + S3 datastore
python my_flow.py \
  --metadata=service \
  --datastore=s3 \
  prefect create my_flow_prefect.py

Or via environment variables (applied to all flows):

export METAFLOW_DEFAULT_METADATA=service
export METAFLOW_DEFAULT_DATASTORE=s3
python my_flow.py prefect create my_flow_prefect.py

Flow-level timeout

python my_flow.py prefect create my_flow_prefect.py --workflow-timeout=3600

Step decorators (--with)

Inject Metaflow step decorators at deploy time without modifying the flow source:

# Run each step inside a sandbox (e.g. metaflow-sandbox extension)
python my_flow.py prefect create my_flow_prefect.py --with=sandbox

# Multiple decorators are supported
python my_flow.py prefect deploy --name prod \
  --with=sandbox \
  --with="resources:cpu=4,memory=8000"

@project support

Flows decorated with @project use a project-qualified name for the deployment:

@project(name="my-team")
class MyFlow(FlowSpec):
    ...
# Deployment will be registered as "my-team.MyFlow"
python my_flow.py prefect deploy --name prod

How it works

metaflow-prefect generates a self-contained Prefect flow file from your Metaflow flow's DAG. Each Metaflow step becomes a @task. The generated file:

  • runs each step as a subprocess via the standard metaflow step CLI
  • passes --input-paths correctly for joins and foreach splits
  • runs foreach body tasks concurrently via Prefect's task runner
  • maps @retry, @timeout, and @environment decorators to Prefect task settings
  • writes Metaflow artifacts to the Prefect UI as markdown artifacts with a ready-to-use retrieval snippet

Prefect UI: flow run timeline

The generated flow preserves the Metaflow DAG structure — foreach fan-outs appear as parallel task runs in the Prefect timeline:

Flow run timeline showing foreach fan-out

Prefect UI: artifact retrieval snippets

After each step completes, a Prefect artifact is posted showing the Metaflow self.* artifact names and a one-liner to fetch each value:

Artifact tab showing retrieval snippet

Supported decorators

Decorator Behaviour
@retry(times=N, minutes_between_retries=M) Maps to @task(retries=N, retry_delay_seconds=M*60)
@timeout(seconds=N) / @timeout(minutes=N) Maps to @task(timeout_seconds=N)
@environment(vars={...}) Merges vars into the step subprocess environment
@schedule(cron=...) Used as the deployment cron schedule
@project(name=...) Prefixes the deployment name with the project name

Unsupported decorators (@batch, @slurm, @trigger, @trigger_on_finish, @exit_hook, @parallel) raise a clear error at compile time.

Limitations

Limitation Detail
No @condition support Metaflow's conditional branching (@condition) is not supported — it raises a compile-time error to prevent generating incorrect code.
No parallel_foreach parallel_foreach=True (Metaflow's MPI-style multi-node execution) is a genuine architectural limitation — it requires @batch or @kubernetes backends and runs as a single distributed job, which has no Prefect equivalent. Raises an error at compile time.
@resources adds Prefect tags CPU/GPU/memory hints from @resources are added as tags=["resource:cpu=N", ...] on the generated @task. GPU steps also receive task_run_concurrency_tags=["gpu"] for rate-limiting. These tags are visible in the Prefect UI but do not automatically allocate resources — configure matching resources on the work pool. Sandbox users: use @sandbox(cpu=N, gpu=G) for actual resource allocation; @resources tags remain advisory.

Development

git clone https://github.com/npow/metaflow-prefect.git
cd metaflow-prefect
pip install -e ".[dev]"
pytest -v

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

metaflow_prefect-0.3.0.tar.gz (384.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

metaflow_prefect-0.3.0-py3-none-any.whl (32.1 kB view details)

Uploaded Python 3

File details

Details for the file metaflow_prefect-0.3.0.tar.gz.

File metadata

  • Download URL: metaflow_prefect-0.3.0.tar.gz
  • Upload date:
  • Size: 384.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for metaflow_prefect-0.3.0.tar.gz
Algorithm Hash digest
SHA256 21e9f2515f11498f831b9f5cf3f2e1f6b3aeb7d5f06e428b72e887d9afd206d7
MD5 20b42a2bb534bf3aaa1f214acd29d33f
BLAKE2b-256 ae775630e1e69df33a5ef4ebd975232cdfc5b184d0e7b8ee98489e037b173b4d

See more details on using hashes here.

Provenance

The following attestation bundles were made for metaflow_prefect-0.3.0.tar.gz:

Publisher: publish.yml on npow/metaflow-prefect

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file metaflow_prefect-0.3.0-py3-none-any.whl.

File metadata

File hashes

Hashes for metaflow_prefect-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 bf218ec1956286df0243f5883a479306fd844d732825be31ae6568dbe6012708
MD5 fa9f36f6aaf409a9690a8c152d5d0503
BLAKE2b-256 8bddd666fe04db72f1c56d2fdb43870fdc432437ca2b0e0c4375f2c92ecf57f1

See more details on using hashes here.

Provenance

The following attestation bundles were made for metaflow_prefect-0.3.0-py3-none-any.whl:

Publisher: publish.yml on npow/metaflow-prefect

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page