Python utilities for connection to the Snowflake data warehouse

# SnowConn

This repository is a wrapper around the snowflake SQLAlchemy library. It manages the creation of connections and provides a few convenience functions that should be good enough to cover most use cases yet be flexible enough to allow additional wrappers to be written around to serve more specific use cases for different teams.

## Installation

pip install snowconn


pip install 'git+ssh://git@github.com/Daltix/SnowConn.git@master#egg=snowconn'


If you want to use pandas functionality (read/write from/to pandas dataframes) you can install as follows:

pip install snowconn[pandas]


If you want to enable SSO authentication you can install as follows:

pip install snowconn[storage]


If you want to install all functionality (AWS secrets manager connection, SSO, pandas) you can install as follows:

pip install snowconn[all]


## Connection

Everything is implemented in a single SnowConn class. To import it is always the same:

from snowconn import SnowConn


### (1) Connection using your own personal creds

Install snowsql and configure ~/.snowsql/config as per the instructions

You can test that it is correctly installed by then executing snowsql from the command line.

WARNING Be sure to configure your account name like the following:

accountname = ACCOUNT_ID.REGION


(example accountname = eq90000.eu-west-1)

If you don't include the region part (eu-west-1 in the example above), it will hang for about a minute and then give you a permission denied.

Now that you are able to execute snowsql to successfully connect, you are ready to use the SnowConn.connect function:

with SnowConn.connect() as conn:


That's it you are connected! You can connect to a specific schema / database with the following:

with SnowConn.connect('daltix_database', 'public') as conn:


** NOTE: Connect using SSO ** If you are using SSO (Okta or others), you need to update your .snowsqlk/config with the following modifications:

• Include an "authenticator" line, see here for possible values and their meaning).

### (2) Connection using AWS Secrets Manager

You need to have boto3 installed which you can do so with the following:

pip install boto3


Now you must satisfy the folloing requirements:

1. Have a secret stored in an accessable aws account
2. The secret must have the following keys:
• USERNAME
• PASSWORD
• ACCOUNT
• ROLE

For this example, we will assume the price_plotter is the secret manager that we will be using.

Now that you know the name of the secret, you MUST be sure that the context in which it is running has access to read that secret. Once this is done, you can now execute the following code:

with SnowConn.connect(methods=['secretsmanager'], credsman_name='price_plotter') as conn:


Alternatively you can use the specific connect_secretsmanager method:

with SnowConn.connect_secretsmanager('price_plotter') as conn:


And you are connected! You can also pass the database and schema along

with SnowConn.connect_secretsmanager('price_plotter', 'daltix', 'public') as conn:


An example of a policy that gives access to the price_plotter looks like this:

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"secretsmanager:GetResourcePolicy",
"secretsmanager:GetSecretValue",
"secretsmanager:DescribeSecret",
"secretsmanager:ListSecretVersionIds"
],
}
]
}


And an example of this in a serverless.yml looks like this:

iamRoleStatements:
- Effect: Allow
Action:
- secretsmanager:DescribeSecret
- secretsmanager:List*
Resource:
- "*"
- Effect: Allow
Action:
- secretsmanager:*
Resource:
- { Fn::Sub: "arn:aws:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:price_plotter-??????" }


## API

Now that you're connected, there are a few low-level functions that you can use to programatically interact with the snowflake tables that you have access to.

The rest of these examples assume that you have used one of the above methods to connect and have access to the daltix.public.price table.

### Creating a connection

Creating a connection is very easy (see examples above for connection options):

with SnowConn.connect() as conn:


You can also create connections manually without using a context (this is not recommended, see Known Issues section below), make sure to close the connection after you are done:

conn = SnowConn.connect()
conn.close() # close the connection when done


### execute_simple

The exc_simple function is used for when you have a single statement to execute and the result set can fit into memory. It takes a single argument which a string of the SQL statement that you with to execute. Take the following for example:

