Metaflow extension: deploy and run flows as Prefect deployments
Project description
metaflow-prefect
Deploy and run Metaflow flows as Prefect deployments.
metaflow-prefect generates a self-contained Prefect flow file from any Metaflow flow, letting
you schedule, deploy, and monitor your pipelines through Prefect while keeping all your existing
Metaflow code unchanged.
Install
pip install metaflow-prefect
Or from source:
git clone https://github.com/npow/metaflow-prefect.git
cd metaflow-prefect
pip install -e ".[dev]"
Quick start
# Compile and run locally (no Prefect server needed)
python my_flow.py prefect run
# Register as a named deployment on a Prefect server
python my_flow.py prefect create --name prod --work-pool my-pool
Usage
Commands
| Command | Description |
|---|---|
prefect run |
Compile and run the flow via Prefect locally (ephemeral, no server needed). |
prefect resume --clone-run-id <id> |
Re-run a failed flow, skipping steps that already succeeded. |
prefect compile <output.py> |
Write the generated Prefect flow file without running it. |
prefect create --name <name> |
Register a named deployment on a running Prefect server. |
prefect trigger --name <name> |
Trigger a run for an existing deployment. |
# Run locally
python my_flow.py prefect run
# Resume a failed run (reuses already-completed step outputs)
python my_flow.py prefect resume --clone-run-id prefect-<uuid>
# Deploy to a Prefect server
python my_flow.py prefect create --name prod --work-pool my-pool
# Trigger a run of the deployed flow
python my_flow.py prefect trigger --name prod
All graph shapes are supported
# Linear
class SimpleFlow(FlowSpec):
@step
def start(self):
self.value = 42
self.next(self.end)
@step
def end(self): pass
# Split/join (static branch)
class BranchFlow(FlowSpec):
@step
def start(self):
self.next(self.branch_a, self.branch_b)
...
# Conditional (dynamic branch — only one path runs at runtime)
class ConditionalFlow(FlowSpec):
value = Parameter("value", default=42, type=int)
@step
def start(self):
self.route = "high" if self.value >= 50 else "low"
self.next({"high": self.high_branch, "low": self.low_branch}, condition="route")
@step
def high_branch(self): ...
@step
def low_branch(self): ...
@step
def join(self): ...
# Foreach fan-out (body tasks run concurrently)
class ForeachFlow(FlowSpec):
@step
def start(self):
self.items = [1, 2, 3]
self.next(self.process, foreach="items")
...
Parametrised flows
Parameters defined with metaflow.Parameter are forwarded automatically:
python param_flow.py prefect run
# Or pass parameters at the CLI:
python param_flow.py prefect compile param_flow_prefect.py
python param_flow_prefect.py --message "hello" --count 5
Step decorator support
@retry, @timeout, @environment, and @resources decorators are read from your flow
and applied to the generated Prefect tasks automatically — no changes to your flow code required.
class MyFlow(FlowSpec):
@retry(times=3, minutes_between_retries=2)
@timeout(seconds=600)
@environment(vars={"API_KEY": "secret"})
@step
def train(self):
...
The generated Prefect task becomes:
@task(name="train", retries=3, timeout_seconds=600, retry_delay_seconds=120)
def _step_train(run_id, prev_task_id):
_extra_env.update({"API_KEY": "secret"})
...
Event-based triggers
@trigger and @trigger_on_finish are wired as Prefect automations when the deployment
is registered. Re-running prefect create updates the automations in place.
@trigger_on_finish(flow="UpstreamFlow")
class MyFlow(FlowSpec):
...
python my_flow.py prefect create --name prod --work-pool my-pool
# → Registers deployment AND creates a Prefect automation:
# "on prefect.flow-run.Completed for flow 'UpstreamFlow' → run deployment 'prod'"
@trigger(event="data.ready")
class MyFlow(FlowSpec):
...
python my_flow.py prefect create --name prod --work-pool my-pool
# → Registers deployment AND creates a Prefect automation:
# "on event 'data.ready' → run deployment 'prod'"
Resume failed runs
Pass --clone-run-id to reuse outputs from steps that already succeeded in a previous run:
python my_flow.py prefect resume --clone-run-id prefect-<uuid-of-failed-run>
Configuration
Metadata service and datastore
By default, metaflow-prefect uses whatever metadata and datastore backends are active in your
Metaflow environment. The generated Prefect file bakes in METADATA_TYPE and DATASTORE_TYPE
at creation time so every step subprocess uses the same backend.
To use a remote metadata service or object store, configure them before running prefect create:
# Remote metadata service + S3 datastore
python my_flow.py \
--metadata=service \
--datastore=s3 \
prefect create --name prod --work-pool my-pool
Or via environment variables (applied to all flows):
export METAFLOW_DEFAULT_METADATA=service
export METAFLOW_DEFAULT_DATASTORE=s3
python my_flow.py prefect create --name prod --work-pool my-pool
Flow-level timeout
python my_flow.py prefect create --name prod --workflow-timeout=3600
Step decorators (--with)
Inject Metaflow step decorators at deploy time without modifying the flow source:
# Run each step inside a sandbox (e.g. metaflow-sandbox extension)
python my_flow.py prefect run --with=sandbox
# Multiple decorators supported at deployment time
python my_flow.py prefect create --name prod \
--with=sandbox \
--with="resources:cpu=4,memory=8000"
@project support
Flows decorated with @project use a project-qualified name for the deployment:
@project(name="my-team")
class MyFlow(FlowSpec):
...
# Deployment will be registered as "my-team.MyFlow"
python my_flow.py prefect create --name prod
How it works
metaflow-prefect generates a self-contained Prefect flow file from your Metaflow flow's DAG.
Each Metaflow step becomes a @task. The generated file:
- runs each step as a subprocess via the standard
metaflow stepCLI - streams stdout and stderr from each step subprocess to the Prefect logger in real time
- passes
--input-pathscorrectly for joins and foreach splits - runs foreach body tasks concurrently via Prefect's task runner
- maps
@retry,@timeout,@environment, and@resourcesdecorators to Prefect task settings - writes Metaflow artifacts to the Prefect UI as markdown artifacts with a ready-to-use retrieval snippet
- creates Prefect automations for
@triggerand@trigger_on_finishwhen deploying
Prefect UI: flow run timeline
The generated flow preserves the Metaflow DAG structure — foreach fan-outs appear as parallel task runs in the Prefect timeline:
Prefect UI: artifact retrieval snippets
After each step completes, a Prefect artifact is posted showing the Metaflow self.* artifact
names and a one-liner to fetch each value:
Supported decorators
| Decorator | Behaviour |
|---|---|
@retry(times=N, minutes_between_retries=M) |
Maps to @task(retries=N, retry_delay_seconds=M*60) |
@timeout(seconds=N) / @timeout(minutes=N) |
Maps to @task(timeout_seconds=N) |
@environment(vars={...}) |
Merges vars into the step subprocess environment |
@resources(cpu=N, gpu=G, memory=M) |
Added as Prefect task tags; GPU steps get a concurrency tag. Advisory only — configure matching resources on the work pool. |
@schedule(cron=...) |
Used as the deployment cron schedule |
@project(name=...) |
Prefixes the deployment name with the project name |
@trigger(event=...) |
Creates a Prefect automation that fires the deployment on the named event |
@trigger_on_finish(flow=...) |
Creates a Prefect automation that fires the deployment when the upstream Prefect flow completes |
Unsupported decorators (@batch, @slurm, @exit_hook, @parallel)
raise a clear error at compile time.
Limitations
| Limitation | Detail |
|---|---|
No parallel_foreach |
parallel_foreach=True (Metaflow's MPI-style multi-node execution) requires @batch or @kubernetes backends and runs as a single distributed job, which has no Prefect equivalent. Raises an error at compile time. |
@resources tags are advisory |
CPU/GPU/memory hints are added as Prefect task tags and are visible in the UI, but do not automatically allocate resources — configure matching resources on the work pool. |
@trigger event scope |
@trigger(event="foo") watches for a Prefect event named "foo". Metaflow's own event system is separate from Prefect's — emit events via Prefect's event API to use this trigger. |
Development
git clone https://github.com/npow/metaflow-prefect.git
cd metaflow-prefect
pip install -e ".[dev]"
pytest -v
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file metaflow_prefect-0.3.7.tar.gz.
File metadata
- Download URL: metaflow_prefect-0.3.7.tar.gz
- Upload date:
- Size: 399.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5eff877b93997984dbb98c695e25ff02e5ff5f4111ab8f5cf85f405d0230e8c2
|
|
| MD5 |
7b691756b94ccf845f5315878a201350
|
|
| BLAKE2b-256 |
c458d18195c3212a492087c85aeb89ed04d347134e7cd70875db97b9b802b686
|
Provenance
The following attestation bundles were made for metaflow_prefect-0.3.7.tar.gz:
Publisher:
publish.yml on npow/metaflow-prefect
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
metaflow_prefect-0.3.7.tar.gz -
Subject digest:
5eff877b93997984dbb98c695e25ff02e5ff5f4111ab8f5cf85f405d0230e8c2 - Sigstore transparency entry: 1056129354
- Sigstore integration time:
-
Permalink:
npow/metaflow-prefect@b279ad4f70d95cf56f7341ab7a9a5c071793324c -
Branch / Tag:
refs/tags/v0.3.7 - Owner: https://github.com/npow
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@b279ad4f70d95cf56f7341ab7a9a5c071793324c -
Trigger Event:
push
-
Statement type:
File details
Details for the file metaflow_prefect-0.3.7-py3-none-any.whl.
File metadata
- Download URL: metaflow_prefect-0.3.7-py3-none-any.whl
- Upload date:
- Size: 36.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
68ddf60bde2db2797c425b6ce2f441d8fc2a2ac3882a86f653caa7915e728eb0
|
|
| MD5 |
83d0d48c6f4fad9ed0aa6dcbef616810
|
|
| BLAKE2b-256 |
fbd1a0790a7b7e7810ac9c374dbdcd10aff28069f1a6c0dd950ad6f5cecef3d5
|
Provenance
The following attestation bundles were made for metaflow_prefect-0.3.7-py3-none-any.whl:
Publisher:
publish.yml on npow/metaflow-prefect
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
metaflow_prefect-0.3.7-py3-none-any.whl -
Subject digest:
68ddf60bde2db2797c425b6ce2f441d8fc2a2ac3882a86f653caa7915e728eb0 - Sigstore transparency entry: 1056129355
- Sigstore integration time:
-
Permalink:
npow/metaflow-prefect@b279ad4f70d95cf56f7341ab7a9a5c071793324c -
Branch / Tag:
refs/tags/v0.3.7 - Owner: https://github.com/npow
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@b279ad4f70d95cf56f7341ab7a9a5c071793324c -
Trigger Event:
push
-
Statement type: