Skip to main content

Run Metaflow flows as Kestra workflows

Project description

metaflow-kestra

CI E2E PyPI License: Apache-2.0 Python 3.10+

Deploy and run Metaflow flows as Kestra workflows.

metaflow-kestra compiles any Metaflow flow into a Kestra YAML workflow, letting you schedule, deploy, and monitor your pipelines through Kestra while keeping all your existing Metaflow code unchanged.

Install

pip install metaflow-kestra

Or from source:

git clone https://github.com/npow/metaflow-kestra.git
cd metaflow-kestra
pip install -e ".[dev]"

Quick start

python my_flow.py kestra run --kestra-host http://localhost:8080 --wait

Usage

Generate, deploy, and run

# Compile the flow to a Kestra YAML
python my_flow.py kestra compile flow.yaml

# Deploy to a running Kestra server
python my_flow.py kestra create --kestra-host http://localhost:8080

# Compile, deploy, and trigger in one step
python my_flow.py kestra run --kestra-host http://localhost:8080 --wait

All graph shapes are supported

# Linear
class SimpleFlow(FlowSpec):
    @step
    def start(self):
        self.value = 42
        self.next(self.end)
    @step
    def end(self): pass

# Split/join (branch)
class BranchFlow(FlowSpec):
    @step
    def start(self):
        self.data = [1, 2, 3]
        self.next(self.branch_a, self.branch_b)
    ...

# Foreach fan-out (body tasks run concurrently)
class ForeachFlow(FlowSpec):
    @step
    def start(self):
        self.items = [1, 2, 3]
        self.next(self.process, foreach="items")
    ...

Parametrised flows

metaflow.Parameter definitions become Kestra input fields automatically:

class ParamFlow(FlowSpec):
    greeting = Parameter("greeting", default="hello")
    count    = Parameter("count", default=3, type=int)
    ...
# Trigger with custom values via the CLI
python param_flow.py kestra run --wait

# Or trigger manually in the Kestra UI — inputs appear as typed form fields

Scheduling

@schedule maps to a Kestra cron trigger:

@schedule(cron="0 9 * * 1")   # every Monday at 9 AM
class WeeklyFlow(FlowSpec):
    ...

Step decorator support

@retry is read from your flow and applied to the generated Kestra task automatically:

class MyFlow(FlowSpec):
    @retry(times=3, minutes_between_retries=2)
    @step
    def train(self):
        ...

Configuration

Metadata service and datastore

metaflow-kestra bakes the active metadata and datastore backends into the generated YAML at compile time, so every step subprocess uses the same backend. To use a specific backend:

python my_flow.py \
  --metadata=service \
  --datastore=s3 \
  kestra compile flow.yaml

Or via environment variables:

export METAFLOW_DEFAULT_METADATA=service
export METAFLOW_DEFAULT_DATASTORE=s3
python my_flow.py kestra compile flow.yaml

Authentication

python my_flow.py kestra run \
  --kestra-host http://kestra.internal:8080 \
  --kestra-user admin@example.com \
  --kestra-password secret \
  --wait

How it works

metaflow-kestra walks your Metaflow flow's DAG and emits a Kestra YAML. Each Metaflow step becomes a io.kestra.plugin.scripts.python.Script task. The generated YAML:

  • runs a metaflow_init task first to create the _parameters artifact and assign a stable run ID
  • runs each step as a subprocess via the standard metaflow step CLI
  • passes --input-paths correctly for joins and foreach splits
  • wraps split branches in a Parallel task so they execute concurrently
  • wraps foreach body tasks in a ForEach task
  • maps @retry to Kestra task retry configuration
  • maps @schedule to a Kestra cron trigger
  • writes Metaflow artifact names and a ready-to-use retrieval snippet to Kestra task outputs after each step

Kestra UI: execution topology and timeline

The generated flow preserves the Metaflow DAG structure. Branch and foreach fan-outs appear as nested parallel task groups in the topology view and run concurrently in the Gantt timeline:

Branch flow execution topology

Foreach flow Gantt timeline

Kestra UI: Metaflow artifact outputs

After each step completes, two extra output variables are posted to the Kestra task:

Variable Content
metaflow_artifacts Comma-separated list of artifact names produced by the step
metaflow_snippet Ready-to-paste Python code to load those artifacts via the Metaflow client

metaflow_snippet showing Python accessor code

metaflow_artifacts listing artifact names

Supported constructs

Construct Kestra mapping
Linear steps Sequential Script tasks
self.next(a, b) split Parallel task wrapping branch tasks
@step with inputs (join) Sequential task after Parallel completes
self.next(step, foreach=items) ForEach task
Parameter Kestra inputs with type mapping (INT, STRING, FLOAT, BOOLEAN)
@schedule(cron=...) Kestra Schedule trigger
@retry Kestra task retry configuration

Development

git clone https://github.com/npow/metaflow-kestra.git
cd metaflow-kestra
pip install -e ".[dev]"

# Start a local Kestra instance (requires Docker)
docker compose up -d

# Run the integration test suite
KESTRA_HOST=http://localhost:8090 \
python -m pytest tests/test_e2e.py -m integration -v

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

metaflow_kestra-0.3.3.tar.gz (34.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

metaflow_kestra-0.3.3-py3-none-any.whl (25.1 kB view details)

Uploaded Python 3

File details

Details for the file metaflow_kestra-0.3.3.tar.gz.

File metadata

  • Download URL: metaflow_kestra-0.3.3.tar.gz
  • Upload date:
  • Size: 34.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for metaflow_kestra-0.3.3.tar.gz
Algorithm Hash digest
SHA256 e144fbf3a763d2ec9a2777e3c840e4f4ae6381ceeda251fb86dfcda0b92d19c4
MD5 6ea5bdb22ef84effeeb5ec75b1c98557
BLAKE2b-256 3a98440fa432846b49594bbe9765af3ac11bd86d83114b027eb727e1a8862d0e

See more details on using hashes here.

Provenance

The following attestation bundles were made for metaflow_kestra-0.3.3.tar.gz:

Publisher: publish.yml on npow/metaflow-kestra

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file metaflow_kestra-0.3.3-py3-none-any.whl.

File metadata

File hashes

Hashes for metaflow_kestra-0.3.3-py3-none-any.whl
Algorithm Hash digest
SHA256 2a8b5921311735568b10d7cc852d091fbd2a521f58e4e7cbabc065151eb18f85
MD5 5e34db9c9eda79218452c3572c8d10fa
BLAKE2b-256 dcbf4a4fd52cd24688b6432f9e6245885bc6d964ab02a64cfda3163ee40f69c9

See more details on using hashes here.

Provenance

The following attestation bundles were made for metaflow_kestra-0.3.3-py3-none-any.whl:

Publisher: publish.yml on npow/metaflow-kestra

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page