Skip to main content

Open-source Airflow DAG generator for SQLMesh projects

Project description

SQLMesh DAG Generator

Generate Apache Airflow DAGs from SQLMesh projects - no cloud dependencies required.

Transform your SQLMesh models into production-ready Airflow DAGs with full data lineage, automatically!

โœจ Key Features

  • ๐Ÿ”ฅ Dynamic DAG Generation (Default): Fire-and-forget - place DAG once, auto-discovers models at runtime
  • ๐Ÿ“… Auto-Scheduling: Automatically detects DAG schedule from SQLMesh model intervals - no manual configuration!
  • ๐Ÿ” Runtime Connection Parametrization: Pass database credentials via Airflow Connections - no hardcoded secrets!
  • โœ… Full Lineage in Airflow: Each SQLMesh model = One Airflow task with proper dependencies
  • ๐ŸŒ Multi-Environment Support: Use Airflow Variables + SQLMesh gateways for dev/staging/prod
  • โšก Incremental Models: Proper handling with data_interval_start/end
  • ๐ŸŽฏ Enhanced Error Handling: SQLMesh-specific error messages in Airflow logs
  • ๐Ÿ› ๏ธ Dual Mode: Dynamic (auto-discovery, default) or Static (full control)
  • ๐Ÿšซ No Vendor Lock-in: Open source, no cloud dependencies

๐Ÿข Enterprise Features (NEW in v0.8.0)

  • ๐Ÿ”” Callbacks: on_failure_callback, on_success_callback, sla_miss_callback for alerting
  • ๐Ÿท๏ธ Tag-Based Filtering: include_tags, exclude_tags for Data Mesh team-specific DAGs
  • ๐ŸŽฑ Pool Configuration: pool, pool_slots for resource management
  • โฉ Trigger Downstream DAGs: trigger_dag_id for ML pipeline integration
  • ๐ŸŽฏ Pattern Filtering: model_pattern for regex-based model selection

โš ๏ธ Important: Gateway vs Environment

SQLMesh uses "gateways" to switch between environments, NOT an "environment" parameter.

# โŒ WRONG - environment parameter is for SQLMesh virtual environments (testing)
generator = SQLMeshDAGGenerator(
    sqlmesh_project_path="/path/to/project",
    environment="prod",  # ERROR: Environment 'prod' was not found
)

# โœ… CORRECT - Use gateway to switch between dev/staging/prod
generator = SQLMeshDAGGenerator(
    sqlmesh_project_path="/path/to/project",
    gateway="prod",  # This is how you select your environment!
    # environment defaults to "" (empty string) - perfect for production
)

๐Ÿ“š Read Understanding SQLMesh Environments for the complete explanation of the difference between gateway and environment.

๐Ÿš€ Quick Start (3 Steps)

1. Install

pip install sqlmesh-dag-generator  # (when published)
# OR
git clone <repo> && cd SQLMeshDAGGenerator && pip install -e .

2. Generate DAG (Dynamic Mode - Default!)

from sqlmesh_dag_generator import SQLMeshDAGGenerator

# Point to your SQLMesh project
generator = SQLMeshDAGGenerator(
    sqlmesh_project_path="/path/to/your/sqlmesh/project",
    dag_id="my_pipeline",
    schedule_interval="@daily"
)

# Generate dynamic DAG (default - fire and forget!)
dag_code = generator.generate_dynamic_dag()

# Save it
with open("my_pipeline.py", "w") as f:
    f.write(dag_code)

3. Deploy to Airflow

cp my_pipeline.py /opt/airflow/dags/

That's it! ๐ŸŽ‰ Your SQLMesh models are now orchestrated by Airflow. The DAG will auto-discover models at runtime - no regeneration needed when models change!

๐Ÿ’ก What You Get

Your SQLMesh Project:

my_project/
โ””โ”€โ”€ models/
    โ”œโ”€โ”€ raw_orders.sql
    โ”œโ”€โ”€ stg_orders.sql      # depends on raw_orders
    โ””โ”€โ”€ orders_summary.sql  # depends on stg_orders

Generated Airflow DAG:

Airflow Graph View:
  [raw_orders] โ†’ [stg_orders] โ†’ [orders_summary]
  
โœ… Each model = separate task
โœ… SQLMesh dependencies = Airflow dependencies  
โœ… Full lineage visible in Airflow UI

๐Ÿ“š Documentation

Getting Started

Configuration & Features

Deployment & Production

Reference

๐Ÿ”ฅ Why Dynamic Mode (Default)?

Dynamic mode auto-discovers SQLMesh models at runtime:

dag_code = generator.generate_dynamic_dag()  # Default behavior!

Benefits:

  • โœ… No regeneration needed when SQLMesh models change
  • โœ… Always in sync - DAG updates automatically
  • โœ… Multi-environment - Uses Airflow Variables
  • โœ… Production-ready - Enhanced error handling

Want static mode instead? Just use generator.generate_dag() - see Usage Guide.

