Skip to main content

Metaflow extension: deploy and run flows as Prefect deployments

Project description

metaflow-prefect

CI PyPI License: Apache-2.0 Python 3.10+

Deploy and run Metaflow flows as Prefect deployments.

metaflow-prefect generates a self-contained Prefect flow file from any Metaflow flow, letting you schedule, deploy, and monitor your pipelines through Prefect while keeping all your existing Metaflow code unchanged.

Install

pip install metaflow-prefect

Or from source:

git clone https://github.com/npow/metaflow-prefect.git
cd metaflow-prefect
pip install -e ".[dev]"

Quick start

# Compile and run locally (no Prefect server needed)
python my_flow.py prefect run

# Register as a named deployment on a Prefect server
python my_flow.py prefect create --name prod --work-pool my-pool

Usage

Commands

Command Description
prefect run Compile and run the flow via Prefect locally (ephemeral, no server needed).
prefect resume --clone-run-id <id> Re-run a failed flow, skipping steps that already succeeded.
prefect compile <output.py> Write the generated Prefect flow file without running it.
prefect create --name <name> Register a named deployment on a running Prefect server.
prefect trigger --name <name> Trigger a run for an existing deployment.
# Run locally
python my_flow.py prefect run

# Resume a failed run (reuses already-completed step outputs)
python my_flow.py prefect resume --clone-run-id prefect-<uuid>

# Deploy to a Prefect server
python my_flow.py prefect create --name prod --work-pool my-pool

# Trigger a run of the deployed flow
python my_flow.py prefect trigger --name prod

All graph shapes are supported

# Linear
class SimpleFlow(FlowSpec):
    @step
    def start(self):
        self.value = 42
        self.next(self.end)
    @step
    def end(self): pass

# Split/join (branch)
class BranchFlow(FlowSpec):
    @step
    def start(self):
        self.next(self.branch_a, self.branch_b)
    ...

# Foreach fan-out (body tasks run concurrently)
class ForeachFlow(FlowSpec):
    @step
    def start(self):
        self.items = [1, 2, 3]
        self.next(self.process, foreach="items")
    ...

Parametrised flows

Parameters defined with metaflow.Parameter are forwarded automatically:

python param_flow.py prefect run
# Or pass parameters at the CLI:
python param_flow.py prefect compile param_flow_prefect.py
python param_flow_prefect.py --message "hello" --count 5

Step decorator support

@retry, @timeout, @environment, and @resources decorators are read from your flow and applied to the generated Prefect tasks automatically — no changes to your flow code required.

class MyFlow(FlowSpec):
    @retry(times=3, minutes_between_retries=2)
    @timeout(seconds=600)
    @environment(vars={"API_KEY": "secret"})
    @step
    def train(self):
        ...

The generated Prefect task becomes:

@task(name="train", retries=3, timeout_seconds=600, retry_delay_seconds=120)
def _step_train(run_id, prev_task_id):
    _extra_env.update({"API_KEY": "secret"})
    ...

Event-based triggers

@trigger and @trigger_on_finish are wired as Prefect automations when the deployment is registered. Re-running prefect create updates the automations in place.

@trigger_on_finish(flow="UpstreamFlow")
class MyFlow(FlowSpec):
    ...
python my_flow.py prefect create --name prod --work-pool my-pool
# → Registers deployment AND creates a Prefect automation:
#   "on prefect.flow-run.Completed for flow 'UpstreamFlow' → run deployment 'prod'"
@trigger(event="data.ready")
class MyFlow(FlowSpec):
    ...
python my_flow.py prefect create --name prod --work-pool my-pool
# → Registers deployment AND creates a Prefect automation:
#   "on event 'data.ready' → run deployment 'prod'"

Resume failed runs

Pass --clone-run-id to reuse outputs from steps that already succeeded in a previous run:

python my_flow.py prefect resume --clone-run-id prefect-<uuid-of-failed-run>

Configuration

Metadata service and datastore

By default, metaflow-prefect uses whatever metadata and datastore backends are active in your Metaflow environment. The generated Prefect file bakes in METADATA_TYPE and DATASTORE_TYPE at creation time so every step subprocess uses the same backend.

To use a remote metadata service or object store, configure them before running prefect create:

# Remote metadata service + S3 datastore
python my_flow.py \
  --metadata=service \
  --datastore=s3 \
  prefect create --name prod --work-pool my-pool

Or via environment variables (applied to all flows):

export METAFLOW_DEFAULT_METADATA=service
export METAFLOW_DEFAULT_DATASTORE=s3
python my_flow.py prefect create --name prod --work-pool my-pool

Flow-level timeout

python my_flow.py prefect create --name prod --workflow-timeout=3600

Step decorators (--with)

Inject Metaflow step decorators at deploy time without modifying the flow source:

# Run each step inside a sandbox (e.g. metaflow-sandbox extension)
python my_flow.py prefect run --with=sandbox

