Skip to main content

A decorator-based SQL execution framework for Airflow

Project description

AirSQL

A decorator-based SQL execution framework for Airflow that provides clean, Python-like syntax for data operations.

Features

  • 🎯 Decorator-based syntax - Clean, intuitive Python decorators for SQL operations
  • 🔗 Native Airflow integration - Uses Airflow connections and follows Airflow patterns
  • 🗃️ Multi-database support - Works with Postgres, BigQuery, and more
  • 📄 SQL file support - Keep complex queries in separate .sql files with Jinja templating
  • Flexible outputs - Write to tables, return DataFrames, or save to files
  • 🔄 Smart operations - Built-in support for replace, merge/upsert operations
  • 🌐 Cross-database queries - Query across different databases (planned with DataFusion)
  • 🔍 Data quality checks - Built-in SQL check operators compatible with dbt tests
  • 📊 Transfer operators - Move data between BigQuery, Postgres, and GCS
  • 👁️ Smart sensors - SQL sensors with retry logic for BigQuery and Postgres

Installation

pip install airsql

Or if you're using uv:

uv add airsql

Quick Start

Basic Usage

1. Simple DataFrame Query

from airsql import sql, Table, File

@sql.dataframe(source_conn="postgres_conn")
def get_active_users():
    return "SELECT * FROM users WHERE active = true"

# Use in DAG
df_task = get_active_users()

2. Query with Table References

@sql.dataframe
def user_activity_analysis(users_table, events_table):
    return """
    SELECT u.id, u.name, COUNT(e.id) as event_count
    FROM {{ users_table }} u
    LEFT JOIN {{ events_table }} e ON u.id = e.user_id
    GROUP BY u.id, u.name
    """

# Use in DAG
analysis_task = user_activity_analysis(
    users_table=Table("postgres_conn", "users.active_users"),
    events_table=Table("bigquery_conn", "analytics.user_events")
)

3. Replace Table Content

@sql.replace(output_table=Table("postgres_conn", "reports.daily_summary"))
def create_daily_report(transactions_table):
    return """
    SELECT DATE(created_at) as date, SUM(amount) as total
    FROM {{ transactions_table }}
    GROUP BY DATE(created_at)
    """

# Use in DAG
report_task = create_daily_report(
    transactions_table=Table("postgres_conn", "transactions.orders")
)

4. Data Quality Checks

@sql.check(conn_id="bigquery_conn")
def test_no_nulls(table):
    return "SELECT COUNT(*) FROM {{ table }} WHERE id IS NULL"

@sql.check(conn_id="postgres_conn")
def test_row_count(table):
    return "SELECT CASE WHEN COUNT(*) > 0 THEN 1 ELSE 0 END FROM {{ table }}"

# Use in DAG
null_check = test_no_nulls(table=Table("bigquery_conn", "analytics.users"))
count_check = test_row_count(table=Table("postgres_conn", "staging.orders"))

5. Transfer Operations

from airsql import BigQueryToPostgresOperator, PostgresToBigQueryOperator

# Transfer from BigQuery to Postgres
bq_to_pg = BigQueryToPostgresOperator(
    task_id="transfer_users",
    source_project_dataset_table="my-project.analytics.users",
    postgres_conn_id="postgres_default",
    destination_table="staging.users",
    gcs_bucket="temp-bucket",
    gcp_conn_id="google_cloud_default"
)

# Transfer from Postgres to BigQuery
pg_to_bq = PostgresToBigQueryOperator(
    task_id="transfer_orders",
    postgres_conn_id="postgres_default",
    sql="SELECT * FROM orders WHERE date >= '2024-01-01'",
    destination_project_dataset_table="my-project.staging.orders",
    gcs_bucket="temp-bucket",
    gcp_conn_id="google_cloud_default"
)

For more examples and detailed documentation, see the full documentation.

Migration from retize.sql

This package is the evolution of retize.sql. The main changes:

  • Package renamed from retize.sql to airsql
  • Table class schema field renamed to dataset (avoids Pydantic warnings)
  • Asset URIs changed from rtz:// to airsql://
  • Improved organization with sensors and transfers in submodules

License

This project is licensed under the MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airsql-0.6.3.tar.gz (31.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airsql-0.6.3-py3-none-any.whl (40.3 kB view details)

Uploaded Python 3

File details

Details for the file airsql-0.6.3.tar.gz.

File metadata

  • Download URL: airsql-0.6.3.tar.gz
  • Upload date:
  • Size: 31.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.2 {"installer":{"name":"uv","version":"0.10.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for airsql-0.6.3.tar.gz
Algorithm Hash digest
SHA256 86e43376b22e6b6b87ce35f72577950bffdd6070b0a95e8f38fb03896a5f4398
MD5 1da55b6d968ba1f3c7b273bf1312061c
BLAKE2b-256 62c9311c3f1e579bcf15630899d0b4156e84d9d12d5637cd046fe24aa8be9a7c

See more details on using hashes here.

File details

Details for the file airsql-0.6.3-py3-none-any.whl.

File metadata

  • Download URL: airsql-0.6.3-py3-none-any.whl
  • Upload date:
  • Size: 40.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.2 {"installer":{"name":"uv","version":"0.10.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for airsql-0.6.3-py3-none-any.whl
Algorithm Hash digest
SHA256 ce8c3a53c98129e9861c58517fe199c3de8164c4adec77fff18c3b1852efabdd
MD5 d91b54b5f95254b346fe5046d912871b
BLAKE2b-256 bdb547b8f78e48d95aeb713f8a204e9fe6629edd8e3c275a88b6c87811391b45

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page