Smith's production-ready Airflow DAGs from YAML — schema-validated, registry-driven, GCP-ready.
Project description
DagSmith
Smith production-ready Airflow DAGs from YAML — schema-validated, registry-driven, GCP-ready.
| Build |
|
|---|---|
| Quality |
|
| Stack |
|
| Meta |
|
| License |
|
| Docs |
|
Table of Contents
- Overview
- Key Features
- Quick Start
- CLI Reference
- YAML Spec Format
- Supported Operators & Sensors
- Generic Plugin System
- FinOps Labels
- Project Layout
- Documentation
- Development
- Contributing
- License
Overview
DagSmith is a code-generation framework that compiles structured YAML pipeline definitions into fully typed, production-ready Apache Airflow DAG files. Instead of writing repetitive Python boilerplate for each DAG, you declare your pipeline in YAML and DagSmith handles the rest: imports, operator instantiation, dependency wiring, and code formatting.
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ YAML Spec │────▶│ Validate │────▶│ Generate │────▶│ Format │
│ (author) │ │ (Pydantic) │ │ (.py DAG) │ │ (ruff) │
└──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘
Key Features
| Feature | Description |
|---|---|
| Author-time validation | Pydantic schemas catch bad config before code generation, not at Airflow deploy time |
| Pluggable registry | Add new operators/sensors to a YAML config file with zero Python code changes |
| GCP-native | First-class BigQuery and GCS operator support with automatic FinOps label injection |
| Clean output | Generated DAGs are human-readable, ruff-formatted Python you can review and version-control |
| 16+ built-in operators | BigQuery, GCS, Python, Bash, Branching, Sensors, Triggers, TaskGroups — each with full validation |
| Generic plugin system | Register any Airflow operator/sensor in YAML and use immediately |
| Variable substitution | ${VAR__NAME__VAR} expansion across all YAML sections before validation |
| Full CLI toolkit | generate, validate, list, resolve with colorized output and CI-friendly exit codes |
Quick Start
Prerequisites
| Tool | Version | Purpose |
|---|---|---|
| Python | ≥ 3.13 | Runtime |
| uv | latest | Package manager & script runner |
| ruff | latest | Linter & formatter (post-processing) |
Install
# Install uv
# Linux / macOS:
curl -LsSf https://astral.sh/uv/install.sh | sh
# Windows (PowerShell):
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
# Install ruff
# Linux / macOS:
curl -LsSf https://astral.sh/ruff/install.sh | sh
# Windows (PowerShell):
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/ruff/install.ps1 | iex"
# Install project dependencies
uv sync --group dev
Generate your first DAG
# Generate all example DAGs
dagsmith generate examples/
# Generate a single DAG
dagsmith generate examples/01_simple_bq_pipeline.yaml
# Validate without generating
dagsmith validate examples/ --strict
# List registered operators
dagsmith list
# Preview variable-expanded YAML
dagsmith resolve examples/01_simple_bq_pipeline.yaml
CLI Reference
dagsmith generate
Render YAML specs into .py DAG files.
dagsmith generate TARGETS [options]
| Flag | Description |
|---|---|
TARGETS |
One or more YAML file paths or directories (required) |
-p, --pattern REGEX |
Filter YAML files by filename regex |
-o, --output-dir DIR |
Output directory (default: ./dags/) |
--dry-run |
Validate and render without writing files |
-x, --fail-fast |
Stop on first failure |
--no-format |
Skip ruff post-processing |
dagsmith validate
Validate YAML specs without generating code. Ideal for CI gates.
dagsmith validate TARGETS [options]
| Flag | Description |
|---|---|
TARGETS |
One or more YAML file paths or directories (required) |
-p, --pattern REGEX |
Filter YAML files by filename regex |
--strict |
Treat warnings (missing metadata, zero retries, isolated tasks) as errors |
dagsmith list
Display all registered operators, sensors, and utilities.
dagsmith list [options]
| Flag | Description |
|---|---|
--origin |
Filter by section: standard, third_party, custom |
--type |
Filter by class type: operator, sensor, util, model |
dagsmith resolve
Expand ${VAR__...__VAR} references and output the fully resolved YAML.
dagsmith resolve TARGETS [options]
| Flag | Description |
|---|---|
TARGETS |
One or more YAML file paths or directories (required) |
-p, --pattern REGEX |
Filter YAML files by filename regex |
-o, --output FILE |
Write resolved YAML to a file instead of stdout |
-x, --fail-fast |
Stop on first variable expansion error |
Global flags: -v, --verbose (debug output), -q, --quiet (warnings/errors only)
YAML Spec Format
Structure
variables: # Optional — ${VAR} substitution
configurations: # Optional — reusable config values
metadata: # Required — documentation metadata
dag: # Required — airflow.DAG() constructor
gcp: # Required — GCP connection defaults
default_args: # Optional — applied to every task
user_defined_macros: # Optional — Jinja macros
tasks: # Optional — operator/sensor/group specs
dependencies: # Optional — task execution order
Full Example
variables:
VAR__PROJECT_ID__VAR: "my-gcp-project-001"
configurations:
base_path: "/home/airflow/gcs/dags/${VAR__PROJECT_ID__VAR}/"
metadata:
title: "Daily Account Activity Load"
owner: "data-team@example.com"
email: "data-team@example.com"
version: "1.0.0"
jira: "DE-101"
developer_name: "daily_load"
dag:
dag_id: "sequential_bq"
description: "Load daily account activity into BigQuery."
schedule: "0 6 * * *"
start_date: "2026-01-02 12:13:14"
timezone: "America/New_York"
catchup: false
max_active_runs: 1
dagrun_timeout: 7200
is_paused_upon_creation: true
tags:
- "warehouse:bigquery"
- "module:daily_load"
gcp:
project_id: "${VAR__PROJECT_ID__VAR}"
location: "us-east4"
default_args:
owner: "airflow"
retries: 1
retry_delay: 60
email: [ "data-team@example.com" ]
email_on_failure: true
tasks:
- task_id: "stage_data"
operator: BigQueryInsertJobOperator
sql: "sql/stage_acct_activity.sql"
params:
project_id: "${VAR__PROJECT_ID__VAR}"
src_dataset: "warehouse_tables"
- task_id: "transform_data"
operator: BigQueryInsertJobOperator
sql: "sql/transform_acct_activity.sql"
- task_id: "load_final"
operator: BigQueryInsertJobOperator
sql: "sql/load_acct_activity.sql"
retries: 3
dependencies:
- "stage_data >> transform_data >> load_final"
More YAML patterns (click to expand)
Task Groups
tasks:
- operator: TaskGroup
group_id: "staging"
tooltip: "Stage source tables"
tasks:
- task_id: "stage_orders"
operator: BigQueryInsertJobOperator
sql: "sql/stage_orders.sql"
- task_id: "stage_customers"
operator: BigQueryInsertJobOperator
sql: "sql/stage_customers.sql"
dependencies:
- "stage_orders >> stage_customers"
dependencies:
- "start >> staging >> aggregation"
Sensors
- task_id: "wait_for_upstream"
operator: ExternalTaskSensor
external_dag_id: "upstream_pipeline"
external_task_id: "final_step"
mode: "reschedule"
poke_interval: 300
timeout: 21600
allowed_states: [ "success" ]
execution_delta: 3600
Python Callable
- task_id: "validate_params"
operator: PythonOperator
python_callable: "callables.validators.validate_params"
op_kwargs:
env: "{{ params.env }}"
Generic Plugin Operator
# No Python code needed — register in airflow_registry.yaml
- task_id: "notify_slack"
operator: SlackWebhookOperator
slack_webhook_conn_id: "slack_default"
message: "Daily load completed for {{ ds }}"
channel: "#data-alerts"
Dependency Syntax
dependencies:
- "task_a >> task_b >> task_c" # sequential
- "[task_x, task_y] >> task_z" # fan-in
- "task_z >> [task_a, task_b]" # fan-out
- "task_c << [task_a, task_b]" # fan-in (reverse)
- "group_a >> group_b" # task group references
Variables & Substitution
variables:
VAR__PROJECT_ID__VAR: "my-gcp-project-001"
VAR__DATASET__VAR: "warehouse_tables"
gcp:
project_id: "${VAR__PROJECT_ID__VAR}"
Naming rules: must be ALL_UPPERCASE, begin with VAR__, end with __VAR.
Field Aliases
| Canonical | Alias | Section |
|---|---|---|
retry_delay |
retry_delay_seconds |
default_args, task-level |
sla |
sla_seconds |
default_args |
schedule |
schedule_interval |
dag |
gcp_conn_id |
google_cloud_conn_id |
gcp |
execution_delta |
execution_delta_seconds |
ExternalTaskSensor |
execution_date |
logical_date |
TriggerDagRunOperator |
poke_interval |
poll_interval |
GCSObjectsWithPrefixExistenceSensor |
Supported Operators & Sensors
Built-in operators have dedicated Pydantic models with full field-level validation.
Standard (Airflow core)
| Operator | Description |
|---|---|
PythonOperator |
Run a Python callable |
BranchPythonOperator |
Branch based on callable return value |
BashOperator |
Execute a bash command |
EmptyOperator |
No-op placeholder / pipeline marker |
TriggerDagRunOperator |
Trigger another DAG |
ExternalTaskSensor |
Wait for a task in another DAG |
BigQuery
| Operator | Description |
|---|---|
BigQueryInsertJobOperator |
Run SQL via BigQuery Jobs API |
BigQueryCheckOperator |
Assert a SQL query returns truthy |
BigQueryValueCheckOperator |
Assert a SQL scalar matches expected value |
BigQueryTableExistenceSensor |
Wait for a table to exist |
GCS (Google Cloud Storage)
| Operator | Description |
|---|---|
GCSToBigQueryOperator |
Load GCS files into BigQuery |
GCSToGCSOperator |
Copy/move objects between GCS buckets |
GCSDeleteObjectsOperator |
Delete objects from a GCS bucket |
GCSObjectsWithPrefixExistenceSensor |
Wait for objects with a prefix to exist |
Generic Plugin
Any operator/sensor registered in configs/airflow_registry.yaml works immediately — no Python code changes.
Generic Plugin System
# 1. Register in configs/airflow_registry.yaml (or DAGSMITH_EXTRA_REGISTRY)
airflow_class_registry:
custom:
SlackWebhookOperator:
module: airflow.providers.slack.operators.slack_webhook
class: SlackWebhookOperator
type: operator
# 2. Use in any YAML spec
tasks:
- task_id: "notify"
operator: SlackWebhookOperator
slack_webhook_conn_id: "slack_default"
message: "Pipeline complete!"
Use DAGSMITH_EXTRA_REGISTRY env var to maintain a separate registry file without modifying the bundled config:
export DAGSMITH_EXTRA_REGISTRY=/path/to/my_registry.yaml
dagsmith generate specs/
| Built-in operators | Generic operators | Generic sensors | |
|---|---|---|---|
| Field validation | Full Pydantic schema | None (runtime errors) | Sensor fields only |
| Registry entry | Not required | Required | Required |
| Python changes | None | None | None |
FinOps Labels
Every BigQueryInsertJobOperator task automatically gets FinOps labels injected from
configs/airflow_registry.yaml:
| Label | Value (Jinja template) |
|---|---|
dag_id |
{{ dag.dag_id }} |
task_id |
{{ task.task_id }} |
execution_date |
{{ ds_nodash }} |
instance_name |
{{ var.value.composer_env_name | default('composer') }} |
run_id |
Cleaned, lowercase dag_run.run_id |
Project Layout
dagsmith/
configs/
airflow_registry.yaml # operator/sensor registry + FinOps labels
examples/ # 18 sample YAML DAG specs
references/
reference_template.yaml # fully documented YAML template
docs/ # interactive HTML documentation (GitHub Pages)
src/
cli.py # CLI: generate, validate, list, resolve
loader.py # YAML loading + ${VAR} expansion + validation
code_generator.py # renders YamlDagSpec -> .py string
callables.py # dotted-path -> (module, fn, alias) resolver
dependencies.py # >> / << dependency string parser
cron.py # cron expression humanizer
utils.py # py_repr, safe_var, humanize_readable_time
registry/
core.py # loads airflow_registry.yaml, get_import_line
models.py # RegistryEntry, RegistryConfig Pydantic models
schemas/
__init__.py # YamlDagSpec root model, discriminated unions
base.py # BaseTaskSpec, BaseSensorOperatorSpec, DagSpec
generic.py # GenericOperatorSpec, GenericSensorSpec
shared_renderers.py # render_common_fields, render_bigquery_common_fields
bigquery/ # BQ operator/sensor specs + renderers
gcs/ # GCS operator/sensor specs + renderers
standard/ # PythonOperator, BashOperator, etc.
tests/ # mirrors src/ layout
pyproject.toml # deps, ruff, mypy, pytest config
Dockerfile # lightweight runtime image (python:3.13-slim + uv)
Documentation
Full interactive documentation is available under docs/:
| Page | Description |
|---|---|
| Overview | Features, how it works, operator table |
| Quick Start | Installation and first DAG |
| CLI Reference | All commands, flags, and examples |
| YAML Spec Format | Every section and field documented |
| Operators & Sensors | All 16 operators with field details |
| Callables | Callbacks, python_callable, placement guide |
| Best Practices | FinOps, aliases, architecture, tips |
| Full YAML Template | Copy-paste reference + generated output examples |
GitHub Pages: Enable Pages with source set to
/docsfolder to host the documentation site.
Development
Toolchain
| Tool | Purpose | Config |
|---|---|---|
| uv | Package manager + script runner | pyproject.toml |
| ruff | Lint + format (line-length: 110) | [tool.ruff] |
| mypy | Strict type checking (Python 3.13) | [tool.mypy] |
| pytest | Testing (coverage ≥ 80%) | [tool.pytest] |
| yamllint | YAML linting (line-length: 110) | yamllint-config.yml |
| pre-commit | Git hooks (lint, format, license headers) | .pre-commit-config.yaml |
Commands
uv sync --group dev # install all deps (runtime + dev)
uv run pytest # run tests (coverage threshold: 80%)
uv run pytest -x # fail fast
uv run pytest -n auto # parallel execution
uv run ruff check --fix . # lint + auto-fix
uv run ruff format . # format
uv run mypy . # type-check
Adding a new operator
- Zero-code path: Register in
configs/airflow_registry.yamland use as a generic plugin immediately. - Full validation path: Create spec class + renderer in
src/schemas/<category>/, add to discriminated unions insrc/schemas/__init__.py, add renderer dispatch incode_generator.py, register inairflow_registry.yaml, add tests.
References & Examples
- Reference Template — Fully documented YAML template covering every supported section and field with inline comments, defaults, aliases, and usage notes.
- Examples — 18 numbered sample YAML specs demonstrating specific patterns: simple pipelines, fan-out/fan-in, task groups, sensors, triggers, Python callables, GCS operations, generic plugins, branching, nested groups, macros, and task-level callbacks.
- Interactive Docs — Browsable HTML documentation with syntax-highlighted examples and copy-to-clipboard.
Contributing
- Fork the repository
- Create a feature branch (
git checkout -b feature/my-feature) - Run the full test suite (
uv run pytest) - Ensure linting and type checking pass (
uv run ruff check . && uv run mypy .) - Submit a pull request
License
Licensed under the Apache License 2.0.
Copyright 2026 DagSmith Contributors (Mayuresh Kedari)
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dagsmith-0.0.1.tar.gz.
File metadata
- Download URL: dagsmith-0.0.1.tar.gz
- Upload date:
- Size: 62.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a42324277d9cf8d610b696652f32eddd2bb6bc7f0bcadb02d4835829ceaeec8b
|
|
| MD5 |
27d39dc8e508d67550f09e44764a3c79
|
|
| BLAKE2b-256 |
77801e8a7f9e10f4a391d2557182f0001593af4ebc42c2342e8874d37ee69cf2
|
Provenance
The following attestation bundles were made for dagsmith-0.0.1.tar.gz:
Publisher:
publish.yml on Mayuresh16/dagsmith
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dagsmith-0.0.1.tar.gz -
Subject digest:
a42324277d9cf8d610b696652f32eddd2bb6bc7f0bcadb02d4835829ceaeec8b - Sigstore transparency entry: 1437528515
- Sigstore integration time:
-
Permalink:
Mayuresh16/dagsmith@d3bd4b0a0ce11da36b5826c723e3831aec0c6194 -
Branch / Tag:
refs/tags/v1-alpha - Owner: https://github.com/Mayuresh16
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@d3bd4b0a0ce11da36b5826c723e3831aec0c6194 -
Trigger Event:
release
-
Statement type:
File details
Details for the file dagsmith-0.0.1-py3-none-any.whl.
File metadata
- Download URL: dagsmith-0.0.1-py3-none-any.whl
- Upload date:
- Size: 82.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4028bdceee8b19ae8bd92143bee2b7be9ea45d8f646c741fd54f9ce4d15a2f69
|
|
| MD5 |
ccb58b3bd73054c75a98eeeccf6381e0
|
|
| BLAKE2b-256 |
9f827b5c9310b6a6e8acd59f45bf99b46c2082258f29d1fe93c1e40dd64c45d3
|
Provenance
The following attestation bundles were made for dagsmith-0.0.1-py3-none-any.whl:
Publisher:
publish.yml on Mayuresh16/dagsmith
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dagsmith-0.0.1-py3-none-any.whl -
Subject digest:
4028bdceee8b19ae8bd92143bee2b7be9ea45d8f646c741fd54f9ce4d15a2f69 - Sigstore transparency entry: 1437528530
- Sigstore integration time:
-
Permalink:
Mayuresh16/dagsmith@d3bd4b0a0ce11da36b5826c723e3831aec0c6194 -
Branch / Tag:
refs/tags/v1-alpha - Owner: https://github.com/Mayuresh16
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@d3bd4b0a0ce11da36b5826c723e3831aec0c6194 -
Trigger Event:
release
-
Statement type: