Skip to main content

Apache Airflow operator that loads data into Snowflake with built-in data quality checks. Supports S3, GCS, and Azure.

Project description

SnowflakeDataQualityGateOperator

An Apache Airflow provider that loads data from any cloud storage (S3, GCS, Azure) into Snowflake with a built-in data quality gate. Bad data is caught in a transient staging table before it ever touches your production tables.

Why?

Most cloud-to-Snowflake pipelines blindly COPY INTO production. When bad data lands, you find out hours later from a broken dashboard. This operator enforces a stage → validate → promote pattern in a single, reusable task.

Installation

pip install airflow-provider-s3-snowflake-quality

Quick start

from airflow_s3_snowflake_quality.operators import SnowflakeDataQualityGateOperator
from airflow_s3_snowflake_quality.checks import (
    RowCountCheck, NullCheck, UniquenessCheck,
    FreshnessCheck, AcceptedValuesCheck, CustomSQLCheck,
)

load_orders = SnowflakeDataQualityGateOperator(
    task_id="load_orders",
    source_path="s3://my-data-lake/orders/dt={{ ds }}/",
    file_format="PARQUET",
    snowflake_conn_id="snowflake_prod",
    target_database="ANALYTICS",
    target_schema="PUBLIC",
    target_table="ORDERS",
    load_strategy="merge",
    merge_keys=["ORDER_ID"],
    storage_integration="s3_integration",
    quality_checks=[
        RowCountCheck(min=1000),
        NullCheck(column="ORDER_ID", max_fraction=0.0),
        UniquenessCheck(columns=["ORDER_ID"]),
        FreshnessCheck(column="ORDER_TS", max_age_hours=24),
        AcceptedValuesCheck(column="STATUS", values=["pending", "shipped", "delivered", "cancelled"]),
        CustomSQLCheck(sql="SELECT * FROM {staging_table} WHERE TOTAL_AMOUNT < 0"),
    ],
)

Works with GCS and Azure too — just change the source_path:

source_path="gcs://bucket/orders/dt={{ ds }}/"       # Google Cloud Storage
source_path="azure://container/orders/dt={{ ds }}/"   # Azure Blob

How it works

┌──────────┐     ┌────────────────────┐     ┌──────────┐     ┌────────────┐
│  Cloud   │────▶│  Staging Table     │────▶│ Quality  │────▶│ Production │
│  Storage │     │  (transient)       │     │ Checks   │     │ Table      │
└──────────┘     └────────────────────┘     └──────────┘     └────────────┘
  S3 / GCS                                       │ FAIL
  / Azure                                        ▼
                                            Task fails or
                                            returns report
  1. StageCOPY INTO a transient Snowflake table from cloud storage
  2. Validate — Run every configured quality check against the staged data
  3. Promote — If all checks pass, move data via append, replace, or merge
  4. Cleanup — Staging table is always dropped, even on failure

Operator parameters

Parameter Type Description
source_path str Cloud storage path (s3://, gcs://, azure://). Jinja-templatable
file_format str PARQUET, CSV, JSON, AVRO, ORC
snowflake_conn_id str Airflow connection ID for Snowflake
target_database str Snowflake database
target_schema str Snowflake schema
target_table str Snowflake table
load_strategy str "append" (default), "replace", or "merge"
merge_keys list[str] Required for merge strategy
quality_checks list[BaseCheck] Quality checks to run
on_failure str "fail" (default) or "warn"
storage_integration str Snowflake storage integration name
copy_options str Extra COPY INTO options

Quality checks

Check What it does Example
RowCountCheck(min, max) Row count within bounds RowCountCheck(min=1000)
NullCheck(column, max_fraction) Null rate below threshold NullCheck(column="ID", max_fraction=0.0)
UniquenessCheck(columns) Column(s) are unique UniquenessCheck(columns=["ID"])
FreshnessCheck(column, max_age_hours) Newest timestamp is recent FreshnessCheck(column="TS", max_age_hours=24)
AcceptedValuesCheck(column, values) All values in whitelist AcceptedValuesCheck(column="STATUS", values=[...])
CustomSQLCheck(sql, name) SQL returns 0 rows to pass CustomSQLCheck(sql="SELECT * FROM {staging_table} WHERE ...")

Writing custom checks

from airflow_s3_snowflake_quality.checks.base import BaseCheck, CheckResult, CheckStatus

class MyCheck(BaseCheck):
    def validate(self, cursor, staging_table) -> CheckResult:
        count = self._execute_scalar(cursor, f"SELECT COUNT(*) FROM {staging_table} WHERE ...")
        if count > 0:
            return CheckResult(check_name=self.name, status=CheckStatus.FAILED, message=f"Found {count} bad rows")
        return CheckResult(check_name=self.name, status=CheckStatus.PASSED, message="All good")

Testing

python3 -m venv venv && source venv/bin/activate
pip install -e ".[dev]" && pip install duckdb pyarrow

make all              # lint + typecheck + 62 pytest tests (97% coverage)
make test-local-e2e   # 5 end-to-end scenarios with DuckDB

See integration_test/ for full S3 + Snowflake + Airflow Docker-based testing.

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

snowflake_data_quality_gate_operator-0.1.1.tar.gz (24.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file snowflake_data_quality_gate_operator-0.1.1.tar.gz.

File metadata

File hashes

Hashes for snowflake_data_quality_gate_operator-0.1.1.tar.gz
Algorithm Hash digest
SHA256 923c3328becf95da59fb269c49b579c85076c9abbd95ce0956ab495ad23016b0
MD5 14d28c684c29f64fc948e3836a4e56d1
BLAKE2b-256 bfae97213105e968e9022ed0df7bf00ba329202bb4a6f4fb0e1b290f1cfe67dd

See more details on using hashes here.

File details

Details for the file snowflake_data_quality_gate_operator-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for snowflake_data_quality_gate_operator-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 de56b1aa070f2e0be26a29fbb227910c9fe47cf7ea9566ca547a4b7e23f56f25
MD5 6b9715df53d3376505d0deb69e868f4b
BLAKE2b-256 2289c352adf1c7ff8f207405fa065db4ac21f2f236504afd71ede21d64b02cc5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page