Skip to main content

a fast & friendly airflow dag factory

Project description

DAG Tool

A Friendly Airflow DAG Build Tool for Data Engineer with YAML file template.

[!WARNING] This project will reference the DAG generate code from the Astronomer: DAG-Factory. But I replace some logic that fit with ETL propose for Data Engineer.

[!NOTE] Disclaimer: This project will override all necessary parameters that should pass to Airflow object with Data Engineer context. So, if you want to use this project concept, you can enhance it with your idea.

File Structure:

dags/
├── { domain }/
│     ├── { module-dags }/
│     │     ├── __init__.py
│     │     ├── dag.yml
│     │     ├── variables.yml
│     │     └── assets/
│     │         ├── dag-schema-mapping.json
│     │         └── dag-transform-query.sql
│     │
│     └── { module-dags }/
│           ├── __init__.py

[!NOTE] I think this project should support multiple DAGs structure like:

dags/
├── { domain }/
│     ├── { module-dags }/
│     │     ├── __init__.py
│     │     ├── dag-{ name-1 }.yml
│     │     ├── dag-{ name-2 }.yml
│     │     ├── variables.yml
│     │     └── assets/
│     │         ├── dag-case-1-schema-mapping.json
│     │         ├── dag-case-1-transform-query.sql
│     │         ├── dag-case-2-schema-mapping.json
│     │         └── dag-case-2-transform-query.sql
│     │
│     └── { module-dags }/
│           ├── __init__.py

Execution Flow:

The flow of this project provide the interface Pydantic Model before passing it to Airflow objects.

S --> Template --> Pydantic Model --> DAG/Operator Objects --> Execute --> E

CI Flow:

S --> DAGs      --> GitSync     --> Airflow K8s Pod
  --> Variables --> API Sync    --> Airflow Variables
  --> Assets    --> CI Merge

Feature Supported:

  • JSON Schema validation
  • Passing environment variable
  • Allow Passing Airflow Template

📦 Installation

[!WARNING] This package does not publish to PyPI yet.

uv pip install -U dagtool
Airflow Version Supported Noted
2.7.1 This is the first Airflow version that this project supported.
>=2.7.1,<3.0.0 Common version support for Airflow version 2.x.x
>=3.x.x Common version support for Airflow version 3.x.x

🎯 Usage

This DAG generator engine need you define the dag.yml file and set engine object to get the current path on __init__.py file.

[!NOTE] If you want to dynamic environment config on the dag.yaml file, you can use a variable.yaml file for dynamic value that marking on config template via macro function, {{ var('keystore-on-dag-name') }}.

name: transaction
schedule: "@daily"
owner: "de-oncall@email.com,de@email.com"
authors: ["de-team"]
tags: ["sales", "tier-1", "daily"]
tasks:
  - task: start
    op: empty

  - group: etl_sales_master
    upstream: start
    tasks:
      - type: extract
        op: python
        uses: libs.gcs.csv@1.1.0
        assets:
          - name: schema-mapping.json
            alias: schema
            convertor: basic
        params:
          path: gcs://{{ var("PROJECT_ID") }}/sales/master/date/{ exec_date:%y }

      - task: transform
        upstream: extract
        op: docker
        uses: docker.rgt.co.th/image.transform:0.0.1
        assets:
          - name: transform-query.sql
            alias: transform
        params:
          path: gcs://{{ var("PROJECT_ID") }}/landing/master/date/{ exec_date:%y }

      - task: sink
        op: python
        run: |
          import time
          time.sleep(5)

  - task: end
    upstream: etl_sales_master
    op: empty
"""# SALES DAG

This DAG will extract data from Google Cloud Storage to Google BigQuery LakeHouse
via DuckDB engine.

> This DAG is the temp DAG for ingest data to GCP.
"""
from dagtool import DagTool

dag = DagTool("sales", path=__file__, docs=__doc__)
dag.build_to_globals(gb=globals())

Output:

The DAG that was built from this package will have the name is, sales_transaction.

💬 Contribute

I do not think this project will go around the world because it has specific propose, and you can create by your coding without this project dependency for long term solution. So, on this time, you can open the GitHub issue on this project :raised_hands: for fix bug or request new feature if you want it.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagtool-0.0.1.tar.gz (21.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagtool-0.0.1-py3-none-any.whl (14.4 kB view details)

Uploaded Python 3

File details

Details for the file dagtool-0.0.1.tar.gz.

File metadata

  • Download URL: dagtool-0.0.1.tar.gz
  • Upload date:
  • Size: 21.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.8

File hashes

Hashes for dagtool-0.0.1.tar.gz
Algorithm Hash digest
SHA256 e36c0de7fde18ff9065f12573199309ed51b29bd6bbf3b9a23fb4eccec0c7c01
MD5 06008b8f914c3eba4f8b291b911c4632
BLAKE2b-256 3f3dffe3a00cd6be021a0608529d8574c694f1a54c0ece2c7f3deb0724153c10

See more details on using hashes here.

Provenance

The following attestation bundles were made for dagtool-0.0.1.tar.gz:

Publisher: publish.yml on ddeutils/dagtool

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dagtool-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: dagtool-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 14.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.8

File hashes

Hashes for dagtool-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 e670ee9008cb40d7e5f43262218829df959e8ea73f891c045bb51c03f764832f
MD5 f17ed0e139f08af8f90bfd196c9ed05a
BLAKE2b-256 24f2288cbef500b6250425ac074cd8769d69432c3e07244936ba845646980ce6

See more details on using hashes here.

Provenance

The following attestation bundles were made for dagtool-0.0.1-py3-none-any.whl:

Publisher: publish.yml on ddeutils/dagtool

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page