a fast & friendly airflow dag factory
Project description
DAG Tool
A Friendly Airflow DAG Build Tool for Data Engineer with YAML file template.
[!WARNING] This project will reference the DAG generate code from the Astronomer: DAG-Factory. But I replace some logic that fit with ETL propose for Data Engineer.
[!NOTE] Disclaimer: This project will override all necessary parameters that should pass to Airflow object with Data Engineer context. So, if you want to use this project concept, you can enhance it with your idea.
File Structure:
dags/
├── { domain }/
│ ├── { module-dags }/
│ │ ├── __init__.py
│ │ ├── dag.yml
│ │ ├── variables.yml
│ │ └── assets/
│ │ ├── dag-schema-mapping.json
│ │ └── dag-transform-query.sql
│ │
│ └── { module-dags }/
│ ├── __init__.py
[!NOTE] I think this project should support multiple DAGs structure like:
dags/ ├── { domain }/ │ ├── { module-dags }/ │ │ ├── __init__.py │ │ ├── dag-{ name-1 }.yml │ │ ├── dag-{ name-2 }.yml │ │ ├── variables.yml │ │ └── assets/ │ │ ├── dag-case-1-schema-mapping.json │ │ ├── dag-case-1-transform-query.sql │ │ ├── dag-case-2-schema-mapping.json │ │ └── dag-case-2-transform-query.sql │ │ │ └── { module-dags }/ │ ├── __init__.py
Execution Flow:
The flow of this project provide the interface Pydantic Model before passing it to Airflow objects.
S --> Template --> Pydantic Model --> DAG/Operator Objects --> Execute --> E
CI Flow:
S --> DAGs --> GitSync --> Airflow K8s Pod
--> Variables --> API Sync --> Airflow Variables
--> Assets --> CI Merge
Feature Supported:
- JSON Schema validation
- Passing environment variable
- Allow Passing Airflow Template
📦 Installation
[!WARNING] This package does not publish to PyPI yet.
uv pip install -U dagtool
| Airflow Version | Supported | Noted |
|---|---|---|
2.7.1 |
✅ | This is the first Airflow version that this project supported. |
>=2.7.1,<3.0.0 |
❌ | Common version support for Airflow version 2.x.x |
>=3.x.x |
❌ | Common version support for Airflow version 3.x.x |
🎯 Usage
This DAG generator engine need you define the dag.yml file and set engine
object to get the current path on __init__.py file.
[!NOTE] If you want to dynamic environment config on the
dag.yamlfile, you can use avariable.yamlfile for dynamic value that marking on config template via macro function,{{ var('keystore-on-dag-name') }}.
name: transaction
schedule: "@daily"
owner: "de-oncall@email.com,de@email.com"
authors: ["de-team"]
tags: ["sales", "tier-1", "daily"]
tasks:
- task: start
op: empty
- group: etl_sales_master
upstream: start
tasks:
- type: extract
op: python
uses: libs.gcs.csv@1.1.0
assets:
- name: schema-mapping.json
alias: schema
convertor: basic
params:
path: gcs://{{ var("PROJECT_ID") }}/sales/master/date/{ exec_date:%y }
- task: transform
upstream: extract
op: docker
uses: docker.rgt.co.th/image.transform:0.0.1
assets:
- name: transform-query.sql
alias: transform
params:
path: gcs://{{ var("PROJECT_ID") }}/landing/master/date/{ exec_date:%y }
- task: sink
op: python
run: |
import time
time.sleep(5)
- task: end
upstream: etl_sales_master
op: empty
"""# SALES DAG
This DAG will extract data from Google Cloud Storage to Google BigQuery LakeHouse
via DuckDB engine.
> This DAG is the temp DAG for ingest data to GCP.
"""
from dagtool import DagTool
dag = DagTool("sales", path=__file__, docs=__doc__)
dag.build_to_globals(gb=globals())
Output:
The DAG that was built from this package will have the name is, sales_transaction.
💬 Contribute
I do not think this project will go around the world because it has specific propose, and you can create by your coding without this project dependency for long term solution. So, on this time, you can open the GitHub issue on this project :raised_hands: for fix bug or request new feature if you want it.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dagtool-0.0.1.tar.gz.
File metadata
- Download URL: dagtool-0.0.1.tar.gz
- Upload date:
- Size: 21.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e36c0de7fde18ff9065f12573199309ed51b29bd6bbf3b9a23fb4eccec0c7c01
|
|
| MD5 |
06008b8f914c3eba4f8b291b911c4632
|
|
| BLAKE2b-256 |
3f3dffe3a00cd6be021a0608529d8574c694f1a54c0ece2c7f3deb0724153c10
|
Provenance
The following attestation bundles were made for dagtool-0.0.1.tar.gz:
Publisher:
publish.yml on ddeutils/dagtool
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dagtool-0.0.1.tar.gz -
Subject digest:
e36c0de7fde18ff9065f12573199309ed51b29bd6bbf3b9a23fb4eccec0c7c01 - Sigstore transparency entry: 411793720
- Sigstore integration time:
-
Permalink:
ddeutils/dagtool@f0c084d168d3ae8b96b0806692bcf6a59a106cc6 -
Branch / Tag:
refs/tags/v0.0.1 - Owner: https://github.com/ddeutils
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@f0c084d168d3ae8b96b0806692bcf6a59a106cc6 -
Trigger Event:
release
-
Statement type:
File details
Details for the file dagtool-0.0.1-py3-none-any.whl.
File metadata
- Download URL: dagtool-0.0.1-py3-none-any.whl
- Upload date:
- Size: 14.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e670ee9008cb40d7e5f43262218829df959e8ea73f891c045bb51c03f764832f
|
|
| MD5 |
f17ed0e139f08af8f90bfd196c9ed05a
|
|
| BLAKE2b-256 |
24f2288cbef500b6250425ac074cd8769d69432c3e07244936ba845646980ce6
|
Provenance
The following attestation bundles were made for dagtool-0.0.1-py3-none-any.whl:
Publisher:
publish.yml on ddeutils/dagtool
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dagtool-0.0.1-py3-none-any.whl -
Subject digest:
e670ee9008cb40d7e5f43262218829df959e8ea73f891c045bb51c03f764832f - Sigstore transparency entry: 411793731
- Sigstore integration time:
-
Permalink:
ddeutils/dagtool@f0c084d168d3ae8b96b0806692bcf6a59a106cc6 -
Branch / Tag:
refs/tags/v0.0.1 - Owner: https://github.com/ddeutils
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@f0c084d168d3ae8b96b0806692bcf6a59a106cc6 -
Trigger Event:
release
-
Statement type: