MCP server to convert Apache Airflow DAGs to Prefect flows
Project description
airflow-unfactor 🛫➡️🌊
"Airflow is for airports. Welcome to modern orchestration."
An MCP server that refactors Apache Airflow DAG code into Prefect flow code with AI assistance. Built with FastMCP.
Features
- 🔄 Refactor-Focused — Translates Airflow DAG patterns into maintainable Prefect flow patterns
- 📚 Educational — Comments explain why Prefect does it better
- ✅ Test Generation — Every converted flow comes with pytest tests
- 🤖 AI-Assisted — Smart analysis of complex DAG patterns
- 📦 Batch Support — Refactor entire DAG projects at once
- 🧙 Visual Wizard — Step-by-step UI for guided migrations
Installation
# Using uv (recommended)
uv pip install airflow-unfactor
# Using pip
pip install airflow-unfactor
# From source (for development)
git clone https://github.com/gabcoyne/airflow-unfactor.git
cd airflow-unfactor
uv pip install -e ".[dev]"
Quick Start
Use with Claude Desktop
Add to your Claude Desktop config (~/Library/Application Support/Claude/claude_desktop_config.json):
{
"mcpServers": {
"airflow-unfactor": {
"command": "uvx",
"args": ["airflow-unfactor"]
}
}
}
Then ask Claude:
"Refactor the DAG in
dags/my_etl.pyinto a Prefect flow"
Use with Cursor
Add to your Cursor MCP config:
{
"mcpServers": {
"airflow-unfactor": {
"command": "uvx",
"args": ["airflow-unfactor"]
}
}
}
Run Standalone
# Start the MCP server
airflow-unfactor
# Or with uv
uvx airflow-unfactor
Visual Wizard
For a guided, step-by-step migration experience:
# Install with UI support
pip install airflow-unfactor[ui]
# Start the wizard
airflow-unfactor --ui
# Open in browser
open http://localhost:8765
The wizard provides:
- Visual DAG analysis with complexity scoring
- Live preview of converted flows
- Validation with confidence scores
- Complete project export as ZIP
MCP Tools
| Tool | Description |
|---|---|
analyze |
Analyze a DAG's structure, operators, and complexity |
convert |
Refactor a DAG into a Prefect flow with tests |
validate |
Verify refactoring maintains behavioral equivalence |
explain |
Learn Airflow concepts and Prefect equivalents |
batch |
Refactor multiple DAGs at once |
scaffold |
Generate a complete Prefect project from DAG directory |
Validating Conversions
The validate tool compares your original DAG with the converted flow to ensure behavioral equivalence:
# Via MCP
result = await validate(
original_dag="path/to/dag.py",
converted_flow="path/to/flow.py"
)
# Returns JSON with:
# - is_valid: Overall pass/fail
# - task_count_match: Whether task counts match
# - dependency_preserved: Whether dependencies are preserved
# - confidence_score: 0-100 confidence rating
# - issues: Specific mismatches found
The validator:
- Extracts task graphs from both files
- Ignores DummyOperator/EmptyOperator (not needed in Prefect)
- Detects XCom patterns and verifies they're converted to return values
- Reports actionable issues when mismatches are found
Recommended Target Layout (New Prefect Project)
When refactoring Airflow DAGs into a new codebase, keep generated output organized instead of dropping files into a single folder.
Suggested structure (inspired by prefecthq/flows style):
your-project/
flows/ # Prefect flow entrypoints
tasks/ # Reusable task functions
deployments/ # deployment.yaml / deployment scripts
infrastructure/ # work pool / worker config helpers
tests/
flows/
tasks/
migration/
airflow_sources/ # original DAGs kept read-only for reference
conversion_notes/ # runbook outputs and manual TODOs
This helps keep migration work auditable and reduces long-term repo clutter.
Example
Airflow DAG:
from airflow import DAG
from airflow.operators.python import PythonOperator
def extract():
return {"users": [1, 2, 3]}
def transform(ti):
data = ti.xcom_pull(task_ids="extract")
return [u * 2 for u in data["users"]]
with DAG("my_etl", ...) as dag:
t1 = PythonOperator(task_id="extract", python_callable=extract)
t2 = PythonOperator(task_id="transform", python_callable=transform)
t1 >> t2
Converted Prefect Flow:
from prefect import flow, task
# ✨ Prefect Advantage: Direct Data Passing
# No XCom push/pull - data flows in-memory between tasks.
@task
def extract():
return {"users": [1, 2, 3]}
@task
def transform(data):
return [u * 2 for u in data["users"]]
@flow(name="my_etl")
def my_etl():
data = extract() # Direct return
result = transform(data) # Direct parameter
return result
Plus generated tests! See Testing for details.
Documentation
Full documentation: gabcoyne.github.io/airflow-unfactor
Development
# Clone and install
git clone https://github.com/gabcoyne/airflow-unfactor.git
cd airflow-unfactor
uv pip install -e ".[dev]"
# Run tests
pytest
# Task tracking with Beads
bd ready
Contributing
Contributions welcome! See CONTRIBUTING.md.
License
MIT - see LICENSE.
Made with 💙 by Prefect
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file airflow_unfactor-0.1.1.tar.gz.
File metadata
- Download URL: airflow_unfactor-0.1.1.tar.gz
- Upload date:
- Size: 744.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
edc8ff195342ac2b1279a71ee689e45fefc7e9109dee8bd389d4d85cf8ed4a50
|
|
| MD5 |
851788a04d58fbb21e2fb9b1a4a7d94a
|
|
| BLAKE2b-256 |
c312f1e7a5906de67a594847288861681f284d3455a52e2c48cba0020914e6d1
|
Provenance
The following attestation bundles were made for airflow_unfactor-0.1.1.tar.gz:
Publisher:
release.yml on gabcoyne/airflow-unfactor
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
airflow_unfactor-0.1.1.tar.gz -
Subject digest:
edc8ff195342ac2b1279a71ee689e45fefc7e9109dee8bd389d4d85cf8ed4a50 - Sigstore transparency entry: 957833715
- Sigstore integration time:
-
Permalink:
gabcoyne/airflow-unfactor@e7847e05815dfaab134b04e18c81542c1a379c03 -
Branch / Tag:
refs/tags/v0.1.1 - Owner: https://github.com/gabcoyne
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@e7847e05815dfaab134b04e18c81542c1a379c03 -
Trigger Event:
push
-
Statement type:
File details
Details for the file airflow_unfactor-0.1.1-py3-none-any.whl.
File metadata
- Download URL: airflow_unfactor-0.1.1-py3-none-any.whl
- Upload date:
- Size: 235.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c40ced4b67d4eb0dc277d23e851f203911b07e66474d405e68c8cd3fc2726a3a
|
|
| MD5 |
1e3db02f76e9b8155172a4bbb855cd36
|
|
| BLAKE2b-256 |
f321d87cf7c24219f8fa48a1a54910509c5714cf3409eddb3ce606d4440299f1
|
Provenance
The following attestation bundles were made for airflow_unfactor-0.1.1-py3-none-any.whl:
Publisher:
release.yml on gabcoyne/airflow-unfactor
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
airflow_unfactor-0.1.1-py3-none-any.whl -
Subject digest:
c40ced4b67d4eb0dc277d23e851f203911b07e66474d405e68c8cd3fc2726a3a - Sigstore transparency entry: 957833739
- Sigstore integration time:
-
Permalink:
gabcoyne/airflow-unfactor@e7847e05815dfaab134b04e18c81542c1a379c03 -
Branch / Tag:
refs/tags/v0.1.1 - Owner: https://github.com/gabcoyne
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@e7847e05815dfaab134b04e18c81542c1a379c03 -
Trigger Event:
push
-
Statement type: