Skip to main content

An opinionated framework for ETL built on top of Airflow

Project description

gusty

Versions PyPi build coverage Code Style

gusty allows you to manage your Airflow DAGs, tasks, and task groups with greater ease. It can automatically generate dependencies between tasks and external dependencies for tasks in other DAGs.

The gusty approach to Airflow is that individual tasks are represented as YAML, where an operator and its arguments, along with its dependencies and external dependencies, are specified in a .yml file. By passing a directory path of these YAML task specifications to gusty's create_dag function, you can have your DAGs create themselves.

In addition to parsing YAML files, gusty also parses YAML front matter in .ipynb and .Rmd files, allowing you to include Python and R notebook formats in your data pipeline straightaway.

Lastly, gusty's create_dag function can be passed any keyword argument from Airflow's DAG class, as well as dictionaries for task group defaults and external dependency sensor defaults. And if you'd rather, gusty can pick up per-DAG and per-task-group specifications via YAML files titled METADATA.yml - which will override any defaults passed to create_dag - so you can specify defaults and override those defaults with metadata.

gusty works with both Airflow 1.x and Airflow 2.x, and automatically generates task groups in Airflow 2.x. Plus, you can specify task group dependencies and external_dependencies in each task group's METADATA.yml file.

In short, gusty allows you to focus on the tasks in a pipeline instead of the scaffolding.

Up and running with create_dag

gusty's core function is create_dag. To have gusty generate a DAG, provide a path to a directory that contains .yml files for the DAG's tasks. The create_dag function can take any keyword arguments from Airflow's DAG class, as well as dictionaries for task group defaults (task_group_defaults) and external dependency sensor defaults (wait_for_defaults).

An example of the entire .py file that generates your DAG looks like this:

import airflow
from datetime import timedelta
from airflow.utils.dates import days_ago
from gusty import create_dag

dag = create_dag(
  '/usr/local/airflow/dags/hello_world',
  description="A dag created without metadata",
  schedule_interval="0 0 * * *",
  default_args={
      "owner": "gusty",
      "depends_on_past": False,
      "start_date": days_ago(1),
      "email": "gusty@gusty.com",
      "email_on_failure": False,
      "email_on_retry": False,
      "retries": 3,
      "retry_delay": timedelta(minutes=5),
  },
  task_group_defaults={"prefix_group_id": True},
  wait_for_defaults={"retries": 10},
  latest_only=False
  )

Note you must import Airflow.

The resulting DAG will be named after the directory, in this case, hello_world. By default, gusty will create a latest_only DAG, where every job in the DAG will only run for the most recent run date, regardless of if a backfill is called. This behavior is disabled above using latest_only=False. As with anything in gusty, all of these parameters can also be specified in METADATA.yml files.

Examples how to create a directory of DAG task files can be found in the gusty repo's example folder.

Containerized Demo

As an additional resource, you can check out a containerized demo of gusty and Airflow over at the gusty-demo repo, which illustrates how gusty and a few custom operators can make SQL queries, Jupyter notebooks, and RMarkdown documents all work together in the same data pipeline.

For more details on the gusty package, please see below.

Hello World in YAML

YAML Tasks

Instead of importing and calling a BashOperator directly, you can specify the operator and the bash_command parameter (which is a required field for Airflow's BashOperator) in a .yml file:

operator: airflow.operators.bash.BashOperator
bash_command: echo hello world

gusty takes this .yml and turns it into a task based on its file name. If this file was called hello_world.yml, the resulting task would show up in your DAG as hello_world.

Dependencies

You can also set dependencies between jobs in .yml as well. Here is another task, goodbye_world. that depends on hello_world.

operator: airflow.operators.bash.BashOperator
bash_command: echo goodbye world
dependencies:
  - hello_world

This will automatically set the goodbye_world task downstream of the hello_world task.

External dependencies for tasks in other DAGs can be set using the dag_id: task_id format:

external_dependencies:
  - dag_id: task_id

To wait for an entire external DAG to run successfully just use the format dag_id: all instead.

DAGs as YAML

As mentioned, your DAGs can also be represented as .yml files. Specifically, DAGs should be represented in a file called METADATA.yml. Similar to the basic Airflow tutorial, our DAG's .yml might look something like this:

description: "A Gusty version of the DAG described by this Airflow tutorial: https://airflow.apache.org/docs/stable/tutorial.html"
schedule_interval: "1 0 * * *"
default_args:
    owner: airflow
    depends_on_past: False
    start_date: !days_ago 1
    email: airflow@example.com
    email_on_failure: False
    email_on_retry: False
    retries: 1
    retry_delay: !timedelta 'minutes: 5'
#   queue: bash_queue
#   pool: backfill
#   priority_weight: 10
#   end_date: !datetime [2016, 1, 1]
#   wait_for_downstream: false
#   sla: !timedelta 'hours: 2'
#   trigger_rule: all_success

Task Groups

As of Airflow 2.0.0, task groups provide Airflow users with another layer of organization. gusty fully supports task groups, and creating a task group is as easy as adding a new folder within your DAG's main folder, and adding task .yml files to that subfolder. gusty will take care of the rest.

By default, gusty turns off prefixing task names with task group names, but you can enable this functionality by either adding prefix_group_id: True to a task group's METADATA.yml, or adding task_group_defaults={"prefix_group_id": True} to your call to create_dag. As mentioned, you can set defaults in your call to create_dag, then override those defaults using per-task-group METADATA.yml files.

gusty also accepts a suffix_group_id parameter, which will place the task group name at the end of the task name, if that's what you want!

In short, if it's available in a task group, it's available in gusty.

External Task Sensors

When you specify external dependencies, gusty will use Airflow's ExternalTaskSensor to create wait_for_ tasks in your DAG. Using the wait_for_defaults parameter in create_dag, you can specify the behavior of these ExternalTaskSensor tasks, things like mode ("poke"/"reschedule") and poke_interval.

You can also specify external dependencies at the DAG level if you want, and gusty will ensure that DAG-level external dependencies sit at the root of your DAG.

Root Tasks

gusty also features the ability for you to specify "root tasks" for your DAG, where a root task is defined as "some task that should happen before any other task in the DAG". To enable this, you just have to provide a list of root_tasks to the DAG's METADATA.yml or in create_dag. Root tasks will only work if they have no upstream or downstream dependencies, which enables gusty to place these tasks at the root of your DAG.

Operators

Calling Airflow Operators

gusty will take parameterized .yml for any operator, given a string that includes the module path and the operator class, such as airflow.operators.bash.BashOperator or airflow.providers.amazon.aws.transfers.s3_to_redshift.S3ToRedshiftOperator. In theory, if it's available in a module, you can use a .yml to define it.

Since sensors are also operators, you can utilize them with gusty, too!

Calling Custom Operators

gusty will also work with any of your custom operators, so long as those operators are located in an operators directory in your designated AIRFLOW_HOME.

In order for your local operators to import properly, they must follow the pattern of having a snake_case file name and a CamelCase operator name, for example the filename of an operator called YourOperator must be called your_operator.py.

Just as the BashOperator above was accessed via with full module path prepended, airflow.operators.bash.BashOperator, your local operators are accessed via the local keyword, e.g. local.YourOperator.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gusty-0.3.3.tar.gz (19.3 kB view hashes)

Uploaded Source

Built Distribution

gusty-0.3.3-py3-none-any.whl (26.8 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page