Flowmelet - Airflow DAG Compile-time Generator with unified features from Airflow, dagfactor, Gusty, and DBT
Project description
Flowmelet
Flowmelet is an Airflow DAG Generator that generates native DAG from model based structure yml files. This library is inspired by Dagfactory, Gusty, DBT.
Features
Recursive Compile-time Validations
root/domain1/layer1/validate_inlets.validation.py
def validate_python_operator_node(nodes_bag, node_id, node_type, node_data):
is_a_task = node_type == "task"
if 'inlets' not in node_data and is_a_task:
raise ValueError(f"[INLETS_NOT_FOUND]: {node_id}")
Output
ValueError: [INLETS_NOT_FOUND]: root/domain1/layer1/model_2
Task Reference Expressions (DBT Inspired)
root/domain1/layer1/layer3/model_1.yml
operator: airflow.operators.bash_operator.BashOperator
params:
table_name: schema.model_1_table
extras:
extra_1: extra_1
extra_2: extra_2
sql: |
select * from table
root/domain1/layer1/layer3/model_2.yml
dependencies:
- model_1
operator: airflow.operators.bash_operator.BashOperator
params:
table_name: schema.model_2_table
extras: ${ task['root/domain1/layer1/layer3/model_2'].params.extras }
sql: |
select * from ${ task['root/domain1/layer1/layer3/model_2'].params.table_name }
Output
root/domain1/layer1/layer3/model_2.out
dependencies:
- model_1
operator: airflow.operators.bash_operator.BashOperator
params:
table_name: schema.model_1_table
extras:
extra_1: extra_1
extra_2: extra_2
sql: |
select * from schema.model_2_table }
Simplified External Dependencies
root/domain1/model_2.yml
dependencies:
- root/domain1/layer1/model_1 #External DAG Task Dependency
- root/domain1/layer2 #External DAG Dependency
operator: airflow.operators.bash_operator.BashOperator
sql: |
select * from table
Output
root/domain1/___sensor_root_domain1_layer1_model_1.out
dependencies: []
external_dag_id: root_domain1_layer1
external_task_id: model_1
mode: poke
operator: airflow.operators.sensors.ExternalTaskSensor
poke_interval: 600
root/domain1/___sensor_root_domain1_layer2.out
dependencies: []
external_dag_id: root_domain1_layer2
external_task_id: None
mode: poke
operator: airflow.operators.sensors.ExternalTaskSensor
poke_interval: 600
timeout: 3600
root/domain1/model_2.out
dependencies:
- ___sensor_root_domain1_layer1_model_1
- ___sensor_root_domain1_layer2
operator: airflow.operators.bash_operator.BashOperator
original_dependencies:
- root/domain1/layer1/model_1
- root/domain1/layer2
sql: select * from table
Auto-dependencies injections
root/domain1/model_2.yml
#dependencies will be auto injected from sql e.g. task["root/domain1/model_1"].params.table_name
operator: airflow.operators.bash_operator.BashOperator
params:
table_name: schema.model_2_table
sql: |
select * from ${ task['root/domain1/model_1'].params.table_name }
Output
root/domain1/model_2.out
dependencies:
- model_1
operator: airflow.operators.bash_operator.BashOperator
params:
table_name: schema.model_2_table
sql: select * from schema.model_1_table
Recursive Task Decorators
root/data_quality_check_1.decorator.yml
operator: airflow.operators.bash_operator.BashOperator
bash_command: 'echo DQ CHECK 1 FOR ALL NODES WITH airflow.operators.bash_operator.BashOperator.. DETECTED_TASK: ${task.task_id}...'
trigger_rule: 'all_done'
execution_rule: 'after'
target_pattern: airflow.operators.bash_operator.BashOperator
Output
root/domain1/___decorator_data_quality_check_1_model_2.out
bash_command:
echo DQ CHECK 1 FOR ALL NODES WITH airflow.operators.bash_operator.BashOperator.. DETECTED_TASK: model_2...
dependencies:
- model_2
execution_rule: after
operator: airflow.operators.bash_operator.BashOperator
target_pattern: airflow.operators.bash_operator.BashOperator
trigger_rule: all_done
Other outputs
- root/domain1/layer1/___decorator_data_quality_check_1_model_1.out
- root/domain1/layer1/layer3/___decorator_data_quality_check_1_model_1.out
- root/domain1/layer2/___decorator_data_quality_check_1_model_1.out
Recursive Lineage UI (DBT Inspired)
Quick Start
- Clone https://github.com/yhow101/flowmelet.git
- Run the following command to install dependencies
pip install -r requirements.txt
- Run the following command
python dag_generator.py --dags_path <BASE_PATH>/samples/recursive_decorators
- Should see all generated files in target folder
Parameters
Parameter | Description | Required | Default Value | Sample |
---|---|---|---|---|
dags_path | DAGS directory location | X | N/A | python dag_generatory --dags_path <BASE_PATH>/samples/recursive_decorators |
mode | native - Generate only Native DAGs dagfactory - Generate only DAGfactory DAGs lineage - Generate only Lineages debug - Generate only DAG and Tasks Config for debugging all - Generate all outputs cleanup - Clean all generated files under specified dags directory |
all | python dag_generatory --dags_path <BASE_PATH>/samples/recursive_decorators --mode native |
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for flowmelet-0.0.8-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 84d99dc265c6a6af1bd832b176935690a8f0efd41b0236ee961da3cf51a85678 |
|
MD5 | b51ca9e833c79326d0c0c63e1e055e27 |
|
BLAKE2b-256 | a2c81528dfe1ea05956b4a379bd8526c2d94e8aa10aaa62723db6320a75e73b6 |