Skip to main content

Smith's production-ready Airflow DAGs from YAML — schema-validated, registry-driven, GCP-ready.

Project description

DagSmith

Smith production-ready Airflow DAGs from YAML — schema-validated, registry-driven, GCP-ready.

Build CI Nightly
Quality Coverage ≥80% Ruff mypy strict
Stack Python 3.13+ Airflow ≥3.2 Pydantic v2
Meta Version 0.1.0
License License: Apache 2.0
Docs Docs

Table of Contents


Overview

DagSmith is a code-generation framework that compiles structured YAML pipeline definitions into fully typed, production-ready Apache Airflow DAG files. Instead of writing repetitive Python boilerplate for each DAG, you declare your pipeline in YAML and DagSmith handles the rest: imports, operator instantiation, dependency wiring, and code formatting.

┌──────────────┐     ┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│  YAML Spec   │────▶│   Validate   │────▶│   Generate   │────▶│  Format      │
│  (author)    │     │  (Pydantic)  │     │  (.py DAG)   │     │  (ruff)      │
└──────────────┘     └──────────────┘     └──────────────┘     └──────────────┘

Key Features

Feature Description
Author-time validation Pydantic schemas catch bad config before code generation, not at Airflow deploy time
Pluggable registry Add new operators/sensors to a YAML config file with zero Python code changes
GCP-native First-class BigQuery and GCS operator support with automatic FinOps label injection
Clean output Generated DAGs are human-readable, ruff-formatted Python you can review and version-control
16+ built-in operators BigQuery, GCS, Python, Bash, Branching, Sensors, Triggers, TaskGroups — each with full validation
Generic plugin system Register any Airflow operator/sensor in YAML and use immediately
Variable substitution ${VAR__NAME__VAR} expansion across all YAML sections before validation
Full CLI toolkit generate, validate, list, resolve with colorized output and CI-friendly exit codes

Quick Start

Prerequisites

Tool Version Purpose
Python ≥ 3.13 Runtime
uv latest Package manager & script runner
ruff latest Linter & formatter (post-processing)

Install

# Install uv
# Linux / macOS:
curl -LsSf https://astral.sh/uv/install.sh | sh
# Windows (PowerShell):
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"

# Install ruff
# Linux / macOS:
curl -LsSf https://astral.sh/ruff/install.sh | sh
# Windows (PowerShell):
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/ruff/install.ps1 | iex"

# Install project dependencies
uv sync --group dev

Generate your first DAG

# Generate all example DAGs
dagsmith generate examples/

# Generate a single DAG
dagsmith generate examples/01_simple_bq_pipeline.yaml

# Validate without generating
dagsmith validate examples/ --strict

# List registered operators
dagsmith list

# Preview variable-expanded YAML
dagsmith resolve examples/01_simple_bq_pipeline.yaml

CLI Reference

dagsmith generate

Render YAML specs into .py DAG files.

dagsmith generate TARGETS [options]
Flag Description
TARGETS One or more YAML file paths or directories (required)
-p, --pattern REGEX Filter YAML files by filename regex
-o, --output-dir DIR Output directory (default: ./dags/)
--dry-run Validate and render without writing files
-x, --fail-fast Stop on first failure
--no-format Skip ruff post-processing

dagsmith validate

Validate YAML specs without generating code. Ideal for CI gates.

dagsmith validate TARGETS [options]
Flag Description
TARGETS One or more YAML file paths or directories (required)
-p, --pattern REGEX Filter YAML files by filename regex
--strict Treat warnings (missing metadata, zero retries, isolated tasks) as errors

dagsmith list

Display all registered operators, sensors, and utilities.

dagsmith list [options]
Flag Description
--origin Filter by section: standard, third_party, custom
--type Filter by class type: operator, sensor, util, model

dagsmith resolve

Expand ${VAR__...__VAR} references and output the fully resolved YAML.

dagsmith resolve TARGETS [options]
Flag Description
TARGETS One or more YAML file paths or directories (required)
-p, --pattern REGEX Filter YAML files by filename regex
-o, --output FILE Write resolved YAML to a file instead of stdout
-x, --fail-fast Stop on first variable expansion error