>>> conn.execute_simple('select * from price limit 1;')
[{'DALTIX_ID': '0d3c30353035a6ab5747237a1f2600bbf5ddd27401372c5effe0f2790a88ad56', 'SHOP': 'ahed', 'COUNTRY': 'de', 'PRODUCT_ID': '616846.0', 'LOCATION': 'base', 'PRICE': 37.99, 'PROMO_PRICE': None, 'PRICE_STD': None, 'PROMO_PRICE_STD': None, 'UNIT': None, 'UNIT_STD': None, 'IS_MAIN': True, 'VENDOR': None, 'VENDOR_STD': None, 'DOWNLOADED_ON': datetime.datetime(2018, 11, 18, 0, 0, 1), 'DOWNLOADED_ON_LOCAL': datetime.datetime(2018, 11, 18, 1, 0, 1), 'DOWNLOADED_ON_DATE': datetime.date(2018, 11, 18), 'IS_LATEST_PRICE': False}]


### execute_string

If you have multiple sql statements in a single string that you want to execute or the resultset is larger than will fit into memory, this is the function that you want to use. It returns a list of cursors that are a result of each of the statements that are contained in the string. See here for the full documentation.

>>> conn.execute_string('create temporary table price_small as (select * from price limit 1); select * from price_small;')
[<snowflake.connector.cursor.SnowflakeCursor object at 0x10f537898>, <snowflake.connector.cursor.SnowflakeCursor object at 0x10f52c588>]


### execute_file

If you have the contents of an sql file that you want to execute, you can use this function. For example:

echo "select * from price limit 1;" > query.sql

>>> conn.execute_file('query.sql')
>>> [<snowflake.connector.cursor.SnowflakeCursor object at 0x1188d6390>]


This also returns a list of cursors the same as execute_string does. In fact, this function is nothing more than a very simple wrapper around execute_string.

Use this function to read the results of a query into a dataframe. Note that pandas is NOT a dependency of this repo so if you want to use it you must satisfy this dependency yourself.

It takes one sql string as an argument and returns a dataframe.

>>> conn.read_df('select daltix_id, downloaded_on, price from price limit 5;')
0  0d3c30353035a6ab5747237a1f2600bbf5ddd27401372c 2018-11-18 00:00:01  37.99
3  807e2a7706b8c515264fa55bed3891d5685ac5ee0148f0 2018-11-18 00:00:04   3.70
>>>


### write_df

Use this to write a dataframe to Snowflake. This is a very thin wrapper around the pandas DataFrame.to_sql() function.

Unfortunately, it doesn't play nice with dictionaries and arrays so the use cases are quite limited. Hopefully we will improve upon this in the future.

### get_current_role

Returns the current role.

### close

Use this to cleanly close all connections that have ever been associated with this instance of SnowConn. If you don't use this your process will hang for a while without saying anything before it actually exits.

## Accessing the connection objects directly

These functions are mostly wrappers around 2 connection libraries:

Should you need to use either of these yourself, you can ask for the connections yourself with the following functions:

### get_raw_connection

This will return the instance of a snowflake connector which is documented here. It is a good choice if you have very simple needs and for some reason none of the functions in the rest of this repo are serving your needs.

### get_alchemy_engine

This is the result of create_engine() which was called during connect() . It does not represent an active connection to the database but rather acts as a factory for connections.

This is useful for using the most commonly abstracted things in other libraries such as dashboards, pandas, etc. However, like SQLAlchemy in general, despite being very widely supported and feature-complete, it is not the simplest API so it should probably not be your first choice unless you know exactly that you need it.

### get_connection

This returns the result of the creating the sqlalchemy engine and then calling connect() on it. Unlike the result of get_alchemy_engine this represents an active connection to Snowflake and this has a session associated with it.

You can see the object documentation here

## Known issues

There is a bug with snowflake-connector which causes some connections to Snowflake to not close properly in certain circumstances. This can cause timeout errors.

You can handle this in two ways: the first is to wrap usage of the connection in a try/finally block to ensure the connection is explicitly closed, like this:

from snowconn import SnowConn
conn = SnowConn.connect(...)
try:
result = execute_string(query) # or result = read_df(query), etc
finally:
conn.close()


The second way is to use SnowConn with the with syntax, as follows:

with SnowConn.connect() as conn:


## Project details

Uploaded source