Skip to main content

Run Metaflow flows as Kestra workflows

Project description

metaflow-kestra

CI E2E PyPI License: Apache-2.0 Python 3.10+

Deploy and run Metaflow flows as Kestra workflows.

metaflow-kestra compiles any Metaflow flow into a Kestra YAML workflow, letting you schedule, deploy, and monitor your pipelines through Kestra while keeping all your existing Metaflow code unchanged.

Install

pip install metaflow-kestra

Or from source:

git clone https://github.com/npow/metaflow-kestra.git
cd metaflow-kestra
pip install -e ".[dev]"

Quick start

python my_flow.py kestra run --kestra-host http://localhost:8080 --wait

Usage

Generate, deploy, and run

# Compile the flow to a Kestra YAML
python my_flow.py kestra compile flow.yaml

# Deploy to a running Kestra server
python my_flow.py kestra create --kestra-host http://localhost:8080

# Compile, deploy, and trigger in one step
python my_flow.py kestra run --kestra-host http://localhost:8080 --wait

# Resume a failed run (skips already-completed steps)
python my_flow.py kestra resume --clone-run-id kestra-<hex> \
    --kestra-host http://localhost:8080

All graph shapes are supported

# Linear
class SimpleFlow(FlowSpec):
    @step
    def start(self):
        self.value = 42
        self.next(self.end)
    @step
    def end(self): pass

# Split/join (branch)
class BranchFlow(FlowSpec):
    @step
    def start(self):
        self.data = [1, 2, 3]
        self.next(self.branch_a, self.branch_b)
    ...

# Foreach fan-out (body tasks run concurrently)
class ForeachFlow(FlowSpec):
    @step
    def start(self):
        self.items = [1, 2, 3]
        self.next(self.process, foreach="items")
    ...

Parametrised flows

metaflow.Parameter definitions become Kestra input fields automatically:

class ParamFlow(FlowSpec):
    greeting = Parameter("greeting", default="hello")
    count    = Parameter("count", default=3, type=int)
    ...
# Trigger with custom values via the CLI
python param_flow.py kestra run --wait

# Or trigger manually in the Kestra UI — inputs appear as typed form fields

Scheduling

@schedule maps to a Kestra cron trigger:

@schedule(cron="0 9 * * 1")   # every Monday at 9 AM
class WeeklyFlow(FlowSpec):
    ...

Nested foreach

Multi-level foreach fan-outs are fully supported — each nesting level maps to a nested Kestra ForEach task:

class NestedForeachFlow(FlowSpec):
    @step
    def start(self):
        self.models = ["a", "b"]
        self.next(self.train, foreach="models")

    @step
    def train(self):
        self.seeds = [1, 2, 3]
        self.next(self.run_seed, foreach="seeds")

    @step
    def run_seed(self): ...
    ...

Step decorator support

@retry is read from your flow and applied to the generated Kestra task automatically:

class MyFlow(FlowSpec):
    @retry(times=3, minutes_between_retries=2)
    @step
    def train(self):
        ...

Resource hints

@resources on a step forwards CPU, memory, and GPU hints to the underlying compute backend via --with=resources:cpu=N,memory=M,gpu=G:

class MyFlow(FlowSpec):
    @resources(cpu=4, memory=8000, gpu=1)
    @step
    def train(self):
        ...

Event-driven triggers (@trigger / @trigger_on_finish)

# Trigger this flow when a named Kestra event label fires
@trigger(event="data.ready")
class MyFlow(FlowSpec):
    ...
# Trigger this flow when UpstreamFlow completes in Kestra
@trigger_on_finish(flow="UpstreamFlow")
class DownstreamFlow(FlowSpec):
    ...

Both translate to Kestra Flow trigger entries in the generated YAML.

Configuration

Metadata service and datastore

metaflow-kestra bakes the active metadata and datastore backends into the generated YAML at compile time, so every step subprocess uses the same backend. To use a specific backend:

python my_flow.py \
  --metadata=service \
  --datastore=s3 \
  kestra compile flow.yaml

Or via environment variables:

export METAFLOW_DEFAULT_METADATA=service
export METAFLOW_DEFAULT_DATASTORE=s3
python my_flow.py kestra compile flow.yaml

Authentication

