Skip to main content

MCP server to convert Apache Airflow DAGs to Prefect flows

Project description

airflow-unfactor 🛫➡️🌊

"Airflow is for airports. Welcome to modern orchestration."

Tests PyPI License

An MCP server that refactors Apache Airflow DAG code into Prefect flow code with AI assistance. Built with FastMCP.

Features

  • 🔄 Refactor-Focused — Translates Airflow DAG patterns into maintainable Prefect flow patterns
  • 📚 Educational — Comments explain why Prefect does it better
  • Test Generation — Every converted flow comes with pytest tests
  • 🤖 AI-Assisted — Smart analysis of complex DAG patterns
  • 📦 Batch Support — Refactor entire DAG projects at once
  • 🧙 Visual Wizard — Step-by-step UI for guided migrations

Installation

# Using uv (recommended)
uv pip install airflow-unfactor

# Using pip
pip install airflow-unfactor

# From source (for development)
git clone https://github.com/gabcoyne/airflow-unfactor.git
cd airflow-unfactor
uv pip install -e ".[dev]"

Quick Start

Use with Claude Desktop

Add to your Claude Desktop config (~/Library/Application Support/Claude/claude_desktop_config.json):

{
  "mcpServers": {
    "airflow-unfactor": {
      "command": "uvx",
      "args": ["airflow-unfactor"]
    }
  }
}

Then ask Claude:

"Refactor the DAG in dags/my_etl.py into a Prefect flow"

Use with Cursor

Add to your Cursor MCP config:

{
  "mcpServers": {
    "airflow-unfactor": {
      "command": "uvx",
      "args": ["airflow-unfactor"]
    }
  }
}

Run Standalone

# Start the MCP server
airflow-unfactor

# Or with uv
uvx airflow-unfactor

Visual Wizard

For a guided, step-by-step migration experience:

# Install with UI support
pip install airflow-unfactor[ui]

# Start the wizard
airflow-unfactor --ui

# Open in browser
open http://localhost:8765

The wizard provides:

  • Visual DAG analysis with complexity scoring
  • Live preview of converted flows
  • Validation with confidence scores
  • Complete project export as ZIP

MCP Tools

Tool Description
analyze Analyze a DAG's structure, operators, and complexity
convert Refactor a DAG into a Prefect flow with tests
validate Verify refactoring maintains behavioral equivalence
explain Learn Airflow concepts and Prefect equivalents
batch Refactor multiple DAGs at once
scaffold Generate a complete Prefect project from DAG directory

Validating Conversions

The validate tool compares your original DAG with the converted flow to ensure behavioral equivalence:

# Via MCP
result = await validate(
    original_dag="path/to/dag.py",
    converted_flow="path/to/flow.py"
)

# Returns JSON with:
# - is_valid: Overall pass/fail
# - task_count_match: Whether task counts match
# - dependency_preserved: Whether dependencies are preserved
# - confidence_score: 0-100 confidence rating
# - issues: Specific mismatches found

The validator:

  • Extracts task graphs from both files
  • Ignores DummyOperator/EmptyOperator (not needed in Prefect)
  • Detects XCom patterns and verifies they're converted to return values
  • Reports actionable issues when mismatches are found

Recommended Target Layout (New Prefect Project)

When refactoring Airflow DAGs into a new codebase, keep generated output organized instead of dropping files into a single folder.

Suggested structure (inspired by prefecthq/flows style):

your-project/
  flows/                  # Prefect flow entrypoints
  tasks/                  # Reusable task functions
  deployments/            # deployment.yaml / deployment scripts
  infrastructure/         # work pool / worker config helpers
  tests/
    flows/
    tasks/
  migration/
    airflow_sources/      # original DAGs kept read-only for reference
    conversion_notes/     # runbook outputs and manual TODOs

This helps keep migration work auditable and reduces long-term repo clutter.

Example

Airflow DAG:

from airflow import DAG
from airflow.operators.python import PythonOperator

def extract():
    return {"users": [1, 2, 3]}

def transform(ti):
    data = ti.xcom_pull(task_ids="extract")
    return [u * 2 for u in data["users"]]

with DAG("my_etl", ...) as dag:
    t1 = PythonOperator(task_id="extract", python_callable=extract)
    t2 = PythonOperator(task_id="transform", python_callable=transform)
    t1 >> t2

Converted Prefect Flow:

from prefect import flow, task

# ✨ Prefect Advantage: Direct Data Passing
# No XCom push/pull - data flows in-memory between tasks.

@task
def extract():
    return {"users": [1, 2, 3]}

@task
def transform(data):
    return [u * 2 for u in data["users"]]

@flow(name="my_etl")
def my_etl():
    data = extract()  # Direct return
    result = transform(data)  # Direct parameter
    return result

Plus generated tests! See Testing for details.

Documentation

Full documentation: gabcoyne.github.io/airflow-unfactor

Development

# Clone and install
git clone https://github.com/gabcoyne/airflow-unfactor.git
cd airflow-unfactor
uv pip install -e ".[dev]"

# Run tests
pytest

# Task tracking with Beads
bd ready

Contributing

Contributions welcome! See CONTRIBUTING.md.

License

MIT - see LICENSE.


Made with 💙 by Prefect

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_unfactor-0.1.1.tar.gz (744.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_unfactor-0.1.1-py3-none-any.whl (235.8 kB view details)

Uploaded Python 3

File details

Details for the file airflow_unfactor-0.1.1.tar.gz.

File metadata

  • Download URL: airflow_unfactor-0.1.1.tar.gz
  • Upload date:
  • Size: 744.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for airflow_unfactor-0.1.1.tar.gz
Algorithm Hash digest
SHA256 edc8ff195342ac2b1279a71ee689e45fefc7e9109dee8bd389d4d85cf8ed4a50
MD5 851788a04d58fbb21e2fb9b1a4a7d94a
BLAKE2b-256 c312f1e7a5906de67a594847288861681f284d3455a52e2c48cba0020914e6d1

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_unfactor-0.1.1.tar.gz:

Publisher: release.yml on gabcoyne/airflow-unfactor

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file airflow_unfactor-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_unfactor-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 c40ced4b67d4eb0dc277d23e851f203911b07e66474d405e68c8cd3fc2726a3a
MD5 1e3db02f76e9b8155172a4bbb855cd36
BLAKE2b-256 f321d87cf7c24219f8fa48a1a54910509c5714cf3409eddb3ce606d4440299f1

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_unfactor-0.1.1-py3-none-any.whl:

Publisher: release.yml on gabcoyne/airflow-unfactor

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page