Skip to main content

Run Metaflow flows as Temporal workflows

Project description

metaflow-temporal

CI PyPI License: Apache-2.0 Python 3.10+

Run Metaflow flows as Temporal workflows.

metaflow-temporal generates a self-contained Temporal worker file from any Metaflow flow, letting you schedule, deploy, and monitor your pipelines through Temporal while keeping all your existing Metaflow code unchanged.

Install

pip install metaflow-temporal

Or from source:

git clone https://github.com/npow/metaflow-temporal.git
cd metaflow-temporal
pip install -e ".[dev]"

Quick start

python my_flow.py temporal create --output my_flow_worker.py
python my_flow_worker.py          # start the worker
python my_flow_worker.py trigger  # trigger a run

Usage

Generate and run a worker

python my_flow.py temporal create --output my_flow_worker.py
python my_flow_worker.py

All graph shapes are supported

# Linear
class SimpleFlow(FlowSpec):
    @step
    def start(self):
        self.value = 42
        self.next(self.end)
    @step
    def end(self): pass

# Split/join (branch)
class BranchFlow(FlowSpec):
    @step
    def start(self):
        self.next(self.branch_a, self.branch_b)
    ...

# Foreach fan-out
class ForeachFlow(FlowSpec):
    @step
    def start(self):
        self.items = ["alpha", "beta", "gamma"]
        self.next(self.process, foreach="items")
    ...

Parametrised flows

Parameters defined with metaflow.Parameter are forwarded automatically:

python param_flow.py temporal create --output param_flow_worker.py
python param_flow_worker.py trigger greeting=world

Configuration

Metadata service and datastore

By default, metaflow-temporal uses whatever metadata and datastore backends are active in your Metaflow environment. The generated worker file bakes in metadata_type and datastore_type at creation time so every step subprocess uses the same backend.

To use a remote metadata service or object store, configure them before running temporal create:

# Remote metadata service + S3 datastore
python my_flow.py \
  --metadata=service \
  --datastore=s3 \
  temporal create my_flow_worker.py

Or via environment variables (applied to all flows):

export METAFLOW_DEFAULT_METADATA=service
export METAFLOW_DEFAULT_DATASTORE=s3
python my_flow.py temporal create my_flow_worker.py

Compute backends and step decorators

Step decorators like @kubernetes, @batch, @conda, and third-party backends such as @sandbox, @daytona, and @e2b are automatically forwarded to each step subprocess at runtime. No extra configuration is needed — decorate your steps as usual:

from metaflow import FlowSpec, step, kubernetes

class MyFlow(FlowSpec):
    @kubernetes(cpu=4, memory=8000)
    @step
    def train(self):
        ...

To inject a decorator at deploy time without modifying the flow source:

python my_flow.py --with=sandbox temporal create my_flow_worker.py

Scheduling

Decorate your flow with @schedule and the worker automatically registers a Temporal Schedule when it starts. The schedule fires the workflow on the specified cron:

from metaflow import FlowSpec, schedule, step

@schedule(hourly=True)      # or daily=True, weekly=True, cron="0 6 * * ? *"
class DailyFlow(FlowSpec):
    @step
    def start(self):
        ...
python daily_flow.py temporal create daily_flow_worker.py
python daily_flow_worker.py  # registers the schedule and starts listening

Run tags

Tags are attached to every run produced by the worker:

python my_flow.py temporal create --output my_flow_worker.py --tag team:ml --tag env:prod

Project namespace isolation

Flows decorated with @project get full namespace isolation between branches. Different branches use separate task queues and store runs under the project-aware name (<project>.<branch>.<FlowName>), so user, staging, and production deployments never collide.

from metaflow import FlowSpec, project, step

@project(name="myproject")
class TrainFlow(FlowSpec):
    ...
# User branch (default): myproject.user.alice.TrainFlow
python train_flow.py temporal create --output train_worker.py

# Named test branch: myproject.test.staging.TrainFlow
python train_flow.py temporal create --output train_worker.py --branch staging

# Production: myproject.prod.TrainFlow
python train_flow.py temporal create --output train_worker.py --production

Project tags (project:myproject, project_branch:prod) are automatically added to every run.

Workflow execution timeout

Cap how long a single workflow execution may run before Temporal cancels it:

python my_flow.py temporal create --output my_flow_worker.py --workflow-timeout 7200

Saga Pattern (Automatic Compensation)

Long-running flows that touch external systems (hotel bookings, cloud provisioning, payment processing) need a way to undo completed work when a later step fails. The Saga pattern solves this: each compensatable step declares a compensation handler that runs automatically, in reverse order (LIFO), if the workflow fails.

