Skip to main content

A toolkit of operators, hooks and utilities for Apache Airflow 3

Project description

Airflow Toolkit

Collection of operators, hooks and utilities for building ELT pipelines on Apache Airflow.


Why airflow-toolkit?

The modern ELT pattern

Every data pipeline should follow a single direction:

Source  (API / SFTP / SQL database)
  ↓  Extract
Data Lake  (S3 / Azure Blob / GCS / ADLS / …)
  ↓  Load
Warehouse / Database  (Postgres / Databricks / Snowflake / …)

The data lake layer is not optional. Every load leaves an immutable copy of the raw data before it reaches the warehouse. This gives you:

  • Free reprocessing — if a transformation has a bug, re-run it from the raw files without calling the source API again.
  • Full traceability — every row in the warehouse can be traced back to the exact source file that produced it.
  • Decoupled steps — extraction and loading are independent tasks; each one can be retried or replaced without affecting the other.

The N×M operator problem

Apache Airflow's built-in operators are point-to-point: one operator per source/destination pair (FTPToS3Operator, S3ToFTPOperator, PostgresToS3Operator, …). With N sources and M destinations you end up managing N×M operators, each with its own implementation and failure modes. Any cross-cutting change (authentication, retry logic, metadata columns) must be replicated across all of them.

Our solution

airflow-toolkit solves this with two ideas:

  1. A common FilesystemProtocol — a thin, uniform interface over S3, Azure Blob, GCS, ADLS, SFTP, local filesystem, and Databricks Volumes. Operators talk to the protocol, not to the backend. Swapping backends requires no code change in the DAG.

  2. Operators by technology family — instead of one operator per combination, we provide generic operators (XToFilesystem, FilesystemToX) that work with any backend through the protocol. The matrix collapses from N×M to a handful of composable building blocks.


Installation

airflow-toolkit requires Python 3.11–3.13 and Apache Airflow 3. Install the airflow3 extra and only the provider extras you actually use:

# Databricks only — no cloud provider deps
pip install "airflow-toolkit[airflow3,databricks]"

# S3 + HTTP
pip install "airflow-toolkit[airflow3,amazon,http]"

# Everything
pip install "airflow-toolkit[airflow3-full]"

Available extras

Extra Installs Use when
airflow3 apache-airflow>=3, providers-fab, pendulum Required when combining individual extras
databricks databricks-sdk, databricks-sql-connector, deltalake, pandas Databricks Volumes, DuckDB→Delta Lake
amazon providers-amazon S3 filesystem backend
google providers-google GCS filesystem backend
azure providers-microsoft-azure Azure Blob / ADLS filesystem backend
sftp providers-sftp SFTP filesystem backend
slack providers-slack Slack failure notifications
http providers-http, requests, jmespath, pandas HttpToFilesystem, MultiHttpToFilesystem
duckdb airflow-provider-duckdb DuckdbToDeltalake operator
sqlite providers-sqlite SQLite as source or destination
airflow3-full all of the above Quick start / development

Design principles

  • One data-flow direction: source → data lake → warehouse. Every operation must follow one of two shapes: Extract (any source → filesystem) or Load (filesystem → warehouse).
  • The filesystem layer is mandatory. Every load must leave an auditable trace before reaching the warehouse.
  • Out of scope: direct database-to-database copies (SQLToSQL), warehouse maintenance operations (VACUUM, OPTIMIZE, MERGE INTO), data quality checks, and streaming ingestion. These either bypass the lake or belong to a separate tooling layer.

Filesystem Protocol

FilesystemProtocol is a common interface implemented for the following backends. The correct implementation is resolved at runtime from the Airflow connection's conn_type:

Backend conn_type Provider
Amazon S3 aws apache-airflow-providers-amazon
Azure Blob Storage / ADLS wasb apache-airflow-providers-microsoft-azure
Google Cloud Storage google_cloud_platform apache-airflow-providers-google
SFTP sftp apache-airflow-providers-sftp
Local filesystem fs built-in
Azure File Share (Service Principal) azure_file_share_sp this library
Databricks Unity Catalog Volume azure_databricks_volume this library

Operators resolve the backend automatically:

# S3 connection example
AIRFLOW_CONN_MY_DATA_LAKE='{"conn_type": "aws", "extra": {"endpoint_url": "https://s3.amazonaws.com"}}'

