Seamless integration between Dagster and SQLMesh for modern data engineering workflows
Project description
SQLMesh-Dagster Integration
This module provides a complete integration between SQLMesh and Dagster, allowing SQLMesh models to be materialized as Dagster assets with support for audits, metadata, and adaptive scheduling.
Features
🎯 SQLMesh Model to Dagster Asset Conversion
- Individual asset control : Each SQLMesh model becomes a separate Dagster asset with granular success/failure control
- Automatic materialization : SQLMesh models are automatically converted to Dagster assets
- External assets support : SQLMesh sources (external assets) are mapped to Dagster AssetKeys
- Automatic dependencies : Dependencies between models are preserved in Dagster
- Partitioning : Support for partitioned SQLMesh models (managed by SQLMesh, no integration with Dagster partitions - no Dagster → SQLMesh backfill)
📊 SQLMesh Metadata Integration to Dagster
- Complete metadata : Cron, tags, kind, dialect, query, partitioned_by, clustered_by
- Code versioning : Uses SQLMesh data_hash for Dagster versioning
- Column descriptions : Table metadata with descriptions
- Customizable tags : SQLMesh tags mapping to Dagster
✅ SQLMesh Audits to Asset Checks Conversion
- Automatic audits : SQLMesh audits become Dagster AssetCheckSpec
- AssetCheckResult : Automatic emission of audit results with proper output handling
- Audit metadata : SQL query, arguments, dialect, blocking status
- Non-blocking : Dagster checks are non-blocking (SQLMesh handles blocking)
- Fallback handling : Graceful handling when no evaluation events are found
⏰ Adaptive Scheduling
- Automatic analysis : Detection of the finest granularity from SQLMesh crons
- Adaptive schedule : Automatic creation of a Dagster schedule based on crons
- Intelligent execution : SQLMesh manages which models should be executed
- Monitoring : Detailed logs and granularity metadata
🔧 All-in-One Factory
- Simple configuration : Single function to configure everything
- Extensible translator : Customizable translator system
- Automatic validation : External dependencies validation
- Retry policy : Centralized retry policy configuration
Basic Usage
Simple Factory (Recommended)
from dagster import RetryPolicy, AssetKey, Backoff
from .decorators import sqlmesh_definitions_factory
from .translator import SQLMeshTranslator
class SlingToSqlmeshTranslator(SQLMeshTranslator):
def get_external_asset_key(self, external_fqn: str) -> AssetKey:
"""
Custom mapping for external assets.
SQLMesh: 'jaffle_db.main.raw_source_customers' → Sling: ['target', 'main', 'raw_source_customers']
"""
parts = external_fqn.replace('"', '').split('.')
if len(parts) >= 3:
catalog, schema, table = parts[0], parts[1], parts[2]
return AssetKey(['target', 'main', table])
return AssetKey(['external'] + parts[1:])
# All-in-one factory: everything configured in one line!
defs = sqlmesh_definitions_factory(
project_dir="sqlmesh_project",
gateway="postgres",
translator=SlingToSqlmeshTranslator(),
concurrency_limit=1,
name="sqlmesh_multi_asset",
group_name="sqlmesh",
op_tags={"team": "data", "env": "prod"},
retry_policy=RetryPolicy(max_retries=1, delay=30.0, backoff=Backoff.EXPONENTIAL),
)
Advanced Configuration
from dagster import Definitions, RetryPolicy
from .decorators import sqlmesh_assets_factory, sqlmesh_adaptive_schedule_factory
from .resource import SQLMeshResource
from .translator import SQLMeshTranslator
# SQLMesh resource configuration
sqlmesh_resource = SQLMeshResource(
project_dir="sqlmesh_project",
gateway="postgres",
translator=SlingToSqlmeshTranslator(),
concurrency_limit=1,
ignore_cron=True # only for testing purposes
)
# SQLMesh assets configuration
sqlmesh_assets = sqlmesh_assets_factory(
sqlmesh_resource=sqlmesh_resource,
name="sqlmesh_multi_asset",
group_name="sqlmesh",
op_tags={"team": "data", "env": "prod"},
retry_policy=RetryPolicy(max_retries=1, delay=30.0, backoff=Backoff.EXPONENTIAL),
)
# Adaptive schedule and job created automatically
sqlmesh_adaptive_schedule, sqlmesh_job, _ = sqlmesh_adaptive_schedule_factory(
sqlmesh_resource=sqlmesh_resource
)
defs = Definitions(
assets=[sqlmesh_assets],
jobs=[sqlmesh_job],
schedules=[sqlmesh_adaptive_schedule],
resources={
"sqlmesh": sqlmesh_resource,
},
)
Custom Translator
To map external assets (SQLMesh sources) to your Dagster conventions, you can create a custom translator:
from .translator import SQLMeshTranslator
import dagster as dg
class MyCustomTranslator(SQLMeshTranslator):
def get_external_asset_key(self, external_fqn: str) -> dg.AssetKey:
"""
Custom mapping for external assets.
Example: 'jaffle_db.main.raw_source_customers' → ['target', 'main', 'raw_source_customers']
"""
parts = external_fqn.replace('"', '').split('.')
# We ignore the catalog (jaffle_db), we take the rest
return dg.AssetKey(['target'] + parts[1:])
def get_group_name(self, context, model) -> str:
"""
Custom mapping for groups.
"""
model_name = getattr(model, "view_name", "")
if model_name.startswith("stg_"):
return "staging"
elif model_name.startswith("mart_"):
return "marts"
return super().get_group_name(context, model)
Translator Methods
The SQLMeshTranslator exposes several methods you can override:
get_external_asset_key(external_fqn: str) -> AssetKey
Maps an external asset FQN to a Dagster AssetKey.
get_asset_key(model) -> AssetKey
Maps a SQLMesh model to a Dagster AssetKey.
get_group_name(context, model) -> str
Determines the group for a model.
get_tags(context, model) -> dict
Generates tags for a model.
get_metadata(model, keys: list[str]) -> dict
Extracts specified metadata from the model.
Asset Checks and Audits
Automatic Audit Conversion
SQLMesh audits are automatically converted to Dagster AssetCheckSpec:
# SQLMesh audit
MODEL (
name customers,
audits (
not_null(column=id),
unique_values(columns=[id, email])
)
);
# Automatically becomes in Dagster
AssetCheckSpec(
name="not_null",
asset=AssetKey(["customers"]),
blocking=False, # SQLMesh handles blocking
description="SQLMesh audit: not_null(column=id)"
)
AssetCheckResult Emission
During execution, audit results are emitted as AssetCheckResult:
AssetCheckResult(
passed=True,
asset_key=AssetKey(["customers"]),
check_name="not_null",
metadata={
"sqlmesh_model_name": "customers",
"audit_query": "SELECT COUNT(*) FROM customers WHERE id IS NULL",
"audit_blocking": False,
"audit_dialect": "postgres",
"audit_args": {"column": "id"}
}
)
Adaptive Scheduling
Automatic Cron Analysis
The system automatically analyzes all SQLMesh crons and determines the finest granularity:
# If you have models with different crons:
# - customers: @daily
# - orders: @hourly
# - events: */5 * * * * (every 5 minutes)
# The adaptive schedule will be: */5 * * * * (every 5 minutes)
Intelligent Execution
The schedule runs sqlmesh run on all models, but SQLMesh automatically manages which models should be executed:
# The schedule simply does:
sqlmesh_resource.context.run(
ignore_cron=False, # SQLMesh respects crons
execution_time=datetime.datetime.now(),
)
Architecture
Individual Asset Pattern
Each SQLMesh model becomes a separate Dagster asset that:
- Materializes independently : Each asset calls
sqlmesh.materialize_assets_threaded()for its specific model - Controls success/failure : Each asset can succeed or fail individually based on SQLMesh execution results
- Handles dependencies : Uses
translator.get_model_deps_with_external()for proper dependency mapping - Manages checks : Each asset handles its own audit results with
AssetCheckResultoutputs
Benefits of Individual Assets
- Granular control : Each asset can succeed or fail independently in the Dagster UI
- Clear visibility : See exactly which models are running, succeeded, or failed
- Individual retries : Failed assets can be retried without affecting others
- Better monitoring : Track performance and issues per model
- Flexible scheduling : Different assets can have different schedules if needed
SQLMeshResource
- Manages SQLMesh context and caching
- Implements strict singleton pattern
- Uses AnyIO for multithreading
- Accepts a custom translator
SQLMeshTranslator
- Maps SQLMesh concepts to Dagster
- Extensible via inheritance
- Handles external assets and dependencies
SQLMesh Metadata via Tags
You can pass metadata from SQLMesh models to Dagster assets using the tag convention dagster:property:value:
-- In your SQLMesh model
MODEL (
name customers,
tags ARRAY["dagster:group_name:sqlmesh_datamarts"],
-- ... other model properties
);
Supported Properties
Currently supported Dagster properties via tags:
dagster:group_name:value: Sets the Dagster asset group name- Example:
"dagster:group_name:sqlmesh_datamarts" - Result: Asset will be in the "sqlmesh_datamarts" group
- Example:
Tag Convention
The convention follows the pattern: dagster:property:value
dagster: Prefix to indicate this is for Dagsterproperty: The Dagster property to update on the assetvalue: The value to set for that property
Priority Order
When determining asset properties, the translator follows this priority:
- SQLMesh tags :
dagster:group_name:value(highest priority) - Factory parameter :
group_name="sqlmesh"in factory call - Default logic : Automatic group determination based on model path
sqlmesh_definitions_factory
- All-in-one factory for simple configuration
- Automatically creates: resource, assets, job, schedule
- Validates external dependencies
- Returns Definitions directly
SQLMeshEventCaptureConsole
- Custom SQLMesh console to capture events
- Captures audit results for AssetCheckResult
- Handles metadata serialization
Plan + Run Architecture
Individual Asset Materialization
Each Dagster asset materializes its specific SQLMesh model using:
- Model Selection :
get_models_to_materialize()selects the specific model for the asset - Materialization :
sqlmesh.materialize_assets_threaded()executes the model - Result Handling : Console events determine success/failure and audit results
Implementation Details
# In each individual asset
def model_asset(context: AssetExecutionContext, sqlmesh: SQLMeshResource):
# Materialize this specific model
models_to_materialize = get_models_to_materialize(
[current_asset_spec.key],
sqlmesh.get_models,
sqlmesh.translator,
)
# Execute materialization
plan = sqlmesh.materialize_assets_threaded(models_to_materialize, context=context)
# Check results via console events
failed_models_events = sqlmesh._console.get_failed_models_events()
evaluation_events = sqlmesh._console.get_evaluation_events()
# Return MaterializeResult + AssetCheckResult for audits
return MaterializeResult(...), *check_results
This approach provides granular control while maintaining all SQLMesh integration features.
Performance
- Individual execution : Each asset runs its own SQLMesh materialization (may result in multiple
sqlmesh runcalls) - Strict singleton : Only one active SQLMesh instance
- Caching : Contexts, models and translators are cached
- Multithreading : Uses AnyIO to avoid Dagster blocking
- Lazy loading : Resources are loaded on demand
- Early validation : External dependencies validation before execution
- Optimized execution : SQLMesh automatically skips models that don't need materialization
Development Workflow
SQLMesh Development Philosophy
This module follows SQLMesh's philosophy of separation of concerns:
- Development : Use SQLMesh CLI for development and schema changes
- Production : Use SQLMesh CLI for promoting changes
- Orchestration : Use this Dagster module only for running models
Development Workflow
1. Local Development
# Develop your models locally
sqlmesh plan dev
sqlmesh apply dev
# Test your changes
sqlmesh run dev
2. Production Promotion
# Promote changes to production
sqlmesh plan prod # ->manual operation to validate the plan (apply it)
# Or use CI/CD pipeline
# - sqlmesh plan prod
3. Dagster Orchestration
# Dagster takes over for production runs
# - Automatic scheduling via adaptive schedule
# - Manual runs via Dagster UI
# - Only executes: sqlmesh run prod
Module Responsibilities
What this module DOES:
- ✅ Orchestrates
sqlmesh runcommands - ✅ Schedules model execution
- ✅ Monitors execution and audits
- ✅ Emits Dagster events and metadata
What this module DOES NOT:
- ❌ Plan changes (
sqlmesh plan) - ❌ Apply changes (
sqlmesh apply) - ❌ Handle breaking changes
- ❌ Manage environments
Breaking Changes Management
Breaking changes are handled outside this module:
- Development :
sqlmesh plan dev+ manual review - Production :
sqlmesh plan prod+ CI/CD approval - Orchestration : This module only runs approved models
Environment Separation
# Development (SQLMesh CLI)
sqlmesh plan dev
sqlmesh apply dev
sqlmesh run dev
# Production (Dagster module)
# Automatically runs: sqlmesh run prod
# Based on schedules and triggers
This separation ensures:
- ✅ Clear responsibilities : Development vs Orchestration
- ✅ Safe deployments : Breaking changes handled by SQLMesh CLI
- ✅ Reliable orchestration : Dagster only runs approved models
- ✅ CI/CD friendly : Standard SQLMesh workflow for deployments
Limitations
- Multiple SQLMesh runs : Each asset triggers its own
sqlmesh run(may impact performance with many assets) - No Dagster → SQLMesh backfill : Partitions managed only by SQLMesh itself (run a materialization to backfill)
- Breaking changes : Handled outside the module (SQLMesh CLI or CI/CD)
- Environment management : SQLMesh CLI or CI/CD
Troubleshooting
Common Issues
"Invalid cron" errors
- Cause : Cron faster than 5 minutes
- Solution : Use
ignore_cron=Truefor testing
External asset mapping errors
- Cause : Translator doesn't handle FQN format
- Solution : Check
get_external_asset_keymethod
Performance issues
- Cause : Too many models loaded
- Solution : Use
concurrency_limitand caching
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dg_sqlmesh-1.2.0.tar.gz.
File metadata
- Download URL: dg_sqlmesh-1.2.0.tar.gz
- Upload date:
- Size: 38.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.24
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b5c24b7e7208efc3f3fd71110e136e49f4ae398919a4237953dc748d1b99401c
|
|
| MD5 |
ed592cb0d1751125dcdd566058bc2331
|
|
| BLAKE2b-256 |
05a314bacb23a7b1ac26d26dcdd2004541ba09ce7fb0dc1c2f9da2fd5dbf172a
|
File details
Details for the file dg_sqlmesh-1.2.0-py3-none-any.whl.
File metadata
- Download URL: dg_sqlmesh-1.2.0-py3-none-any.whl
- Upload date:
- Size: 36.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.24
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8fdda457e1c574e1feeca8e1a26e1b9001457276223f49d2d72bc442b2f8a5e5
|
|
| MD5 |
e26ff4ab01f3cd395b1af3967ef686b0
|
|
| BLAKE2b-256 |
814d1839f087696c11fce55b4257e8cafa6d0eaf50d481d285806f3f85c84e67
|