Global flags: -v, --verbose (debug output), -q, --quiet (warnings/errors only)

YAML Spec Format

Structure

variables:          # Optional — ${VAR} substitution
configurations:     # Optional — reusable config values
metadata:           # Required — documentation metadata
dag:                # Required — airflow.DAG() constructor
gcp:                # Required — GCP connection defaults
default_args:       # Optional — applied to every task
user_defined_macros: # Optional — Jinja macros
tasks:              # Optional — operator/sensor/group specs
dependencies:       # Optional — task execution order

Full Example

variables:
  VAR__PROJECT_ID__VAR: "my-gcp-project-001"

configurations:
  base_path: "/home/airflow/gcs/dags/${VAR__PROJECT_ID__VAR}/"

metadata:
  title: "Daily Account Activity Load"
  owner: "data-team@example.com"
  email: "data-team@example.com"
  version: "1.0.0"
  jira: "DE-101"
  developer_name: "daily_load"

dag:
  dag_id: "sequential_bq"
  description: "Load daily account activity into BigQuery."
  schedule: "0 6 * * *"
  start_date: "2026-01-02 12:13:14"
  timezone: "America/New_York"
  catchup: false
  max_active_runs: 1
  dagrun_timeout: 7200
  is_paused_upon_creation: true
  tags:
    - "warehouse:bigquery"
    - "module:daily_load"

gcp:
  project_id: "${VAR__PROJECT_ID__VAR}"
  location: "us-east4"

default_args:
  owner: "airflow"
  retries: 1
  retry_delay: 60
  email: [ "data-team@example.com" ]
  email_on_failure: true

tasks:
  - task_id: "stage_data"
    operator: BigQueryInsertJobOperator
    sql: "sql/stage_acct_activity.sql"
    params:
      project_id: "${VAR__PROJECT_ID__VAR}"
      src_dataset: "warehouse_tables"

  - task_id: "transform_data"
    operator: BigQueryInsertJobOperator
    sql: "sql/transform_acct_activity.sql"

  - task_id: "load_final"
    operator: BigQueryInsertJobOperator
    sql: "sql/load_acct_activity.sql"
    retries: 3

dependencies:
  - "stage_data >> transform_data >> load_final"
More YAML patterns (click to expand)

Task Groups

tasks:
  - operator: TaskGroup
    group_id: "staging"
    tooltip: "Stage source tables"
    tasks:
      - task_id: "stage_orders"
        operator: BigQueryInsertJobOperator
        sql: "sql/stage_orders.sql"
      - task_id: "stage_customers"
        operator: BigQueryInsertJobOperator
        sql: "sql/stage_customers.sql"
    dependencies:
      - "stage_orders >> stage_customers"

dependencies:
  - "start >> staging >> aggregation"

Sensors

- task_id: "wait_for_upstream"
  operator: ExternalTaskSensor
  external_dag_id: "upstream_pipeline"
  external_task_id: "final_step"
  mode: "reschedule"
  poke_interval: 300
  timeout: 21600
  allowed_states: [ "success" ]
  execution_delta: 3600

Python Callable

- task_id: "validate_params"
  operator: PythonOperator
  python_callable: "callables.validators.validate_params"
  op_kwargs:
    env: "{{ params.env }}"

Generic Plugin Operator

# No Python code needed — register in airflow_registry.yaml
- task_id: "notify_slack"
  operator: SlackWebhookOperator
  slack_webhook_conn_id: "slack_default"
  message: "Daily load completed for {{ ds }}"
  channel: "#data-alerts"

Dependency Syntax

dependencies:
  - "task_a >> task_b >> task_c"         # sequential
  - "[task_x, task_y] >> task_z"         # fan-in
  - "task_z >> [task_a, task_b]"         # fan-out
  - "task_c << [task_a, task_b]"         # fan-in (reverse)
  - "group_a >> group_b"                 # task group references

Variables & Substitution

variables:
  VAR__PROJECT_ID__VAR: "my-gcp-project-001"
  VAR__DATASET__VAR: "warehouse_tables"

gcp:
  project_id: "${VAR__PROJECT_ID__VAR}"

Naming rules: must be ALL_UPPERCASE, begin with VAR__, end with __VAR.

Field Aliases

