Skip to main content

A decorator-based SQL execution framework for Airflow

Project description

AirSQL

A decorator-based SQL execution framework for Airflow that provides clean, Python-like syntax for data operations.

Features

  • 🎯 Decorator-based syntax - Clean, intuitive Python decorators for SQL operations
  • 🔗 Native Airflow integration - Uses Airflow connections and follows Airflow patterns
  • 🗃️ Multi-database support - Works with Postgres, BigQuery, and more
  • 📄 SQL file support - Keep complex queries in separate .sql files with Jinja templating
  • Flexible outputs - Write to tables, return DataFrames, or save to files
  • 🔄 Smart operations - Built-in support for replace, merge/upsert operations
  • 🌐 Cross-database queries - Query across different databases (planned with DataFusion)
  • 🔍 Data quality checks - Built-in SQL check operators compatible with dbt tests
  • 📊 Transfer operators - Move data between BigQuery, Postgres, and GCS
  • 👁️ Smart sensors - SQL sensors with retry logic for BigQuery and Postgres

Installation

pip install airsql

Or if you're using uv:

uv add airsql

Quick Start

Basic Usage

1. Simple DataFrame Query

from airsql import sql, Table, File

@sql.dataframe(source_conn="postgres_conn")
def get_active_users():
    return "SELECT * FROM users WHERE active = true"

# Use in DAG
df_task = get_active_users()

2. Query with Table References

@sql.dataframe
def user_activity_analysis(users_table, events_table):
    return """
    SELECT u.id, u.name, COUNT(e.id) as event_count
    FROM {{ users_table }} u
    LEFT JOIN {{ events_table }} e ON u.id = e.user_id
    GROUP BY u.id, u.name
    """

# Use in DAG
analysis_task = user_activity_analysis(
    users_table=Table("postgres_conn", "users.active_users"),
    events_table=Table("bigquery_conn", "analytics.user_events")
)

3. Replace Table Content

@sql.replace(output_table=Table("postgres_conn", "reports.daily_summary"))
def create_daily_report(transactions_table):
    return """
    SELECT DATE(created_at) as date, SUM(amount) as total
    FROM {{ transactions_table }}
    GROUP BY DATE(created_at)
    """

# Use in DAG
report_task = create_daily_report(
    transactions_table=Table("postgres_conn", "transactions.orders")
)

4. Data Quality Checks

@sql.check(conn_id="bigquery_conn")
def test_no_nulls(table):
    return "SELECT COUNT(*) FROM {{ table }} WHERE id IS NULL"

@sql.check(conn_id="postgres_conn")
def test_row_count(table):
    return "SELECT CASE WHEN COUNT(*) > 0 THEN 1 ELSE 0 END FROM {{ table }}"

# Use in DAG
null_check = test_no_nulls(table=Table("bigquery_conn", "analytics.users"))
count_check = test_row_count(table=Table("postgres_conn", "staging.orders"))

5. Transfer Operations

from airsql import BigQueryToPostgresOperator, PostgresToBigQueryOperator

# Transfer from BigQuery to Postgres
bq_to_pg = BigQueryToPostgresOperator(
    task_id="transfer_users",
    source_project_dataset_table="my-project.analytics.users",
    postgres_conn_id="postgres_default",
    destination_table="staging.users",
    gcs_bucket="temp-bucket",
    gcp_conn_id="google_cloud_default"
)

# Transfer from Postgres to BigQuery
pg_to_bq = PostgresToBigQueryOperator(
    task_id="transfer_orders",
    postgres_conn_id="postgres_default",
    sql="SELECT * FROM orders WHERE date >= '2024-01-01'",
    destination_project_dataset_table="my-project.staging.orders",
    gcs_bucket="temp-bucket",
    gcp_conn_id="google_cloud_default"
)

For more examples and detailed documentation, see the full documentation.

Migration from retize.sql

This package is the evolution of retize.sql. The main changes:

  • Package renamed from retize.sql to airsql
  • Table class schema field renamed to dataset (avoids Pydantic warnings)
  • Asset URIs changed from rtz:// to airsql://
  • Improved organization with sensors and transfers in submodules

License

This project is licensed under the MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airsql-0.9.10.tar.gz (41.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airsql-0.9.10-py3-none-any.whl (50.8 kB view details)

Uploaded Python 3

File details

Details for the file airsql-0.9.10.tar.gz.

File metadata

  • Download URL: airsql-0.9.10.tar.gz
  • Upload date:
  • Size: 41.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for airsql-0.9.10.tar.gz
Algorithm Hash digest
SHA256 a244f79b91a2ee98d604a17a57c11f6777fb027dd6baf8f907b443e6578a152d
MD5 6ecbbc2bbcef3e18d64defee33b3e7e0
BLAKE2b-256 ab76dabf7dc88955cce5a0e2eee30c9882333dbff32a8504f29dd9431832e10e

See more details on using hashes here.

File details

Details for the file airsql-0.9.10-py3-none-any.whl.

File metadata

  • Download URL: airsql-0.9.10-py3-none-any.whl
  • Upload date:
  • Size: 50.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for airsql-0.9.10-py3-none-any.whl
Algorithm Hash digest
SHA256 68076078af5d1b2d07857ea882fe29f60555c8e6725b5224e2a739e72851edc4
MD5 b4106831e077ae01ae7c68a783bd1fe8
BLAKE2b-256 69379b974eaa24f7f76efebe162acf70f0c2dbc880b332d4b81470ba00451241

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page