Use Python's multiprocessing module to execute tasks(python functon and windows bat script) of a DAG in parallel
Project description
Overview
Dagline is a package for executing multiple tasks of a DAG flow in parallel. The type of task can be python function and Windows bat script.
System Requirements
- Python 3.10+
Installation
- python -m pip install dagline
Use-cases
- Create a folder where you put the dag files locally. You can name your folder/py whatever you want:
dags\
└──dag_example.py
- Define the DAG, open dag_example.py and enter the following content:
from dagline.models.dag import DAG
from dagline.models.operators.winbat import WinbatOperator
from dagline.models.operators.python import PythonOperator
def read_excel():
'''Do your works here'''
print('call this func read_excel')
def read_csv():
'''Do your works here'''
print('call this func read_csv')
def load_to_db():
'''Do your works here'''
print('call this func load_to_db')
task1 = PythonOperator(task_id = 'read_excel', python_callable = read_excel)
task2 = PythonOperator(task_id = 'read_csv', python_callable = read_csv)
task3 = PythonOperator(task_id = 'load_to_db', python_callable = load_to_db)
task4 = WinbatOperator(task_id = 'housekeep', bat_command = r"C:\xxx\housekeep.bat")
'''DAG graph, child task : [parent tasks]'''
tasks_flow ={
task1 : [],
task2 : [],
task3 : [task1, task2],
task4 : [task3]
}
with DAG(
dag_id = 'dag_example',
tasks_flow = tasks_flow,
logfile = "C:\xxx\dag_example.log",
retry_cnt = 3,
retry_delay = 10 #seconds
) as dag:
pass
- Run a DAG in parallel or run a DAG from some specified tasks
python -m dagline dags run <dag_files_home> <dag_id> < --start_with_task_ids >
dag_files_home The folder of the DAG files
dag_id The id of the dag
--start_with_task_ids A list of the task ids, if it was provided, the DAG will start from these tasks, not from the beginning of the DAG
>python -m dagline dags run C:\xxx\dags_tasks\dags dag_example1
>python -m dagline dags run C:\xxx\dags_tasks\dags dag_example1 --start_with_task_ids read_csv
- Run a task of the DAG
python -m dagline tasks run <dag_files_home> <dag_id> <task_id>
dag_files_home The folder of the DAG files
dag_id The id of the dag
task_id The id of the task, only runs this task in the DAG
>python -m dagline tasks run C:\xxx\dags_tasks\dags dag_example read_csv
- Visualize a DAG on the html page
python -m dagline dags show <dag_files_home> <dag_id>
dag_files_home The folder of the DAG files
dag_id The id of the dag
>python -m dagline dags show C:\xxx\dags_tasks\dags dag_example
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.