Import step from the extension instead of Metaflow. It's a drop-in replacement that adds an optional .compensate decorator for declaring compensation handlers inline:

from metaflow import FlowSpec
from metaflow_extensions.temporal.plugins.temporal import step  # shadows metaflow.step

class BookingFlow(FlowSpec):

    @step
    def start(self):
        self.next(self.book_hotel)

    @step
    def book_hotel(self):
        self.hotel_id = reserve_hotel()   # side effect
        self.next(self.book_flight)

    @book_hotel.compensate
    def cancel_hotel(self):
        cancel_reservation(self.hotel_id) # self.hotel_id injected from forward step

    @step
    def book_flight(self):
        self.flight_id = reserve_flight() # side effect
        self.next(self.confirm)

    @book_flight.compensate
    def cancel_flight(self):
        cancel_reservation(self.flight_id)

    @step
    def confirm(self):
        raise RuntimeError("Payment declined")  # triggers compensations

    @step
    def end(self):
        pass

When confirm raises, the workflow automatically runs cancel_flight then cancel_hotel (reverse order), then re-raises the original error.

API

  • Each compensatable step gets at most one .compensate handler — enforced structurally.
  • The handler receives the forward step's persisted artifacts as self.<attr>.
  • The handler must not call self.next().
  • Non-saga steps use @step identically to plain Metaflow — no change needed.
  • Compensation failures are logged but do not cascade (best-effort execution).

Limitations (v1)

  • Compensations are tracked only for steps executed via the main linear path (_execute_node). Steps inside foreach bodies and parallel split branches are not compensated.

How it works

metaflow-temporal compiles your flow's DAG into a self-contained worker file. Each Metaflow step becomes a Temporal Activity (@activity.defn). A single MetaflowWorkflow (@workflow.defn) interprets the CONFIG dict to orchestrate them.

The worker file:

  • runs each step as a subprocess via the standard metaflow step CLI
  • passes --input-paths correctly for joins and foreach splits
  • records artifact names in each activity result with a ready-to-use retrieval snippet

Temporal UI: workflow list

Multiple Metaflow flows show up as MetaflowWorkflow executions, each labelled by flow name:

Workflow list showing multiple Metaflow flows

Temporal UI: run timeline

Foreach fan-outs and split/join branches appear as parallel Activity runs in the timeline:

Run timeline showing parallel foreach and branch activities

Temporal UI: artifact names

After each step completes, the activity result contains the artifact names and a one-liner to fetch values from the Metaflow datastore:

Activity result showing artifact names and fetch snippet

# Fetch any artifact value — copy the snippet from the Temporal UI
import metaflow
task = metaflow.Task("ArtifactFlow/temporal-abc123/transform/temporal-transform-0")
value = task.data.doubled

Development

git clone https://github.com/npow/metaflow-temporal.git
cd metaflow-temporal
pip install -e ".[dev]"
pytest -v

For Tier 2 tests against a real Temporal server:

docker compose up -d
pytest -v -m integration

The Temporal UI is available at http://localhost:8080.

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

metaflow_temporal-0.3.1.tar.gz (37.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

metaflow_temporal-0.3.1-py3-none-any.whl (33.2 kB view details)

Uploaded Python 3

File details

Details for the file metaflow_temporal-0.3.1.tar.gz.

File metadata

  • Download URL: metaflow_temporal-0.3.1.tar.gz
  • Upload date:
  • Size: 37.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for metaflow_temporal-0.3.1.tar.gz
Algorithm Hash digest
SHA256 bd9cac53feea9f2b50b2c52801168bccb2a54461ef26d4dfe9bec6bf451f8975
MD5 73fa716dc1524372acd5e44758613f00
BLAKE2b-256 cc6eb5024ce4be8ef266bd2902cc8bd4fe005160072d0e0ad85786721428c0e5

See more details on using hashes here.

Provenance

The following attestation bundles were made for metaflow_temporal-0.3.1.tar.gz:

Publisher: publish.yml on npow/metaflow-temporal

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file metaflow_temporal-0.3.1-py3-none-any.whl.

File metadata

File hashes

Hashes for metaflow_temporal-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 ddb09c6f966020d79fb83251b623fc09668230c10d05592424e06e764e2989a2
MD5 b1a12d05fe28f6c3426c377fdb753384
BLAKE2b-256 337e749715820aa42c98d79335729a17b9748d4ee68188e97d22612921ce6218

See more details on using hashes here.

Provenance

The following attestation bundles were made for metaflow_temporal-0.3.1-py3-none-any.whl:

Publisher: publish.yml on npow/metaflow-temporal

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page