MultiDagRunPlugin for airflow
Project description
Multi dag run
This plugin contains operators for triggering a DAG run multiple times and you can dynamically specify how many DAG run instances create.
It can be useful when you have to handle a big data and you want to split it into chunks and run multiple instances of the same task in parallel.
When you see a lot launched target DAGs you can set up more workers. So this makes it pretty easy to scale.
Install
pip install airflow_multi_dagrun
Example
Code for scheduling dags
import datetime as dt from airflow import DAG from airflow.operators.dagrun_operator import DagRunOrder from airflow.operators.multi_dagrun import TriggerMultiDagRunOperator def generate_dag_run(): for i in range(100): yield DagRunOrder(payload={'index': i}) default_args = { 'owner': 'airflow', 'start_date': dt.datetime(2015, 6, 1), } dag = DAG('reindex_scheduler', schedule_interval=None, default_args=default_args) ran_dags = TriggerMultiDagRunOperator( task_id='gen_target_dag_run', dag=dag, trigger_dag_id='example_target_dag', python_callable=generate_dag_run, )
This code will schedule dag with id example_target_dag
100 times and pass payload to it.
Example of triggered dag:
dag = DAG( dag_id='example_target_dag', schedule_interval=None, default_args={'start_date': datetime.utcnow(), 'owner': 'airflow'}, ) def run_this_func(dag_run, **kwargs): print("Chunk received: {}".format(dag_run.conf['index'])) chunk_handler = PythonOperator( task_id='chunk_handler', provide_context=True, python_callable=run_this_func, dag=dag )
Run example
There is docker-compose config, so it requires docker to be installed: docker
, docker-compose
make run
- start docker containers, init db, run airflow webservermake down
- destroy docker containers
Contributions
If you have found a bug or have some idea for improvement feel free to create an issue or pull request.
License
Apache 2.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Built Distribution
Hashes for airflow_multi_dagrun-1.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | fa39739ecba082dc1feb0f3c9574c62ab11fb6ec7c804bdbe909b75ac62c48e6 |
|
MD5 | 544e949a46fa949631be32356f6d1126 |
|
BLAKE2-256 | 4632d25181654e2e72fc6bc0d5db67a272690e933dfd81f891a1673c2a7c51cc |