Skip to main content

Metaflow extension: deploy and run flows as Prefect deployments

Project description

metaflow-prefect

CI PyPI License: Apache-2.0 Python 3.10+

Deploy and run Metaflow flows as Prefect deployments.

metaflow-prefect generates a self-contained Prefect flow file from any Metaflow flow, letting you schedule, deploy, and monitor your pipelines through Prefect while keeping all your existing Metaflow code unchanged.

Install

pip install metaflow-prefect

Or from source:

git clone https://github.com/npow/metaflow-prefect.git
cd metaflow-prefect
pip install -e ".[dev]"

Quick start

python my_flow.py prefect create my_flow_prefect.py
python my_flow_prefect.py

Usage

Generate and run a Prefect flow

# Write the generated file and run it directly
python my_flow.py prefect create my_flow_prefect.py
python my_flow_prefect.py

# Or compile and run in one step
python my_flow.py prefect run

# Register a named deployment on a Prefect server
python my_flow.py prefect deploy --name prod --work-pool my-pool

All graph shapes are supported

# Linear
class SimpleFlow(FlowSpec):
    @step
    def start(self):
        self.value = 42
        self.next(self.end)
    @step
    def end(self): pass

# Split/join (branch)
class BranchFlow(FlowSpec):
    @step
    def start(self):
        self.next(self.branch_a, self.branch_b)
    ...

# Foreach fan-out (body tasks run concurrently)
class ForeachFlow(FlowSpec):
    @step
    def start(self):
        self.items = [1, 2, 3]
        self.next(self.process, foreach="items")
    ...

Parametrised flows

Parameters defined with metaflow.Parameter are forwarded automatically:

python param_flow.py prefect create param_flow_prefect.py
python param_flow_prefect.py --message "hello" --count 5

Step decorator support

@retry, @timeout, and @environment decorators are read from your flow and applied to the generated Prefect tasks automatically — no changes to your flow code required.

class MyFlow(FlowSpec):
    @retry(times=3, minutes_between_retries=2)
    @timeout(seconds=600)
    @environment(vars={"API_KEY": "secret"})
    @step
    def train(self):
        ...

The generated Prefect task becomes:

@task(name="train", retries=3, timeout_seconds=600, retry_delay_seconds=120)
def _step_train(run_id, prev_task_id):
    _extra_env.update({"API_KEY": "secret"})
    ...

Configuration

Metadata service and datastore

By default, metaflow-prefect uses whatever metadata and datastore backends are active in your Metaflow environment. The generated Prefect file bakes in METADATA_TYPE and DATASTORE_TYPE at creation time so every step subprocess uses the same backend.

To use a remote metadata service or object store, configure them before running prefect create:

# Remote metadata service + S3 datastore
python my_flow.py \
  --metadata=service \
  --datastore=s3 \
  prefect create my_flow_prefect.py

Or via environment variables (applied to all flows):

export METAFLOW_DEFAULT_METADATA=service
export METAFLOW_DEFAULT_DATASTORE=s3
python my_flow.py prefect create my_flow_prefect.py

Flow-level timeout

python my_flow.py prefect create my_flow_prefect.py --workflow-timeout=3600

Step decorators (--with)

Inject Metaflow step decorators at deploy time without modifying the flow source:

# Run each step inside a sandbox (e.g. metaflow-sandbox extension)
python my_flow.py prefect create my_flow_prefect.py --with=sandbox

# Multiple decorators are supported
python my_flow.py prefect deploy --name prod \
  --with=sandbox \
  --with="resources:cpu=4,memory=8000"

@project support

Flows decorated with @project use a project-qualified name for the deployment:

@project(name="my-team")
class MyFlow(FlowSpec):
    ...
# Deployment will be registered as "my-team.MyFlow"
python my_flow.py prefect deploy --name prod

How it works

metaflow-prefect generates a self-contained Prefect flow file from your Metaflow flow's DAG. Each Metaflow step becomes a @task. The generated file:

  • runs each step as a subprocess via the standard metaflow step CLI
  • passes --input-paths correctly for joins and foreach splits
  • runs foreach body tasks concurrently via Prefect's task runner
  • maps @retry, @timeout, and @environment decorators to Prefect task settings
  • writes Metaflow artifacts to the Prefect UI as markdown artifacts with a ready-to-use retrieval snippet

Prefect UI: flow run timeline

The generated flow preserves the Metaflow DAG structure — foreach fan-outs appear as parallel task runs in the Prefect timeline:

Flow run timeline showing foreach fan-out

Prefect UI: artifact retrieval snippets

After each step completes, a Prefect artifact is posted showing the Metaflow self.* artifact names and a one-liner to fetch each value:

Artifact tab showing retrieval snippet

Supported decorators

Decorator Behaviour
@retry(times=N, minutes_between_retries=M) Maps to @task(retries=N, retry_delay_seconds=M*60)
@timeout(seconds=N) / @timeout(minutes=N) Maps to @task(timeout_seconds=N)
@environment(vars={...}) Merges vars into the step subprocess environment
@schedule(cron=...) Used as the deployment cron schedule
@project(name=...) Prefixes the deployment name with the project name

Unsupported decorators (@batch, @slurm, @trigger, @trigger_on_finish, @exit_hook, @parallel) raise a clear error at compile time.

Development

git clone https://github.com/npow/metaflow-prefect.git
cd metaflow-prefect
pip install -e ".[dev]"
pytest -v

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

metaflow_prefect-0.2.0.tar.gz (376.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

metaflow_prefect-0.2.0-py3-none-any.whl (24.8 kB view details)

Uploaded Python 3

File details

Details for the file metaflow_prefect-0.2.0.tar.gz.

File metadata

  • Download URL: metaflow_prefect-0.2.0.tar.gz
  • Upload date:
  • Size: 376.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for metaflow_prefect-0.2.0.tar.gz
Algorithm Hash digest
SHA256 ff24b10233d3a9f67c75f33e64b304c9c463641e26c4943e499619cf5e032390
MD5 3ecab055b2d8259c84cb859b448bc3ad
BLAKE2b-256 f889a6cdebe108a308b97976547f0df00b8fe1723d779be9bde060a235a16368

See more details on using hashes here.

Provenance

The following attestation bundles were made for metaflow_prefect-0.2.0.tar.gz:

Publisher: publish.yml on npow/metaflow-prefect

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file metaflow_prefect-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for metaflow_prefect-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 544e22038d5b4f612afd65b3938340c7c95bcc4af43024a91ad99a5d0d1fc524
MD5 96e21b537826097cfbe44acf04cc4f14
BLAKE2b-256 fefdce96f88800993d477bb2e34f06d23389c1ed95de9a77e0a855fca05a58de

See more details on using hashes here.

Provenance

The following attestation bundles were made for metaflow_prefect-0.2.0-py3-none-any.whl:

Publisher: publish.yml on npow/metaflow-prefect

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page