Skip to main content

Open-source Airflow DAG generator for SQLMesh projects

Project description

SQLMesh DAG Generator

Generate Apache Airflow DAGs from SQLMesh projects - no cloud dependencies required.

Transform your SQLMesh models into production-ready Airflow DAGs with full data lineage, automatically!

โœจ Key Features

  • ๐Ÿ”ฅ Dynamic DAG Generation (Default): Fire-and-forget - place DAG once, auto-discovers models at runtime
  • ๐Ÿ“… Auto-Scheduling: Automatically detects DAG schedule from SQLMesh model intervals - no manual configuration!
  • ๐Ÿ” Runtime Connection Parametrization: Pass database credentials via Airflow Connections - no hardcoded secrets!
  • โœ… Full Lineage in Airflow: Each SQLMesh model = One Airflow task with proper dependencies
  • ๐ŸŒ Multi-Environment Support: Use Airflow Variables + SQLMesh gateways for dev/staging/prod
  • โšก Incremental Models: Proper handling with data_interval_start/end
  • ๐Ÿฉบ Integrity Guardrails: Warn when sub-hourly incremental models run with catchup=False, with optional bounded recovery helpers
  • ๐ŸŽฏ Enhanced Error Handling: SQLMesh-specific error messages in Airflow logs
  • ๐Ÿ› ๏ธ Dual Mode: Dynamic (auto-discovery, default) or Static (full control)
  • ๐Ÿšซ No Vendor Lock-in: Open source, no cloud dependencies

โš ๏ธ Important: Gateway vs Environment

SQLMesh uses "gateways" to switch between environments, NOT an "environment" parameter.

# โŒ WRONG - environment parameter is deprecated
generator = SQLMeshDAGGenerator(
    sqlmesh_project_path="/path/to/project",
    environment="prod",  # This doesn't work!
)

# โœ… CORRECT - Use gateway to switch environments
generator = SQLMeshDAGGenerator(
    sqlmesh_project_path="/path/to/project",
    gateway="prod"  # This is how you select your environment!
)

See Multi-Environment Configuration Guide for complete setup instructions.

๐Ÿš€ Quick Start (3 Steps)

1. Install

pip install sqlmesh-dag-generator  # (when published)
# OR
git clone <repo> && cd SQLMeshDAGGenerator && pip install -e .

2. Generate DAG (Dynamic Mode - Default!)

from sqlmesh_dag_generator import SQLMeshDAGGenerator

# Point to your SQLMesh project
generator = SQLMeshDAGGenerator(
    sqlmesh_project_path="/path/to/your/sqlmesh/project",
    dag_id="my_pipeline",
    schedule_interval="@daily"
)

# Generate dynamic DAG (default - fire and forget!)
dag_code = generator.generate_dynamic_dag()

# Save it
with open("my_pipeline.py", "w") as f:
    f.write(dag_code)

3. Deploy to Airflow

cp my_pipeline.py /opt/airflow/dags/

That's it! ๐ŸŽ‰ Your SQLMesh models are now orchestrated by Airflow. The DAG will auto-discover models at runtime - no regeneration needed when models change!

Recovery And Completeness

SQLMesh DAG Generator forwards Airflow's data_interval_start and data_interval_end into ctx.run(start=..., end=...). That means the package executes the interval Airflow gives it, but it does not invent missed Airflow runs on its own.

If you run sub-hourly incremental models with catchup=False, outages can leave completeness gaps unless you replay the missed windows.

The package now supports an explicit recovery policy:

  • recovery_mode="disabled": no runtime recovery tasks are added.
  • recovery_mode="warn": add an integrity guard task that detects missing intervals and logs them.
  • recovery_mode="bounded_auto" (default): add the same guard task plus a bounded recovery task that replays missing intervals when the gap is within recovery_max_intervals.

Example:

generator = SQLMeshDAGGenerator(
  sqlmesh_project_path="/path/to/project",
  dag_id="my_pipeline",
  recovery_mode="bounded_auto",
  recovery_max_intervals=6,
)

When recovery_mode is enabled, the package adds stable helper tasks to the DAG instead of mutating the graph at runtime:

  • sqlmesh_integrity_guard
  • sqlmesh_recovery_backfill in bounded_auto mode