๐ŸŽฏ Simple Example

The simplest possible usage - just 3 lines of code:

from sqlmesh_dag_generator import SQLMeshDAGGenerator

generator = SQLMeshDAGGenerator(
    sqlmesh_project_path="/path/to/your/sqlmesh/project",
    dag_id="my_pipeline"
)

dag_code = generator.generate_dynamic_dag()

See examples/simple_generate.py for a complete runnable example.

๐Ÿค Contributing

Contributions welcome! See CONTRIBUTING.md for guidelines.

๐Ÿ“„ License

[Your License Here]


Built with โค๏ธ for the data engineering community

Configuration File

Create a dag_generator_config.yaml:

sqlmesh:
  project_path: "/path/to/sqlmesh/project"
  gateway: "prod"
  environment: ""  # Empty for production
  default_catalog: "my_catalog" # Optional: For 3-part naming support

airflow:
  dag_id: "sqlmesh_pipeline"
  schedule_interval: "0 0 * * *"
  default_args:
    owner: "data-team"
    retries: 3
    retry_delay_minutes: 5
  tags:
    - sqlmesh
    - analytics

generation:
  output_dir: "/path/to/airflow/dags"
  operator_type: "python"  # or "bash"
  include_tests: true
  parallel_tasks: true
  auto_replan_on_change: true  # Automatically run 'sqlmesh plan' if models change
  # Plan optimization options (for faster execution when no changes)
  skip_backfill: false   # Skip apply if backfill is required (use CI/CD for initial backfills)
  plan_only: false       # Generate plan without applying (for review/dry-run)
  log_plan_details: true # Log detailed plan information (context, plan, apply phases)
  return_value: true     # Enable/disable XCom return values

How It Works

  1. Load SQLMesh Project: Reads your SQLMesh project configuration and models
  2. Extract Dependencies: Analyzes SQL queries to build dependency graph
  3. Generate Tasks: Creates Airflow tasks for each SQLMesh model
  4. Set Dependencies: Connects tasks based on model dependencies
  5. Apply Schedules: Preserves cron schedules and execution logic
  6. Output DAG: Generates Python file ready for Airflow

Architecture

SQLMesh Project
    โ†“
SQLMeshDAGGenerator
    โ”œโ”€โ”€ Context Loader (loads SQLMesh context)
    โ”œโ”€โ”€ Model Parser (extracts model metadata)
    โ”œโ”€โ”€ Dependency Resolver (builds dependency graph)
    โ””โ”€โ”€ DAG Builder (generates Airflow DAG)
    โ†“
Airflow DAG File

Advanced Features

Custom Operators

from sqlmesh_dag_generator import SQLMeshDAGGenerator
from airflow.operators.python import PythonOperator

generator = SQLMeshDAGGenerator(
    sqlmesh_project_path="/path/to/project",
    custom_operator_class=PythonOperator,
    operator_kwargs={"provide_context": True}
)

Model Filtering

# Generate DAG for specific models only
generator = SQLMeshDAGGenerator(
    sqlmesh_project_path="/path/to/project",
    include_models=["model1", "model2"],
    exclude_models=["test_*"]
)

Dynamic Task Generation

# Generate tasks with dynamic parallelism
generator = SQLMeshDAGGenerator(
    sqlmesh_project_path="/path/to/project",
    enable_dynamic_tasks=True,
    max_parallel_tasks=10
)

โš ๏ธ Important: Deployment Warnings

๐Ÿ”ด Distributed Airflow Requires Shared Volume

If you're using KubernetesExecutor, CeleryExecutor, or any distributed Airflow setup:

Your SQLMesh project MUST be accessible to all workers!

Solutions:

  • Option 1 (Recommended): Mount project on shared volume (EFS/NFS/Filestore)
  • Option 2: Bake project into Docker image (loses fire-and-forget benefit)

See full guide: docs/DEPLOYMENT_WARNINGS.md

๐ŸŸก Operator Type Limitations

  • Dynamic Mode: Python operator only (current limitation)
  • Static Mode: Supports Python, Bash, and Kubernetes operators

For Bash/Kubernetes in dynamic mode, use static generation for now.

๐ŸŸข Kubernetes Operator Support

To use operator_type: kubernetes:

generation:
  operator_type: kubernetes
  docker_image: "your-registry/sqlmesh:v1.0"  # REQUIRED
  namespace: "data-pipelines"

๐Ÿ“– Full Documentation: docs/DEPLOYMENT_WARNINGS.md

Requirements

  • Python >= 3.8
  • Apache Airflow >= 2.0
  • SQLMesh >= 0.20.0

Development

# Clone the repository
git clone https://github.com/yourusername/sqlmesh-dag-generator.git
cd sqlmesh-dag-generator

# Install in development mode
pip install -e ".[dev]"

# Run tests
pytest

# Run linter
black .
ruff check .

๐Ÿ”ง Troubleshooting

Common Issues

1. TypeError: Object of type CompletionStatus is not JSON serializable

