Skip to main content

Use Python's multiprocessing module to execute tasks(python functon and windows bat script) of a DAG in parallel

Project description

Overview

Dagline is a package for executing multiple tasks of a DAG flow in parallel. The type of task can be python function and Windows bat script.

System Requirements

  • Python 3.10+

Installation

  • python -m pip install dagline

Use-cases

  • Create a folder where you put the dag files locally. You can name your folder/py whatever you want:
    dags\
            └──dag_example.py
     
  • Define the DAG, open dag_example.py and enter the following content:
from dagline.models.dag import DAG
from dagline.models.operators.winbat import WinbatOperator
from dagline.models.operators.python import PythonOperator


def read_excel():
    '''Do your works here'''
    print('call this func read_excel')
    
def read_csv():
    '''Do your works here'''
    print('call this func read_csv')
    
def load_to_db():
    '''Do your works here'''
    print('call this func load_to_db')


task1 = PythonOperator(task_id = 'read_excel', python_callable = read_excel)
task2 = PythonOperator(task_id = 'read_csv', python_callable = read_csv)
task3 = PythonOperator(task_id = 'load_to_db', python_callable = load_to_db)
task4 = WinbatOperator(task_id = 'housekeep', bat_command = r"C:\xxx\housekeep.bat")


'''DAG graph, child task : [parent tasks]'''
tasks_flow ={
task1 : [],
task2 : [],
task3 : [task1, task2],
task4 : [task3]
}


with DAG(
    dag_id = 'dag_example',
    tasks_flow = tasks_flow,
    logfile = "C:\xxx\dag_example.log",
    retry_cnt = 3,
    retry_delay = 10 #seconds
) as dag:
    pass
  • Run a DAG in parallel or run a DAG from some specified tasks

python -m dagline dags run <dag_files_home> <dag_id> < --start_with_task_ids >
         dag_files_home                     The folder of the DAG files
         dag_id                                    The id of the dag
         --start_with_task_ids            A list of the task ids, if it was provided, the DAG will start from these tasks, not from the beginning of the DAG

>python -m dagline dags run C:\xxx\dags_tasks\dags dag_example1
>python -m dagline dags run C:\xxx\dags_tasks\dags dag_example1 --start_with_task_ids read_csv
  • Run a task of the DAG

python -m dagline tasks run <dag_files_home> <dag_id> <task_id>
        dag_files_home                     The folder of the DAG files
        dag_id                                    The id of the dag
        task_id                                   The id of the task, only runs this task in the DAG

>python -m dagline tasks run C:\xxx\dags_tasks\dags dag_example read_csv
  • Visualize  a DAG on the html page

python -m dagline dags show <dag_files_home> <dag_id>
        dag_files_home                     The folder of the DAG files
        dag_id                                    The id of the dag

>python -m dagline dags show C:\xxx\dags_tasks\dags dag_example

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagline-0.1.0.tar.gz (8.5 kB view hashes)

Uploaded Source

Built Distribution

dagline-0.1.0-py3-none-any.whl (12.6 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page