Metaflow extension to compile and deploy flows as Dagster jobs
Project description
metaflow-dagster
Deploy and run Metaflow flows as Dagster jobs.
metaflow-dagster generates a self-contained Dagster definitions file from any Metaflow flow, letting
you schedule, monitor, and launch your pipelines through Dagster while keeping all your existing
Metaflow code unchanged.
Install
pip install metaflow-dagster
Or from source:
git clone https://github.com/npow/metaflow-dagster.git
cd metaflow-dagster
pip install -e ".[test]"
Quick start
python my_flow.py dagster create dagster_defs.py
dagster dev -f dagster_defs.py
Usage
Generate and run a Dagster job
python my_flow.py dagster create dagster_defs.py
dagster dev -f dagster_defs.py
Or execute directly in Python:
from dagster_defs import MyFlow
result = MyFlow.execute_in_process()
All graph shapes are supported
# Linear
class SimpleFlow(FlowSpec):
@step
def start(self):
self.value = 42
self.next(self.end)
@step
def end(self): pass
# Split/join (static branch)
class BranchFlow(FlowSpec):
@step
def start(self):
self.next(self.branch_a, self.branch_b)
...
# Conditional (dynamic branch — only one path runs at runtime)
class ConditionalFlow(FlowSpec):
value = Parameter("value", default=42, type=int)
@step
def start(self):
self.route = "high" if self.value >= 50 else "low"
self.next({"high": self.high_branch, "low": self.low_branch}, condition="route")
@step
def high_branch(self): ...
@step
def low_branch(self): ...
@step
def join(self): ...
# Foreach fan-out
class ForeachFlow(FlowSpec):
@step
def start(self):
self.items = [1, 2, 3]
self.next(self.process, foreach="items")
...
Parametrised flows
Parameters defined with metaflow.Parameter are forwarded automatically as a typed Dagster Config
class on the start op:
python param_flow.py dagster create param_flow_dagster.py
Then pass them via Dagster's run config when launching from the UI, or via a config file when using the CLI:
# config.yaml
ops:
op_start:
config:
greeting: Hi
count: 5
python -m dagster job execute -f param_flow_dagster.py -j ParametrizedFlow -c config.yaml
Step decorators (--with)
Inject Metaflow step decorators at deploy time without modifying the flow source:
# Run every step in a sandbox (e.g. metaflow-sandbox extension)
python my_flow.py dagster create my_flow_dagster.py --with=sandbox
# Multiple decorators are supported
python my_flow.py dagster create my_flow_dagster.py \
--with=sandbox \
--with='resources:cpu=4,memory=8000'
Retries and timeouts
@retry and @timeout on any step are picked up automatically. The generated op gets a Dagster
RetryPolicy and an op_execution_timeout tag — no extra configuration needed:
class MyFlow(FlowSpec):
@retry(times=3, minutes_between_retries=2)
@timeout(seconds=300)
@step
def train(self):
...
Generates:
@op(retry_policy=RetryPolicy(max_retries=3, delay=120),
tags={"dagster/op_execution_timeout": "300"})
def op_train(context): ...
Each Dagster retry passes the correct --retry-count to Metaflow so attempt numbering is consistent.
Environment variables
@environment(vars={...}) on a step passes those variables to the metaflow step subprocess:
@environment(vars={"TOKENIZERS_PARALLELISM": "false"})
@step
def embed(self): ...
Project namespace
If the flow uses @project(name=...), the Dagster job name is automatically prefixed:
@project(name="recommendations")
class TrainFlow(FlowSpec): ...
python train_flow.py dagster create out.py
# job name: recommendations_TrainFlow
Workflow timeout
Cap the total wall-clock time for the entire job run:
python my_flow.py dagster create my_flow_dagster.py --workflow-timeout 3600
Attach tags
Metaflow tags are forwarded to every metaflow step subprocess at compile time:
python my_flow.py dagster create my_flow_dagster.py --tag env:prod --tag version:2
Custom job name
python my_flow.py dagster create my_flow_dagster.py --name nightly_pipeline
Configuration
Metadata service and datastore
By default, metaflow-dagster uses whatever metadata and datastore backends are active in your
Metaflow environment. The generated file bakes in those settings at creation time so every step
subprocess uses the same backend.
To use a remote metadata service or object store, configure them before running dagster create:
python my_flow.py \
--metadata=service \
--datastore=s3 \
dagster create my_flow_dagster.py
Or via environment variables:
export METAFLOW_DEFAULT_METADATA=service
export METAFLOW_DEFAULT_DATASTORE=s3
python my_flow.py dagster create my_flow_dagster.py
Scheduling
If your flow has a @schedule decorator, the generated file includes a ScheduleDefinition
automatically. No extra configuration needed.
How it works
metaflow-dagster compiles your Metaflow flow's DAG into a self-contained Dagster definitions file.
Each Metaflow step becomes a @op. The generated file:
- runs each step as a subprocess via the standard
metaflow stepCLI - passes
--input-pathscorrectly for joins and foreach splits - emits Metaflow artifact keys and a retrieval snippet to the Dagster UI after each step
Job graph
The compiled DAG is fully visible in Dagster — typed inputs, fan-out branches, and fan-in joins:
Launchpad
Parametrised flows get a typed config schema in the Dagster launchpad, populated from your
Metaflow Parameter defaults:
Run timeline
Each Metaflow step appears as a Dagster op with real wall-clock timing. Parallel branches run concurrently:
Artifact retrieval
After each step, the op emits the artifact keys and a ready-to-copy retrieval snippet — without loading the values themselves:
from metaflow import Task
task = Task('BranchingFlow/dagster-d75a08c398a3/start/1')
task['value'].data
Step logs
Every op logs the exact metaflow step CLI command it ran. Flow print() output streams through
Dagster's log panel:
Development
git clone https://github.com/npow/metaflow-dagster.git
cd metaflow-dagster
pip install -e ".[test]"
# Fast compilation tests only (seconds)
pytest -v -m "not e2e"
# Full end-to-end suite (compiles + runs via `dagster job execute`)
pytest -v -m e2e
The e2e tests compile each flow to a real Dagster definitions file, execute it via
dagster job execute against a temporary SQLite-backed Dagster instance, and verify
Metaflow artifacts on disk. No mocks, no webserver required.
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file metaflow_dagster-0.3.6.tar.gz.
File metadata
- Download URL: metaflow_dagster-0.3.6.tar.gz
- Upload date:
- Size: 38.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f6734a7ae5b83e8733819910c599c210ef687973ba4b43fef0955a65faff8b9c
|
|
| MD5 |
414b47c08bb9a45a52e0e919c70d477a
|
|
| BLAKE2b-256 |
900c09fbcec7a2aadcc48180268932acb97eed47794c7ec9a3e383311f5a7669
|
Provenance
The following attestation bundles were made for metaflow_dagster-0.3.6.tar.gz:
Publisher:
publish.yml on npow/metaflow-dagster
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
metaflow_dagster-0.3.6.tar.gz -
Subject digest:
f6734a7ae5b83e8733819910c599c210ef687973ba4b43fef0955a65faff8b9c - Sigstore transparency entry: 1056129501
- Sigstore integration time:
-
Permalink:
npow/metaflow-dagster@97ea0c3d4eb7028a0a5c06a679c4a2ed6ec3ed7c -
Branch / Tag:
refs/tags/v0.3.6 - Owner: https://github.com/npow
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@97ea0c3d4eb7028a0a5c06a679c4a2ed6ec3ed7c -
Trigger Event:
push
-
Statement type:
File details
Details for the file metaflow_dagster-0.3.6-py3-none-any.whl.
File metadata
- Download URL: metaflow_dagster-0.3.6-py3-none-any.whl
- Upload date:
- Size: 32.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
77cba78fbe57699df41338c6c480779a5a0bb5c79efd5346907a6a2403700497
|
|
| MD5 |
e88a9b8711e37a2ba41add55cadf62fa
|
|
| BLAKE2b-256 |
6139132ddadd827368589b27702b7a9e2b306d5b066ec1ce2b8834b91c39dd25
|
Provenance
The following attestation bundles were made for metaflow_dagster-0.3.6-py3-none-any.whl:
Publisher:
publish.yml on npow/metaflow-dagster
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
metaflow_dagster-0.3.6-py3-none-any.whl -
Subject digest:
77cba78fbe57699df41338c6c480779a5a0bb5c79efd5346907a6a2403700497 - Sigstore transparency entry: 1056129505
- Sigstore integration time:
-
Permalink:
npow/metaflow-dagster@97ea0c3d4eb7028a0a5c06a679c4a2ed6ec3ed7c -
Branch / Tag:
refs/tags/v0.3.6 - Owner: https://github.com/npow
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@97ea0c3d4eb7028a0a5c06a679c4a2ed6ec3ed7c -
Trigger Event:
push
-
Statement type: