No project description provided
Project description
Airflow Tools
Collection of Operators, Hooks and utility functions aimed at facilitating ELT pipelines.
Data Lake Facade
The Data Lake Facade serves as an abstracion over different Hooks that can be used as a backend such as:
- Azure Data Lake Storage (ADLS)
- Simple Storage Service (S3)
Operators can create the correct hook at runtime by passing a connection ID with a connection type of aws
or adls
. Example code:
conn = BaseHook.get_connection(conn_id)
hook = conn.get_hook()
Operators
HTTP to Data Lake
Creates a Example usage:
HttpToDataLake(
task_id='test_http_to_data_lake',
http_conn_id='http_test',
data_lake_conn_id='data_lake_test',
data_lake_path=s3_bucket + '/source1/entity1/{{ ds }}/',
endpoint='/api/users',
method='GET',
jmespath_expression='data[:2].{id: id, email: email}',
)
JMESPATH expressions
APIs often return the response we are interested in wrapped in a key. JMESPATH expressions are a query language that we can use to select the response we are interested in. You can find more information on JMESPATH expressions and test them here.
The above expression selects the first two objects inside the key data, and then only the id
and email
attributes in each object. An example response can be found here.
Tests
Integration tests
To guarantee that the library works as intended we have an integration test that attempts to install it in a fresh virtual environment, and we aim to have a test for each Operator.
Running integration tests locally
The lint-and-test.yml
workflow sets up the necessary environment variables, but if you want to run them locally you will need the following environment variables:
AIRFLOW_CONN_DATA_LAKE_TEST='{"conn_type": "aws", "extra": {"endpoint_url": "http://localhost:9090"}}'
AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE
AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
AWS_DEFAULT_REGION=us-east-1
TEST_BUCKET=data_lake
S3_ENDPOINT_URL=http://localhost:9090
AIRFLOW_CONN_DATA_LAKE_TEST='{"conn_type": "aws", "extra": {"endpoint_url": "http://localhost:9090"}}' AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY TEST_BUCKET=data_lake S3_ENDPOINT_URL=http://localhost:9090 poetry run pytest tests/ --doctest-modules --junitxml=junit/test-results.xml --cov=com --cov-report=xml --cov-report=html
AIRFLOW_CONN_SFTP_TEST='{"conn_type": "sftp", "host": "localhost", "port": 22, "login": "test_user", "password": "pass"}'
And you also need to run Adobe's S3 mock container like this:
docker run --rm -p 9090:9090 -e initialBuckets=data_lake -e debug=true -t adobe/s3mock
and the SFTP container like this:
docker run -p 22:22 -d atmoz/sftp test_user:pass:::root_folder
Notifications
Slack (incoming webhook)
If your or your team are using slack, you can send and receive notifications about failed dags using dag_failure_slack_notification_webhook
method
(in notifications.slack.webhook
). You need to create a new Slack App and enable the "Incoming Webhooks". More info about sending messages using
Slack Incoming Webhooks here.
You need to create a new Airflow connection with the name SLACK_WEBHOOK_NOTIFICATION_CONN
(or AIRFLOW_CONN_SLACK_WEBHOOK_NOTIFICATION_CONN
if you are using environment variables.)
Default message will have the format below:
But you can custom this message providing the below parameters:
- text (str)[optional]: the main message will appear in the notification. If you provide your slack block will be ignored.
- blocks (dict)[optional]: you can provide your custom slack blocks for your message.
- include_blocks (bool)[optional]: indicates if the default block have to be used. If you provide your own blocks will be ignored.
- source (typing.Literal['DAG', 'TASK'])[optional]: source of the failure (dag or task). Default:
DAG
. - image_url: (str)[optional] image url for you notification (
accessory
). You can useAIRFLOW_TOOLS__SLACK_NOTIFICATION_IMG_URL
instead.
Example of use in a Dag
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow_tools.notifications.slack.webhook import (
dag_failure_slack_notification_webhook, # <--- IMPORT
)
with DAG(
"slack_notification_dkl",
description="Slack notification on fail",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
on_failure_callback=dag_failure_slack_notification_webhook(), # <--- HERE
) as dag:
t = BashOperator(
task_id="failing_test",
depends_on_past=False,
bash_command="exit 1",
retries=1,
)
if __name__ == "__main__":
dag.test()
You can used only in a task providing the parameter source='TASK'
:
t = BashOperator(
task_id="failing_test",
depends_on_past=False,
bash_command="exit 1",
retries=1,
on_failure_callback=dag_failure_slack_notification_webhook(source='TASK')
)
You can add a custom message (ignoring the slack blocks for a formatted message):
with DAG(
...
on_failure_callback=dag_failure_slack_notification_webhook(
text='The task {{ ti.task_id }} failed',
include_blocks=False
),
) as dag:
Or you can pass your own Slack blocks:
custom_slack_blocks = {
"type": "section",
"text": {
"type": "mrkdwn",
"text": "<https://api.slack.com/reference/block-kit/block|This is an example using custom Slack blocks>"
}
}
with DAG(
...
on_failure_callback=dag_failure_slack_notification_webhook(
blocks=custom_slack_blocks
),
) as dag:
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file airflow_tools-0.6.6.tar.gz
.
File metadata
- Download URL: airflow_tools-0.6.6.tar.gz
- Upload date:
- Size: 19.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.2 CPython/3.11.8 Darwin/23.4.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 87c7ef0a79c22ce9566c65ea599ce8c3588294ef2b924f120478b9d2e25b14f0 |
|
MD5 | b3ea7208487ea041e64a2d4061c788e2 |
|
BLAKE2b-256 | 1b6a9ecff7688c8df7d70b9092fd4ea7b8fe0a0523ab85639978d5d916a4e30f |
File details
Details for the file airflow_tools-0.6.6-py3-none-any.whl
.
File metadata
- Download URL: airflow_tools-0.6.6-py3-none-any.whl
- Upload date:
- Size: 25.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.2 CPython/3.11.8 Darwin/23.4.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | acb24c50fb393414dc7c03715f78a6912c8d10ae69aa091b7222374ea541c468 |
|
MD5 | 30e5066381cc92ae3860ea7eb5edc988 |
|
BLAKE2b-256 | 58f5bf5ab937cc8b206288bb98b2621994171aa0caaec0ca7a8994a5e4c83d86 |