Canonical Alias Section
retry_delay retry_delay_seconds default_args, task-level
sla sla_seconds default_args
schedule schedule_interval dag
gcp_conn_id google_cloud_conn_id gcp
execution_delta execution_delta_seconds ExternalTaskSensor
execution_date logical_date TriggerDagRunOperator
poke_interval poll_interval GCSObjectsWithPrefixExistenceSensor

Supported Operators & Sensors

Built-in operators have dedicated Pydantic models with full field-level validation.

Standard (Airflow core)

Operator Description
PythonOperator Run a Python callable
BranchPythonOperator Branch based on callable return value
BashOperator Execute a bash command
EmptyOperator No-op placeholder / pipeline marker
TriggerDagRunOperator Trigger another DAG
ExternalTaskSensor Wait for a task in another DAG

BigQuery

Operator Description
BigQueryInsertJobOperator Run SQL via BigQuery Jobs API
BigQueryCheckOperator Assert a SQL query returns truthy
BigQueryValueCheckOperator Assert a SQL scalar matches expected value
BigQueryTableExistenceSensor Wait for a table to exist

GCS (Google Cloud Storage)

Operator Description
GCSToBigQueryOperator Load GCS files into BigQuery
GCSToGCSOperator Copy/move objects between GCS buckets
GCSDeleteObjectsOperator Delete objects from a GCS bucket
GCSObjectsWithPrefixExistenceSensor Wait for objects with a prefix to exist

Generic Plugin

Any operator/sensor registered in configs/airflow_registry.yaml works immediately — no Python code changes.

Generic Plugin System

# 1. Register in configs/airflow_registry.yaml (or DAGSMITH_EXTRA_REGISTRY)
airflow_class_registry:
  custom:
    SlackWebhookOperator:
      module: airflow.providers.slack.operators.slack_webhook
      class: SlackWebhookOperator
      type: operator

# 2. Use in any YAML spec
tasks:
  - task_id: "notify"
    operator: SlackWebhookOperator
    slack_webhook_conn_id: "slack_default"
    message: "Pipeline complete!"

Use DAGSMITH_EXTRA_REGISTRY env var to maintain a separate registry file without modifying the bundled config:

export DAGSMITH_EXTRA_REGISTRY=/path/to/my_registry.yaml
dagsmith generate specs/
Built-in operators Generic operators Generic sensors
Field validation Full Pydantic schema None (runtime errors) Sensor fields only
Registry entry Not required Required Required
Python changes None None None

FinOps Labels

Every BigQueryInsertJobOperator task automatically gets FinOps labels injected from configs/airflow_registry.yaml:

Label Value (Jinja template)
dag_id {{ dag.dag_id }}
task_id {{ task.task_id }}
execution_date {{ ds_nodash }}
instance_name {{ var.value.composer_env_name | default('composer') }}
run_id Cleaned, lowercase dag_run.run_id

Project Layout

dagsmith/
  configs/
    airflow_registry.yaml        # operator/sensor registry + FinOps labels
  examples/                      # 18 sample YAML DAG specs
  references/
    reference_template.yaml      # fully documented YAML template
  docs/                          # interactive HTML documentation (GitHub Pages)
  src/
    cli.py                       # CLI: generate, validate, list, resolve
    loader.py                    # YAML loading + ${VAR} expansion + validation
    code_generator.py            # renders YamlDagSpec -> .py string
    callables.py                 # dotted-path -> (module, fn, alias) resolver
    dependencies.py              # >> / << dependency string parser
    cron.py                      # cron expression humanizer
    utils.py                     # py_repr, safe_var, humanize_readable_time
    registry/
      core.py                    # loads airflow_registry.yaml, get_import_line
      models.py                  # RegistryEntry, RegistryConfig Pydantic models
    schemas/
      __init__.py                # YamlDagSpec root model, discriminated unions
      base.py                    # BaseTaskSpec, BaseSensorOperatorSpec, DagSpec
      generic.py                 # GenericOperatorSpec, GenericSensorSpec
      shared_renderers.py        # render_common_fields, render_bigquery_common_fields
      bigquery/                  # BQ operator/sensor specs + renderers
      gcs/                       # GCS operator/sensor specs + renderers
      standard/                  # PythonOperator, BashOperator, etc.
  tests/                         # mirrors src/ layout
  pyproject.toml                 # deps, ruff, mypy, pytest config
  Dockerfile                     # lightweight runtime image (python:3.13-slim + uv)

