Configuration based Apache Airflow
Project description
Awehflow
Configuration based Airflow pipelines with metric logging and alerting out the box.
Prerequisites
You will need the following to run this code:
- Python 3
Installation
pip install awehflow[default]
If you are installing on Google Cloud Composer with Airflow 1.10.2:
pip install awehflow[composer]
Event & metric tables
Create a postgresql
database that can be referenced via Airflow connection. In the DB create the following tables
-
Jobs data table
CREATE TABLE public.jobs ( id serial4 NOT NULL, run_id varchar NOT NULL, dag_id varchar NULL, "name" varchar NULL, project varchar NULL, status varchar NULL, engineers json NULL, error json NULL, start_time timestamptz NULL, end_time timestamptz NULL, reference_time timestamptz NULL, CONSTRAINT job_id_pkey PRIMARY KEY (id), CONSTRAINT run_id_dag_id_unique UNIQUE (run_id, dag_id) );
-
Task metrics table
CREATE TABLE public.task_metrics ( id serial4 NOT NULL, run_id varchar NULL, dag_id varchar NULL, task_id varchar NULL, job_name varchar NULL, value json NULL, created_time timestamptz NULL, reference_time timestamptz NULL, CONSTRAINT task_metrics_id_pkey PRIMARY KEY (id) );
Usage
Usage of awehflow
can be broken up into two parts: bootstrapping and configuration of pipelines
Bootstrap
In order to expose the generated pipelines (airflow
DAGs) for airflow
to pick up when scanning for DAGs, one has to create a DagLoader
that points to a folder where the pipeline configuration files will be located:
import os
from awehflow.alerts.slack import SlackAlerter
from awehflow.core import DagLoader
from awehflow.events.postgres import PostgresMetricsEventHandler
"""airflow doesn't pick up DAGs in files unless
the words 'airflow' and 'DAG' features"""
configs_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'configs')
metrics_handler = PostgresMetricsEventHandler(jobs_table='jobs', task_metrics_table='task_metrics')
slack_alerter = SlackAlerter(channel='#airflow')
loader = DagLoader(
project="awehflow-demo",
configs_path=configs_path,
event_handlers=[metrics_handler],
alerters=[slack_alerter]
)
dags = loader.load(global_symbol_table=globals())
As seen in the code snippet, one can also pass in "event handlers" and "alerters" to perform actions on certain pipeline events and potentially alert the user of certain events on a given channel. See the sections below for more detail.
The global symbol table needs to be passed to the loader
since airflow
scans it for objects of type DAG
, and then synchronises the state with its own internal state store.
*caveat: airflow
ignores python
files that don't contain the words "airflow" and "DAG". It is thus advised to put those words in a comment to ensure the generated DAGs get picked up when the DagBag
is getting filled.
Event Handlers
As a pipeline generated using awehflow
is running, certain events get emitted. An event handler gives the user the option of running code when these events occur.
The following events are (potentially) potentially emitted as a pipeline runs:
start
success
failure
task_metric
Existing event handlers include:
PostgresMetricsEventHandler
: persists pipeline metrics to a Postgres databasePublishToGooglePubSubEventHandler
: events get passed straight to a Google Pub/Sub topic
An AlertsEventHandler
gets automatically added to a pipeline. Events get passed along to registered alerters.
Alerters
An Alerter
is merely a class that implements an alert
method. By default a SlackAlerter
is configured in the dags/PROJECT/bootstrap.py
file of an awehflow project. awehflow supports the addition of multiple alerters, which allows success or failure events to be sent to mutliple channels
YAML configuration
In order to add alerts to an awehflow DAG add the following to the root space of the configuration
alert_on:
- 'failure' # Send out a formatted message if a task in the DAG fails. This is optional
- 'success' # Send out a formatted message once the DAG completes successfully. This is optional
Available alerters
SlackAlerter
- awehflow.alerts.slack.SlackAlerter
Sends an alert to a specified slack channel via the Slack webhook functionality
- Parameters
channel
- The name of the channel that the alerts should be sent toslack_conn_id
- The name of the airflow connection that contains the token information, default:slack_default
- Connection requirements - Create a HTTP connection with the name specified for
slack_conn_id
, the required HTTP fields are:password
- The slack token issued by your admin team, which allows for the sending of messages via the slack python API
GoogleChatAlerter
- awehflow.alerts.googlechat.GoogleChatAlerter
Sends an alert to the configured Google Chat space
- Parameters
gchat_conn_id
- The name of the airflow connection that contains the GChat space information, default:gchat_default
- Connection requirements - Create a HTTP connection with the name specified for the
gchat_conn_id
, the requried HTTP fields are:host
- The GChat spaces URLhttps://chat.googleapis.com/v1/spaces
password
- The GChat spaces key configuration information, exhttps://chat.googleapis.com/v1/spaces/SPACES_ID?key=SPACES_KEY
SPACES_ID
- Should be supplied by your GChat admin teamSPACES_KEY
- Should be supplied by your GChat admin team
Configuration
Awehflow configuration files can be written as .yml OR .hocon files either formats are supported
Shown below is sample hocon configuration file
{
name: my_first_dag,
start_date: 2022-01-01,
catchup: true,
schedule: "10 0 * * *",
version: 1,
alert_on:[
success,
failure
],
params: {
default: {
source_folder: /tmp
},
production: {
source_folder: /data
}
},
default_dag_args: {
retries: 1
},
dependencies: [
{
id: 'ping_sensor'
operator: 'airflow.sensors.bash.BashSensor'
params: {
bash_command: 'echo ping'
mode: 'reschedule'
}
}
],
tasks: [
{
id: first_dummy_task,
operator: airflow.operators.dummy.DummyOperator,
},
{
id: first_bash_task,
operator: airflow.operators.bash.BashOperator,
params: {
bash_command: 'echo "Hello World"'
},
upstream: [
first_dummy_task
]
}
]
}
This configuration does the following:
- Creates a DAG called
my_first_dag
- Scheduled to run daily 10min past midnight
- Catchup has been enabled to ensure all runs of the DAG since 2022-01-01 are executed
- Dependancies
- First check if the command
echo ping
succeeds
- First check if the command
- Tasks
- First run a dummy task that does nothing
- If the dummy task succeeds, execute the bash command
Running the tests
Tests may be run with
python -m unittest discover tests
or to run code coverage too:
coverage run -m unittest discover tests && coverage html
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for awehflow-2.1.4.6.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7ad69dfb96bf8e0ffa547a1f1bb25f7977f1eae793172a584e69162dd488fa31 |
|
MD5 | 8f15d121f88f85e9a92b293a22941492 |
|
BLAKE2b-256 | 57f6d381ff4d97d094f389ec68d7ee485525a76b6da0ab54dac8abfb5c3fa228 |