Skip to main content

No project description provided

Project description

Airflow Toolkit

Collection of Operators, Hooks and utility functions aimed at facilitating ELT pipelines.

Overview

This is an opinionated library focused on ELT, which means that our goal is to facilitate loading data from various data sources into a data lake, as well as loading from a data lake to a data warehouse and running transformations inside a data warehouse.

Airflow's operators notoriously suffer from an NxM problem, where if you have N data sources and M destinations you end up with NxM different operators (FTPToS3, S3ToFTP, PostgresToS3, FTPToPostgres etc.). We aim to mitigate this issue in two ways:

  1. ELT focused: cuts down on the number of possible sources and destinations, as we always want to do source -> data lake -> data warehouse.
  2. Building common interfaces: where possible we want to treat similar data sources in the same way. Airflow recently has done a good job at this by deprecating all specific SQL operators (PostgresOperator, MySQLOperator, etc.) in favour of a more generic SQLExecuteQueryOperator that works with any hook compatible with the dbapi2 interface. We take this philosophy and apply it any time we can, like providing a unified interface for all filesystem data sources that then enables us to have much more generic operators like SQLToFilesystem, FilesystemToFilesystem.

Filesystem Interface/Protocol

We provide a thin wrapper over many hooks for filesystem data sources. These wrappers use the hook's specific methods to implement some common methods that we then use inside the operators without needing to worry about the hook's specific type. For now we provide support for the following filesystem hooks, some of them native or that belong to other providers and others implemented in this library:

  • WasbHook (Blob Storage/ADLS)
  • S3Hook (S3)
  • SFTPHook (SFTP)
  • FSHook (Local Filesystem)
  • AzureFileShareServicePrincipalHook (Azure Fileshare with support for service principal authentication)
  • AzureDatabricksVolumeHook (Unity Catalog Columes)

Operators can create the correct hook at runtime by passing a connection ID with a connection type of aws or adls. Example code:

conn = BaseHook.get_connection(conn_id)
hook = conn.get_hook()

Operators

HTTP to Filesystem (Data Lake)

Creates a Example usage:

HttpToFilesystem(
    task_id='test_http_to_data_lake',
    http_conn_id='http_test',
    data_lake_conn_id='data_lake_test',
    data_lake_path=s3_bucket + '/source1/entity1/{{ ds }}/',
    endpoint='/api/users',
    method='GET',
    jmespath_expression='data[:2].{id: id, email: email}',
)

Sensors

Filesystem (generic) File Sensor

This sensor checks if a file exists in a generic filesystem. Type of filesystem is determined by the connection type.

Current supported filesystem connections are (conn_type parameter):

  • aws (s3)
  • azure_databricks_volume
  • azure_file_share_sp
  • sftp
  • fs (LocalFileSystem)
  • google_cloud_platform
  • wasb (BlobStorageFilesystem)

For example, with a connection with a S3 filesystem:

AIRFLOW_CONN_TEST_S3_FILESYSTEM='{"conn_type": "aws", "extra": {"endpoint_url": "http://localhost:9090"}}'

You can use a sensor like this:

FilesystemFileSensor(
    task_id='check_file_existence_exist_in_s3',
    filesystem_conn_id='test_s3_filesystem',
    source_path='data_lake/2023/10/01/test.csv',
    poke_interval=60,
    timeout=300,
)

JMESPATH expressions

APIs often return the response we are interested in wrapped in a key. JMESPATH expressions are a query language that we can use to select the response we are interested in. You can find more information on JMESPATH expressions and test them here.

The above expression selects the first two objects inside the key data, and then only the id and email attributes in each object. An example response can be found here.

Tests

Integration tests

To guarantee that the library works as intended we have an integration test that attempts to install it in a fresh virtual environment, and we aim to have a test for each Operator.

Running integration tests locally

The lint-and-test.yml workflow sets up the necessary environment variables, but if you want to run them locally you will need the following environment variables:

AIRFLOW_CONN_DATA_LAKE_TEST='{"conn_type": "aws", "extra": {"endpoint_url": "http://localhost:9090"}}'
AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE
AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
AWS_DEFAULT_REGION=us-east-1
TEST_BUCKET=data_lake
S3_ENDPOINT_URL=http://localhost:9090

AIRFLOW_CONN_DATA_LAKE_TEST='{"conn_type": "aws", "extra": {"endpoint_url": "http://localhost:9090"}}' AIRFLOW_CONN_SFTP_TEST='{"conn_type": "sftp", "host": "localhost", "port": 22, "login": "test_user", "password": "pass"}' AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY TEST_BUCKET=data_lake S3_ENDPOINT_URL=http://localhost:9090 poetry run pytest tests/ --doctest-modules --junitxml=junit/test-results.xml --cov=com --cov-report=xml --cov-report=html
AIRFLOW_CONN_SFTP_TEST='{"conn_type": "sftp", "host": "localhost", "port": 22, "login": "test_user", "password": "pass"}'