Documentation

Full interactive documentation is available under docs/:

Page Description
Overview Features, how it works, operator table
Quick Start Installation and first DAG
CLI Reference All commands, flags, and examples
YAML Spec Format Every section and field documented
Operators & Sensors All 16 operators with field details
Callables Callbacks, python_callable, placement guide
Best Practices FinOps, aliases, architecture, tips
Full YAML Template Copy-paste reference + generated output examples

GitHub Pages: Enable Pages with source set to /docs folder to host the documentation site.

Development

Toolchain

Tool Purpose Config
uv Package manager + script runner pyproject.toml
ruff Lint + format (line-length: 110) [tool.ruff]
mypy Strict type checking (Python 3.13) [tool.mypy]
pytest Testing (coverage ≥ 80%) [tool.pytest]
yamllint YAML linting (line-length: 110) yamllint-config.yml
pre-commit Git hooks (lint, format, license headers) .pre-commit-config.yaml

Commands

uv sync --group dev             # install all deps (runtime + dev)
uv run pytest                   # run tests (coverage threshold: 80%)
uv run pytest -x                # fail fast
uv run pytest -n auto           # parallel execution
uv run ruff check --fix .       # lint + auto-fix
uv run ruff format .            # format
uv run mypy .                   # type-check

Adding a new operator

  1. Zero-code path: Register in configs/airflow_registry.yaml and use as a generic plugin immediately.
  2. Full validation path: Create spec class + renderer in src/schemas/<category>/, add to discriminated unions in src/schemas/__init__.py, add renderer dispatch in code_generator.py, register in airflow_registry.yaml, add tests.

References & Examples

  • Reference Template — Fully documented YAML template covering every supported section and field with inline comments, defaults, aliases, and usage notes.
  • Examples — 18 numbered sample YAML specs demonstrating specific patterns: simple pipelines, fan-out/fan-in, task groups, sensors, triggers, Python callables, GCS operations, generic plugins, branching, nested groups, macros, and task-level callbacks.
  • Interactive Docs — Browsable HTML documentation with syntax-highlighted examples and copy-to-clipboard.

Contributing

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/my-feature)
  3. Run the full test suite (uv run pytest)
  4. Ensure linting and type checking pass (uv run ruff check . && uv run mypy .)
  5. Submit a pull request

License

Licensed under the Apache License 2.0.

Copyright 2026 DagSmith Contributors (Mayuresh Kedari)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagsmith-0.0.1.tar.gz (62.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagsmith-0.0.1-py3-none-any.whl (82.7 kB view details)

Uploaded Python 3

File details

Details for the file dagsmith-0.0.1.tar.gz.

File metadata

  • Download URL: dagsmith-0.0.1.tar.gz
  • Upload date:
  • Size: 62.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dagsmith-0.0.1.tar.gz
Algorithm Hash digest
SHA256 a42324277d9cf8d610b696652f32eddd2bb6bc7f0bcadb02d4835829ceaeec8b
MD5 27d39dc8e508d67550f09e44764a3c79
BLAKE2b-256 77801e8a7f9e10f4a391d2557182f0001593af4ebc42c2342e8874d37ee69cf2

See more details on using hashes here.

Provenance

The following attestation bundles were made for dagsmith-0.0.1.tar.gz:

Publisher: publish.yml on Mayuresh16/dagsmith

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dagsmith-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: dagsmith-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 82.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for dagsmith-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 4028bdceee8b19ae8bd92143bee2b7be9ea45d8f646c741fd54f9ce4d15a2f69
MD5 ccb58b3bd73054c75a98eeeccf6381e0
BLAKE2b-256 9f827b5c9310b6a6e8acd59f45bf99b46c2082258f29d1fe93c1e40dd64c45d3

See more details on using hashes here.

Provenance

The following attestation bundles were made for dagsmith-0.0.1-py3-none-any.whl:

Publisher: publish.yml on Mayuresh16/dagsmith

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page