python my_flow.py kestra run \
  --kestra-host http://kestra.internal:8080 \
  --kestra-user admin@example.com \
  --kestra-password secret \
  --wait

How it works

metaflow-kestra walks your Metaflow flow's DAG and emits a Kestra YAML. Each Metaflow step becomes a io.kestra.plugin.scripts.python.Script task. The generated YAML:

  • runs a metaflow_init task first to create the _parameters artifact and assign a stable run ID
  • runs each step as a subprocess via the standard metaflow step CLI
  • passes --input-paths correctly for joins and foreach splits
  • wraps split branches in a Parallel task so they execute concurrently
  • wraps foreach body tasks in a ForEach task
  • maps @retry to Kestra task retry configuration
  • maps @schedule to a Kestra cron trigger
  • writes Metaflow artifact names and a ready-to-use retrieval snippet to Kestra task outputs after each step

Kestra UI: execution topology and timeline

The generated flow preserves the Metaflow DAG structure. Branch and foreach fan-outs appear as nested parallel task groups in the topology view and run concurrently in the Gantt timeline:

Branch flow execution topology

Foreach flow Gantt timeline

Kestra UI: Metaflow artifact outputs

After each step completes, two extra output variables are posted to the Kestra task:

Variable Content
metaflow_artifacts Comma-separated list of artifact names produced by the step
metaflow_snippet Ready-to-paste Python code to load those artifacts via the Metaflow client

metaflow_snippet showing Python accessor code

metaflow_artifacts listing artifact names

Supported constructs

Construct Kestra mapping
Linear steps Sequential Script tasks
self.next(a, b) split Parallel task wrapping branch tasks
@step with inputs (join) Sequential task after Parallel completes
self.next(step, foreach=items) ForEach task
Nested foreach (multi-level) Nested ForEach tasks
Parameter Kestra inputs with type mapping (INT, STRING, FLOAT, BOOLEAN)
@schedule(cron=...) Kestra Schedule trigger
@retry Kestra task retry configuration
@resources(cpu=N, memory=M, gpu=G) --with=resources:... forwarded to compute backend
@trigger(event=...) Kestra Flow trigger with event-label condition
@trigger_on_finish(flow=...) Kestra Flow trigger on upstream flow completion
kestra resume --clone-run-id <id> Resume failed run; skips steps that already succeeded

Development

git clone https://github.com/npow/metaflow-kestra.git
cd metaflow-kestra
pip install -e ".[dev]"

# Start a local Kestra instance (requires Docker)
docker compose up -d

# Run the integration test suite
KESTRA_HOST=http://localhost:8090 \
python -m pytest tests/test_e2e.py -m integration -v

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

metaflow_kestra-0.3.4.tar.gz (35.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

metaflow_kestra-0.3.4-py3-none-any.whl (26.2 kB view details)

Uploaded Python 3

File details

Details for the file metaflow_kestra-0.3.4.tar.gz.

File metadata

  • Download URL: metaflow_kestra-0.3.4.tar.gz
  • Upload date:
  • Size: 35.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for metaflow_kestra-0.3.4.tar.gz
Algorithm Hash digest
SHA256 7090a25966a931e0453439c8a69dc083b9e00f62268e7a5da4e0d77aa729b9b4
MD5 9c317b2ad31cc1baf610466f7cc5dafb
BLAKE2b-256 5f3bb22855d73b039a284526e263c1f95f034fbd4ced7293df026d94eb4f5fa8

See more details on using hashes here.

Provenance

The following attestation bundles were made for metaflow_kestra-0.3.4.tar.gz:

Publisher: publish.yml on npow/metaflow-kestra

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file metaflow_kestra-0.3.4-py3-none-any.whl.

File metadata

File hashes

Hashes for metaflow_kestra-0.3.4-py3-none-any.whl
Algorithm Hash digest
SHA256 5e7ad446c9815ae3716e57b952fd74d55c48c802d2a81104504b6fb5f427c719
MD5 8a43dd671907bec066ea021f8d6af3d5
BLAKE2b-256 0464ac5eb8115bf79d55880bda89fbcf0354a67120cc3aa14e0a080811e2396b

See more details on using hashes here.

Provenance

The following attestation bundles were made for metaflow_kestra-0.3.4-py3-none-any.whl:

Publisher: publish.yml on npow/metaflow-kestra

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page