This keeps recovery explicit and observable in Airflow while preserving the default "no surprise backfills" behavior.

Manual Backfill Task

For larger historical gaps, the package also exposes a first-class manual backfill task helper. Use it in a separate unscheduled DAG and trigger it only when you need to replay a broader window:

from datetime import datetime, timedelta
from airflow import DAG
from sqlmesh_dag_generator import SQLMeshDAGGenerator

generator = SQLMeshDAGGenerator(
  sqlmesh_project_path="/path/to/project",
  dag_id="my_pipeline",
  gateway="prod",
)

with DAG(
  dag_id="my_pipeline_manual_backfill",
  schedule=None,
  start_date=datetime(2024, 1, 1),
  catchup=False,
) as dag:
  generator.create_manual_backfill_task(
    dag,
    default_start="2024-01-01T00:00:00",
    execution_timeout=timedelta(hours=12),
  )

The task reads optional dag_run.conf overrides:

  • start: ISO-8601 start boundary
  • end: ISO-8601 end boundary. If omitted, the task backfills up to the current UTC time.
  • models: one model name or a list of model names. If omitted, the task backfills all models.

๐Ÿ’ก What You Get

Your SQLMesh Project:

my_project/
โ””โ”€โ”€ models/
    โ”œโ”€โ”€ raw_orders.sql
    โ”œโ”€โ”€ stg_orders.sql      # depends on raw_orders
    โ””โ”€โ”€ orders_summary.sql  # depends on stg_orders

Generated Airflow DAG:

Airflow Graph View:
  [raw_orders] โ†’ [stg_orders] โ†’ [orders_summary]
  
โœ… Each model = separate task
โœ… SQLMesh dependencies = Airflow dependencies  
โœ… Full lineage visible in Airflow UI

๐Ÿ“š Documentation

๐Ÿ”ฅ Why Dynamic Mode (Default)?

Dynamic mode auto-discovers SQLMesh models at runtime:

dag_code = generator.generate_dynamic_dag()  # Default behavior!

Benefits:

  • โœ… No regeneration needed when SQLMesh models change
  • โœ… Always in sync - DAG updates automatically
  • โœ… Multi-environment - Uses Airflow Variables
  • โœ… Production-ready - Enhanced error handling

Want static mode instead? Just use generator.generate_dag() - see Usage Guide.

๐ŸŽฏ Simple Example

The simplest possible usage - just 3 lines of code:

from sqlmesh_dag_generator import SQLMeshDAGGenerator

generator = SQLMeshDAGGenerator(
    sqlmesh_project_path="/path/to/your/sqlmesh/project",
    dag_id="my_pipeline"
)

dag_code = generator.generate_dynamic_dag()

See examples/simple_generate.py for a complete runnable example.

๐Ÿค Contributing

Contributions welcome! See CONTRIBUTING.md for guidelines.

๐Ÿ“„ License

[Your License Here]


Built with โค๏ธ for the data engineering community

Configuration File

Create a dag_generator_config.yaml:

sqlmesh:
  project_path: "/path/to/sqlmesh/project"
  environment: "prod"
  gateway: "local"

airflow:
  dag_id: "sqlmesh_pipeline"
  schedule_interval: "0 0 * * *"
  default_args:
    owner: "data-team"
    retries: 3
    retry_delay_minutes: 5
  tags:
    - sqlmesh
    - analytics

generation:
  output_dir: "/path/to/airflow/dags"
  operator_type: "python"  # or "bash"
  include_tests: true
  parallel_tasks: true

How It Works

  1. Load SQLMesh Project: Reads your SQLMesh project configuration and models
  2. Extract Dependencies: Analyzes SQL queries to build dependency graph
  3. Generate Tasks: Creates Airflow tasks for each SQLMesh model
  4. Set Dependencies: Connects tasks based on model dependencies
  5. Apply Schedules: Preserves cron schedules and execution logic
  6. Output DAG: Generates Python file ready for Airflow

Architecture

SQLMesh Project
    โ†“
SQLMeshDAGGenerator
    โ”œโ”€โ”€ Context Loader (loads SQLMesh context)
    โ”œโ”€โ”€ Model Parser (extracts model metadata)
    โ”œโ”€โ”€ Dependency Resolver (builds dependency graph)
    โ””โ”€โ”€ DAG Builder (generates Airflow DAG)
    โ†“
