Open-source Airflow DAG generator for SQLMesh projects
Project description
SQLMesh DAG Generator
Generate Apache Airflow DAGs from SQLMesh projects - no cloud dependencies required.
Transform your SQLMesh models into production-ready Airflow DAGs with full data lineage, automatically!
โจ Key Features
- ๐ฅ Dynamic DAG Generation (Default): Fire-and-forget - place DAG once, auto-discovers models at runtime
- ๐ Auto-Scheduling: Automatically detects DAG schedule from SQLMesh model intervals - no manual configuration!
- ๐ Runtime Connection Parametrization: Pass database credentials via Airflow Connections - no hardcoded secrets!
- โ Full Lineage in Airflow: Each SQLMesh model = One Airflow task with proper dependencies
- ๐ Multi-Environment Support: Use Airflow Variables + SQLMesh gateways for dev/staging/prod
- โก Incremental Models: Proper handling with
data_interval_start/end - ๐ฉบ Integrity Guardrails: Warn when sub-hourly incremental models run with
catchup=False, with optional bounded recovery helpers - ๐ฏ Enhanced Error Handling: SQLMesh-specific error messages in Airflow logs
- ๐ ๏ธ Dual Mode: Dynamic (auto-discovery, default) or Static (full control)
- ๐ซ No Vendor Lock-in: Open source, no cloud dependencies
โ ๏ธ Important: Gateway vs Environment
SQLMesh uses "gateways" to switch between environments, NOT an "environment" parameter.
# โ WRONG - environment parameter is deprecated
generator = SQLMeshDAGGenerator(
sqlmesh_project_path="/path/to/project",
environment="prod", # This doesn't work!
)
# โ
CORRECT - Use gateway to switch environments
generator = SQLMeshDAGGenerator(
sqlmesh_project_path="/path/to/project",
gateway="prod" # This is how you select your environment!
)
See Multi-Environment Configuration Guide for complete setup instructions.
๐ Quick Start (3 Steps)
1. Install
pip install sqlmesh-dag-generator # (when published)
# OR
git clone <repo> && cd SQLMeshDAGGenerator && pip install -e .
2. Generate DAG (Dynamic Mode - Default!)
from sqlmesh_dag_generator import SQLMeshDAGGenerator
# Point to your SQLMesh project
generator = SQLMeshDAGGenerator(
sqlmesh_project_path="/path/to/your/sqlmesh/project",
dag_id="my_pipeline",
schedule_interval="@daily"
)
# Generate dynamic DAG (default - fire and forget!)
dag_code = generator.generate_dynamic_dag()
# Save it
with open("my_pipeline.py", "w") as f:
f.write(dag_code)
3. Deploy to Airflow
cp my_pipeline.py /opt/airflow/dags/
That's it! ๐ Your SQLMesh models are now orchestrated by Airflow. The DAG will auto-discover models at runtime - no regeneration needed when models change!
Recovery And Completeness
SQLMesh DAG Generator forwards Airflow's data_interval_start and data_interval_end into ctx.run(start=..., end=...).
That means the package executes the interval Airflow gives it, but it does not invent missed Airflow runs on its own.
If you run sub-hourly incremental models with catchup=False, outages can leave completeness gaps unless you replay the missed windows.
The package now supports an explicit recovery policy:
recovery_mode="disabled": no runtime recovery tasks are added.recovery_mode="warn": add an integrity guard task that detects missing intervals and logs them.recovery_mode="bounded_auto"(default): add the same guard task plus a bounded recovery task that replays missing intervals when the gap is withinrecovery_max_intervals.
Example:
generator = SQLMeshDAGGenerator(
sqlmesh_project_path="/path/to/project",
dag_id="my_pipeline",
recovery_mode="bounded_auto",
recovery_max_intervals=6,
)
When recovery_mode is enabled, the package adds stable helper tasks to the DAG instead of mutating the graph at runtime:
sqlmesh_integrity_guardsqlmesh_recovery_backfillinbounded_automode
This keeps recovery explicit and observable in Airflow while preserving the default "no surprise backfills" behavior.
Manual Backfill Task
For larger historical gaps, the package also exposes a first-class manual backfill task helper. Use it in a separate unscheduled DAG and trigger it only when you need to replay a broader window:
from datetime import datetime, timedelta
from airflow import DAG
from sqlmesh_dag_generator import SQLMeshDAGGenerator
generator = SQLMeshDAGGenerator(
sqlmesh_project_path="/path/to/project",
dag_id="my_pipeline",
gateway="prod",
)
with DAG(
dag_id="my_pipeline_manual_backfill",
schedule=None,
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
generator.create_manual_backfill_task(
dag,
default_start="2024-01-01T00:00:00",
execution_timeout=timedelta(hours=12),
)
The task reads optional dag_run.conf overrides:
start: ISO-8601 start boundaryend: ISO-8601 end boundary. If omitted, the task backfills up to the current UTC time.models: one model name or a list of model names. If omitted, the task backfills all models.
๐ก What You Get
Your SQLMesh Project:
my_project/
โโโ models/
โโโ raw_orders.sql
โโโ stg_orders.sql # depends on raw_orders
โโโ orders_summary.sql # depends on stg_orders
Generated Airflow DAG:
Airflow Graph View:
[raw_orders] โ [stg_orders] โ [orders_summary]
โ
Each model = separate task
โ
SQLMesh dependencies = Airflow dependencies
โ
Full lineage visible in Airflow UI
๐ Documentation
- Quick Start Guide - Step-by-step tutorial (start here!)
- Quick Reference - One-page cheat sheet
- Auto-Scheduling Guide - Automatic schedule detection ๐ NEW!
- Runtime Configuration - Pass credentials via Airflow Connections ๐
- Multi-Environment Setup - Configure for dev/staging/prod โ ๏ธ IMPORTANT
- Migration Guide - Fix common configuration issues
- Troubleshooting - Common issues and solutions
- Usage Guide - Complete reference
- Dynamic DAGs - Fire-and-forget mode explained
- Deployment Warnings - Critical production considerations
- Examples - Code examples
- Architecture - Technical details
๐ฅ Why Dynamic Mode (Default)?
Dynamic mode auto-discovers SQLMesh models at runtime:
dag_code = generator.generate_dynamic_dag() # Default behavior!
Benefits:
- โ No regeneration needed when SQLMesh models change
- โ Always in sync - DAG updates automatically
- โ Multi-environment - Uses Airflow Variables
- โ Production-ready - Enhanced error handling
Want static mode instead? Just use generator.generate_dag() - see Usage Guide.
๐ฏ Simple Example
The simplest possible usage - just 3 lines of code:
from sqlmesh_dag_generator import SQLMeshDAGGenerator
generator = SQLMeshDAGGenerator(
sqlmesh_project_path="/path/to/your/sqlmesh/project",
dag_id="my_pipeline"
)
dag_code = generator.generate_dynamic_dag()
See examples/simple_generate.py for a complete runnable example.
๐ค Contributing
Contributions welcome! See CONTRIBUTING.md for guidelines.
๐ License
[Your License Here]
Built with โค๏ธ for the data engineering community
Configuration File
Create a dag_generator_config.yaml:
sqlmesh:
project_path: "/path/to/sqlmesh/project"
environment: "prod"
gateway: "local"
airflow:
dag_id: "sqlmesh_pipeline"
schedule_interval: "0 0 * * *"
default_args:
owner: "data-team"
retries: 3
retry_delay_minutes: 5
tags:
- sqlmesh
- analytics
generation:
output_dir: "/path/to/airflow/dags"
operator_type: "python" # or "bash"
include_tests: true
parallel_tasks: true
How It Works
- Load SQLMesh Project: Reads your SQLMesh project configuration and models
- Extract Dependencies: Analyzes SQL queries to build dependency graph
- Generate Tasks: Creates Airflow tasks for each SQLMesh model
- Set Dependencies: Connects tasks based on model dependencies
- Apply Schedules: Preserves cron schedules and execution logic
- Output DAG: Generates Python file ready for Airflow
Architecture
SQLMesh Project
โ
SQLMeshDAGGenerator
โโโ Context Loader (loads SQLMesh context)
โโโ Model Parser (extracts model metadata)
โโโ Dependency Resolver (builds dependency graph)
โโโ DAG Builder (generates Airflow DAG)
โ
Airflow DAG File
Advanced Features
Custom Operators
from sqlmesh_dag_generator import SQLMeshDAGGenerator
from airflow.operators.python import PythonOperator
generator = SQLMeshDAGGenerator(
sqlmesh_project_path="/path/to/project",
custom_operator_class=PythonOperator,
operator_kwargs={"provide_context": True}
)
Model Filtering
# Generate DAG for specific models only
generator = SQLMeshDAGGenerator(
sqlmesh_project_path="/path/to/project",
include_models=["model1", "model2"],
exclude_models=["test_*"]
)
Dynamic Task Generation
# Generate tasks with dynamic parallelism
generator = SQLMeshDAGGenerator(
sqlmesh_project_path="/path/to/project",
enable_dynamic_tasks=True,
max_parallel_tasks=10
)
โ ๏ธ Important: Deployment Warnings
๐ด Distributed Airflow Requires Shared Volume
If you're using KubernetesExecutor, CeleryExecutor, or any distributed Airflow setup:
Your SQLMesh project MUST be accessible to all workers!
Solutions:
- Option 1 (Recommended): Mount project on shared volume (EFS/NFS/Filestore)
- Option 2: Bake project into Docker image (loses fire-and-forget benefit)
See full guide: docs/DEPLOYMENT_WARNINGS.md
๐ก Operator Type Limitations
- Dynamic Mode: Python operator only (current limitation)
- Static Mode: Supports Python, Bash, and Kubernetes operators
For Bash/Kubernetes in dynamic mode, use static generation for now.
๐ข Kubernetes Operator Support
To use operator_type: kubernetes:
generation:
operator_type: kubernetes
docker_image: "your-registry/sqlmesh:v1.0" # REQUIRED
namespace: "data-pipelines"
๐ Full Documentation: docs/DEPLOYMENT_WARNINGS.md
Requirements
- Python >= 3.8
- Apache Airflow >= 2.0
- SQLMesh >= 0.20.0
Development
# Clone the repository
git clone https://github.com/yourusername/sqlmesh-dag-generator.git
cd sqlmesh-dag-generator
# Install in development mode
pip install -e ".[dev]"
# Run tests
pytest
# Run linter
black .
ruff check .
Contributing
Contributions are welcome! Please read our Contributing Guide for details.
License
MIT License - see LICENSE file for details.
Comparison with Tobiko Cloud
| Feature | Tobiko Cloud | SQLMesh DAG Generator |
|---|---|---|
| Cost | Paid | Free & Open Source |
| Deployment | Cloud-based | Self-hosted |
| Customization | Limited | Fully Customizable |
| Privacy | External | On-premise |
| Dependencies | Cloud connection | None |
Support
- ๐ Documentation
- ๐ Issue Tracker
- ๐ฌ Discussions
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sqlmesh_dag_generator-0.9.6.tar.gz.
File metadata
- Download URL: sqlmesh_dag_generator-0.9.6.tar.gz
- Upload date:
- Size: 59.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2745a719cfb3e69fa72210d23ad0cbed852be81275fde17287ee4df98732bde5
|
|
| MD5 |
613836837f9079e4dadfbd7166305ccc
|
|
| BLAKE2b-256 |
b5b95d68ae310c7576ea6ab3a42a86860efc8c66dce0604069e1c057b87842a7
|
File details
Details for the file sqlmesh_dag_generator-0.9.6-py3-none-any.whl.
File metadata
- Download URL: sqlmesh_dag_generator-0.9.6-py3-none-any.whl
- Upload date:
- Size: 47.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4f84adfc571c1f43ef8aa2b34c0aeccf6f7e9b85b46c2637aec2da8b804acaf0
|
|
| MD5 |
f9be3795f863c8f8f8a9caa63958977a
|
|
| BLAKE2b-256 |
50d3dcd4711c431460c41565e3df1ef29b2a9bd07819ab18767ffa2175414294
|