Skip to main content

Flowmelet - Airflow DAG Compile-time Generator with unified features from Airflow, dagfactor, Gusty, and DBT

Project description

Flowmelet

Versions PyPi build

Flowmelet is an Airflow DAG Generator that generates native DAG from model based structure yml files. This library is inspired by Dagfactory, Gusty, DBT.

Features

Recursive Compile-time Validations

root/domain1/layer1/validate_inlets.validation.py

def validate_python_operator_node(nodes_bag, node_id, node_type, node_data):
    is_a_task = node_type == "task"
    if 'inlets' not in node_data and is_a_task:
        raise ValueError(f"[INLETS_NOT_FOUND]: {node_id}")

Output

ValueError: [INLETS_NOT_FOUND]: root/domain1/layer1/model_2

Task Reference Expressions (DBT Inspired)

root/domain1/layer1/layer3/model_1.yml

operator: airflow.operators.bash_operator.BashOperator
params:
   table_name: schema.model_1_table
   extras:
      extra_1: extra_1
      extra_2: extra_2
sql: |
   select * from table

root/domain1/layer1/layer3/model_2.yml

dependencies:
   - model_1
operator: airflow.operators.bash_operator.BashOperator
params:
   table_name: schema.model_2_table
   extras: ${ task['root/domain1/layer1/layer3/model_2'].params.extras }
sql: |
   select * from ${ task['root/domain1/layer1/layer3/model_2'].params.table_name }

Output

root/domain1/layer1/layer3/model_2.out

dependencies:
   - model_1
operator: airflow.operators.bash_operator.BashOperator
params:
   table_name: schema.model_1_table
   extras:
      extra_1: extra_1
      extra_2: extra_2
sql: |
   select * from schema.model_2_table }

Simplified External Dependencies

root/domain1/model_2.yml

dependencies:
   - root/domain1/layer1/model_1 #External DAG Task Dependency
   - root/domain1/layer2 #External DAG Dependency
operator: airflow.operators.bash_operator.BashOperator
sql: |
   select * from table

Output

root/domain1/___sensor_root_domain1_layer1_model_1.out

dependencies: []
external_dag_id: root_domain1_layer1
external_task_id: model_1
mode: poke
operator: airflow.operators.sensors.ExternalTaskSensor
poke_interval: 600

root/domain1/___sensor_root_domain1_layer2.out

dependencies: []
external_dag_id: root_domain1_layer2
external_task_id: None
mode: poke
operator: airflow.operators.sensors.ExternalTaskSensor
poke_interval: 600
timeout: 3600

root/domain1/model_2.out

dependencies:
   - ___sensor_root_domain1_layer1_model_1
   - ___sensor_root_domain1_layer2
operator: airflow.operators.bash_operator.BashOperator
original_dependencies:
   - root/domain1/layer1/model_1
   - root/domain1/layer2
sql: select * from table

Auto-dependencies injections

root/domain1/model_2.yml

#dependencies will be auto injected from sql e.g. task["root/domain1/model_1"].params.table_name
operator: airflow.operators.bash_operator.BashOperator
params:
   table_name: schema.model_2_table
sql: |
   select * from ${ task['root/domain1/model_1'].params.table_name }

Output

root/domain1/model_2.out

dependencies:
   - model_1
operator: airflow.operators.bash_operator.BashOperator
params:
   table_name: schema.model_2_table
sql: select * from schema.model_1_table

Recursive Task Decorators

root/data_quality_check_1.decorator.yml

operator: airflow.operators.bash_operator.BashOperator
bash_command: 'echo DQ CHECK 1 FOR ALL NODES WITH airflow.operators.bash_operator.BashOperator..  DETECTED_TASK: ${task.task_id}...'
trigger_rule: 'all_done'
execution_rule: 'after'
target_pattern: airflow.operators.bash_operator.BashOperator

Output

root/domain1/___decorator_data_quality_check_1_model_2.out

bash_command:
   echo DQ CHECK 1 FOR ALL NODES WITH airflow.operators.bash_operator.BashOperator..  DETECTED_TASK: model_2...
dependencies:
   - model_2
execution_rule: after
operator: airflow.operators.bash_operator.BashOperator
target_pattern: airflow.operators.bash_operator.BashOperator
trigger_rule: all_done

Other outputs

  • root/domain1/layer1/___decorator_data_quality_check_1_model_1.out
  • root/domain1/layer1/layer3/___decorator_data_quality_check_1_model_1.out
  • root/domain1/layer2/___decorator_data_quality_check_1_model_1.out

Recursive Lineage UI (DBT Inspired)

Sample Lineage

Quick Start

  1. Clone https://github.com/yhow101/flowmelet.git
  2. Run the following command to install dependencies
    pip install -r requirements.txt
    
  3. Run the following command
    python dag_generator.py --dags_path <BASE_PATH>/samples/recursive_decorators
    
  4. Should see all generated files in target folder

Parameters

Parameter Description Required Default Value Sample
dags_path DAGS directory location X N/A python dag_generatory --dags_path <BASE_PATH>/samples/recursive_decorators
mode native - Generate only Native DAGs
dagfactory - Generate only DAGfactory DAGs
lineage - Generate only Lineages
debug - Generate only DAG and Tasks Config for debugging
all - Generate all outputs
cleanup - Clean all generated files under specified dags directory
all python dag_generatory --dags_path <BASE_PATH>/samples/recursive_decorators --mode native

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowmelet-0.0.10.tar.gz (188.3 kB view hashes)

Uploaded Source

Built Distribution

flowmelet-0.0.10-py3-none-any.whl (187.8 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page