Snowflake SQLAlchemy Dialect
Project description
Snowflake SQLAlchemy
Snowflake SQLAlchemy runs on the top of the Snowflake Connector for Python as a dialect to bridge a Snowflake database and SQLAlchemy applications.
SQLAlchemy version 1.4 is a legacy version (see reference), which is why we are working on a version that supports only SQLAlchemy 2.x. The version will be available as a "release candidate" to enable earlier testing. We will publish more details together with its release. The current version will still be supported for two years in accordance with https://docs.snowflake.com/en/release-notes/requirements#recommended-client-versions
Table of contents:
- Snowflake SQLAlchemy
- Prerequisites
- Installing Snowflake SQLAlchemy
- Verifying Your Installation
- Parameters and Behavior
- Connection Parameters
- Opening and Closing Connection
- Auto-increment Behavior
- Object Name Case Handling
- Index Support
- Numpy Data Type Support
- DECFLOAT Data Type Support
- VECTOR Data Type Support
- UUID Data Type Support
- Cache Column Metadata
- Cross-Database Reflection
- VARIANT, ARRAY and OBJECT Support
- Structured Data Types Support
- CLUSTER BY Support
- Alembic Support
- Key Pair Authentication Support
- Merge Command Support
- Bulk Insert Optimization for ORM Models
- CopyIntoStorage Support
- Iceberg Table with Snowflake Catalog support
- Hybrid Table support
- Dynamic Tables support
- Notes
- Verifying Package Signatures
- Support
- Known Limitations
Prerequisites
Snowflake Connector for Python
The only requirement for Snowflake SQLAlchemy is the Snowflake Connector for Python; however, the connector does not need to be installed because installing Snowflake SQLAlchemy automatically installs the connector.
Data Analytics and Web Application Frameworks (Optional)
Snowflake SQLAlchemy can be used with Pandas, Jupyter and Pyramid, which provide higher levels of application frameworks for data analytics and web applications. However, building a working environment from scratch is not a trivial task, particularly for novice users. Installing the frameworks requires C compilers and tools, and choosing the right tools and versions is a hurdle that might deter users from using Python applications.
An easier way to build an environment is through Anaconda, which provides a complete, precompiled technology stack for all users, including non-Python experts such as data analysts and students. For Anaconda installation instructions, see the Anaconda install documentation. The Snowflake SQLAlchemy package can then be installed on top of Anaconda using pip.
Installing Snowflake SQLAlchemy
The Snowflake SQLAlchemy package can be installed from the public PyPI repository using pip:
pip install --upgrade snowflake-sqlalchemy
pip automatically installs all required modules, including the Snowflake Connector for Python.
Verifying Your Installation
-
Create a file (e.g.
validate.py) that contains the following Python sample code, which connects to Snowflake and displays the Snowflake version:from sqlalchemy import create_engine engine = create_engine( 'snowflake://{user}:{password}@{account}/'.format( user='<your_user_login_name>', password='<your_password>', account='<your_account_name>', ) ) try: connection = engine.connect() results = connection.execute('select current_version()').fetchone() print(results[0]) finally: connection.close() engine.dispose()
-
Replace
<your_user_login_name>,<your_password>, and<your_account_name>with the appropriate values for your Snowflake account and user.For more details, see Connection Parameters.
-
Execute the sample code. For example, if you created a file named
validate.py:python validate.pyThe Snowflake version (e.g.
1.48.0) should be displayed.
Parameters and Behavior
As much as possible, Snowflake SQLAlchemy provides compatible functionality for SQLAlchemy applications. For information on using SQLAlchemy, see the SQLAlchemy documentation.
However, Snowflake SQLAlchemy also provides Snowflake-specific parameters and behavior, which are described in the following sections.
Connection Parameters
Snowflake SQLAlchemy uses the following syntax for the connection string used to connect to Snowflake and initiate a session:
'snowflake://<user_login_name>:<password>@<account_name>'
Where:
<user_login_name>is the login name for your Snowflake user.<password>is the password for your Snowflake user.<account_name>is the name of your Snowflake account.
Include the region in the <account_name> if applicable, more info is available here.
You can optionally specify the initial database and schema for the Snowflake session by including them at the end of the connection string, separated by /. You can also specify the initial warehouse and role for the session as a parameter string at the end of the connection string:
'snowflake://<user_login_name>:<password>@<account_name>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>'
Escaping Special Characters such as %, @ signs in Passwords
As pointed out in SQLAlchemy, URLs
containing special characters need to be URL encoded to be parsed correctly. This includes the %, @ signs. Unescaped password containing special
characters could lead to authentication failure.
The encoding for the password can be generated using urllib.parse:
import urllib.parse
urllib.parse.quote("kx@% jj5/g")
'kx%40%25%20jj5/g'
Note: urllib.parse.quote_plus may also be used if there is no space in the string, as urllib.parse.quote_plus will replace space with +.
To create an engine with the proper encodings, either manually constructing the url string by formatting
or taking advantage of the snowflake.sqlalchemy.URL helper method:
import urllib.parse
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
quoted_password = urllib.parse.quote("kx@% jj5/g")
# 1. manually constructing an url string
url = f'snowflake://testuser1:{quoted_password}@abc123/testdb/public?warehouse=testwh&role=myrole'
engine = create_engine(url)
# 2. using the snowflake.sqlalchemy.URL helper method
engine = create_engine(URL(
account = 'abc123',
user = 'testuser1',
password = quoted_password,
database = 'testdb',
schema = 'public',
warehouse = 'testwh',
role='myrole',
))
Note: After login, the initial database, schema, warehouse and role specified in the connection string can always be changed for the session.
The following example calls the create_engine method with the user name testuser1, password 0123456, account name abc123, database testdb, schema public, warehouse testwh, and role myrole:
from sqlalchemy import create_engine
engine = create_engine(
'snowflake://testuser1:0123456@abc123/testdb/public?warehouse=testwh&role=myrole'
)
Other parameters, such as timezone, can also be specified as a URI parameter or in connect_args parameters. For example:
from sqlalchemy import create_engine
engine = create_engine(
'snowflake://testuser1:0123456@abc123/testdb/public?warehouse=testwh&role=myrole',
connect_args={
'timezone': 'America/Los_Angeles',
}
)
For convenience, you can use the snowflake.sqlalchemy.URL method to construct the connection string and connect to the database. The following example constructs the same connection string from the previous example:
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
engine = create_engine(URL(
account = 'abc123',
user = 'testuser1',
password = '0123456',
database = 'testdb',
schema = 'public',
warehouse = 'testwh',
role='myrole',
timezone = 'America/Los_Angeles',
))
Using a proxy server
Use the supported environment variables, HTTPS_PROXY, HTTP_PROXY and NO_PROXY to configure a proxy server.
Using session parameters
Snowflake session parameters (such as QUERY_TAG) cannot be set directly through the URL helper.
Instead, pass them via the connect_args parameter of create_engine, using the session_parameters dict — the same way you would through the Python connector:
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
engine = create_engine(
URL(
# CONNECTION_PARAMETERS
),
connect_args={
"session_parameters": {
"QUERY_TAG": "SOME_QUERY_TAGS",
}
},
)
Session parameters set this way apply to all queries executed within the session.
To change a session parameter for specific queries mid-session, use ALTER SESSION:
from sqlalchemy import text
with engine.connect() as conn:
conn.execute(text("ALTER SESSION SET QUERY_TAG = 'batch_job_1'"))
conn.execute(text("...")) # Uses 'batch_job_1'
conn.execute(text("ALTER SESSION SET QUERY_TAG = 'batch_job_2'"))
conn.execute(text("...")) # Uses 'batch_job_2'
conn.execute(text("ALTER SESSION UNSET QUERY_TAG"))
conn.execute(text("...")) # No tag
Opening and Closing Connection
Open a connection by executing engine.connect(); avoid using engine.execute(). Make certain to close the connection by executing connection.close() before
engine.dispose(); otherwise, the Python Garbage collector removes the resources required to communicate with Snowflake, preventing the Python connector from closing the session properly.
# Avoid this.
engine = create_engine(...)
engine.execute(<SQL>)
engine.dispose()
# Better.
engine = create_engine(...)
connection = engine.connect()
try:
connection.execute(text(<SQL>))
finally:
connection.close()
engine.dispose()
# Best
try:
with engine.connect() as connection:
connection.execute(text(<SQL>))
# or
connection.exec_driver_sql(<SQL>)
finally:
engine.dispose()
Auto-increment Behavior
Auto-incrementing a value requires the Sequence object. Include the Sequence object in the primary key column to automatically increment the value as each new record is inserted. For example:
t = Table('mytable', metadata,
Column('id', Integer, Sequence('id_seq'), primary_key=True),
Column(...), ...
)
Object Name Case Handling
Snowflake stores all case-insensitive object names in uppercase text. In contrast, SQLAlchemy considers all lowercase object names to be case-insensitive. Snowflake SQLAlchemy converts the object name case during schema-level communication, i.e. during table and index reflection. If you use uppercase object names, SQLAlchemy assumes they are case-sensitive and encloses the names with quotes. This behavior will cause mismatches against data dictionary data received from Snowflake, so unless identifier names have been truly created as case sensitive using quotes, e.g., "TestDb", all lowercase names should be used on the SQLAlchemy side.
Index Support
Indexes are supported only for Hybrid Tables in Snowflake SQLAlchemy. For more details on limitations and use cases, refer to the Create Index documentation. You can create an index using the following methods:
Single Column Index
You can create a single column index by setting the index=True parameter on the column or by explicitly defining an Index object.
hybrid_test_table_1 = HybridTable(
"table_name",
metadata,
Column("column1", Integer, primary_key=True),
Column("column2", String, index=True),
Index("index_1", "column1", "column2")
)
metadata.create_all(engine_testaccount)
Multi-Column Index
For multi-column indexes, you define the Index object specifying the columns that should be indexed.
hybrid_test_table_1 = HybridTable(
"table_name",
metadata,
Column("column1", Integer, primary_key=True),
Column("column2", String),
Index("index_1", "column1", "column2")
)
metadata.create_all(engine_testaccount)
Numpy Data Type Support
Snowflake SQLAlchemy supports binding and fetching NumPy data types. Binding is always supported. To enable fetching NumPy data types, add numpy=True to the connection parameters.
The following example shows the round trip of numpy.datetime64 data:
import numpy as np
import pandas as pd
engine = create_engine(URL(
account = 'abc123',
user = 'testuser1',
password = 'pass',
database = 'db',
schema = 'public',
warehouse = 'testwh',
role='myrole',
numpy=True,
))
specific_date = np.datetime64('2016-03-04T12:03:05.123456789Z')
with engine.connect() as connection:
connection.exec_driver_sql(
"CREATE OR REPLACE TABLE ts_tbl(c1 TIMESTAMP_NTZ)")
connection.exec_driver_sql(
"INSERT INTO ts_tbl(c1) values(%s)", (specific_date,)
)
df = pd.read_sql_query("SELECT * FROM ts_tbl", connection)
assert df.c1.values[0] == specific_date
The following NumPy data types are supported:
- numpy.int64
- numpy.float64
- numpy.datatime64
DECFLOAT Data Type Support
Snowflake SQLAlchemy supports the DECFLOAT data type, which provides decimal floating-point with up to 38 significant digits. For more information, see the Snowflake DECFLOAT documentation.
from sqlalchemy import Column, Integer, MetaData, Table
from snowflake.sqlalchemy import DECFLOAT
metadata = MetaData()
t = Table('my_table', metadata,
Column('id', Integer, primary_key=True),
Column('value', DECFLOAT()),
)
metadata.create_all(engine)
DECFLOAT Precision
The Snowflake Python connector uses Python's decimal module context when converting DECFLOAT values to Python Decimal objects. Python's default decimal context precision is 28 digits, which can truncate DECFLOAT values that use up to 38 digits.
To preserve full 38-digit precision, add enable_decfloat=True to the connection URL:
from sqlalchemy import create_engine
engine = create_engine(
'snowflake://testuser1:0123456@abc123/testdb/public?warehouse=testwh&enable_decfloat=True'
)
Or using the snowflake.sqlalchemy.URL helper:
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
engine = create_engine(URL(
account = 'abc123',
user = 'testuser1',
password = '0123456',
database = 'testdb',
schema = 'public',
warehouse = 'testwh',
enable_decfloat = True,
))
Note: DECFLOAT does not support special values (inf, -inf, NaN) unlike FLOAT.
Why is enable_decfloat not enabled by default? Enabling it sets decimal.getcontext().prec = 38, which modifies Python's thread-local decimal context and affects all Decimal operations in that thread, not just database queries. To avoid unexpected side effects on application code, the dialect emits a warning when DECFLOAT values are retrieved without full precision enabled, guiding users to opt-in explicitly.
VECTOR Data Type Support
Snowflake SQLAlchemy supports the VECTOR data type with varying element type and dimension.
For more information, see the Snowflake documentation.
from sqlalchemy import Column, Integer, Float, MetaData, Table
from snowflake.sqlalchemy import VECTOR
metadata = MetaData()
t = Table('my_table', metadata,
Column('id', Integer, primary_key=True),
Column('int_vec', VECTOR(Integer, 20)),
Column('float_vec', VECTOR(Float, 40)),
)
metadata.create_all(engine)
UUID Data Type Support
SQLAlchemy 2.x only. The generic
UUIDtype does not exist in SQLAlchemy 1.4; on that version SnowflakeUUIDcolumns are reflected asNullType.
Snowflake stores UUID values internally as text strings. When a UUID column is reflected, snowflake-sqlalchemy maps it to sqlalchemy.sql.sqltypes.UUID with as_uuid=False, so query results are returned as plain hyphenated strings (e.g. "6ba7b810-9dad-11d1-80b4-00c04fd430c8") rather than uuid.UUID objects.
from sqlalchemy import Column, MetaData, Table, text
from sqlalchemy.sql.sqltypes import UUID # SA 2.x
metadata = MetaData()
# Define a table with a UUID primary key
t = Table(
"my_table",
metadata,
Column("id", UUID, primary_key=True),
)
metadata.create_all(engine)
# Reflected UUID columns come back as strings by default
with engine.connect() as conn:
row = conn.execute(text("SELECT id FROM my_table LIMIT 1")).fetchone()
print(type(row[0])) # <class 'str'>
print(row[0]) # "6ba7b810-9dad-11d1-80b4-00c04fd430c8"
If you prefer uuid.UUID objects in query results, pass as_uuid=True explicitly:
Column("id", UUID(as_uuid=True), primary_key=True)
Note that Alembic autogenerate will render the column as UUID(as_uuid=True) in generated migration files, whereas the default (as_uuid=False) renders as UUID().
Timestamp and Timezone Support
Snowflake SQLAlchemy provides three Snowflake-specific timestamp types that map directly to their Snowflake counterparts:
from sqlalchemy import Column, Integer, MetaData, Table, create_engine
from snowflake.sqlalchemy import TIMESTAMP_NTZ, TIMESTAMP_TZ, TIMESTAMP_LTZ
engine = create_engine(...)
metadata = MetaData()
t = Table('events', metadata,
Column('id', Integer, primary_key=True),
Column('created_at', TIMESTAMP_NTZ()), # TIMESTAMP WITHOUT TIME ZONE
Column('scheduled_at', TIMESTAMP_TZ()), # TIMESTAMP WITH TIME ZONE
Column('logged_at', TIMESTAMP_LTZ()), # TIMESTAMP WITH LOCAL TIME ZONE
)
metadata.create_all(engine)
SQLAlchemy's generic DateTime and TIMESTAMP types also support timezone-aware columns via the timezone parameter. When timezone=True is set, the dialect emits TIMESTAMP_TZ instead of the default TIMESTAMP_NTZ:
from sqlalchemy import Column, DateTime, Integer, MetaData, Table, create_engine
from sqlalchemy.types import TIMESTAMP
engine = create_engine(...)
metadata = MetaData()
t = Table('events', metadata,
Column('id', Integer, primary_key=True),
Column('naive_ts', DateTime()), # produces TIMESTAMP_NTZ
Column('aware_ts', DateTime(timezone=True)), # produces TIMESTAMP_TZ
Column('naive_ts2', TIMESTAMP()), # produces TIMESTAMP_NTZ
Column('aware_ts2', TIMESTAMP(timezone=True)), # produces TIMESTAMP_TZ
)
metadata.create_all(engine)
This also applies when using pandas to_sql() with timezone-aware datetime columns, which infers DateTime(timezone=True) automatically (see #199).
Note on Time and timezones: SQLAlchemy's Time type accepts a timezone parameter, but Snowflake's TIME data type does not support time zones. Using Time(timezone=True) will compile to plain TIME and the timezone flag will have no effect. If you need to store time data with time-zone information, use a timestamp type such as TIMESTAMP_TZ or DateTime(timezone=True) instead.
Cache Column Metadata
SQLAlchemy provides the runtime inspection API to get the runtime information about the various objects. One common use case is retrieving all tables and their column metadata in a schema to construct a schema catalog. For example, alembic manages database schema migrations on top of SQLAlchemy. A typical flow (SQLAlchemy 1.4) is:
inspector = inspect(engine)
schema = inspector.default_schema_name
for table_name in inspector.get_table_names(schema):
column_metadata = inspector.get_columns(table_name, schema)
primary_keys = inspector.get_pk_constraint(table_name, schema)
foreign_keys = inspector.get_foreign_keys(table_name, schema)
...
In this flow, running a separate query per table can be slow for large schemas. Snowflake SQLAlchemy optimises this with schema-wide cached queries and, where appropriate, fast per-table queries.
Single-Table vs Multi-Table Reflection Performance
SQLAlchemy 2.x (automatic)
SQLAlchemy 2.x distinguishes bulk reflection from single-table inspection at the framework level:
MetaData.reflect()/Table(..., autoload_with=engine)— callsget_multi_columns,get_multi_pk_constraint,get_multi_foreign_keys, andget_multi_unique_constraints. Each issues one schema-wideSHOWorinformation_schemaquery and caches the result for all tables in the schema.inspector.get_columns(table_name)— issues a singleDESC TABLEquery directly against that table. This is fast and correct for all table types including temporary tables.
No configuration is needed; the routing is handled automatically by the SA 2.x dispatch layer.
Note on reflected type representations: Because inspector.get_columns() uses DESC TABLE, reflected types always include Snowflake's resolved default sizes (e.g. BINARY(8388608) instead of BINARY, VARCHAR(16777216) instead of VARCHAR). The type objects are functionally identical; only str() output differs. Use isinstance() checks rather than string comparison for type introspection.
from sqlalchemy import MetaData, inspect, create_engine
engine = create_engine('snowflake://...')
# SA 2.x: one schema-wide query per metadata type, all tables cached at once
metadata = MetaData()
metadata.reflect(bind=engine, schema='public')
# SA 2.x: direct DESC TABLE, no schema-wide query issued
inspector = inspect(engine)
columns = inspector.get_columns('my_table', schema='public')
Per-table optimisation
On SQLAlchemy 2.x, get_pk_constraint, get_unique_constraints, get_foreign_keys, get_indexes, and get_columns automatically use per-table queries (SHOW … IN TABLE, DESC TABLE) for single-table Inspector calls (e.g. Inspector.get_pk_constraint(), pandas.read_sql_table()). MetaData.reflect() continues to use the schema-wide get_multi_* hooks, which issue one query per reflection pass.
On SQLAlchemy 1.4, MetaData.reflect() calls the singular methods per-table. Add cache_column_metadata=True to the connection URL to opt in to per-table queries for get_pk_constraint, get_unique_constraints, get_foreign_keys, get_indexes, and get_columns. Without this flag, the existing schema-wide queries are used unchanged.
engine = create_engine(URL(
account = 'abc123',
user = 'testuser1',
password = 'pass',
database = 'db',
schema = 'public',
warehouse = 'testwh',
role='myrole',
cache_column_metadata=True, # SA 1.4 only: enables per-table reflection
))
Performance Implications
For schemas with many tables (100+), schema-wide queries issued once during MetaData.reflect() are far more efficient than per-table queries in a loop:
- Schema-wide
SHOW PRIMARY KEYS IN SCHEMA(all tables): < 1 second - Per-table loop over 1 000 tables: 1 000+ round-trips
For single-table inspection via Inspector, per-table queries (DESC TABLE, SHOW … IN TABLE) are faster than fetching the entire schema.
Best Practices
- For bulk reflection: use
metadata.reflect()— schema-wide queries are issued once and cached. - For single-table inspection: use
inspector.get_columns()/inspector.get_pk_constraint()etc. — per-table queries are used automatically (SA 2.x) or withcache_column_metadata=True(SA 1.4). - For very large schemas: reflect only the tables you need:
metadata.reflect(bind=engine, schema='public', only=['table1', 'table2'])
Cross-Database Reflection
Snowflake SQLAlchemy supports reflecting tables from different databases using the database.schema notation in the schema parameter. This allows you to work with tables from multiple databases in a single session without using raw SQL.
To reflect a table from a different database, use the database.schema notation:
from sqlalchemy import create_engine, MetaData, Table, select
# Connect to database_a
engine = create_engine('snowflake://user:pass@account/database_a')
metadata = MetaData()
# Reflect a table from database_b using database.schema notation
bananas = Table(
'bananas',
metadata,
schema='database_b.schema_b', # Cross-database schema
autoload_with=engine
)
# Reflect a table from the current database
apples = Table(
'apples',
metadata,
schema='schema_a', # Uses current database (database_a)
autoload_with=engine
)
# Create a cross-database join
stmt = select(apples, bananas).join(
bananas,
apples.c.id == bananas.c.id
)
# The generated SQL will use fully-qualified names for cross-database tables:
# SELECT ... FROM schema_a.apples
# JOIN database_b.schema_b.bananas ON ...
You can also reflect all tables from a different database:
metadata.reflect(
bind=engine,
schema='database_b.schema_b'
)
Note about schema names containing dots: If your Snowflake schema name literally contains a dot character (e.g., created with CREATE SCHEMA "my.schema"), you must quote it in the Python string:
# For schema literally named "my.schema" in database_b
table = Table('mytable', metadata, schema='database_b."my.schema"', autoload_with=engine)
The SQL compilation layer already treats unquoted dots as separators. This cross-database reflection feature makes the reflection layer consistent with that existing behavior.
VARIANT, ARRAY and OBJECT Support
Snowflake SQLAlchemy supports fetching VARIANT, ARRAY and OBJECT data types. All types are converted into str in Python so that you can convert them to native data types using json.loads.
This example shows how to create a table including VARIANT, ARRAY, and OBJECT data type columns.
from snowflake.sqlalchemy import (VARIANT, ARRAY, OBJECT)
t = Table('my_semi_strucutred_datatype_table', metadata,
Column('va', VARIANT),
Column('ob', OBJECT),
Column('ar', ARRAY))
metdata.create_all(engine)
In order to retrieve VARIANT, ARRAY, and OBJECT data type columns and convert them to the native Python data types, fetch data and call the json.loads method as follows:
import json
connection = engine.connect()
results = connection.execute(select([t])
row = results.fetchone()
data_variant = json.loads(row[0])
data_object = json.loads(row[1])
data_array = json.loads(row[2])
Structured Data Types Support
This module defines custom SQLAlchemy types for Snowflake structured data, specifically for Iceberg tables. The types —MAP, OBJECT, and ARRAY— allow you to store complex data structures in your SQLAlchemy models. For detailed information, refer to the Snowflake Structured data types documentation.
MAP
The MAP type represents a collection of key-value pairs, where each key and value can have different types.
- Key Type: The type of the keys (e.g.,
TEXT,NUMBER). - Value Type: The type of the values (e.g.,
TEXT,NUMBER). - Not Null: Whether
NULLvalues are allowed (default isFalse).
Example Usage
IcebergTable(
table_name,
metadata,
Column("id", Integer, primary_key=True),
Column("map_col", MAP(NUMBER(10, 0), TEXT(16777216))),
external_volume="external_volume",
base_location="base_location",
)
OBJECT
The OBJECT type represents a semi-structured object with named fields. Each field can have a specific type, and you can also specify whether each field is nullable.
- Items Types: A dictionary of field names and their types. The type can optionally include a nullable flag (
Truefor not nullable,Falsefor nullable, default isFalse).
Example Usage
IcebergTable(
table_name,
metadata,
Column("id", Integer, primary_key=True),
Column(
"object_col",
OBJECT(key1=(TEXT(16777216), False), key2=(NUMBER(10, 0), False)),
OBJECT(key1=TEXT(16777216), key2=NUMBER(10, 0)), # Without nullable flag
),
external_volume="external_volume",
base_location="base_location",
)
ARRAY
The ARRAY type represents an ordered list of values, where each element has the same type. The type of the elements is defined when creating the array.
- Value Type: The type of the elements in the array (e.g.,
TEXT,NUMBER). - Not Null: Whether
NULLvalues are allowed (default isFalse).
Example Usage
IcebergTable(
table_name,
metadata,
Column("id", Integer, primary_key=True),
Column("array_col", ARRAY(TEXT(16777216))),
external_volume="external_volume",
base_location="base_location",
)
CLUSTER BY Support
Snowflake SQLAchemy supports the CLUSTER BY parameter for tables. For information about the parameter, see :doc:/sql-reference/sql/create-table.
This example shows how to create a table with two columns, id and name, as the clustering keys:
t = Table('myuser', metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
snowflake_clusterby=['id', 'name', text('id > 5')], ...
)
metadata.create_all(engine)
Alembic Support
Alembic is a database migration tool on top of SQLAlchemy. Snowflake SQLAlchemy works by adding the following code to alembic/env.py so that Alembic can recognize Snowflake SQLAlchemy.
from alembic.ddl.impl import DefaultImpl
class SnowflakeImpl(DefaultImpl):
__dialect__ = 'snowflake'
See Alembic Documentation for general usage.
Key Pair Authentication Support
Snowflake SQLAlchemy supports key pair authentication by leveraging its Snowflake Connector for Python underpinnings. See Using Key Pair Authentication for steps to create the private and public keys.
The private key parameter is passed through connect_args as follows:
...
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization
with open("rsa_key.p8", "rb") as key:
p_key= serialization.load_pem_private_key(
key.read(),
password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
backend=default_backend()
)
pkb = p_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())
engine = create_engine(URL(
account='abc123',
user='testuser1',
),
connect_args={
'private_key': pkb,
},
)
Where PRIVATE_KEY_PASSPHRASE is a passphrase to decrypt the private key file, rsa_key.p8.
Currently a private key parameter is not accepted by the snowflake.sqlalchemy.URL method.
Merge Command Support
Snowflake SQLAlchemy supports upserting with its MergeInto custom expression.
See Merge for full documentation.
Use it as follows:
from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, create_engine
from snowflake.sqlalchemy import MergeInto
engine = create_engine(db.url, echo=False)
session = sessionmaker(bind=engine)()
connection = engine.connect()
meta = MetaData()
meta.reflect(bind=session.bind)
t1 = meta.tables['t1']
t2 = meta.tables['t2']
merge = MergeInto(target=t1, source=t2, on=t1.c.t1key == t2.c.t2key)
merge.when_matched_then_delete().where(t2.c.marked == 1)
merge.when_matched_then_update().where(t2.c.isnewstatus == 1).values(val = t2.c.newval, status=t2.c.newstatus)
merge.when_matched_then_update().values(val=t2.c.newval)
merge.when_not_matched_then_insert().values(val=t2.c.newval, status=t2.c.newstatus)
connection.execute(merge)
Bulk Insert Optimization for ORM Models
When using Session.bulk_save_objects() with models that have nullable optional
columns, SQLAlchemy groups objects into separate INSERT batches based on each
object's set of non-None column keys. If some objects were constructed without
supplying every nullable column, they produce different key sets and SQLAlchemy
emits O(N) INSERT statements instead of a single executemany batch.
Snowflake SQLAlchemy provides two components that together solve this problem:
SnowflakeBase(SQLAlchemy 2.x only) — aDeclarativeBasesubclass whose constructor pre-populates every plain-nullable column withNone(or its scalar Python default) at construction time, so all instances share the same column-key set regardless of which kwargs the caller supplied.snowflake_declarative_base()— a factory function compatible with both SQLAlchemy 1.4 and 2.x that produces a declarative base with the same pre-population behaviour.SnowflakeSession— aSessionsubclass that passesrender_nulls=Trueto the internal bulk-save call, preventing pre-populatedNonevalues from being stripped before grouping. Must be used together withSnowflakeBaseorsnowflake_declarative_base()for full effect.
SQLAlchemy 2.x example:
from sqlalchemy import Column, Integer, String, create_engine
from snowflake.sqlalchemy import SnowflakeBase, SnowflakeSession
class MyModel(SnowflakeBase):
__tablename__ = "my_model"
id = Column(Integer, primary_key=True)
name = Column(String) # nullable, no default
status = Column(String, default="active") # scalar default
engine = create_engine("snowflake://...")
SnowflakeBase.metadata.create_all(engine)
session = SnowflakeSession(bind=engine)
# All objects share the same column-key set — emits a single executemany INSERT
session.bulk_save_objects([
MyModel(id=1),
MyModel(id=2, name="foo"),
MyModel(id=3, status="inactive"),
])
session.commit()
SQLAlchemy 1.4 and 2.x compatible example:
from sqlalchemy import Column, Integer, String, create_engine
from snowflake.sqlalchemy import snowflake_declarative_base, SnowflakeSession
Base = snowflake_declarative_base()
class MyModel(Base):
__tablename__ = "my_model"
id = Column(Integer, primary_key=True)
name = Column(String)
status = Column(String, default="active")
engine = create_engine("snowflake://...")
Base.metadata.create_all(engine)
session = SnowflakeSession(bind=engine)
session.bulk_save_objects([
MyModel(id=1),
MyModel(id=2, name="foo"),
])
session.commit()
Notes:
SnowflakeBaseis only available in SQLAlchemy 2.x. Usesnowflake_declarative_base()when your code must run on both SA 1.4 and 2.x.- Columns with
server_default, callable Python defaults (default=fn), or SQL-expression defaults (default=func.now()) are intentionally left absent from pre-population. Objects that differ on such columns may still be placed in separate INSERT batches — this is the same behaviour as stock SQLAlchemy. SnowflakeSessionalone (without the matching base class) is not sufficient: the base class is required to unify the column-key sets beforeSnowflakeSessioncan batch them together.
CopyIntoStorage Support
Snowflake SQLAlchemy supports saving tables/query results into different stages, as well as into Azure Containers and
AWS buckets with its custom CopyIntoStorage expression. See Copy into
for full documentation.
Use it as follows:
from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, create_engine
from snowflake.sqlalchemy import CopyIntoStorage, AWSBucket, CSVFormatter
engine = create_engine(db.url, echo=False)
session = sessionmaker(bind=engine)()
connection = engine.connect()
meta = MetaData()
meta.reflect(bind=session.bind)
users = meta.tables['users']
copy_into = CopyIntoStorage(from_=users,
into=AWSBucket.from_uri('s3://my_private_backup').encryption_aws_sse_kms('1234abcd-12ab-34cd-56ef-1234567890ab'),
formatter=CSVFormatter().null_if(['null', 'Null']))
connection.execute(copy_into)
Iceberg Table with Snowflake Catalog support
Snowflake SQLAlchemy supports Iceberg Tables with the Snowflake Catalog, along with various related parameters. For detailed information about Iceberg Tables, refer to the Snowflake CREATE ICEBERG documentation.
To create an Iceberg Table using Snowflake SQLAlchemy, you can define the table using the SQLAlchemy Core syntax as follows:
table = IcebergTable(
"myuser",
metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
external_volume=external_volume_name,
base_location="my_iceberg_table",
as_query="SELECT * FROM table"
)
Alternatively, you can define the table using a declarative approach:
class MyUser(Base):
__tablename__ = "myuser"
@classmethod
def __table_cls__(cls, name, metadata, *arg, **kw):
return IcebergTable(name, metadata, *arg, **kw)
__table_args__ = {
"external_volume": "my_external_volume",
"base_location": "my_iceberg_table",
"as_query": "SELECT * FROM table",
}
id = Column(Integer, primary_key=True)
name = Column(String)
Hybrid Table support
Snowflake SQLAlchemy supports Hybrid Tables with indexes. For detailed information, refer to the Snowflake CREATE HYBRID TABLE documentation.
To create a Hybrid Table and add an index, you can use the SQLAlchemy Core syntax as follows:
table = HybridTable(
"myuser",
metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
Index("idx_name", "name")
)
Alternatively, you can define the table using the declarative approach:
class MyUser(Base):
__tablename__ = "myuser"
@classmethod
def __table_cls__(cls, name, metadata, *arg, **kw):
return HybridTable(name, metadata, *arg, **kw)
__table_args__ = (
Index("idx_name", "name"),
)
id = Column(Integer, primary_key=True)
name = Column(String)
Dynamic Tables support
Snowflake SQLAlchemy supports Dynamic Tables. For detailed information, refer to the Snowflake CREATE DYNAMIC TABLE documentation.
To create a Dynamic Table, you can use the SQLAlchemy Core syntax as follows:
dynamic_test_table_1 = DynamicTable(
"dynamic_MyUser",
metadata,
Column("id", Integer),
Column("name", String),
target_lag=(1, TimeUnit.HOURS), # Additionally, you can use SnowflakeKeyword.DOWNSTREAM
warehouse='test_wh',
refresh_mode=SnowflakeKeyword.FULL,
as_query="SELECT id, name from MyUser;"
)
Alternatively, you can define a table without columns using the SQLAlchemy select() construct:
dynamic_test_table_1 = DynamicTable(
"dynamic_MyUser",
metadata,
target_lag=(1, TimeUnit.HOURS),
warehouse='test_wh',
refresh_mode=SnowflakeKeyword.FULL,
as_query=select(MyUser.id, MyUser.name)
)
Notes
- Defining a primary key in a Dynamic Table is not supported, meaning declarative tables don’t support Dynamic Tables.
- When using the
as_queryparameter with a string, you must explicitly define the columns. However, if you use the SQLAlchemyselect()construct, you don’t need to explicitly define the columns. - Direct data insertion into Dynamic Tables is not supported.
Verifying Package Signatures
To ensure the authenticity and integrity of the Python package, follow the steps below to verify the package signature using cosign.
Steps to verify the signature:
-
Install cosign:
- This example is using golang installation: installing-cosign-with-go
-
Download the file from the repository like pypi:
-
Download the signature files from the release tag, replace the version number with the version you are verifying:
-
Verify signature:
# replace the version number with the version you are verifying ./cosign verify-blob snowflake_sqlalchemy-1.7.3-py3-none-any.whl \ --certificate snowflake_sqlalchemy-1.7.3-py3-none-any.whl.crt \ --certificate-identity https://github.com/snowflakedb/snowflake-sqlalchemy/.github/workflows/python-publish.yml@refs/tags/v1.7.3 \ --certificate-oidc-issuer https://token.actions.githubusercontent.com \ --signature snowflake_sqlalchemy-1.7.3-py3-none-any.whl.sig Verified OK
Support
Feel free to file an issue or submit a PR here for general cases. For official support, contact Snowflake support at: https://community.snowflake.com/s/article/How-To-Submit-a-Support-Case-in-Snowflake-Lodge
Known Limitations
Identity columns as primary keys
Using SQLAlchemy's Identity() construct on a primary key column is not compatible with the SQLAlchemy ORM when targeting Snowflake.
Why it fails: After an INSERT, the ORM must retrieve the generated primary key to populate the in-memory object. SQLAlchemy supports two mechanisms for this — RETURNING (not available in Snowflake) and cursor.lastrowid (the Snowflake Python connector returns None for this attribute because Snowflake has no native rowid concept — see snowflake-connector-python#1201). With neither mechanism available, the ORM receives None as the primary key and raises:
sqlalchemy.orm.exc.FlushError: Instance <MyModel at 0x...> has a NULL identity key after a flush ...
Example that fails:
from sqlalchemy import Column, Identity, Integer, String
from sqlalchemy.orm import declarative_base, Session
Base = declarative_base()
class MyModel(Base):
__tablename__ = "my_model"
id = Column(Integer, Identity(start=1, increment=1), primary_key=True) # does not work with ORM
name = Column(String)
Base.metadata.create_all(engine)
with Session(engine) as session:
session.add(MyModel(name="test"))
session.commit() # raises FlushError: NULL identity key
The dialect emits a SnowflakeWarning at DDL compile time when Identity() is detected on a primary key column to surface this problem early. The warning is emitted once per unique (table, column) pair per Python process — repeated DDL compilation of the same schema does not produce duplicate output.
To silence the warning entirely, use Python's standard warning filter:
import warnings
from snowflake.sqlalchemy.exc import SnowflakeWarning
warnings.filterwarnings("ignore", category=SnowflakeWarning)
Or set it via the PYTHONWARNINGS environment variable before starting your application:
PYTHONWARNINGS=ignore::snowflake.sqlalchemy.exc.SnowflakeWarning python my_app.py
Workaround — use Sequence() instead:
from sqlalchemy import Column, Integer, Sequence, String
from sqlalchemy.orm import declarative_base, Session
Base = declarative_base()
class MyModel(Base):
__tablename__ = "my_model"
id = Column(Integer, Sequence("my_model_id_seq"), primary_key=True)
name = Column(String)
Base.metadata.create_all(engine)
with Session(engine) as session:
session.add(MyModel(name="test"))
session.commit() # id is populated correctly
Sequence objects are fully supported by the Snowflake dialect and are the recommended way to generate auto-incrementing primary keys when using the ORM. See the Auto-increment Behavior section for more details.
Case-sensitive identifiers
Snowflake stores unquoted identifiers in UPPERCASE and treats them case-insensitively. SQLAlchemy uses lowercase for case-insensitive identifiers. The dialect bridges this gap via normalize_name / denormalize_name, but a few edge cases require explicit opt-in.
How reflection maps Snowflake names to SQLAlchemy names
When the dialect reflects a table, each column name passes through normalize_name, which produces one of three outcomes depending on how the identifier was stored in Snowflake:
| Snowflake stored form | How it was created | normalize_name returns (default) |
normalize_name returns (case_sensitive_identifiers=True) |
SQLAlchemy treats it as |
|---|---|---|---|---|
MYCOL (all-uppercase) |
CREATE TABLE t (MYCOL INT) — unquoted |
"mycol" (plain str) |
"mycol" (plain str) |
case-insensitive |
mycol (lowercase) |
CREATE TABLE t ("mycol" INT) — quoted |
quoted_name("mycol", True) |
quoted_name("mycol", True) |
case-sensitive |
MyCol (mixed-case) |
CREATE TABLE t ("MyCol" INT) — quoted |
"MyCol" (plain str) |
quoted_name("MyCol", True) |
case-sensitive — emitted SQL is "MyCol" in both modes (_requires_quotes forces quoting for any uppercase character) |
With the flag off, the only observable difference for mixed-case names is the Python type: isinstance(name, quoted_name) is False and .quote is None. Emitted SQL, dict key equality, and hashing are identical because quoted_name is a str subclass. Enable the flag when downstream code (e.g. an Alembic render_item hook or a custom preparer subclass) needs to distinguish case-sensitive reflected names by type.
You can observe this directly via the inspector:
from sqlalchemy import inspect
from sqlalchemy.sql.elements import quoted_name
inspector = inspect(engine)
for col in inspector.get_columns("my_table"):
name = col["name"]
if isinstance(name, quoted_name) and name.quote:
print(f"{name!r} — case-sensitive (was created quoted in Snowflake)")
else:
print(f"{name!r} — case-insensitive (Snowflake stores as {name.upper()})")
This means that after reflection, accessing a case-insensitive column requires the lowercase name. Given a table created as:
CREATE TABLE my_table (mycol INT); -- unquoted: stored as MYCOL
metadata.reflect(bind=engine)
t = metadata.tables["my_table"]
t.c.mycol # correct — Snowflake stored MYCOL, reflected as "mycol"
t.c["MYCOL"] # KeyError — the reflected key is lowercase
And a case-sensitive column (quoted in Snowflake) is accessed by its exact reflected name:
t.c[quoted_name("mycol", True)] # correct — matches the reflected quoted_name key
t.c.mycol # also works — quoted_name.__eq__ compares by value
Lowercase column names in CLUSTER BY
Column objects wrapped in quoted_name("mycol", True) are treated as case-sensitive by SQLAlchemy. Pass them directly to snowflake_clusterby:
from sqlalchemy.sql.elements import quoted_name
from sqlalchemy import Column, Integer, MetaData, Table
t = Table(
"my_table",
MetaData(),
Column(quoted_name("mycol", True), Integer),
snowflake_clusterby=[quoted_name("mycol", True)],
)
# Generates: CLUSTER BY ("mycol")
Without quoted_name(..., True) the column name is treated as case-insensitive and Snowflake resolves it as MYCOL.
ALL-UPPERCASE identifiers that are SQL reserved words (e.g. TABLE, SELECT)
Most identifiers are handled correctly without any flag at the SQL layer:
- A quoted lowercase name (
"mycol"in Snowflake) reflects asquoted_name("mycol", True)— already case-sensitive, in both flag modes. - A quoted mixed-case name (
"MyCol") reflects as a plainstr"MyCol"by default; withcase_sensitive_identifiers=Trueit reflects asquoted_name("MyCol", True). Emitted SQL is"MyCol"either way because the preparer's_requires_quotesheuristic force-quotes any name containing uppercase characters, so only code that inspects.quoteorisinstance(..., quoted_name)observes a difference. - An unquoted name (
MYCOLstored as all-uppercase) reflects as"mycol"— correctly case-insensitive.
The one gap is an identifier whose all-uppercase form is also a SQL reserved word. For example, a table literally named TABLE (created as CREATE TABLE "TABLE" ...) stores as TABLE in Snowflake. When the dialect reflects it, normalize_name("TABLE") cannot tell whether TABLE is a plain uppercase column or the keyword TABLE, so by default it returns "TABLE" unchanged — an all-uppercase string that SQLAlchemy treats as case-sensitive. This causes a key mismatch: the table was reflected under the key "TABLE" but the same normalize_name call made during DDL would produce "table".
Enable case_sensitive_identifiers to fix this: the dialect will return quoted_name("table", True) for any all-uppercase identifier that is a reserved word, matching the standard SQLAlchemy convention:
from sqlalchemy import create_engine
engine = create_engine(
"snowflake://user:pass@account/db",
case_sensitive_identifiers=True,
)
Or via URL:
snowflake://user:pass@account/db?case_sensitive_identifiers=True
Hard limit: Enabling this flag changes the dict key used by normalize_name("TABLE") from "TABLE" to quoted_name("table", True). Because hash("TABLE") != hash("table"), any existing code that accesses metadata.tables["TABLE"] by the uppercase key will miss after the flag is enabled. Enabling the flag on an existing codebase requires auditing all string-keyed lookups into metadata.tables and table.c for names that happen to be reserved words.
Case-sensitive schema names — table and model definitions
The most reliable way to declare a case-sensitive schema across multiple tables is MetaData(schema=quoted_name(..., True)). All tables and ORM models that share that MetaData object inherit the schema automatically — no repetition per table:
from sqlalchemy import Column, Integer, MetaData, String, Table
from sqlalchemy.sql.elements import quoted_name
# Declare once — every Table attached to this MetaData inherits it.
metadata = MetaData(schema=quoted_name("myschema", True))
orders = Table("orders", metadata, Column("id", Integer, primary_key=True))
items = Table("items", metadata, Column("id", Integer, primary_key=True))
# SQL: "myschema".ORDERS and "myschema".ITEMS
# (table names are plain strings — Snowflake uppercases them)
To also make the table names case-sensitive, wrap them in quoted_name as well:
from sqlalchemy.sql.elements import quoted_name
orders = Table(quoted_name("orders", True), metadata, Column("id", Integer, primary_key=True))
items = Table(quoted_name("items", True), metadata, Column("id", Integer, primary_key=True))
# SQL: "myschema"."orders" and "myschema"."items"
When tables belong to different schemas, or when the MetaData is shared and you need per-table control, pass schema explicitly on the first table and reuse its .schema attribute on the rest:
orders = Table("orders", metadata,
Column("id", Integer, primary_key=True),
schema=quoted_name("myschema", True))
# Reuse the schema object directly — no need to repeat quoted_name.
items = Table("items", metadata,
Column("id", Integer, primary_key=True),
schema=orders.schema)
Table.schema is always a quoted_name instance. Assigning from another table's .schema carries the quote=True flag intact.
For ORM declarative models, set the schema on Base.metadata so every model inherits it:
from sqlalchemy import Column, Integer
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.sql.elements import quoted_name
class Base(DeclarativeBase):
metadata = MetaData(schema=quoted_name("myschema", True))
class Orders(Base):
__tablename__ = "orders"
id = Column(Integer, primary_key=True)
class Items(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True)
For per-model override, use __table_args__:
class Orders(Base):
__tablename__ = "orders"
__table_args__ = {"schema": quoted_name("myschema", True)}
id = Column(Integer, primary_key=True)
class Items(Base):
__tablename__ = "items"
__table_args__ = {"schema": Orders.__table__.schema} # reuse — no repetition
id = Column(Integer, primary_key=True)
Plain strings with inner double-quotes — '"myschema"' — are also accepted and produce the same result as quoted_name("myschema", True) when the dialect is built with case_sensitive_identifiers=True:
engine = create_engine(
"snowflake://user:pass@account/db",
case_sensitive_identifiers=True,
)
Table("orders", metadata, schema='"myschema"') # → "myschema".ORDERS
Table("orders", metadata, schema='"mydb"."myschema"') # → "mydb"."myschema".ORDERS
Without the flag, the schema parser extracts the inner value but leaves the quote attribute at its default (None), so the preparer's _requires_quotes heuristic decides whether to re-quote it — and for an all-lowercase value like myschema the heuristic returns False, stripping the quotes from the emitted SQL:
# Flag off (default):
Table("orders", metadata, schema='"myschema"')
# SQL: myschema.ORDERS ← Snowflake resolves this to MYSCHEMA (case-insensitive)
When case-sensitivity matters, either enable the flag or use quoted_name("myschema", True) / MetaData(schema=quoted_name(..., True)) explicitly — those forms carry quote=True on the value itself and are honoured regardless of the flag. Prefer quoted_name in code that is read often; prefer the quoted-string form together with the flag when the value comes from a config that already embeds the quotes.
Identifiers that contain a literal double-quote character use the standard SQL escape "". The parser handles this correctly — '"my""schema"' is treated as a single identifier named my"schema and round-trips to the SQL form "my""schema" in both flag modes (the embedded " is not a legal identifier character, so _requires_quotes forces quoting regardless of the flag):
Table("orders", metadata, schema='"my""schema"') # schema name: my"schema
# SQL: "my""schema".ORDERS
Case-sensitive schema names — engine connection
create_snowflake_engine sets the default schema for the connection (equivalent to USE SCHEMA on connect). Use it when all queries in the session target the same case-sensitive schema and you do not want to qualify every table individually:
from snowflake.sqlalchemy import create_snowflake_engine
engine = create_snowflake_engine(
"snowflake://user:pass@account/mydb",
schema="myschema",
case_sensitive_schema=True,
)
When the URL is stored outside Python code (environment variable, alembic.ini, Docker/Kubernetes config), create_snowflake_engine is not available and the schema must be percent-encoded directly in the URL string. Wrap the schema name in %22 (the percent-encoded form of "):
snowflake://user:pass@account/mydb/%22myschema%22
The dialect decodes %22 back to a literal " before passing the value to the Snowflake connector, which then executes USE SCHEMA "myschema" preserving case.
When building the URL programmatically with SQLAlchemy's URL() helper, you can pass the schema with embedded double-quotes directly — the helper's percent-encoding handles them automatically:
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
engine = create_engine(URL(
account="abc123",
user="testuser1",
password="secret",
database="mydb",
schema='"myschema"', # literal " characters; URL() encodes them as %22 internally
))
This is equivalent to the %22 string form but avoids manual percent-encoding.
Alembic — hand-written migrations
quoted_name works directly in op.create_table and op.add_column, so hand-written migration files need no special handling:
import sqlalchemy as sa
from alembic import op
from sqlalchemy.sql.elements import quoted_name
def upgrade():
op.create_table(
"my_table",
sa.Column(quoted_name("mycol", True), sa.String()), # quoted → "mycol" in Snowflake
sa.Column("name", sa.String()), # unquoted → NAME in Snowflake
)
op.add_column(
"my_table",
sa.Column(quoted_name("extra", True), sa.Integer()),
)
Alembic — case-sensitive schema configuration
When the Alembic version table or the migrations themselves target a case-sensitive schema, use the %22 form in every place Alembic receives a schema string, because these values are passed as plain strings and create_snowflake_engine is not available at that point:
# alembic/env.py
from sqlalchemy import create_engine
url = "snowflake://user:pass@account/mydb/%22myschema%22"
context.configure(
url=url,
target_metadata=target_metadata,
version_table_schema="%22myschema%22", # keeps alembic_version table in the same schema
)
When include_schemas=True is enabled, Alembic calls inspector.get_schema_names() and passes each result to the include_name filter. The dialect returns case-sensitive schema names as quoted_name objects (e.g. quoted_name("myschema", True)). Because quoted_name inherits from str, a plain string comparison is safe:
# alembic/env.py
from sqlalchemy.sql.elements import quoted_name
def include_name(name, type_, parent_names):
if type_ == "schema":
# quoted_name("myschema", True) == "myschema" → True (str subclass equality)
return name in {'"myschema"', "myschema"}
return True
context.configure(
url="snowflake://user:pass@account/mydb/%22myschema%22",
target_metadata=target_metadata,
include_schemas=True,
include_name=include_name,
version_table_schema="%22myschema%22",
)
Note: inspector.get_schema_names() returns the schema as Snowflake stores it — lowercase "myschema" if it was created quoted, or uppercase "MYSCHEMA" if unquoted. After normalize_name this becomes quoted_name("myschema", True) or "myschema" (plain lowercase) respectively. Filter against the lowercase value and both cases are covered by the str equality of quoted_name.
Alembic — autogenerate and case-sensitive columns
Alembic's default renderer serialises quoted_name("mycol", True) as the plain string "mycol", losing the case-sensitivity signal. The generated migration would create a case-insensitive MYCOL column instead of "mycol".
This also affects the comparison phase: when autogenerate detects that a reflected quoted_name("mycol", True) column differs from what it would render, it may emit a spurious alter_column on every run. The fix for both problems is the same — register the render_item hook in env.py:
from snowflake.sqlalchemy.alembic_util import render_item as snowflake_render_item
context.configure(
...,
render_item=snowflake_render_item,
)
Hard limit: Alembic has no dialect-level rendering hook. The render_item callback in env.py is the only injection point and requires a two-line opt-in per project. This cannot be eliminated without upstream Alembic changes.
Patterns that do not work
Plain unquoted string as a case-sensitive schema
# WRONG — "myschema" (no quotes, no quoted_name) is denormalized to MYSCHEMA
Table("my_table", metadata, schema="myschema")
class MyModel(Base):
__tablename__ = "my_table"
__table_args__ = {"schema": "myschema"} # also wrong
A plain lowercase string has no quote attribute, so denormalize_name converts it to MYSCHEMA before it reaches Snowflake. No error is raised — if a schema named MYSCHEMA happens to exist the query succeeds silently against the wrong schema.
Use quoted_name("myschema", True), '"myschema"', or MetaData(schema=quoted_name(..., True)) as shown in the section above.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file snowflake_sqlalchemy-1.10.0.tar.gz.
File metadata
- Download URL: snowflake_sqlalchemy-1.10.0.tar.gz
- Upload date:
- Size: 200.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
70d0caf62ed429e080103211bfa7f221414d7ed7a08caa4c455538159e9b4520
|
|
| MD5 |
2930da9ca7251c4ee053e9672d06a1bc
|
|
| BLAKE2b-256 |
29b87b7f235a05861c8e197b896be39b3f82f93047e46b774379ae17e24ed753
|
File details
Details for the file snowflake_sqlalchemy-1.10.0-py3-none-any.whl.
File metadata
- Download URL: snowflake_sqlalchemy-1.10.0-py3-none-any.whl
- Upload date:
- Size: 102.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e5c1c62d2a203b93beb4a813f6d07f2ea03f2151af90314037b7f4827734cdb0
|
|
| MD5 |
cd9fa9bab54d427ed468aec06c220247
|
|
| BLAKE2b-256 |
68a5885ba93a9c312082a7a5c71037e1497e6a262c753e85db65b9a6909117ea
|