# Multiple decorators supported at deployment time
python my_flow.py prefect create --name prod \
  --with=sandbox \
  --with="resources:cpu=4,memory=8000"

@project support

Flows decorated with @project use a project-qualified name for the deployment:

@project(name="my-team")
class MyFlow(FlowSpec):
    ...
# Deployment will be registered as "my-team.MyFlow"
python my_flow.py prefect create --name prod

How it works

metaflow-prefect generates a self-contained Prefect flow file from your Metaflow flow's DAG. Each Metaflow step becomes a @task. The generated file:

  • runs each step as a subprocess via the standard metaflow step CLI
  • streams stdout and stderr from each step subprocess to the Prefect logger in real time
  • passes --input-paths correctly for joins and foreach splits
  • runs foreach body tasks concurrently via Prefect's task runner
  • maps @retry, @timeout, @environment, and @resources decorators to Prefect task settings
  • writes Metaflow artifacts to the Prefect UI as markdown artifacts with a ready-to-use retrieval snippet
  • creates Prefect automations for @trigger and @trigger_on_finish when deploying

Prefect UI: flow run timeline

The generated flow preserves the Metaflow DAG structure — foreach fan-outs appear as parallel task runs in the Prefect timeline:

Flow run timeline showing foreach fan-out

Prefect UI: artifact retrieval snippets

After each step completes, a Prefect artifact is posted showing the Metaflow self.* artifact names and a one-liner to fetch each value:

Artifact tab showing retrieval snippet

Supported decorators

Decorator Behaviour
@retry(times=N, minutes_between_retries=M) Maps to @task(retries=N, retry_delay_seconds=M*60)
@timeout(seconds=N) / @timeout(minutes=N) Maps to @task(timeout_seconds=N)
@environment(vars={...}) Merges vars into the step subprocess environment
@resources(cpu=N, gpu=G, memory=M) Added as Prefect task tags; GPU steps get a concurrency tag. Advisory only — configure matching resources on the work pool.
@schedule(cron=...) Used as the deployment cron schedule
@project(name=...) Prefixes the deployment name with the project name
@trigger(event=...) Creates a Prefect automation that fires the deployment on the named event
@trigger_on_finish(flow=...) Creates a Prefect automation that fires the deployment when the upstream Prefect flow completes

Unsupported decorators (@batch, @slurm, @condition, @exit_hook, @parallel) raise a clear error at compile time.

Limitations

Limitation Detail
No @condition support Metaflow's conditional branching (@condition) is not supported — it raises a compile-time error to prevent generating incorrect code.
No parallel_foreach parallel_foreach=True (Metaflow's MPI-style multi-node execution) requires @batch or @kubernetes backends and runs as a single distributed job, which has no Prefect equivalent. Raises an error at compile time.
@resources tags are advisory CPU/GPU/memory hints are added as Prefect task tags and are visible in the UI, but do not automatically allocate resources — configure matching resources on the work pool.
@trigger event scope @trigger(event="foo") watches for a Prefect event named "foo". Metaflow's own event system is separate from Prefect's — emit events via Prefect's event API to use this trigger.

Development

git clone https://github.com/npow/metaflow-prefect.git
cd metaflow-prefect
pip install -e ".[dev]"
pytest -v

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

metaflow_prefect-0.3.4.tar.gz (392.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

metaflow_prefect-0.3.4-py3-none-any.whl (35.8 kB view details)

Uploaded Python 3

File details

Details for the file metaflow_prefect-0.3.4.tar.gz.

File metadata

  • Download URL: metaflow_prefect-0.3.4.tar.gz
  • Upload date:
  • Size: 392.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for metaflow_prefect-0.3.4.tar.gz
Algorithm Hash digest
SHA256 83bac278c0bf4848ca4857e75fa33f0590988e45ef28c4759edfb447a85a04e0
MD5 99774f68e6829684d72d25f5e7785e5b
BLAKE2b-256 dde731e55b5334b2dda83ee9727ca17749b67093eba8e38df9ab8f8d4536e66c

See more details on using hashes here.

Provenance

The following attestation bundles were made for metaflow_prefect-0.3.4.tar.gz:

Publisher: publish.yml on npow/metaflow-prefect

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file metaflow_prefect-0.3.4-py3-none-any.whl.

File metadata

File hashes

Hashes for metaflow_prefect-0.3.4-py3-none-any.whl
Algorithm Hash digest
SHA256 ef85c4f201fcbdc482e480dc1f27553d91f2354d3de89e305b20eeaca01b5ef6
MD5 3b1816a9d90d8a7553f04842b6e45c73
BLAKE2b-256 5e36d5411411c814de2898098d60ce6fd1a297d0ab4b7f4afc7ea911f10de5d5

See more details on using hashes here.

Provenance

The following attestation bundles were made for metaflow_prefect-0.3.4-py3-none-any.whl:

Publisher: publish.yml on npow/metaflow-prefect

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page