Skip to main content

Priority Tags for Airflow Dags

Project description

airflow-priority

Priority Tags for Airflow Dags

Build Status codecov License PyPI

Overview

This repo provides Airflow Plugins for priority-driven DAG failure alerting. In layman's terms, one need only add a tag to their DAG in P1, P2, P3, P4, P5, and that dag will send a notification to:

Where P1 corresponds to highest priority, and P5 corresponds to lowest.

Installation

You can install from pip:

pip install airflow-priority

Or via conda:

conda install airflow-priority -c conda-forge

Integrations

Integration Metric / Tag
New Relic airflow.custom.priority.p{1,2,3,4,5}.{failed,succeeded,running}
Datadog airflow.custom.priority.p{1,2,3,4,5}.{failed,succeeded,running}
Discord N/A
Slack N/A
Symphony N/A

Datadog

Datadog metric for failed DAG run

Create a new Datadog api key following their guide.

Copy this api key into your airflow.cfg like so:

[priority.datadog]
api_key = the api key
host = https://api.datadoghq.com  # Optional host change
metric = my.custom.metric  # Optional metric name override, default is "airflow.custom.priority"

Ensure your dags are configured with tags and run some, it can often be convenient to have an intentionally failing P1 dag to test the integration. With this, you can now create custom monitors for the tags.

Discord

Message in Discord reflecting DAG status

Create a new Discord application following the guide from the discord.py library.

Copy your bot's token into your airflow.cfg like so:

[priority.discord]
token = the bot's token
channel = the numerical channel ID, from the url or by right clicking

# Configure tag and priority specific channels
channel_P3 = ...
channel_failed_P1 = ...

Ensure your bot is invited into any private channels.

Verbose updates

By default, only DAG failure events will be sent. On rerun, failure messages can be updated to reflect running/success.

The plugin can also be configured to always send updates, including when a DAG starts running and/or succeeds. These can also be configured to send to different channels.

[priority.discord]
# Update messages to reflect current state
update_message = true

send_success = true
channel_success = a-different-channel
send_running = true
channel_running = a-different-channel

Customizing message colors

[priority.discord]
failed_color = "#FF0000"
running_color = "#FFFF00"
success_color = "#00FF00"

New Relic

Dashboard of newrelic metrics

Create a new New Relic API Key following their guide. Note that the type should have INGEST - LICENSE.

Copy this api key into your airflow.cfg like so:

[priority.newrelic]
api_key = the api key
metric = my.custom.metric  # Optional metric name override, default is "airflow.custom.priority"

Under Query Your Data in the New Relic UI, you can create a query for the new custom metric:

SELECT sum(`airflow.custom.priority.p1.failed`) FROM Metric FACET dag

With this, you can now create a custom alert. For fast alerting, we recommend the following parameters:

Window duration - 30 seconds
Sliding window aggregation - Disabled
Slide by interval - Not set
Streaming method - Event timer
Timer - 5 seconds

Fill data gaps with - None
Evaluation delay - Not set

Thresholds: Critical: Query result is above or equals 1 at least once in 1 minute

Slack

Message in Slack reflecting DAG status

Configure a new slack application following the Slack Quickstart.

Ensure your application has the following scopes for public and private channel access:

  • channels:read
  • groups:read
  • chat:write

Enable and install your Slack application into your workspace, and add it as an integration in whatever channel you want it to post.

Copy your Slack application's Oauth Token (starting with xoxb-) and your desired channel into your airflow.cfg like so:

[priority.slack]
token = xoxb-...
channel = channel-name

# Configure tag and priority specific channels
channel_P3 = ...
channel_failed_P1 = ...

Verbose updates

By default, only DAG failure events will be sent. On rerun, failure messages can be updated to reflect running/success.

The plugin can also be configured to always send updates, including when a DAG starts running and/or succeeds. These can also be configured to send to different channels.

[priority.slack]
# Update messages to reflect current state
update_message = true

send_success = true
channel_success = a-different-channel
send_running = true
channel_running = a-different-channel

Customizing message colors

[priority.slack]
failed_color = "#FF0000"
running_color = "#FFFF00"
success_color = "#00FF00"

Symphony

Documentation coming soon!

[priority.symphony]
room_name = the room name
message_create_url = https://mycompany.symphony.com/agent/v4/stream/SID/message/create
cert_file = path/to/my/cert.pem
key_file = path/to/my/key.pem
session_auth = https://mycompany-api.symphony.com/sessionauth/v1/authenticate
key_auth = https://mycompany-api.symphony.com/keyauth/v1/authenticate
room_search_url = https://mycompany.symphony.com/pod/v3/room/search

AWS MWAA CloudWatch

To use with AWS MWAA, set the AWS region to match your MWAA instance.

[priority.aws]
region = "us-east-1"

Alternative Syntax

[!NOTE] AWS MWAA may require you to use the alternative configuration syntax

All scoped configuration options can be set directly under priority.

For example this:

[priority.aws]
region = "us-east-1"

Can be provided as:

[priority]
aws_region = "us-east-1"

License

This software is licensed under the Apache 2.0 license. See the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_priority-1.1.1.tar.gz (20.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_priority-1.1.1-py3-none-any.whl (28.2 kB view details)

Uploaded Python 3

File details

Details for the file airflow_priority-1.1.1.tar.gz.

File metadata

  • Download URL: airflow_priority-1.1.1.tar.gz
  • Upload date:
  • Size: 20.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.10

File hashes

Hashes for airflow_priority-1.1.1.tar.gz
Algorithm Hash digest
SHA256 f9d51aeb0e6a7365ed521aeaf804f8e91a4abf1f0b359a8765b6e28e4ac3f016
MD5 e5ee30810fb769b7fee01fd718a0b71f
BLAKE2b-256 8e4872cbce8502c2b0ecfa50dfc2c9ca867e95975c7db8670b865b384652e495

See more details on using hashes here.

File details

Details for the file airflow_priority-1.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_priority-1.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 b52b9885ad3c91952b6b05d76baf85cdee5f1efb33707d957cf141c49040951d
MD5 8cdcce63246519abe525ea935087154e
BLAKE2b-256 e6b6628f6e0338620077ee79d8402fabd1092661f34e10dd99296b2afa4d0352

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page