Skip to main content

A decorator-based SQL execution framework for Airflow

Project description

AirSQL

A decorator-based SQL execution framework for Airflow that provides clean, Python-like syntax for data operations.

Features

  • 🎯 Decorator-based syntax - Clean, intuitive Python decorators for SQL operations
  • 🔗 Native Airflow integration - Uses Airflow connections and follows Airflow patterns
  • 🗃️ Multi-database support - Works with Postgres, BigQuery, and more
  • 📄 SQL file support - Keep complex queries in separate .sql files with Jinja templating
  • Flexible outputs - Write to tables, return DataFrames, or save to files
  • 🔄 Smart operations - Built-in support for replace, merge/upsert operations
  • 🌐 Cross-database queries - Query across different databases (planned with DataFusion)
  • 🔍 Data quality checks - Built-in SQL check operators compatible with dbt tests
  • 📊 Transfer operators - Move data between BigQuery, Postgres, and GCS
  • 👁️ Smart sensors - SQL sensors with retry logic for BigQuery and Postgres

Installation

pip install airsql

Or if you're using uv:

uv add airsql

Quick Start

Basic Usage

1. Simple DataFrame Query

from airsql import sql, Table, File

@sql.dataframe(source_conn="postgres_conn")
def get_active_users():
    return "SELECT * FROM users WHERE active = true"

# Use in DAG
df_task = get_active_users()

2. Query with Table References

@sql.dataframe
def user_activity_analysis(users_table, events_table):
    return """
    SELECT u.id, u.name, COUNT(e.id) as event_count
    FROM {{ users_table }} u
    LEFT JOIN {{ events_table }} e ON u.id = e.user_id
    GROUP BY u.id, u.name
    """

# Use in DAG
analysis_task = user_activity_analysis(
    users_table=Table("postgres_conn", "users.active_users"),
    events_table=Table("bigquery_conn", "analytics.user_events")
)

3. Replace Table Content

@sql.replace(output_table=Table("postgres_conn", "reports.daily_summary"))
def create_daily_report(transactions_table):
    return """
    SELECT DATE(created_at) as date, SUM(amount) as total
    FROM {{ transactions_table }}
    GROUP BY DATE(created_at)
    """

# Use in DAG
report_task = create_daily_report(
    transactions_table=Table("postgres_conn", "transactions.orders")
)

4. Data Quality Checks

@sql.check(conn_id="bigquery_conn")
def test_no_nulls(table):
    return "SELECT COUNT(*) FROM {{ table }} WHERE id IS NULL"

@sql.check(conn_id="postgres_conn")
def test_row_count(table):
    return "SELECT CASE WHEN COUNT(*) > 0 THEN 1 ELSE 0 END FROM {{ table }}"

# Use in DAG
null_check = test_no_nulls(table=Table("bigquery_conn", "analytics.users"))
count_check = test_row_count(table=Table("postgres_conn", "staging.orders"))

5. Transfer Operations

from airsql import BigQueryToPostgresOperator, PostgresToBigQueryOperator

# Transfer from BigQuery to Postgres
bq_to_pg = BigQueryToPostgresOperator(
    task_id="transfer_users",
    source_project_dataset_table="my-project.analytics.users",
    postgres_conn_id="postgres_default",
    destination_table="staging.users",
    gcs_bucket="temp-bucket",
    gcp_conn_id="google_cloud_default"
)

# Transfer from Postgres to BigQuery
pg_to_bq = PostgresToBigQueryOperator(
    task_id="transfer_orders",
    postgres_conn_id="postgres_default",
    sql="SELECT * FROM orders WHERE date >= '2024-01-01'",
    destination_project_dataset_table="my-project.staging.orders",
    gcs_bucket="temp-bucket",
    gcp_conn_id="google_cloud_default"
)

For more examples and detailed documentation, see the full documentation.

Migration from retize.sql

This package is the evolution of retize.sql. The main changes:

  • Package renamed from retize.sql to airsql
  • Table class schema field renamed to dataset (avoids Pydantic warnings)
  • Asset URIs changed from rtz:// to airsql://
  • Improved organization with sensors and transfers in submodules

License

This project is licensed under the MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airsql-0.7.5.tar.gz (33.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airsql-0.7.5-py3-none-any.whl (42.7 kB view details)

Uploaded Python 3

File details

Details for the file airsql-0.7.5.tar.gz.

File metadata

  • Download URL: airsql-0.7.5.tar.gz
  • Upload date:
  • Size: 33.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for airsql-0.7.5.tar.gz
Algorithm Hash digest
SHA256 a963501f45839cf7f7605fdfa01178671672f4de0dd276bce8cddf5805697872
MD5 dd2d7a4c568d51fb766b87130d6c32be
BLAKE2b-256 ccd23233adc59fa9dd50959197bf87a85775d0ed5b2c0bd7249ec93fca908433

See more details on using hashes here.

File details

Details for the file airsql-0.7.5-py3-none-any.whl.

File metadata

  • Download URL: airsql-0.7.5-py3-none-any.whl
  • Upload date:
  • Size: 42.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for airsql-0.7.5-py3-none-any.whl
Algorithm Hash digest
SHA256 4b004daf760380132874dc76dcbc85251d1f5543460adba84c74ecaf0f83f4ec
MD5 65ddf43c04f998b1d707e7715530af0f
BLAKE2b-256 258ccc9ab6ad9b5c3529b7ff2cf67d594333613d2f84a8ea6db26056124e18bd

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page