Skip to main content

A decorator-based SQL execution framework for Airflow

Project description

AirSQL

A decorator-based SQL execution framework for Airflow that provides clean, Python-like syntax for data operations.

Features

  • 🎯 Decorator-based syntax - Clean, intuitive Python decorators for SQL operations
  • 🔗 Native Airflow integration - Uses Airflow connections and follows Airflow patterns
  • 🗃️ Multi-database support - Works with Postgres, BigQuery, and more
  • 📄 SQL file support - Keep complex queries in separate .sql files with Jinja templating
  • Flexible outputs - Write to tables, return DataFrames, or save to files
  • 🔄 Smart operations - Built-in support for replace, merge/upsert operations
  • 🌐 Cross-database queries - Query across different databases (planned with DataFusion)
  • 🔍 Data quality checks - Built-in SQL check operators compatible with dbt tests
  • 📊 Transfer operators - Move data between BigQuery, Postgres, and GCS
  • 👁️ Smart sensors - SQL sensors with retry logic for BigQuery and Postgres

Installation

pip install airsql

Or if you're using uv:

uv add airsql

Quick Start

Basic Usage

1. Simple DataFrame Query

from airsql import sql, Table, File

@sql.dataframe(source_conn="postgres_conn")
def get_active_users():
    return "SELECT * FROM users WHERE active = true"

# Use in DAG
df_task = get_active_users()

2. Query with Table References

@sql.dataframe
def user_activity_analysis(users_table, events_table):
    return """
    SELECT u.id, u.name, COUNT(e.id) as event_count
    FROM {{ users_table }} u
    LEFT JOIN {{ events_table }} e ON u.id = e.user_id
    GROUP BY u.id, u.name
    """

# Use in DAG
analysis_task = user_activity_analysis(
    users_table=Table("postgres_conn", "users.active_users"),
    events_table=Table("bigquery_conn", "analytics.user_events")
)

3. Replace Table Content

@sql.replace(output_table=Table("postgres_conn", "reports.daily_summary"))
def create_daily_report(transactions_table):
    return """
    SELECT DATE(created_at) as date, SUM(amount) as total
    FROM {{ transactions_table }}
    GROUP BY DATE(created_at)
    """

# Use in DAG
report_task = create_daily_report(
    transactions_table=Table("postgres_conn", "transactions.orders")
)

4. Data Quality Checks

@sql.check(conn_id="bigquery_conn")
def test_no_nulls(table):
    return "SELECT COUNT(*) FROM {{ table }} WHERE id IS NULL"

@sql.check(conn_id="postgres_conn")
def test_row_count(table):
    return "SELECT CASE WHEN COUNT(*) > 0 THEN 1 ELSE 0 END FROM {{ table }}"

# Use in DAG
null_check = test_no_nulls(table=Table("bigquery_conn", "analytics.users"))
count_check = test_row_count(table=Table("postgres_conn", "staging.orders"))

5. Transfer Operations

from airsql import BigQueryToPostgresOperator, PostgresToBigQueryOperator

# Transfer from BigQuery to Postgres
bq_to_pg = BigQueryToPostgresOperator(
    task_id="transfer_users",
    source_project_dataset_table="my-project.analytics.users",
    postgres_conn_id="postgres_default",
    destination_table="staging.users",
    gcs_bucket="temp-bucket",
    gcp_conn_id="google_cloud_default"
)

# Transfer from Postgres to BigQuery
pg_to_bq = PostgresToBigQueryOperator(
    task_id="transfer_orders",
    postgres_conn_id="postgres_default",
    sql="SELECT * FROM orders WHERE date >= '2024-01-01'",
    destination_project_dataset_table="my-project.staging.orders",
    gcs_bucket="temp-bucket",
    gcp_conn_id="google_cloud_default"
)

For more examples and detailed documentation, see the full documentation.

Migration from retize.sql

This package is the evolution of retize.sql. The main changes:

  • Package renamed from retize.sql to airsql
  • Table class schema field renamed to dataset (avoids Pydantic warnings)
  • Asset URIs changed from rtz:// to airsql://
  • Improved organization with sensors and transfers in submodules

License

This project is licensed under the MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airsql-0.9.13.tar.gz (43.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airsql-0.9.13-py3-none-any.whl (52.3 kB view details)

Uploaded Python 3

File details

Details for the file airsql-0.9.13.tar.gz.

File metadata

  • Download URL: airsql-0.9.13.tar.gz
  • Upload date:
  • Size: 43.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for airsql-0.9.13.tar.gz
Algorithm Hash digest
SHA256 a3b011269fa927bf6bc5a6bbb2703287fe574b58f31a5f5bacffd39edb3fb44d
MD5 536a2f067b56bfefc4100934cfa47402
BLAKE2b-256 b03289a6e64a106fdf4832e62484c3ca287f8445c6acb532802add1a63acaacc

See more details on using hashes here.

File details

Details for the file airsql-0.9.13-py3-none-any.whl.

File metadata

  • Download URL: airsql-0.9.13-py3-none-any.whl
  • Upload date:
  • Size: 52.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for airsql-0.9.13-py3-none-any.whl
Algorithm Hash digest
SHA256 bf08cd9d3c53d4e748ffab1aa97fe9de58434957cdce30dd10ec17d420b882cb
MD5 f297c3b233c5721bca502bc3c4e86f7b
BLAKE2b-256 966922b0de217f4fdea48aeaa785116c722260f4204afc21fbe16807b1856f61

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page