Skip to main content

Apache Airflow operator that loads data into Snowflake with built-in data quality checks. Supports S3, GCS, and Azure.

Project description

SnowflakeDataQualityGateOperator

An Apache Airflow provider that loads data from any cloud storage (S3, GCS, Azure) into Snowflake with a built-in data quality gate. Bad data is caught in a transient staging table before it ever touches your production tables.

Why?

Most cloud-to-Snowflake pipelines blindly COPY INTO production. When bad data lands, you find out hours later from a broken dashboard. This operator enforces a stage → validate → promote pattern in a single, reusable task.

Installation

pip install airflow-provider-s3-snowflake-quality

Quick start

from airflow_s3_snowflake_quality.operators import SnowflakeDataQualityGateOperator
from airflow_s3_snowflake_quality.checks import (
    RowCountCheck, NullCheck, UniquenessCheck,
    FreshnessCheck, AcceptedValuesCheck, CustomSQLCheck,
)

load_orders = SnowflakeDataQualityGateOperator(
    task_id="load_orders",
    source_path="s3://my-data-lake/orders/dt={{ ds }}/",
    file_format="PARQUET",
    snowflake_conn_id="snowflake_prod",
    target_database="ANALYTICS",
    target_schema="PUBLIC",
    target_table="ORDERS",
    load_strategy="merge",
    merge_keys=["ORDER_ID"],
    storage_integration="s3_integration",
    quality_checks=[
        RowCountCheck(min=1000),
        NullCheck(column="ORDER_ID", max_fraction=0.0),
        UniquenessCheck(columns=["ORDER_ID"]),
        FreshnessCheck(column="ORDER_TS", max_age_hours=24),
        AcceptedValuesCheck(column="STATUS", values=["pending", "shipped", "delivered", "cancelled"]),
        CustomSQLCheck(sql="SELECT * FROM {staging_table} WHERE TOTAL_AMOUNT < 0"),
    ],
)

Works with GCS and Azure too — just change the source_path:

source_path="gcs://bucket/orders/dt={{ ds }}/"       # Google Cloud Storage
source_path="azure://container/orders/dt={{ ds }}/"   # Azure Blob

How it works

┌──────────┐     ┌────────────────────┐     ┌──────────┐     ┌────────────┐
│  Cloud   │────▶│  Staging Table     │────▶│ Quality  │────▶│ Production │
│  Storage │     │  (transient)       │     │ Checks   │     │ Table      │
└──────────┘     └────────────────────┘     └──────────┘     └────────────┘
  S3 / GCS                                       │ FAIL
  / Azure                                        ▼
                                            Task fails or
                                            returns report
  1. StageCOPY INTO a transient Snowflake table from cloud storage
  2. Validate — Run every configured quality check against the staged data
  3. Promote — If all checks pass, move data via append, replace, or merge
  4. Cleanup — Staging table is always dropped, even on failure

Operator parameters

Parameter Type Description
source_path str Cloud storage path (s3://, gcs://, azure://). Jinja-templatable
file_format str PARQUET, CSV, JSON, AVRO, ORC
snowflake_conn_id str Airflow connection ID for Snowflake
target_database str Snowflake database
target_schema str Snowflake schema
target_table str Snowflake table
load_strategy str "append" (default), "replace", or "merge"
merge_keys list[str] Required for merge strategy
quality_checks list[BaseCheck] Quality checks to run
on_failure str "fail" (default) or "warn"
storage_integration str Snowflake storage integration name
copy_options str Extra COPY INTO options

Quality checks

Check What it does Example
RowCountCheck(min, max) Row count within bounds RowCountCheck(min=1000)
NullCheck(column, max_fraction) Null rate below threshold NullCheck(column="ID", max_fraction=0.0)
UniquenessCheck(columns) Column(s) are unique UniquenessCheck(columns=["ID"])
FreshnessCheck(column, max_age_hours) Newest timestamp is recent FreshnessCheck(column="TS", max_age_hours=24)
AcceptedValuesCheck(column, values) All values in whitelist AcceptedValuesCheck(column="STATUS", values=[...])
CustomSQLCheck(sql, name) SQL returns 0 rows to pass CustomSQLCheck(sql="SELECT * FROM {staging_table} WHERE ...")

Writing custom checks

from airflow_s3_snowflake_quality.checks.base import BaseCheck, CheckResult, CheckStatus

class MyCheck(BaseCheck):
    def validate(self, cursor, staging_table) -> CheckResult:
        count = self._execute_scalar(cursor, f"SELECT COUNT(*) FROM {staging_table} WHERE ...")
        if count > 0:
            return CheckResult(check_name=self.name, status=CheckStatus.FAILED, message=f"Found {count} bad rows")
        return CheckResult(check_name=self.name, status=CheckStatus.PASSED, message="All good")

Testing

python3 -m venv venv && source venv/bin/activate
pip install -e ".[dev]" && pip install duckdb pyarrow

make all              # lint + typecheck + 62 pytest tests (97% coverage)
make test-local-e2e   # 5 end-to-end scenarios with DuckDB

See integration_test/ for full S3 + Snowflake + Airflow Docker-based testing.

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

snowflake_data_quality_gate_operator-0.1.0.tar.gz (24.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file snowflake_data_quality_gate_operator-0.1.0.tar.gz.

File metadata

File hashes

Hashes for snowflake_data_quality_gate_operator-0.1.0.tar.gz
Algorithm Hash digest
SHA256 50005de6376a836d43e6044d684589fd4b90a355f67b00e84c926c4c71cdb8f2
MD5 5cd983c32c66d0f078a775a875b4e213
BLAKE2b-256 baed2bd1e760c182b0ca40dbcd518196908ac6b084690c99985252c3b65fa9af

See more details on using hashes here.

File details

Details for the file snowflake_data_quality_gate_operator-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for snowflake_data_quality_gate_operator-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 547eef85f48ada3c6dce859865c4b91022a958fc808d883180bf1d7a6c42e14c
MD5 5e6af3fa3229fb1b0c5d7b06ae792d17
BLAKE2b-256 fb519bbbd08053d1bc2d35e1311f0976442abcf018d0a56e791968f5a33f0e26

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page