# Azure Blob connection example
AIRFLOW_CONN_MY_DATA_LAKE='{"conn_type": "wasb", "extra": {"connection_string": "DefaultEndpointsProtocol=https;..."}}'

Changing the connection's conn_type is all that is needed to switch backends — no operator code changes.


Operators

HttpToFilesystem

Calls an HTTP endpoint and writes the response to any filesystem. Supports pagination, JMESPath filtering, compression, and custom response transformations.

from airflow_toolkit.providers.filesystem.operators.http_to_filesystem import HttpToFilesystem

HttpToFilesystem(
    task_id='fetch_orders',
    http_conn_id='my_api',
    filesystem_conn_id='my_data_lake',
    filesystem_path='raw/orders/{{ ds }}/',
    endpoint='/api/v1/orders',
    method='GET',
    jmespath_expression='data',   # select the 'data' key from the JSON response
    save_format='jsonl',
)

With cursor-based pagination:

def next_page(response):
    cursor = response.json().get('next_cursor')
    if not cursor:
        return None
    return {'data': {'cursor': cursor}}

HttpToFilesystem(
    task_id='fetch_events',
    http_conn_id='my_api',
    filesystem_conn_id='my_data_lake',
    filesystem_path='raw/events/{{ ds }}/',
    endpoint='/api/v1/events',
    method='POST',
    data={'start_date': '{{ ds }}'},
    pagination_function=next_page,
    save_format='jsonl',
)

MultiHttpToFilesystem

Runs multiple HTTP requests in a single Airflow task, saving each response as a separate file. Useful for fetching multiple entities or date ranges without creating one task per request.

from airflow_toolkit.providers.filesystem.operators.http_to_filesystem import MultiHttpToFilesystem

MultiHttpToFilesystem(
    task_id='fetch_reference_data',
    http_conn_id='my_api',
    filesystem_conn_id='my_data_lake',
    filesystem_path='raw/reference/{{ ds }}/',
    method='GET',
    save_format='jsonl',
    multi_requests=[
        {'endpoint': '/api/v1/categories'},
        {'endpoint': '/api/v1/statuses'},
        {'endpoint': '/api/v1/regions'},
    ],
)

Each entry in multi_requests can override any base parameter (endpoint, method, headers, data, jmespath_expression, save_format, compression).

SQLToFilesystem

Runs a SQL query against any DbApiHook-compatible database and writes the result as Parquet files to any filesystem.

from airflow_toolkit.providers.filesystem.operators.filesystem import SQLToFilesystem

SQLToFilesystem(
    task_id='export_orders',
    source_sql_conn_id='my_postgres',
    destination_fs_conn_id='my_data_lake',
    sql="SELECT * FROM orders WHERE updated_at::date = '{{ ds }}'",
    destination_path='raw/orders/{{ ds }}/',
)

For large tables, use batch_size to write multiple Parquet part files:

SQLToFilesystem(
    task_id='export_large_table',
    source_sql_conn_id='my_postgres',
    destination_fs_conn_id='my_data_lake',
    sql='SELECT * FROM events',
    destination_path='raw/events/{{ ds }}/',
    batch_size=100_000,
)

FilesystemToFilesystem

Copies files between any two filesystem backends. Because both sides use FilesystemProtocol, switching a backend requires only changing the connection — not the operator.

from airflow_toolkit.providers.filesystem.operators.filesystem import FilesystemToFilesystem

FilesystemToFilesystem(
    task_id='landing_to_raw',
    source_fs_conn_id='sftp_source',       # any supported conn_type
    destination_fs_conn_id='s3_data_lake', # any supported conn_type
    source_path='exports/{{ ds }}/',
    destination_path='raw/exports/{{ ds }}/',
)

Replace sftp_source with an Azure Blob connection and the rest of the DAG stays unchanged.

An optional data_transformation callable lets you process each file in-flight:

def decompress_and_decode(data: bytes, filename: str, context: dict) -> bytes:
    import gzip
    return gzip.decompress(data)

FilesystemToFilesystem(
    task_id='decompress_files',
    source_fs_conn_id='sftp_source',
    destination_fs_conn_id='s3_data_lake',
    source_path='exports/{{ ds }}/',
    destination_path='raw/exports/{{ ds }}/',
    data_transformation=decompress_and_decode,
)

FilesystemToDatabase

Reads files (CSV, JSON, or Parquet) from any filesystem and loads them into any SQLAlchemy-compatible database. Handles schema drift automatically: columns present in the file but missing from the table are added; columns present in the table but missing from the file are filled with NULL.