Airflow DAG File

Advanced Features

Custom Operators

from sqlmesh_dag_generator import SQLMeshDAGGenerator
from airflow.operators.python import PythonOperator

generator = SQLMeshDAGGenerator(
    sqlmesh_project_path="/path/to/project",
    custom_operator_class=PythonOperator,
    operator_kwargs={"provide_context": True}
)

Model Filtering

# Generate DAG for specific models only
generator = SQLMeshDAGGenerator(
    sqlmesh_project_path="/path/to/project",
    include_models=["model1", "model2"],
    exclude_models=["test_*"]
)

Dynamic Task Generation

# Generate tasks with dynamic parallelism
generator = SQLMeshDAGGenerator(
    sqlmesh_project_path="/path/to/project",
    enable_dynamic_tasks=True,
    max_parallel_tasks=10
)

โš ๏ธ Important: Deployment Warnings

๐Ÿ”ด Distributed Airflow Requires Shared Volume

If you're using KubernetesExecutor, CeleryExecutor, or any distributed Airflow setup:

Your SQLMesh project MUST be accessible to all workers!

Solutions:

  • Option 1 (Recommended): Mount project on shared volume (EFS/NFS/Filestore)
  • Option 2: Bake project into Docker image (loses fire-and-forget benefit)

See full guide: docs/DEPLOYMENT_WARNINGS.md

๐ŸŸก Operator Type Limitations

  • Dynamic Mode: Python operator only (current limitation)
  • Static Mode: Supports Python, Bash, and Kubernetes operators

For Bash/Kubernetes in dynamic mode, use static generation for now.

๐ŸŸข Kubernetes Operator Support

To use operator_type: kubernetes:

generation:
  operator_type: kubernetes
  docker_image: "your-registry/sqlmesh:v1.0"  # REQUIRED
  namespace: "data-pipelines"

๐Ÿ“– Full Documentation: docs/DEPLOYMENT_WARNINGS.md

Requirements

  • Python >= 3.8
  • Apache Airflow >= 2.0
  • SQLMesh >= 0.20.0

Development

# Clone the repository
git clone https://github.com/yourusername/sqlmesh-dag-generator.git
cd sqlmesh-dag-generator

# Install in development mode
pip install -e ".[dev]"

# Run tests
pytest

# Run linter
black .
ruff check .

Contributing

Contributions are welcome! Please read our Contributing Guide for details.

License

MIT License - see LICENSE file for details.

Comparison with Tobiko Cloud

Feature Tobiko Cloud SQLMesh DAG Generator
Cost Paid Free & Open Source
Deployment Cloud-based Self-hosted
Customization Limited Fully Customizable
Privacy External On-premise
Dependencies Cloud connection None

Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sqlmesh_dag_generator-0.9.6.tar.gz (59.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sqlmesh_dag_generator-0.9.6-py3-none-any.whl (47.8 kB view details)

Uploaded Python 3

File details

Details for the file sqlmesh_dag_generator-0.9.6.tar.gz.

File metadata

  • Download URL: sqlmesh_dag_generator-0.9.6.tar.gz
  • Upload date:
  • Size: 59.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for sqlmesh_dag_generator-0.9.6.tar.gz
Algorithm Hash digest
SHA256 2745a719cfb3e69fa72210d23ad0cbed852be81275fde17287ee4df98732bde5
MD5 613836837f9079e4dadfbd7166305ccc
BLAKE2b-256 b5b95d68ae310c7576ea6ab3a42a86860efc8c66dce0604069e1c057b87842a7

See more details on using hashes here.

File details

Details for the file sqlmesh_dag_generator-0.9.6-py3-none-any.whl.

File metadata

File hashes

Hashes for sqlmesh_dag_generator-0.9.6-py3-none-any.whl
Algorithm Hash digest
SHA256 4f84adfc571c1f43ef8aa2b34c0aeccf6f7e9b85b46c2637aec2da8b804acaf0
MD5 f9be3795f863c8f8f8a9caa63958977a
BLAKE2b-256 50d3dcd4711c431460c41565e3df1ef29b2a9bd07819ab18767ffa2175414294

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page