Open-source Airflow DAG generator for SQLMesh projects
Project description
SQLMesh DAG Generator
Generate Apache Airflow DAGs from SQLMesh projects - no cloud dependencies required.
Transform your SQLMesh models into production-ready Airflow DAGs with full data lineage, automatically!
โจ Key Features
- ๐ฅ Dynamic DAG Generation (Default): Fire-and-forget - place DAG once, auto-discovers models at runtime
- ๐ Auto-Scheduling: Automatically detects DAG schedule from SQLMesh model intervals - no manual configuration!
- ๐ Runtime Connection Parametrization: Pass database credentials via Airflow Connections - no hardcoded secrets!
- โ Full Lineage in Airflow: Each SQLMesh model = One Airflow task with proper dependencies
- ๐ Multi-Environment Support: Use Airflow Variables + SQLMesh gateways for dev/staging/prod
- โก Incremental Models: Proper handling with
data_interval_start/end - ๐ฏ Enhanced Error Handling: SQLMesh-specific error messages in Airflow logs
- ๐ ๏ธ Dual Mode: Dynamic (auto-discovery, default) or Static (full control)
- ๐ซ No Vendor Lock-in: Open source, no cloud dependencies
๐ข Enterprise Features (NEW in v0.8.0)
- ๐ Callbacks:
on_failure_callback,on_success_callback,sla_miss_callbackfor alerting - ๐ท๏ธ Tag-Based Filtering:
include_tags,exclude_tagsfor Data Mesh team-specific DAGs - ๐ฑ Pool Configuration:
pool,pool_slotsfor resource management - โฉ Trigger Downstream DAGs:
trigger_dag_idfor ML pipeline integration - ๐ฏ Pattern Filtering:
model_patternfor regex-based model selection
โ ๏ธ Important: Gateway vs Environment
SQLMesh uses "gateways" to switch between environments, NOT an "environment" parameter.
# โ WRONG - environment parameter is for SQLMesh virtual environments (testing)
generator = SQLMeshDAGGenerator(
sqlmesh_project_path="/path/to/project",
environment="prod", # ERROR: Environment 'prod' was not found
)
# โ
CORRECT - Use gateway to switch between dev/staging/prod
generator = SQLMeshDAGGenerator(
sqlmesh_project_path="/path/to/project",
gateway="prod", # This is how you select your environment!
# environment defaults to "" (empty string) - perfect for production
)
๐ Read Understanding SQLMesh Environments for the complete explanation of the difference between gateway and environment.
๐ Quick Start (3 Steps)
1. Install
pip install sqlmesh-dag-generator # (when published)
# OR
git clone <repo> && cd SQLMeshDAGGenerator && pip install -e .
2. Generate DAG (Dynamic Mode - Default!)
from sqlmesh_dag_generator import SQLMeshDAGGenerator
# Point to your SQLMesh project
generator = SQLMeshDAGGenerator(
sqlmesh_project_path="/path/to/your/sqlmesh/project",
dag_id="my_pipeline",
schedule_interval="@daily"
)
# Generate dynamic DAG (default - fire and forget!)
dag_code = generator.generate_dynamic_dag()
# Save it
with open("my_pipeline.py", "w") as f:
f.write(dag_code)
3. Deploy to Airflow
cp my_pipeline.py /opt/airflow/dags/
That's it! ๐ Your SQLMesh models are now orchestrated by Airflow. The DAG will auto-discover models at runtime - no regeneration needed when models change!
๐ก What You Get
Your SQLMesh Project:
my_project/
โโโ models/
โโโ raw_orders.sql
โโโ stg_orders.sql # depends on raw_orders
โโโ orders_summary.sql # depends on stg_orders
Generated Airflow DAG:
Airflow Graph View:
[raw_orders] โ [stg_orders] โ [orders_summary]
โ
Each model = separate task
โ
SQLMesh dependencies = Airflow dependencies
โ
Full lineage visible in Airflow UI
๐ Documentation
Getting Started
- Quick Start Guide - Step-by-step tutorial (start here!)
- Quick Reference - One-page cheat sheet
- Examples - Code examples
Configuration & Features
- Auto-Scheduling Guide - Automatic schedule detection ๐
- Runtime Configuration - Pass credentials via Airflow Connections ๐
- Multi-Environment Setup - Configure for dev/staging/prod โ ๏ธ IMPORTANT
- Dynamic DAGs - Fire-and-forget mode explained
Deployment & Production
- Deployment Checklist - Step-by-step deployment guide โ NEW!
- Container Deployment - Docker, Kubernetes, ECS, Cloud Composer ๐ณ NEW!
- Common Issues - Real-world problems and solutions ๐ง NEW!
- Troubleshooting - Debug guide
- Deployment Warnings - Critical production considerations
Reference
- Usage Guide - Complete reference
- Migration Guide - Fix common configuration issues
- Architecture - Technical details
๐ฅ Why Dynamic Mode (Default)?
Dynamic mode auto-discovers SQLMesh models at runtime:
dag_code = generator.generate_dynamic_dag() # Default behavior!
Benefits:
- โ No regeneration needed when SQLMesh models change
- โ Always in sync - DAG updates automatically
- โ Multi-environment - Uses Airflow Variables
- โ Production-ready - Enhanced error handling
Want static mode instead? Just use generator.generate_dag() - see Usage Guide.
๐ฏ Simple Example
The simplest possible usage - just 3 lines of code:
from sqlmesh_dag_generator import SQLMeshDAGGenerator
generator = SQLMeshDAGGenerator(
sqlmesh_project_path="/path/to/your/sqlmesh/project",
dag_id="my_pipeline"
)
dag_code = generator.generate_dynamic_dag()
See examples/simple_generate.py for a complete runnable example.
๐ค Contributing
Contributions welcome! See CONTRIBUTING.md for guidelines.
๐ License
[Your License Here]
Built with โค๏ธ for the data engineering community
Configuration File
Create a dag_generator_config.yaml:
sqlmesh:
project_path: "/path/to/sqlmesh/project"
gateway: "prod"
environment: "" # Empty for production
default_catalog: "my_catalog" # Optional: For 3-part naming support
airflow:
dag_id: "sqlmesh_pipeline"
schedule_interval: "0 0 * * *"
default_args:
owner: "data-team"
retries: 3
retry_delay_minutes: 5
tags:
- sqlmesh
- analytics
generation:
output_dir: "/path/to/airflow/dags"
operator_type: "python" # or "bash"
include_tests: true
parallel_tasks: true
auto_replan_on_change: true # Automatically run 'sqlmesh plan' if models change
# Plan optimization options (for faster execution when no changes)
skip_backfill: false # Skip apply if backfill is required (use CI/CD for initial backfills)
plan_only: false # Generate plan without applying (for review/dry-run)
log_plan_details: true # Log detailed plan information (context, plan, apply phases)
return_value: true # Enable/disable XCom return values
How It Works
- Load SQLMesh Project: Reads your SQLMesh project configuration and models
- Extract Dependencies: Analyzes SQL queries to build dependency graph
- Generate Tasks: Creates Airflow tasks for each SQLMesh model
- Set Dependencies: Connects tasks based on model dependencies
- Apply Schedules: Preserves cron schedules and execution logic
- Output DAG: Generates Python file ready for Airflow
Architecture
SQLMesh Project
โ
SQLMeshDAGGenerator
โโโ Context Loader (loads SQLMesh context)
โโโ Model Parser (extracts model metadata)
โโโ Dependency Resolver (builds dependency graph)
โโโ DAG Builder (generates Airflow DAG)
โ
Airflow DAG File
Advanced Features
Custom Operators
from sqlmesh_dag_generator import SQLMeshDAGGenerator
from airflow.operators.python import PythonOperator
generator = SQLMeshDAGGenerator(
sqlmesh_project_path="/path/to/project",
custom_operator_class=PythonOperator,
operator_kwargs={"provide_context": True}
)
Model Filtering
# Generate DAG for specific models only
generator = SQLMeshDAGGenerator(
sqlmesh_project_path="/path/to/project",
include_models=["model1", "model2"],
exclude_models=["test_*"]
)
Dynamic Task Generation
# Generate tasks with dynamic parallelism
generator = SQLMeshDAGGenerator(
sqlmesh_project_path="/path/to/project",
enable_dynamic_tasks=True,
max_parallel_tasks=10
)
โ ๏ธ Important: Deployment Warnings
๐ด Distributed Airflow Requires Shared Volume
If you're using KubernetesExecutor, CeleryExecutor, or any distributed Airflow setup:
Your SQLMesh project MUST be accessible to all workers!
Solutions:
- Option 1 (Recommended): Mount project on shared volume (EFS/NFS/Filestore)
- Option 2: Bake project into Docker image (loses fire-and-forget benefit)
See full guide: docs/DEPLOYMENT_WARNINGS.md
๐ก Operator Type Limitations
- Dynamic Mode: Python operator only (current limitation)
- Static Mode: Supports Python, Bash, and Kubernetes operators
For Bash/Kubernetes in dynamic mode, use static generation for now.
๐ข Kubernetes Operator Support
To use operator_type: kubernetes:
generation:
operator_type: kubernetes
docker_image: "your-registry/sqlmesh:v1.0" # REQUIRED
namespace: "data-pipelines"
๐ Full Documentation: docs/DEPLOYMENT_WARNINGS.md
Requirements
- Python >= 3.8
- Apache Airflow >= 2.0
- SQLMesh >= 0.20.0
Development
# Clone the repository
git clone https://github.com/yourusername/sqlmesh-dag-generator.git
cd sqlmesh-dag-generator
# Install in development mode
pip install -e ".[dev]"
# Run tests
pytest
# Run linter
black .
ruff check .
๐ง Troubleshooting
Common Issues
1. TypeError: Object of type CompletionStatus is not JSON serializable
Cause: Airflow cannot serialize the SQLMesh CompletionStatus enum returned by tasks.
Fix: Upgrade to v0.7.2+. This version automatically converts the status to a string.
Alternatively, use return_value=False in your generator config:
generator = SQLMeshDAGGenerator(
...,
return_value=False # Disable XCom return values
)
2. ConfigError: A query is required and must be a SELECT statement (External Models)
Cause: Defining external models in .sql files is not supported by the generator.
Fix: Define external models in external_models.yaml instead.
# external_models.yaml
models:
- name: raw.users
kind: EXTERNAL
columns:
id: INT
name: TEXT
3. "Environment 'prod' was not found"
Cause: You are using environment="prod" instead of gateway="prod".
Fix: Set gateway="prod" and leave environment as empty string (default).
generator = SQLMeshDAGGenerator(
...,
gateway="prod",
environment="" # Correct for production
)
4. Changes to models are not picked up
Cause: SQLMesh requires a plan/apply step to register changes.
Fix: Enable auto_replan_on_change in your generator config:
generator = SQLMeshDAGGenerator(
...,
auto_replan_on_change=True # Automatically runs 'sqlmesh plan --auto-apply'
)
Or run sqlmesh plan --auto-apply manually.
5. sqlmesh_plan_apply task takes 7+ minutes even with no changes
Cause: When auto_replan_on_change=True, the plan task may run full backfills even when there are no model changes.
Fix (v0.9.0+): The plan optimization now:
- โ Detects "no changes" scenarios and skips apply (~15 seconds instead of 7+ minutes)
- โ Logs detailed timing for each phase (context load, plan compute, apply)
For additional control, use these options:
generator = SQLMeshDAGGenerator(
...,
auto_replan_on_change=True,
skip_backfill=True, # Skip apply if backfill is required (use CI/CD for initial backfills)
plan_only=True, # Generate plan without applying (for review/dry-run)
log_plan_details=True # Log detailed plan information (default: True)
)
Recommended CI/CD approach:
- Run
sqlmesh plan --auto-applyin CI/CD on model changes (handles backfills) - Set
skip_backfill=Truein Airflow DAG (only applies non-backfill changes) - This gives you fast DAG runs (~15s) while ensuring changes are still applied
5. 3-Part Table Naming Issues (Cross-Database)
Cause: SQLMesh generates queries with 3-part names (catalog.schema.table) which might fail if the catalog name doesn't match.
Fix: Set default_catalog in your config to tell SQLMesh which catalog to omit or use.
generator = SQLMeshDAGGenerator(
...,
default_catalog="my_catalog"
)
Advanced Features
Health Check Task
Add a pre-flight check to verify database connectivity before running models:
generator = SQLMeshDAGGenerator(
...,
enable_health_check=True
)
Skip Audits
For faster development iterations, you can skip audit checks:
generator = SQLMeshDAGGenerator(
...,
skip_audits=True
)
Partial DAG Runs
You can generate tasks for a subset of models (useful for testing):
# In your DAG file
generator.create_tasks_in_dag(dag, models=["model_a", "model_b"])
Contributing
Contributions are welcome! Please read our Contributing Guide for details.
License
MIT License - see LICENSE file for details.
Comparison with Tobiko Cloud
| Feature | Tobiko Cloud | SQLMesh DAG Generator |
|---|---|---|
| Cost | Paid | Free & Open Source |
| Deployment | Cloud-based | Self-hosted |
| Customization | Limited | Fully Customizable |
| Privacy | External | On-premise |
| Dependencies | Cloud connection | None |
Support
- ๐ Documentation
- ๐ Issue Tracker
- ๐ฌ Discussions
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sqlmesh_dag_generator-0.9.0.tar.gz.
File metadata
- Download URL: sqlmesh_dag_generator-0.9.0.tar.gz
- Upload date:
- Size: 63.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bb22ce1253e08b1eba27174cd1c0fabd5baffee5ffca011b64bdf09b3bdd671f
|
|
| MD5 |
e5a858be7899088cc648306e4acd0b64
|
|
| BLAKE2b-256 |
07b3ee729631ec0c1e4718b3aebb9478d08240951a1a6ef6d15e3bab8edf704f
|
File details
Details for the file sqlmesh_dag_generator-0.9.0-py3-none-any.whl.
File metadata
- Download URL: sqlmesh_dag_generator-0.9.0-py3-none-any.whl
- Upload date:
- Size: 43.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8d8f7418b2ed16524aa8e475d8dccc81999e71c47a519767e8a9f4ccbc77a33b
|
|
| MD5 |
ad682cdc4c246b497179d7965fd739a7
|
|
| BLAKE2b-256 |
178aa7d2d11dd87ca9c685d6eb3c8f8fb315441a36e7f03abc6fed5e9d7b0e78
|