Cause: Airflow cannot serialize the SQLMesh CompletionStatus enum returned by tasks. Fix: Upgrade to v0.7.2+. This version automatically converts the status to a string. Alternatively, use return_value=False in your generator config:

generator = SQLMeshDAGGenerator(
    ...,
    return_value=False  # Disable XCom return values
)

2. ConfigError: A query is required and must be a SELECT statement (External Models)

Cause: Defining external models in .sql files is not supported by the generator. Fix: Define external models in external_models.yaml instead.

# external_models.yaml
models:
  - name: raw.users
    kind: EXTERNAL
    columns:
      id: INT
      name: TEXT

3. "Environment 'prod' was not found"

Cause: You are using environment="prod" instead of gateway="prod". Fix: Set gateway="prod" and leave environment as empty string (default).

generator = SQLMeshDAGGenerator(
    ...,
    gateway="prod",
    environment=""  # Correct for production
)

4. Changes to models are not picked up

Cause: SQLMesh requires a plan/apply step to register changes. Fix: Enable auto_replan_on_change in your generator config:

generator = SQLMeshDAGGenerator(
    ...,
    auto_replan_on_change=True  # Automatically runs 'sqlmesh plan --auto-apply'
)

Or run sqlmesh plan --auto-apply manually.

5. sqlmesh_plan_apply task takes 7+ minutes even with no changes

Cause: When auto_replan_on_change=True, the plan task may run full backfills even when there are no model changes. Fix (v0.9.0+): The plan optimization now:

  • โœ… Detects "no changes" scenarios and skips apply (~15 seconds instead of 7+ minutes)
  • โœ… Logs detailed timing for each phase (context load, plan compute, apply)

For additional control, use these options:

generator = SQLMeshDAGGenerator(
    ...,
    auto_replan_on_change=True,
    skip_backfill=True,    # Skip apply if backfill is required (use CI/CD for initial backfills)
    plan_only=True,        # Generate plan without applying (for review/dry-run)
    log_plan_details=True  # Log detailed plan information (default: True)
)

Recommended CI/CD approach:

  • Run sqlmesh plan --auto-apply in CI/CD on model changes (handles backfills)
  • Set skip_backfill=True in Airflow DAG (only applies non-backfill changes)
  • This gives you fast DAG runs (~15s) while ensuring changes are still applied

5. 3-Part Table Naming Issues (Cross-Database)

Cause: SQLMesh generates queries with 3-part names (catalog.schema.table) which might fail if the catalog name doesn't match. Fix: Set default_catalog in your config to tell SQLMesh which catalog to omit or use.

generator = SQLMeshDAGGenerator(
    ...,
    default_catalog="my_catalog"
)

Advanced Features

Health Check Task

Add a pre-flight check to verify database connectivity before running models:

generator = SQLMeshDAGGenerator(
    ...,
    enable_health_check=True
)

Skip Audits

For faster development iterations, you can skip audit checks:

generator = SQLMeshDAGGenerator(
    ...,
    skip_audits=True
)

Partial DAG Runs

You can generate tasks for a subset of models (useful for testing):

# In your DAG file
generator.create_tasks_in_dag(dag, models=["model_a", "model_b"])

Contributing

Contributions are welcome! Please read our Contributing Guide for details.

License

MIT License - see LICENSE file for details.

Comparison with Tobiko Cloud

Feature Tobiko Cloud SQLMesh DAG Generator
Cost Paid Free & Open Source
Deployment Cloud-based Self-hosted
Customization Limited Fully Customizable
Privacy External On-premise
Dependencies Cloud connection None

Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sqlmesh_dag_generator-0.9.0.tar.gz (63.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sqlmesh_dag_generator-0.9.0-py3-none-any.whl (43.8 kB view details)

Uploaded Python 3

File details

Details for the file sqlmesh_dag_generator-0.9.0.tar.gz.

File metadata

  • Download URL: sqlmesh_dag_generator-0.9.0.tar.gz
  • Upload date:
  • Size: 63.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for sqlmesh_dag_generator-0.9.0.tar.gz
Algorithm Hash digest
SHA256 bb22ce1253e08b1eba27174cd1c0fabd5baffee5ffca011b64bdf09b3bdd671f
MD5 e5a858be7899088cc648306e4acd0b64
BLAKE2b-256 07b3ee729631ec0c1e4718b3aebb9478d08240951a1a6ef6d15e3bab8edf704f

See more details on using hashes here.

File details

Details for the file sqlmesh_dag_generator-0.9.0-py3-none-any.whl.

File metadata

File hashes

Hashes for sqlmesh_dag_generator-0.9.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8d8f7418b2ed16524aa8e475d8dccc81999e71c47a519767e8a9f4ccbc77a33b
MD5 ad682cdc4c246b497179d7965fd739a7
BLAKE2b-256 178aa7d2d11dd87ca9c685d6eb3c8f8fb315441a36e7f03abc6fed5e9d7b0e78

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page