And you also need to run Adobe's S3 mock container like this:

docker run --rm -p 9090:9090 -e initialBuckets=data_lake -e debug=true -t adobe/s3mock

and the SFTP container like this:

docker run -p 22:22 -d atmoz/sftp test_user:pass:::root_folder

Notifications

Slack (incoming webhook)

If your or your team are using slack, you can send and receive notifications about failed dags using dag_failure_slack_notification_webhook method (in notifications.slack.webhook). You need to create a new Slack App and enable the "Incoming Webhooks". More info about sending messages using Slack Incoming Webhooks here.

You need to create a new Airflow connection with the name SLACK_WEBHOOK_NOTIFICATION_CONN (or AIRFLOW_CONN_SLACK_WEBHOOK_NOTIFICATION_CONN if you are using environment variables.)

Default message will have the format below:

image

But you can custom this message providing the below parameters:

  • text (str)[optional]: the main message will appear in the notification. If you provide your slack block will be ignored.
  • blocks (dict)[optional]: you can provide your custom slack blocks for your message.
  • include_blocks (bool)[optional]: indicates if the default block have to be used. If you provide your own blocks will be ignored.
  • source (typing.Literal['DAG', 'TASK'])[optional]: source of the failure (dag or task). Default: DAG.
  • image_url: (str)[optional] image url for you notification (accessory). You can use AIRFLOW_TOOLKIT__SLACK_NOTIFICATION_IMG_URL instead.
Example of use in a Dag
from datetime import datetime, timedelta

from airflow import DAG

from airflow.operators.bash import BashOperator
from airflow_toolkit.notifications.slack.webhook import (
    dag_failure_slack_notification_webhook,    # <--- IMPORT
)

with DAG(
    "slack_notification_dkl",
    description="Slack notification on fail",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
    on_failure_callback=dag_failure_slack_notification_webhook(),  # <--- HERE
) as dag:

    t = BashOperator(
        task_id="failing_test",
        depends_on_past=False,
        bash_command="exit 1",
        retries=1,
    )


if __name__ == "__main__":
    dag.test()

You can used only in a task providing the parameter source='TASK':

    t = BashOperator(
        task_id="failing_test",
        depends_on_past=False,
        bash_command="exit 1",
        retries=1,
        on_failure_callback=dag_failure_slack_notification_webhook(source='TASK')
    )

You can add a custom message (ignoring the slack blocks for a formatted message):

with DAG(
    ...
    on_failure_callback=dag_failure_slack_notification_webhook(
        text='The task {{ ti.task_id }} failed',
        include_blocks=False
    ),
) as dag:

Or you can pass your own Slack blocks:

custom_slack_blocks = {
    "type": "section",
    "text": {
        "type": "mrkdwn",
        "text": "<https://api.slack.com/reference/block-kit/block|This is an example using custom Slack blocks>"
    }
}

with DAG(
    ...
    on_failure_callback=dag_failure_slack_notification_webhook(
        blocks=custom_slack_blocks
    ),
) as dag:

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_toolkit-1.0.0.tar.gz (34.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_toolkit-1.0.0-py3-none-any.whl (44.1 kB view details)

Uploaded Python 3

File details

Details for the file airflow_toolkit-1.0.0.tar.gz.

File metadata

  • Download URL: airflow_toolkit-1.0.0.tar.gz
  • Upload date:
  • Size: 34.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.11 {"installer":{"name":"uv","version":"0.9.11"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for airflow_toolkit-1.0.0.tar.gz
Algorithm Hash digest
SHA256 aa18f92a5337823eb954b74a516cf9f8d2072db32d13108e353e088875ff64f8
MD5 b3d675448f58c323cfb0e16ea5e69507
BLAKE2b-256 496bf0be9954aeccf9db6daf39a0ee7da587575005de15ed6ed90d5c525736be

See more details on using hashes here.

File details

Details for the file airflow_toolkit-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: airflow_toolkit-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 44.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.11 {"installer":{"name":"uv","version":"0.9.11"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for airflow_toolkit-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5f58b151148db89b91ce11e11cd758b6e4e3e7ac5276a14c5c27a9f016b08e70
MD5 e08b8b5333e2582e4f5687ee59da3da4
BLAKE2b-256 ee96cb7840900020915e4823579bcd2097ffaabaf89d8e1f5bb14598f4b01fd7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page