from airflow_toolkit.providers.deltalake.operators.filesystem_to_database import FilesystemToDatabaseOperator

FilesystemToDatabaseOperator(
    task_id='load_orders',
    filesystem_conn_id='my_data_lake',     # any supported conn_type
    database_conn_id='my_postgres',        # any SQLAlchemy-compatible connection
    filesystem_path='raw/orders/{{ ds }}/',
    db_schema='public',
    db_table='orders',
    source_format='csv',
    table_aggregation_type='append',       # 'append' | 'replace' | 'fail'
    metadata={
        '_ds':          '{{ ds }}',
        '_loaded_at':   '{{ dag_run.start_date.isoformat() }}',
    },
    include_source_path=True,              # adds _LOADED_FROM column for traceability
)

DuckdbToDeltalake

Executes a DuckDB SQL query and writes the result directly to a Delta Lake table on Azure storage. Useful for in-process transformations that land results as an open table format.

from airflow_toolkit.providers.deltalake.operators.duckdb_to_deltalake import DuckdbToDeltalakeOperator

DuckdbToDeltalakeOperator(
    task_id='transform_to_delta',
    duckdb_conn_id='my_duckdb',
    delta_lake_conn_id='my_azure_storage',
    source_query="""
        SELECT
            order_id,
            customer_id,
            total_amount,
            created_at::DATE AS order_date
        FROM read_parquet('az://my-container/raw/orders/{{ ds }}/*.parquet')
    """,
    table_path='az://my-container/delta/orders',
    write_mode='append',   # 'append' | 'overwrite' | 'error' | 'ignore'
    extensions=['azure'],
)

Sensors

FilesystemFileSensor

Waits until a file exists in any supported filesystem. The backend is resolved from the connection's conn_type.

from airflow_toolkit.providers.deltalake.sensors.filesystem_file import FilesystemFileSensor

FilesystemFileSensor(
    task_id='wait_for_daily_export',
    filesystem_conn_id='my_data_lake',
    source_path='raw/orders/{{ ds }}/_SUCCESS',
    poke_interval=60,
    timeout=3600,
)

Supported conn_type values: aws, wasb, google_cloud_platform, sftp, fs, azure_file_share_sp, azure_databricks_volume.


Hooks

AzureFileShareServicePrincipalHook

Connects to Azure File Share using a Service Principal (client credentials flow). Use conn_type: azure_file_share_sp.

AIRFLOW_CONN_MY_FILE_SHARE='{
  "conn_type": "azure_file_share_sp",
  "host": "<storage-account>.file.core.windows.net",
  "login": "<service-principal-client-id>",
  "password": "<service-principal-secret>",
  "extra": {
    "tenant_id": "<tenant-id>",
    "share_name": "<share-name>",
    "protocol": "https"
  }
}'

Once the connection is defined, pass it as filesystem_conn_id to any operator.

AzureDatabricksVolumeHook

Provides access to Databricks Unity Catalog Volumes as a filesystem backend. Use conn_type: azure_databricks_volume.

AzureDatabricksSqlHook

A DbApiHook-compatible hook for running SQL against Databricks SQL Warehouses via a Service Principal. Use conn_type: azure_databricks_sql.

AIRFLOW_CONN_MY_DATABRICKS='{
  "conn_type": "azure_databricks_sql",
  "host": "<workspace>.azuredatabricks.net",
  "login": "<service-principal-client-id>",
  "password": "<service-principal-secret>",
  "extra": {
    "http_path": "/sql/1.0/warehouses/<warehouse-id>",
    "catalog": "my_catalog",
    "schema": "my_schema"
  }
}'

Because AzureDatabricksSqlHook implements DbApiHook, it can be used as source_sql_conn_id in SQLToFilesystem.


Notifications

Slack (incoming webhook)

Send DAG or task failure alerts to a Slack channel using dag_failure_slack_notification_webhook. Requires a Slack App with Incoming Webhooks enabled.

Create an Airflow connection named SLACK_WEBHOOK_NOTIFICATION_CONN (or set AIRFLOW_CONN_SLACK_WEBHOOK_NOTIFICATION_CONN).

DAG-level notification

from datetime import datetime, timedelta
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow_toolkit.notifications.slack.webhook import dag_failure_slack_notification_webhook

