Skip to main content

MultiDagRunPlugin for airflow

Project description

Build Status

Multi dag run

This plugin contains operators for triggering a DAG run multiple times and you can dynamically specify how many DAG run instances create.

It can be useful when you have to handle a big data and you want to split it into chunks and run multiple instances of the same task in parallel.

When you see a lot launched target DAGs you can set up more workers. So this makes it pretty easy to scale.

Install

pip install airflow_multi_dagrun

Example

Code for scheduling dags

import datetime as dt
from airflow import DAG

from airflow_multi_dagrun.operators import TriggerMultiDagRunOperator


def generate_dag_run():
    for i in range(100):
        yield {'index': i}


default_args = {
    'owner': 'airflow',
    'start_date': dt.datetime(2015, 6, 1),
}


dag = DAG('reindex_scheduler', schedule_interval=None, default_args=default_args)


ran_dags = TriggerMultiDagRunOperator(
    task_id='gen_target_dag_run',
    dag=dag,
    trigger_dag_id='example_target_dag',
    python_callable=generate_dag_run,
)

This code will schedule dag with id example_target_dag 100 times and pass payload to it.

Example of triggered dag:

dag = DAG(
   dag_id='example_target_dag',
   schedule_interval=None,
   default_args={'start_date': datetime.utcnow(), 'owner': 'airflow'},
)


def run_this_func(dag_run, **kwargs):
   print("Chunk received: {}".format(dag_run.conf['index']))


chunk_handler = PythonOperator(
   task_id='chunk_handler',
   provide_context=True,
   python_callable=run_this_func,
   dag=dag
)

Run example

There is docker-compose config, so it requires docker to be installed: docker, docker-compose

  1. make init - create db
  2. make add-admin - create admin user (is asks a password)
  3. make web - start docker containers, run airflow webserver
  4. make scheduler - start docker containers, run airflow scheduler

make down will stop and remove docker containers

Contributions

If you have found a bug or have some idea for improvement feel free to create an issue or pull request.

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_multi_dagrun-2.3.1.tar.gz (7.8 kB view details)

Uploaded Source

Built Distribution

airflow_multi_dagrun-2.3.1-py3-none-any.whl (9.2 kB view details)

Uploaded Python 3

File details

Details for the file airflow_multi_dagrun-2.3.1.tar.gz.

File metadata

  • Download URL: airflow_multi_dagrun-2.3.1.tar.gz
  • Upload date:
  • Size: 7.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.6.1 requests/2.25.0 setuptools/47.1.0 requests-toolbelt/0.9.1 tqdm/4.52.0 CPython/3.7.8

File hashes

Hashes for airflow_multi_dagrun-2.3.1.tar.gz
Algorithm Hash digest
SHA256 603aecfda855c27932f3146dd5d63008058a4e9cd9f4a47499830895555b1f3a
MD5 ea94d17d4cb1e952f5574242ca225524
BLAKE2b-256 789f7373f28d46f67e692db88562d315b00718ed8688a19d428943a3f95eae3d

See more details on using hashes here.

File details

Details for the file airflow_multi_dagrun-2.3.1-py3-none-any.whl.

File metadata

  • Download URL: airflow_multi_dagrun-2.3.1-py3-none-any.whl
  • Upload date:
  • Size: 9.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.6.1 requests/2.25.0 setuptools/47.1.0 requests-toolbelt/0.9.1 tqdm/4.52.0 CPython/3.7.8

File hashes

Hashes for airflow_multi_dagrun-2.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 4c8febf19ad169ecca01e0bea3fcdaab071283ad0a8817dfcd6ca3189890a7a1
MD5 70810fe95c39f884a2c9a55506f8346d
BLAKE2b-256 58153478035ec3a2b719fe3e5ded0200eabe08f4602e73f91b936cb2ff0a2239

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page