Apache Airflow operator that loads data into Snowflake with built-in data quality checks. Supports S3, GCS, and Azure.
Project description
SnowflakeDataQualityGateOperator
An Apache Airflow provider that loads data from any cloud storage (S3, GCS, Azure) into Snowflake with a built-in data quality gate. Bad data is caught in a transient staging table before it ever touches your production tables.
Why?
Most cloud-to-Snowflake pipelines blindly COPY INTO production. When bad data lands, you find out hours later from a broken dashboard. This operator enforces a stage → validate → promote pattern in a single, reusable task.
Installation
pip install airflow-provider-s3-snowflake-quality
Quick start
from airflow_s3_snowflake_quality.operators import SnowflakeDataQualityGateOperator
from airflow_s3_snowflake_quality.checks import (
RowCountCheck, NullCheck, UniquenessCheck,
FreshnessCheck, AcceptedValuesCheck, CustomSQLCheck,
)
load_orders = SnowflakeDataQualityGateOperator(
task_id="load_orders",
source_path="s3://my-data-lake/orders/dt={{ ds }}/",
file_format="PARQUET",
snowflake_conn_id="snowflake_prod",
target_database="ANALYTICS",
target_schema="PUBLIC",
target_table="ORDERS",
load_strategy="merge",
merge_keys=["ORDER_ID"],
storage_integration="s3_integration",
quality_checks=[
RowCountCheck(min=1000),
NullCheck(column="ORDER_ID", max_fraction=0.0),
UniquenessCheck(columns=["ORDER_ID"]),
FreshnessCheck(column="ORDER_TS", max_age_hours=24),
AcceptedValuesCheck(column="STATUS", values=["pending", "shipped", "delivered", "cancelled"]),
CustomSQLCheck(sql="SELECT * FROM {staging_table} WHERE TOTAL_AMOUNT < 0"),
],
)
Works with GCS and Azure too — just change the source_path:
source_path="gcs://bucket/orders/dt={{ ds }}/" # Google Cloud Storage
source_path="azure://container/orders/dt={{ ds }}/" # Azure Blob
How it works
┌──────────┐ ┌────────────────────┐ ┌──────────┐ ┌────────────┐
│ Cloud │────▶│ Staging Table │────▶│ Quality │────▶│ Production │
│ Storage │ │ (transient) │ │ Checks │ │ Table │
└──────────┘ └────────────────────┘ └──────────┘ └────────────┘
S3 / GCS │ FAIL
/ Azure ▼
Task fails or
returns report
- Stage —
COPY INTOa transient Snowflake table from cloud storage - Validate — Run every configured quality check against the staged data
- Promote — If all checks pass, move data via
append,replace, ormerge - Cleanup — Staging table is always dropped, even on failure
Operator parameters
| Parameter | Type | Description |
|---|---|---|
source_path |
str |
Cloud storage path (s3://, gcs://, azure://). Jinja-templatable |
file_format |
str |
PARQUET, CSV, JSON, AVRO, ORC |
snowflake_conn_id |
str |
Airflow connection ID for Snowflake |
target_database |
str |
Snowflake database |
target_schema |
str |
Snowflake schema |
target_table |
str |
Snowflake table |
load_strategy |
str |
"append" (default), "replace", or "merge" |
merge_keys |
list[str] |
Required for merge strategy |
quality_checks |
list[BaseCheck] |
Quality checks to run |
on_failure |
str |
"fail" (default) or "warn" |
storage_integration |
str |
Snowflake storage integration name |
copy_options |
str |
Extra COPY INTO options |
Quality checks
| Check | What it does | Example |
|---|---|---|
RowCountCheck(min, max) |
Row count within bounds | RowCountCheck(min=1000) |
NullCheck(column, max_fraction) |
Null rate below threshold | NullCheck(column="ID", max_fraction=0.0) |
UniquenessCheck(columns) |
Column(s) are unique | UniquenessCheck(columns=["ID"]) |
FreshnessCheck(column, max_age_hours) |
Newest timestamp is recent | FreshnessCheck(column="TS", max_age_hours=24) |
AcceptedValuesCheck(column, values) |
All values in whitelist | AcceptedValuesCheck(column="STATUS", values=[...]) |
CustomSQLCheck(sql, name) |
SQL returns 0 rows to pass | CustomSQLCheck(sql="SELECT * FROM {staging_table} WHERE ...") |
Writing custom checks
from airflow_s3_snowflake_quality.checks.base import BaseCheck, CheckResult, CheckStatus
class MyCheck(BaseCheck):
def validate(self, cursor, staging_table) -> CheckResult:
count = self._execute_scalar(cursor, f"SELECT COUNT(*) FROM {staging_table} WHERE ...")
if count > 0:
return CheckResult(check_name=self.name, status=CheckStatus.FAILED, message=f"Found {count} bad rows")
return CheckResult(check_name=self.name, status=CheckStatus.PASSED, message="All good")
Testing
python3 -m venv venv && source venv/bin/activate
pip install -e ".[dev]" && pip install duckdb pyarrow
make all # lint + typecheck + 62 pytest tests (97% coverage)
make test-local-e2e # 5 end-to-end scenarios with DuckDB
See integration_test/ for full S3 + Snowflake + Airflow Docker-based testing.
License
Apache 2.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file snowflake_data_quality_gate_operator-0.1.1.tar.gz.
File metadata
- Download URL: snowflake_data_quality_gate_operator-0.1.1.tar.gz
- Upload date:
- Size: 24.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
923c3328becf95da59fb269c49b579c85076c9abbd95ce0956ab495ad23016b0
|
|
| MD5 |
14d28c684c29f64fc948e3836a4e56d1
|
|
| BLAKE2b-256 |
bfae97213105e968e9022ed0df7bf00ba329202bb4a6f4fb0e1b290f1cfe67dd
|
File details
Details for the file snowflake_data_quality_gate_operator-0.1.1-py3-none-any.whl.
File metadata
- Download URL: snowflake_data_quality_gate_operator-0.1.1-py3-none-any.whl
- Upload date:
- Size: 19.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
de56b1aa070f2e0be26a29fbb227910c9fe47cf7ea9566ca547a4b7e23f56f25
|
|
| MD5 |
6b9715df53d3376505d0deb69e868f4b
|
|
| BLAKE2b-256 |
2289c352adf1c7ff8f207405fa065db4ac21f2f236504afd71ede21d64b02cc5
|