A decorator-based SQL execution framework for Airflow
Project description
AirSQL
A decorator-based SQL execution framework for Airflow that provides clean, Python-like syntax for data operations.
Features
- 🎯 Decorator-based syntax - Clean, intuitive Python decorators for SQL operations
- 🔗 Native Airflow integration - Uses Airflow connections and follows Airflow patterns
- 🗃️ Multi-database support - Works with Postgres, BigQuery, and more
- 📄 SQL file support - Keep complex queries in separate
.sqlfiles with Jinja templating - ⚡ Flexible outputs - Write to tables, return DataFrames, or save to files
- 🔄 Smart operations - Built-in support for replace, merge/upsert operations
- 🌐 Cross-database queries - Query across different databases (planned with DataFusion)
- 🔍 Data quality checks - Built-in SQL check operators compatible with dbt tests
- 📊 Transfer operators - Move data between BigQuery, Postgres, and GCS
- 👁️ Smart sensors - SQL sensors with retry logic for BigQuery and Postgres
Installation
pip install airsql
Or if you're using uv:
uv add airsql
Quick Start
Basic Usage
1. Simple DataFrame Query
from airsql import sql, Table, File
@sql.dataframe(source_conn="postgres_conn")
def get_active_users():
return "SELECT * FROM users WHERE active = true"
# Use in DAG
df_task = get_active_users()
2. Query with Table References
@sql.dataframe
def user_activity_analysis(users_table, events_table):
return """
SELECT u.id, u.name, COUNT(e.id) as event_count
FROM {{ users_table }} u
LEFT JOIN {{ events_table }} e ON u.id = e.user_id
GROUP BY u.id, u.name
"""
# Use in DAG
analysis_task = user_activity_analysis(
users_table=Table("postgres_conn", "users.active_users"),
events_table=Table("bigquery_conn", "analytics.user_events")
)
3. Replace Table Content
@sql.replace(output_table=Table("postgres_conn", "reports.daily_summary"))
def create_daily_report(transactions_table):
return """
SELECT DATE(created_at) as date, SUM(amount) as total
FROM {{ transactions_table }}
GROUP BY DATE(created_at)
"""
# Use in DAG
report_task = create_daily_report(
transactions_table=Table("postgres_conn", "transactions.orders")
)
4. Data Quality Checks
@sql.check(conn_id="bigquery_conn")
def test_no_nulls(table):
return "SELECT COUNT(*) FROM {{ table }} WHERE id IS NULL"
@sql.check(conn_id="postgres_conn")
def test_row_count(table):
return "SELECT CASE WHEN COUNT(*) > 0 THEN 1 ELSE 0 END FROM {{ table }}"
# Use in DAG
null_check = test_no_nulls(table=Table("bigquery_conn", "analytics.users"))
count_check = test_row_count(table=Table("postgres_conn", "staging.orders"))
5. Transfer Operations
from airsql import BigQueryToPostgresOperator, PostgresToBigQueryOperator
# Transfer from BigQuery to Postgres
bq_to_pg = BigQueryToPostgresOperator(
task_id="transfer_users",
source_project_dataset_table="my-project.analytics.users",
postgres_conn_id="postgres_default",
destination_table="staging.users",
gcs_bucket="temp-bucket",
gcp_conn_id="google_cloud_default"
)
# Transfer from Postgres to BigQuery
pg_to_bq = PostgresToBigQueryOperator(
task_id="transfer_orders",
postgres_conn_id="postgres_default",
sql="SELECT * FROM orders WHERE date >= '2024-01-01'",
destination_project_dataset_table="my-project.staging.orders",
gcs_bucket="temp-bucket",
gcp_conn_id="google_cloud_default"
)
For more examples and detailed documentation, see the full documentation.
Migration from retize.sql
This package is the evolution of retize.sql. The main changes:
- Package renamed from
retize.sqltoairsql - Table class
schemafield renamed todataset(avoids Pydantic warnings) - Asset URIs changed from
rtz://toairsql:// - Improved organization with sensors and transfers in submodules
License
This project is licensed under the MIT License.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file airsql-0.8.0.tar.gz.
File metadata
- Download URL: airsql-0.8.0.tar.gz
- Upload date:
- Size: 35.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
62d1ed678b07bb3a6e07874eb9048d33b1e6ffc4ab9f697464c4ed10271ef710
|
|
| MD5 |
fbc3b3949ad34e6ae0d64794f8be7c96
|
|
| BLAKE2b-256 |
6e54cdc4b79ae652bfdf3b522332e3b869d6ae81d9d24be86a8b2d2fbf2a1cdf
|
File details
Details for the file airsql-0.8.0-py3-none-any.whl.
File metadata
- Download URL: airsql-0.8.0-py3-none-any.whl
- Upload date:
- Size: 44.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8e5598d390ff97072b0df607029c1fb5093a584db63d99cb6ae7422d884fc4ce
|
|
| MD5 |
93a7c039675f260ee6791e17850fb712
|
|
| BLAKE2b-256 |
285c8113575120439b82e0bd0a8517f3a0d9c2a9528fef06efa1dd23cc893050
|