with DAG(
    'my_pipeline',
    schedule=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    on_failure_callback=dag_failure_slack_notification_webhook(),
) as dag:

    t = BashOperator(task_id='run', bash_command='python my_script.py')

Task-level notification

BashOperator(
    task_id='run',
    bash_command='python my_script.py',
    on_failure_callback=dag_failure_slack_notification_webhook(source='TASK'),
)

Custom message

on_failure_callback=dag_failure_slack_notification_webhook(
    text='Pipeline {{ dag.dag_id }} failed on {{ ds }}',
    include_blocks=False,
)

Custom Slack blocks

on_failure_callback=dag_failure_slack_notification_webhook(
    blocks={
        'type': 'section',
        'text': {'type': 'mrkdwn', 'text': '*Pipeline failed* — check the logs.'},
    }
)

Default notification format:

image

Parameters:

Parameter Type Description
text str (optional) Plain-text message. Overrides blocks if provided.
blocks dict (optional) Custom Slack Block Kit payload.
include_blocks bool (optional) Whether to include the default formatted block.
source 'DAG' | 'TASK' (optional) Source of the failure. Default: 'DAG'.
image_url str (optional) Accessory image URL. Can also be set via AIRFLOW_TOOLKIT__SLACK_NOTIFICATION_IMG_URL.

Running Tests

Integration tests

Integration tests install the library in a clean virtual environment and exercise each operator end-to-end. You need Docker running locally.

Start the required containers:

# S3-compatible mock (Adobe S3Mock)
docker run --rm -p 9090:9090 -e initialBuckets=data_lake -e debug=true adobe/s3mock

# SFTP server
docker run -p 22:22 -d atmoz/sftp test_user:pass:::root_folder

Set the required environment variables and run the test suite:

export AIRFLOW_CONN_DATA_LAKE_TEST='{"conn_type": "aws", "extra": {"endpoint_url": "http://localhost:9090"}}'
export AIRFLOW_CONN_SFTP_TEST='{"conn_type": "sftp", "host": "localhost", "port": 22, "login": "test_user", "password": "pass"}'
export AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE
export AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
export AWS_DEFAULT_REGION=us-east-1
export TEST_BUCKET=data_lake
export S3_ENDPOINT_URL=http://localhost:9090

uv run pytest tests/ --junitxml=junit/test-results.xml

The CI pipeline (.github/workflows/lint-and-test.yml) runs the full matrix automatically on every push: Airflow 3 / Python 3.11, Airflow 3 / Python 3.12, and Airflow 3 / Python 3.13.

Python 3.14: Airflow 3.2+ supports Python 3.14, but pyarrow (a transitive dependency of the [databricks] extra via deltalake) does not yet publish pre-built wheels for Python 3.14. Until it does, Python 3.14 is excluded from the CI matrix and the official test badge. The package can still be installed on Python 3.14 using extras that do not depend on pyarrow (e.g. [airflow3,http,amazon,sftp]).

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_toolkit-2.0.0.tar.gz (39.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_toolkit-2.0.0-py3-none-any.whl (45.8 kB view details)

Uploaded Python 3

File details

Details for the file airflow_toolkit-2.0.0.tar.gz.

File metadata

  • Download URL: airflow_toolkit-2.0.0.tar.gz
  • Upload date:
  • Size: 39.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.10 {"installer":{"name":"uv","version":"0.11.10","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for airflow_toolkit-2.0.0.tar.gz
Algorithm Hash digest
SHA256 068c5fee44b8a360557e189b9482d36b7beac9f603ed028e78b58f307bc40a0d
MD5 b46153a397ac2b9425875cd48541e61c
BLAKE2b-256 de338dd2ec1f15f137a5edb3ec8a838ff63ca7281c449b7e01b88ac195ee8e8a

See more details on using hashes here.

File details

Details for the file airflow_toolkit-2.0.0-py3-none-any.whl.

File metadata

  • Download URL: airflow_toolkit-2.0.0-py3-none-any.whl
  • Upload date:
  • Size: 45.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.10 {"installer":{"name":"uv","version":"0.11.10","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for airflow_toolkit-2.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b7e81df8c8fdb1de76fa98401d08144a37a9cd6077a6fd24031c83e0802019ab
MD5 7ec11aea1892efbe4405e5d331150c03
BLAKE2b-256 2fb62159c0df0f733d6cab0dbae3e39ea88c69f694c1b56adaf3fd07e6b2d78f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page