Skip to main content

A Tabular Helper API library that wraps snowflake-connector-python with typed connection management, multi-format profile support, and a normalized query return shape.

Project description

tha-snowflake-runner

CI

A Tabular Helper API library that wraps snowflake-connector-python with typed connection management, multi-format profile support, and a normalized query return shape.

Install

pip install tha-snowflake-runner

Quick start

from tha_snowflake_runner import ThaSnowflake

sf = ThaSnowflake(role="ANALYST", warehouse="COMPUTE_WH", database="PROD", schema="PUBLIC")

# inline SQL
result = sf.query("SELECT id, name FROM users WHERE active = %s", params=(True,))
# {"rows": [{"ID": "u1", "NAME": "Alice"}], "rowcount": 1, "status": None}

# or load SQL from a file
result = sf.query(file="queries/users.sql", params=(True,))

Connection modes

Mode 1 — native connections.toml (default)

Delegates profile lookup to the connector, which reads ~/.snowflake/connections.toml (or the path in SNOWFLAKE_HOME).

sf = ThaSnowflake(role="ANALYST", warehouse="WH")

# Named profile
sf = ThaSnowflake(connection_name="prod", role="ANALYST", warehouse="WH")

Mode 2 — custom connections file

Pass any .toml, .ini/.cfg, or .json file. Both flat ([profile]) and nested ([connections.profile]) TOML styles are supported.

sf = ThaSnowflake(
    connections_file="~/my_connections.toml",
    connection_name="prod",
    role="ANALYST",
    warehouse="WH",
)

# List available profile names from the file
names = sf.list_profiles()  # ["default", "prod", "dev"]

# Module-level convenience — no ThaSnowflake instance needed
from tha_snowflake_runner import list_profiles
names = list_profiles("~/my_connections.toml")

Mode 3 — inline (no file)

Supply all connection parameters directly. account being set triggers inline mode.

# Okta SSO — opens a browser window
sf = ThaSnowflake(account="myorg", user="me@example.com", authenticator="externalbrowser")

# Password (service accounts)
sf = ThaSnowflake(account="myorg", user="svc@example.com", password="secret")

# Key-pair
sf = ThaSnowflake(
    account="myorg",
    user="svc@example.com",
    private_key_file="~/keys/rsa_key.p8",
    private_key_passphrase="mypass",
)

# OAuth token
sf = ThaSnowflake(account="myorg", authenticator="oauth", token="...")

Pass quiet_connect=True to suppress connector stdout (e.g. the externalbrowser browser-open message).

Query return shape

Every query() call returns a normalized dict:

result = sf.query("SELECT * FROM orders WHERE id = %s", params=("o1",))
# {
#   "rows": [{"ID": "o1", "TOTAL": 99.0}],
#   "rowcount": 1,
#   "status": None,        # None on success, error string on Snowflake query failure
# }

sf.rows always holds the result of the most recent query (thread-local).

Session — multiple queries on one connection

with sf.session(role="ANALYST", warehouse="WH") as sess:
    users = sess.query("SELECT * FROM users")
    orders = sess.query("SELECT * FROM orders")
    # one connection opened, two queries run, connection closed on exit

For inline use without a with block, use open_session() — caller is responsible for closing:

sess = sf.open_session(role="ANALYST", warehouse="WH")
result = sess.query("SELECT * FROM users")
sess.close()

Pass accumulate=True to append rows across queries into sess.rows:

with sf.session(accumulate=True) as sess:
    sess.query("SELECT * FROM users WHERE region = 'US'")
    sess.query("SELECT * FROM users WHERE region = 'EU'")
    all_users = sess.rows  # combined result from both queries

The return value of each sess.query() is always the current query's result only — sess.rows is the accumulation point.

Threading

sf.rows is thread-local — each thread sees only its own query results. For concurrent workloads, open a Session inside each worker:

from concurrent.futures import ThreadPoolExecutor

def worker(sql):
    with sf.session() as sess:
        return sess.query(sql)

with ThreadPoolExecutor(max_workers=4) as pool:
    results = list(pool.map(worker, queries))

Retry

sf = ThaSnowflake(
    account="myorg",
    retry_connect=3,         # up to 3 retries (4 total attempts)
    retry_on=(OSError,),     # only retry on these exception types
    retry_delay=2.0,         # seconds between attempts
    status_cb=print,         # called with a message on each failed attempt
)

API

ThaSnowflake(*, ...)

Parameter Type Default Description
connection_name str "default" Profile name for Mode 1/2
connections_file str | None None Path to custom connections file (Mode 2)
account str | None None Snowflake account identifier (Mode 3)
user str | None None Username (Mode 3)
authenticator str | None None "externalbrowser", "oauth", etc.
password str | None None Password auth (Mode 3)
private_key_file str | None None Path to .p8 key file; ~ is expanded
private_key_passphrase str | None None Passphrase for encrypted private key
token str | None None OAuth token (Mode 3)
role str | None None Default Snowflake role
warehouse str | None None Default warehouse
database str | None None Default database
schema str | None None Default schema
quiet_connect bool False Suppress connector stdout on connect
status_cb callable | None None Called with status strings (retry messages, etc.)
mode str "app" Reserved — "app" or "cli"
retry_connect int 0 Number of reconnect retries on failure
retry_on type | tuple () Exception types that trigger a retry
retry_delay float 1.0 Seconds to wait between retry attempts

Methods

Method Returns Description
set_context(*, role, warehouse, database, schema) None Update default context; only non-None values are applied
build_connect_kwargs(*, role, warehouse, database, schema) dict Return kwargs that would be passed to snowflake.connector.connect — useful for debugging
connect(**kwargs) SnowflakeConnection Open and return a raw connection
connection(**kwargs) context manager Open a connection, close it on exit
session(*, accumulate=False, **kwargs) context manager → Session Open a Session backed by one connection; closes on exit
open_session(*, accumulate=False, **kwargs) Session Open and return a Session without a context manager; caller must call sess.close()
query(sql=None, *, file=None, params, conn, role, warehouse, database, schema) dict Execute a SELECT; pass sql or file= (not both); returns {"rows", "rowcount", "status"}
list_profiles() list[str] Profile names from connections_file (requires Mode 2)

Session

Obtain via sf.session(). Not thread-safe — one Session per thread.

Member Description
query(sql=None, *, file=None, params) Execute a SELECT on the persistent connection; pass sql or file= (not both); returns {"rows", "rowcount", "status"}
rows Running result — latest query only when accumulate=False (default), all queries combined when accumulate=True
close() Close the underlying connection

list_profiles(path) — module-level

from tha_snowflake_runner import list_profiles

names = list_profiles("~/connections.toml")  # ["default", "prod", "dev"]

Supports the same formats as Mode 2: .toml, .ini, .cfg, .json.

Scope

tha-snowflake-runner is read-only by design — query() and stream() only. Write operations (INSERT, UPDATE, DELETE, MERGE, DDL) are intentionally not supported; use the raw connector directly for those.

Alternatives

tha-snowflake-runner is intentionally narrow: no ORM, no Snowpark, no async — just a thin typed wrapper for common query patterns and connection-file management.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tha_snowflake_runner-0.1.3.tar.gz (75.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tha_snowflake_runner-0.1.3-py3-none-any.whl (11.7 kB view details)

Uploaded Python 3

File details

Details for the file tha_snowflake_runner-0.1.3.tar.gz.

File metadata

  • Download URL: tha_snowflake_runner-0.1.3.tar.gz
  • Upload date:
  • Size: 75.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.13

File hashes

Hashes for tha_snowflake_runner-0.1.3.tar.gz
Algorithm Hash digest
SHA256 ba5c67e48c533a4172c9c84d85da05dd83f265da97ca7945f6cb0359a5d1e5e7
MD5 b1fd377d73e8eee34a37c9bd59ae0576
BLAKE2b-256 3439de0d827c9bdd1ac5ca806ccf56a5bc64d001f92081024b1cb86d9a01af82

See more details on using hashes here.

Provenance

The following attestation bundles were made for tha_snowflake_runner-0.1.3.tar.gz:

Publisher: publish.yml on tha-guy-nate/tha-snowflake-runner

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file tha_snowflake_runner-0.1.3-py3-none-any.whl.

File metadata

File hashes

Hashes for tha_snowflake_runner-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 ee1813d184dc91789a8fec9e8dd38fcec8e3b6f12a1c166390d5436b98f4903b
MD5 2e2595a91d9f7007411fecf5c72c1fac
BLAKE2b-256 1c9abeaa7ae81e7bf0f941f9382227a3ccb18accb03f043a678a8a680a78c494

See more details on using hashes here.

Provenance

The following attestation bundles were made for tha_snowflake_runner-0.1.3-py3-none-any.whl:

Publisher: publish.yml on tha-guy-nate/tha-snowflake-runner

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page