Skip to main content

SQL template generator

Project description

SQL Template

test pypi version PyPI - Python Version size gh license

A SQL Template Generator (aka SqlPlate) provide the generator object for SQL template statements via Python API object. All SQL template files are store in the Jinja template format that is the powerful template tool package.

[!NOTE] This project keep all SQL use-case that use on Data Engineer works like SCD2 with MERGE operator.

The layer of SQL template files will be:

templates/
   ├─ databricks/
   │     ├─ 📂macros/
   │     │     ╰─ ⚙️ delta.jinja
   │     ╰─ 📂latest/
   │           ├─ 📜 etl.delta.sql
   │           ├─ 📜 etl.scd2.sql
   │           ╰─ 📜 select.sql
   ├─ sqlite/
   │     ╰─ 📂latest/
   │           ╰─📜 etl.delta.sql
   ├─ synapse/
   │     ╰─ 📂latest/
   │           ╰─📜 etl.delta.sql
   ╰─ utils/
         ╰─ ⚙️ etl_vars.jinja

[!IMPORTANT] The first object of this project is generate ETL statements for dynamic service change. You can change the SQL compute service while the SQL statement does not change.

:package: Installation

pip install -U sqlplate

:fork_and_knife: Usage

Generate SQL template

Start passing option parameters before generate the Delta ETL SQL statement that will use on the Azure Databricks service.

from datetime import datetime
from sqlplate import SQLPlate

statement: str = (
    SQLPlate.format('databricks')
    .template('etl.delta')
    .option('catalog', 'catalog-name')
    .option('schema', 'schema-name')
    .option('table', 'table-name')
    .option('pk', 'pk_col')
    .option('columns', ['col01', 'col02'])
    .option('query', 'SELECT * FROM catalog-name.schema-name.source-name')
    .option('load_src', 'SOURCE_FOO')
    .option('load_id', 1)
    .option('load_date', datetime(2025, 2, 1, 10))
    .option('only_main', True)
    .load()
)
print(statement.strip().strip('\n'))

The result SQL statement:

MERGE INTO catalog-name.schema-name.table-name AS target
USING (
    WITH change_query AS (
        SELECT
            src.*,
        CASE WHEN tgt.pk_col IS NULL THEN 99
             WHEN hash(src.col01, src.col02) <> hash(tgt.col01, tgt.col02) THEN 1
             ELSE 0 END AS data_change
        FROM ( SELECT * FROM catalog-name.schema-name.source-name ) AS src
        LEFT JOIN catalog-name.schema-name.table-name AS tgt
            ON  tgt.col01 = src.col01
AND tgt.col02 = src.col02
    )
    SELECT * EXCEPT( data_change ) FROM change_query WHERE data_change IN (99, 1)
) AS source
    ON  target.pk_col = source.pk_col
WHEN MATCHED THEN UPDATE
    SET target.col01            = source.col01
    ,   target.col02            = source.col02
    ,   target.updt_load_src    = 'SOURCE_FOO'
    ,   target.updt_load_id     = 1
    ,   target.updt_load_date   = to_timestamp('20250201', 'yyyyMMdd')
WHEN NOT MATCHED THEN INSERT
    (
        col01, col02, pk_col, load_src, load_id, load_date, updt_load_src, updt_load_id, updt_load_date
    )
    VALUES (
        source.col01,
        source.col02,
        source.pk_col,
        'SOURCE_FOO',
        1,
        20250201,
        'SOURCE_FOO',
        1,
        to_timestamp('20250201', 'yyyyMMdd')
    )
;

Data Quality

This package handle generate SQL statement only. For the data quality part, this package can use quality templates instead.

from sqlplate import SQLPlate

statement: str = (
    SQLPlate.format('databricks')
    .template('quality.check')
    .option('catalog', 'catalog-name')
    .option('schema', 'schema-name')
    .option('table', 'table-name')
    .option('filter', "load_date >= to_timestamp('20250201', 'yyyyMMdd')")
    .option('unique', ['pk_col'])
    .option('notnull', ['col01', 'col02'])
    .check("contain", ["col01"], "IN ['A', 'B', 'C']")
    .check("gt_10000", ["col03"], "> 10000")
    .load()
)
print(statement.strip().strip('\n'))

The result SQL statement:

WITH source AS (
    SELECT
        *
    FROM
        catalog-name.schema-name.table-name
    WHERE load_date >= to_timestamp('20250201', 'yyyyMMdd')
)
, records AS (
    SELECT COUNT(1) AS table_records FROM source
)
SELECT
    (SELECT table_records FROM records) AS table_records
    , (SELECT COUNT( DISTINCT pk_col ) FROM source) = table_records AS unique_pk_col
    , (SELECT COUNT_IF( col01 IS NULL ) FROM source) = 0 AS notnull_col01
    , (SELECT COUNT_IF( col02 IS NULL ) FROM source) = 0 AS notnull_col02
    , (SELECT COUNT(1) FILTER(WHERE col01 IN ['A', 'B', 'C']) FROM source) = table_records AS contain_col01
    , (SELECT COUNT(1) FILTER(WHERE col03 > 10000) FROM source) = table_records AS gt_10000_col03
FROM records

:chains: Support Systems

System Progress Status System Integration Test Remark
databricks 🟢 🟡 Azure Databricks
postgres 🔴 🔴
mysql 🔴 🔴
mssql 🔴 🔴 Microsoft SQL Server
synapse 🔴 🔴 Azure Synapse Dedicate SQL Pool
synapse-serverless 🔴 🔴 Azure Synapse Serverless SQL Pool
bigquery 🟡 🔴 Google BigQuery
snowflake 🔴 🔴
sqlite 🟡 🟡
duckdb 🟡 🟡
redshift 🔴 🔴 Amazon RedShift
athena 🔴 🔴 Amazon Athena
trino 🔴 🔴 Trino

[!NOTE]

  • 🟢 Complete
  • 🟡 In progress
  • 🔴 Does not develop yet
  • 🟣 Does not plan to support

:speech_balloon: Contribute

I do not think this project will go around the world because it has specific propose, and you can create by your coding without this project dependency for long term solution. So, on this time, you can open the GitHub issue on this project :raised_hands: for fix bug or request new feature if you want it.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sqlplate-0.0.7.tar.gz (14.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sqlplate-0.0.7-py3-none-any.whl (9.5 kB view details)

Uploaded Python 3

File details

Details for the file sqlplate-0.0.7.tar.gz.

File metadata

  • Download URL: sqlplate-0.0.7.tar.gz
  • Upload date:
  • Size: 14.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.8

File hashes

Hashes for sqlplate-0.0.7.tar.gz
Algorithm Hash digest
SHA256 6dacd5919a52b8151cd69d8ad0fd64372c0bf88aa9ff191afef91e2183346572
MD5 cb07a3836ea1b65807c4a3b4b2ec218c
BLAKE2b-256 93969e2fe7bafe4c78e97444f605855e6c5e312c7901d408bd9a20e7767be237

See more details on using hashes here.

Provenance

The following attestation bundles were made for sqlplate-0.0.7.tar.gz:

Publisher: publish.yml on korawica/sqlplate

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file sqlplate-0.0.7-py3-none-any.whl.

File metadata

  • Download URL: sqlplate-0.0.7-py3-none-any.whl
  • Upload date:
  • Size: 9.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.8

File hashes

Hashes for sqlplate-0.0.7-py3-none-any.whl
Algorithm Hash digest
SHA256 8a0e60b64faada08196e432a786301efd526fc1e06f1e94b127390a4397a593a
MD5 1ec4a686149c929d9a57b217e5efa8e3
BLAKE2b-256 fdd8c1490dfd006b6af3bf74398ee893bbe4706b8ef2eed7b9c66c5b71aa6aa4

See more details on using hashes here.

Provenance

The following attestation bundles were made for sqlplate-0.0.7-py3-none-any.whl:

Publisher: publish.yml on korawica/sqlplate

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page