Skip to main content

Configuration based Apache Airflow

Project description

Awehflow

coverage report pipeline status

Configuration based Airflow pipelines with metric logging and alerting out the box.

Prerequisites

You will need the following to run this code:

  • Python 3

Installation

pip install awehflow[default]

If you are installing on Google Cloud Composer with Airflow 1.10.2:

pip install awehflow[composer]

Usage

Usage of awehflow can be broken up into two parts: bootstrapping and configuration of pipelines

Bootstrap

In order to expose the generated pipelines (airflow DAGs) for airflow to pick up when scanning for DAGs, one has to create a DagLoader that points to a folder where the pipeline configuration files will be located:

import os

from awehflow.alerts.slack import SlackAlerter
from awehflow.core import DagLoader
from awehflow.events.postgres import PostgresMetricsEventHandler

"""airflow doesn't pick up DAGs in files unless 
the words 'airflow' and 'DAG' features"""

configs_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'configs')

metrics_handler = PostgresMetricsEventHandler(jobs_table='jobs', task_metrics_table='task_metrics')

slack_alerter = SlackAlerter(channel='#airflow')

loader = DagLoader(
    project="awehflow-demo",
    configs_path=configs_path,
    event_handlers=[metrics_handler],
    alerters=[slack_alerter]
)

dags = loader.load(global_symbol_table=globals())

As seen in the code snippet, one can also pass in "event handlers" and "alerters" to perform actions on certain pipeline events and potentially alert the user of certain events on a given channel. See the sections below for more detail. The global symbol table needs to be passed to the loader since airflow scans it for objects of type DAG, and then synchronises the state with its own internal state store.

*caveat: airflow ignores python files that don't contain the words "airflow" and "DAG". It is thus advised to put those words in a comment to ensure the generated DAGs get picked up when the DagBag is getting filled.

Event Handlers

As a pipeline generated using awehflow is running, certain events get emitted. An event handler gives the user the option of running code when these events occur.

The following events are (potentially) potentially emitted as a pipeline runs:

  • start
  • success
  • failure
  • task_metric

Existing event handlers include:

  • PostgresMetricsEventHandler: persists pipeline metrics to a Postgres database
  • PublishToGooglePubSubEventHandler: events get passed straight to a Google Pub/Sub topic

An AlertsEventHandler gets automatically added to a pipeline. Events get passed along to registered alerters.

Alerters

An Alerter is merely a class that implements an alert method. By default a SlackAlerter is configured in the dags/PROJECT/bootstrap.py file of an awehflow project. awehflow supports the addition of multiple alerters, which allows success or failure events to be sent to mutliple channels

YAML configuration

In order to add alerts to an awehflow DAG add the following to the root space of the configuration

alert_on:
  - 'failure' # Send out a formatted message if a task in the DAG fails. This is optional
  - 'success' # Send out a formatted message once the DAG completes successfully. This is optional
Available alerters
SlackAlerter - awehflow.alerts.slack.SlackAlerter

Sends an alert to a specified slack channel via the Slack webhook functionality

  • Parameters
    • channel - The name of the channel that the alerts should be sent to
    • slack_conn_id - The name of the airflow connection that contains the token information, default: slack_default
  • Connection requirements - Create a HTTP connection with the name specified for slack_conn_id, the required HTTP fields are:
    • password - The slack token issued by your admin team, which allows for the sending of messages via the slack python API
GoogleChatAlerter - awehflow.alerts.googlechat.GoogleChatAlerter

Sends an alert to the configured Google Chat space

  • Parameters
    • gchat_conn_id - The name of the airflow connection that contains the GChat space information, default: gchat_default
  • Connection requirements - Create a HTTP connection with the name specified for the gchat_conn_id, the requried HTTP fields are:
    • host - The GChat spaces URL https://chat.googleapis.com/v1/spaces
    • password - The GChat spaces key configuration information, ex https://chat.googleapis.com/v1/spaces/SPACES_ID?key=SPACES_KEY
      • SPACES_ID - Should be supplied by your GChat admin team
      • SPACES_KEY - Should be supplied by your GChat admin team

Configuration

Awehflow configuration files can be written as .yml OR .hocon files either formats are supported

Shown below is sample hocon configuration file

{
  name: my_first_dag,
  start_date: 2022-01-01,
  catchup: true,
  schedule: "10 0 * * *",
  version: 1,
  alert_on:[
    success,
    failure
  ],
  params: {
    default: {
      source_folder: /tmp
    },
    production: {
      source_folder: /data
    }
  },
  default_dag_args: {
    retries: 1
  },
  dependencies: [
    {
      id: 'ping_sensor'
      operator: 'airflow.sensors.bash.BashSensor'
      params: {
        bash_command: 'echo ping'
        mode: 'reschedule'
      }
    }
  ],
  tasks: [
      {
        id: first_dummy_task,
        operator: airflow.operators.dummy.DummyOperator,
      },
      {
        id: first_bash_task,
        operator: airflow.operators.bash.BashOperator,
        params: {
          bash_command: 'echo "Hello World"'
        },
        upstream: [
          first_dummy_task
        ]
      }
    ]
}

This configuration does the following:

  • Creates a DAG called my_first_dag
    • Scheduled to run daily 10min past midnight
    • Catchup has been enabled to ensure all runs of the DAG since 2022-01-01 are executed
  • Dependancies
    • First check if the command echo ping succeeds
  • Tasks
    • First run a dummy task that does nothing
    • If the dummy task succeeds, execute the bash command

Running the tests

Tests may be run with

python -m unittest discover tests

or to run code coverage too:

coverage run -m unittest discover tests && coverage html

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

awehflow-2.1.4.6.0.tar.gz (25.1 kB view hashes)

Uploaded Source

Built Distribution

awehflow-2.1.4.6.0-py3-none-any.whl (29.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page