Run Metaflow flows as Temporal workflows
Project description
metaflow-temporal
Run Metaflow flows as Temporal workflows.
metaflow-temporal generates a self-contained Temporal worker file from any Metaflow flow, letting
you schedule, deploy, and monitor your pipelines through Temporal while keeping all your existing
Metaflow code unchanged.
Install
pip install metaflow-temporal
Or from source:
git clone https://github.com/npow/metaflow-temporal.git
cd metaflow-temporal
pip install -e ".[dev]"
Quick start
python my_flow.py temporal create --output my_flow_worker.py
python my_flow_worker.py # start the worker
python my_flow_worker.py trigger # trigger a run
Usage
Generate and run a worker
python my_flow.py temporal create --output my_flow_worker.py
python my_flow_worker.py
All graph shapes are supported
# Linear
class SimpleFlow(FlowSpec):
@step
def start(self):
self.value = 42
self.next(self.end)
@step
def end(self): pass
# Split/join (branch)
class BranchFlow(FlowSpec):
@step
def start(self):
self.next(self.branch_a, self.branch_b)
...
# Foreach fan-out
class ForeachFlow(FlowSpec):
@step
def start(self):
self.items = ["alpha", "beta", "gamma"]
self.next(self.process, foreach="items")
...
Parametrised flows
Parameters defined with metaflow.Parameter are forwarded automatically:
python param_flow.py temporal create --output param_flow_worker.py
python param_flow_worker.py trigger greeting=world
Configuration
Metadata service and datastore
By default, metaflow-temporal uses whatever metadata and datastore backends are active in your
Metaflow environment. The generated worker file bakes in metadata_type and datastore_type
at creation time so every step subprocess uses the same backend.
To use a remote metadata service or object store, configure them before running temporal create:
# Remote metadata service + S3 datastore
python my_flow.py \
--metadata=service \
--datastore=s3 \
temporal create my_flow_worker.py
Or via environment variables (applied to all flows):
export METAFLOW_DEFAULT_METADATA=service
export METAFLOW_DEFAULT_DATASTORE=s3
python my_flow.py temporal create my_flow_worker.py
Compute backends and step decorators
Step decorators like @kubernetes, @batch, @conda, and third-party backends such as
@sandbox, @daytona, and @e2b are automatically forwarded to each step subprocess at
runtime. No extra configuration is needed — decorate your steps as usual:
from metaflow import FlowSpec, step, kubernetes
class MyFlow(FlowSpec):
@kubernetes(cpu=4, memory=8000)
@step
def train(self):
...
To inject a decorator at deploy time without modifying the flow source:
python my_flow.py --with=sandbox temporal create my_flow_worker.py
Scheduling
Decorate your flow with @schedule and the worker automatically registers a Temporal Schedule
when it starts. The schedule fires the workflow on the specified cron:
from metaflow import FlowSpec, schedule, step
@schedule(hourly=True) # or daily=True, weekly=True, cron="0 6 * * ? *"
class DailyFlow(FlowSpec):
@step
def start(self):
...
python daily_flow.py temporal create daily_flow_worker.py
python daily_flow_worker.py # registers the schedule and starts listening
Run tags
Tags are attached to every run produced by the worker:
python my_flow.py temporal create --output my_flow_worker.py --tag team:ml --tag env:prod
Project namespace isolation
Flows decorated with @project get full namespace isolation between branches. Different branches
use separate task queues and store runs under the project-aware name
(<project>.<branch>.<FlowName>), so user, staging, and production deployments never collide.
from metaflow import FlowSpec, project, step
@project(name="myproject")
class TrainFlow(FlowSpec):
...
# User branch (default): myproject.user.alice.TrainFlow
python train_flow.py temporal create --output train_worker.py
# Named test branch: myproject.test.staging.TrainFlow
python train_flow.py temporal create --output train_worker.py --branch staging
# Production: myproject.prod.TrainFlow
python train_flow.py temporal create --output train_worker.py --production
Project tags (project:myproject, project_branch:prod) are automatically added to every run.
Workflow execution timeout
Cap how long a single workflow execution may run before Temporal cancels it:
python my_flow.py temporal create --output my_flow_worker.py --workflow-timeout 7200
Saga Pattern (Automatic Compensation)
Long-running flows that touch external systems (hotel bookings, cloud provisioning, payment processing) need a way to undo completed work when a later step fails. The Saga pattern solves this: each compensatable step declares a compensation handler that runs automatically, in reverse order (LIFO), if the workflow fails.
Import step from the extension instead of Metaflow. It's a drop-in replacement that adds
an optional .compensate decorator for declaring compensation handlers inline:
from metaflow import FlowSpec
from metaflow_extensions.temporal.plugins.temporal import step # shadows metaflow.step
class BookingFlow(FlowSpec):
@step
def start(self):
self.next(self.book_hotel)
@step
def book_hotel(self):
self.hotel_id = reserve_hotel() # side effect
self.next(self.book_flight)
@book_hotel.compensate
def cancel_hotel(self):
cancel_reservation(self.hotel_id) # self.hotel_id injected from forward step
@step
def book_flight(self):
self.flight_id = reserve_flight() # side effect
self.next(self.confirm)
@book_flight.compensate
def cancel_flight(self):
cancel_reservation(self.flight_id)
@step
def confirm(self):
raise RuntimeError("Payment declined") # triggers compensations
@step
def end(self):
pass
When confirm raises, the workflow automatically runs cancel_flight then cancel_hotel
(reverse order), then re-raises the original error.
API
- Each compensatable step gets at most one
.compensatehandler — enforced structurally. - The handler receives the forward step's persisted artifacts as
self.<attr>. - The handler must not call
self.next(). - Non-saga steps use
@stepidentically to plain Metaflow — no change needed. - Compensation failures are logged but do not cascade (best-effort execution).
Compensations are tracked for all step types: linear steps, foreach body tasks, and parallel split/join branches.
Resume a failed run
If a workflow fails partway through, resume it from where it left off — completed steps are skipped and only unfinished steps re-execute:
python my_flow.py temporal resume temporal-myflow-abc123
The run ID is printed when you trigger a run or shown in the Temporal UI. The resumed run reuses the same Metaflow run ID so all artifacts remain under the same pathspec.
Graceful shutdown
The worker handles SIGTERM and SIGINT cleanly — in-progress activities finish before the process exits:
kill -TERM <worker_pid> # or Ctrl-C
# Worker shutting down gracefully.
Namespace
Forward a Metaflow namespace to every step subprocess:
python my_flow.py temporal create --output my_flow_worker.py --namespace production
Auto-trigger on upstream completion
Decorate your flow with @trigger_on_finish and the worker will poll for completed upstream
runs and automatically trigger this flow when one finishes:
from metaflow import FlowSpec, trigger_on_finish, step
@trigger_on_finish(flow="UpstreamFlow")
class DownstreamFlow(FlowSpec):
...
How it works
metaflow-temporal compiles your flow's DAG into a self-contained worker file. Each Metaflow
step becomes a Temporal Activity (@activity.defn). A single MetaflowWorkflow (@workflow.defn)
interprets the CONFIG dict to orchestrate them.
The worker file:
- runs each step as a subprocess via the standard
metaflow stepCLI - passes
--input-pathscorrectly for joins and foreach splits - records artifact names in each activity result with a ready-to-use retrieval snippet
Temporal UI: workflow list
Multiple Metaflow flows show up as MetaflowWorkflow executions, each labelled by flow name:
Temporal UI: run timeline
Foreach fan-outs and split/join branches appear as parallel Activity runs in the timeline:
Temporal UI: artifact names
After each step completes, the activity result contains the artifact names and a one-liner to fetch values from the Metaflow datastore:
# Fetch any artifact value — copy the snippet from the Temporal UI
import metaflow
task = metaflow.Task("ArtifactFlow/temporal-abc123/transform/temporal-transform-0")
value = task.data.doubled
Development
git clone https://github.com/npow/metaflow-temporal.git
cd metaflow-temporal
pip install -e ".[dev]"
pytest -v
For integration tests against a real Temporal server (in-process worker):
docker compose up -d
pytest -v -m integration
For true end-to-end tests (generates real worker files, starts them as subprocesses):
docker compose up -d
pytest -v -m e2e --timeout=300
The Temporal UI is available at http://localhost:8080.
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file metaflow_temporal-0.4.1.tar.gz.
File metadata
- Download URL: metaflow_temporal-0.4.1.tar.gz
- Upload date:
- Size: 42.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c71d177abd01b9619607c77588691a6cf4b011efeaa005c4739f67d244e9dd9d
|
|
| MD5 |
f0090362c18e7f96bc525e3c4c2ea50a
|
|
| BLAKE2b-256 |
e618d68e2e642edfff16f0764225546992d91de0865aca3f31df348c0c0eb93a
|
Provenance
The following attestation bundles were made for metaflow_temporal-0.4.1.tar.gz:
Publisher:
publish.yml on npow/metaflow-temporal
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
metaflow_temporal-0.4.1.tar.gz -
Subject digest:
c71d177abd01b9619607c77588691a6cf4b011efeaa005c4739f67d244e9dd9d - Sigstore transparency entry: 1054580412
- Sigstore integration time:
-
Permalink:
npow/metaflow-temporal@86a34a00bc00b7a17251dd6923947d35448a1f45 -
Branch / Tag:
refs/tags/v0.4.1 - Owner: https://github.com/npow
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@86a34a00bc00b7a17251dd6923947d35448a1f45 -
Trigger Event:
push
-
Statement type:
File details
Details for the file metaflow_temporal-0.4.1-py3-none-any.whl.
File metadata
- Download URL: metaflow_temporal-0.4.1-py3-none-any.whl
- Upload date:
- Size: 36.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c245486afa539d94066513730e8b55ea8bf5ccc12756690b59358f5bde778c0f
|
|
| MD5 |
8e4212b4a1ac8acd57d4f6bd1ed12b3a
|
|
| BLAKE2b-256 |
61a9bc8fa677bd9008e0affb59d9465d0c02a2a0d6b53e0f4c070a92865455cb
|
Provenance
The following attestation bundles were made for metaflow_temporal-0.4.1-py3-none-any.whl:
Publisher:
publish.yml on npow/metaflow-temporal
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
metaflow_temporal-0.4.1-py3-none-any.whl -
Subject digest:
c245486afa539d94066513730e8b55ea8bf5ccc12756690b59358f5bde778c0f - Sigstore transparency entry: 1054580415
- Sigstore integration time:
-
Permalink:
npow/metaflow-temporal@86a34a00bc00b7a17251dd6923947d35448a1f45 -
Branch / Tag:
refs/tags/v0.4.1 - Owner: https://github.com/npow
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@86a34a00bc00b7a17251dd6923947d35448a1f45 -
Trigger Event:
push
-
Statement type: