Skip to main content

A package for database session management using sqlalchemy and snowpark libraries

Project description

SQLAlchemy-Snowpark Session Management Module

This pip module simplifies the management of database sessions using SQLAlchemy and Snowpark. It allows users to create a session with Snowflake, PostgreSQL, and other supported databases using a connection string or environment variables.

Overview

This README.md file provides comprehensive instructions for installing, setting up, and using the Sqlalchemy-Snowpark module, ensuring users can easily establish connections to their data warehouses and use inbuilt methods to query the datawarehouse and manage results.


Installation

You can install the module using pip:

pip install sqlalchemy-snowpark

Sqlalchemy - Snowpark

Snowflake

This Python module helps establish a database session to Snowflake using SQLAlchemy or using Snowpark python connector. It supports creating connections via a provided connection string or by using environment variables for credentials.

Requires DB_SOURCE, USERNAME, HOST, PASSWORD, ROLE, WAREHOUSE, and DATABASE.

If you want to create a session using SQLAlchemy then set the following environment variables

export DB_ENGINE=sqlalchemy

and if you want to create a Snowpark session the set the following environment variables

export DB_ENGINE=snowpark

1. Create DB Session Using a Connection String

If you have a connection string, you can create a session like this:

from sqlalchemy_snowpark.connection import get_db_session

connection_string = "snowflake://user:password@account/database/schema?warehouse=warehouse&role=role"
session = get_db_session(snowflake_creds)
session.close()

2. Create DB Session Using environment variables

Environment Variables The following environment variables are required if no connection string is provided:

Note : In case of snowpark session ( DB_ENGINE=snowpark ), only this option will work.

export SNOWFLAKE_USER={snowflake_username}
export SNOWFLAKE_PASSWORD={snowflake_password}
export SNOWFLAKE_ACCOUNT={snowflake_account}
export SNOWFLAKE_DATABASE={snowflake_database}
export SNOWFLAKE_SCHEMA={snowflake_schema}
export SNOWFLAKE_WAREHOUSE={snowflake_warehouse}
export SNOWFLAKE_ROLE={snowflake_role}
export SNOWFLAKE_HOST={snowflake_host}
from sqlalchemy_snowpark.connection import get_db_session

session = get_db_session()

Whitelisting

If network policy is activated in the snowflake account and incoming ips are not allowed or restricted then need to whitelist our StepFunction IP :

Please follow the below steps for the same :

  1. Navigate to the Admin->Security section by clicking on "Admin" in the left navigation panel

  2. Switch to Network Rules. Create a new rule by clicking on + Network Rule button a. Name: SFN_RULE b. Choose Type: IPv4 and Mode: Ingress c. Under Identifiers -> Add IP 18.210.244.167

  3. Switch to Network Policy. Create a new policy by clicking on + Network Policy button a. Name: SFN_POLICY b. Under Allowed Section & Under Select Rule Dropdown select SFN_RULE then click on Create button to create the policy. c. Click on the dotted icon(...) at the end of the policy name and click Activate to start the policy.

  4. Navigate back to the worksheet and replace placeholder with the StepFunctions public IP address.

    ALTER NETWORK POLICY SFN_POLICY SET ALLOWED_IP_LIST=('18.210.244.167')

Redshift

Requires USERNAME, HOST, PASSWORD, and DATABASE.

1. Create DB Session Using a Connection String

### Direct Connection (Redshift in Public Subnet)
from sqlalchemy_snowpark.connector import get_db_session
from sqlalchemy.engine.url import URL

# Define the connection parameters
redshift_connection_string = URL.create(
    drivername="redshift+redshift_connector",  # The driver to use
    username="your_username",  # Your Redshift username
    password="your_password",  # Your Redshift password
    host="your_redshift_cluster_host",  # Redshift cluster endpoint
    port=5439,  # Default port for Redshift
    database="your_database_name",  # The name of your Redshift database
    query={"sslmode": "verify-ca"}  # Optional: to ensure the connection is encrypted
)

session = get_db_session(redshift_connection_string)
session.close()

2. Create DB Session Using Environment Variables

Environment Variables The following environment variables are required if no connection string is provided:

export REDSHIFT_USERNAME={redshift_username}
export REDSHIFT_PASSWORD={redshift_password}
export REDSHIFT_HOST={redshift_host}
export REDSHIFT_DATABASE={redshift_database}
from sqlalchemy_snowpark.connection import get_db_session

session = get_db_session()

PostgreSQL

Requires USERNAME, HOST, PASSWORD, and DATABASE.

1. Create DB Session Using a Connection String

from sqlalchemy_snowpark.connection import get_db_session

postgresql_connection_string = f"postgresql+psycopg2://{username}:{password}@{host}:5432/{database}"
session = get_session(postgresql_connection_string)
session.close()

2. Create DB Session Using Environment Variables

Environment Variables The following environment variables are required if no connection string is provided:

export POSTGRESQL_USERNAME={postgresql_username}
export POSTGRESQL_PASSWORD={postgresql_password}
export POSTGRESQL_HOST={postgresql_host}
export POSTGRESQL_DATABASE={postgresql_database}
from sqlalchemy_snowpark.connection import get_db_session

session = get_db_session()

Handling Connection

Once the session is established, you can interact with your data warehouse using most of the SQLAlchemy's ORM capabilities.


ORM Capabilities

Methods Overview

  1. fetchone(get_obj=False) This method is used to fetch one record. If get_obj=True, it will return response in dictionary_format otherwise plain result will be returned.

Usage

result = db_session.execute(query).fetchone() 
# OR
result db_session.execute(query).fetchone(get_obj=True)
  1. fetchmany(count, get_obj=False) This method is used to fetch multiple records at a time . If get_obj=True, it will return response in dictionary_format otherwise plain result will be returned.

Usage

result = db_session.execute(query).fetchmany(count=10, get_obj=False) 
# OR
result db_session.execute(query).fetchone(count=10, get_obj=True)
  1. fetchall(get_obj=False) This method is used to fetch all records at a time . If get_obj=True, it will return response in dictionary_format otherwise plain result will be returned.

Usage

result = db_session.execute(query).fetchall() 
# OR
result db_session.execute(query).fetchall(get_obj=True)
  1. mappings_one() This method is used to fetch one record in dictionary format.

Usage

result = db_session.execute(query).mappings_one() 
  1. mappings_all() This method is used to fetch all records in dictionary format.

Usage

result = db_session.execute(query).mappings_all() 
  1. close() This method is used to close the db session. Usage
db_session.close()
  1. execute(query) This method is used to execute db query.

Usage

db_session.execute({query}).fetchone()

8.commit() Commits the transaction

Usage

db_session.commit()
  1. rollback() Rolls back the transaction.

Usage

db_session.rollback()
  1. add(model_class, data) Adds a new record to the database.

Note: Model class represents the sqlachemy model class Usage

db_session.add(model_class={model_class}, data={data_in_dict_format})
  1. create_table(cls, checkfirst=True) Creates a table based on the model class.

Usage

db_session.create_table(cls=model_class, checkfirst=True)
  1. query(model_class, filter: dict = dict(), filter_by: dict = dict(), order_by: list = [], fields: list = [], limit: int = None) This method is used to run query using a model class just like we do in sqlachemy ` Executes a SQL query on the Snowflake database using the provided parameters.

Parameters:

  • schema (str): The name of the database schema.
  • table (str): The name of the table to query.
  • filter (dict): A dictionary containing a single filter condition.
    • Ex: {"column_1": "='value1'"}
    • Note: Both filter and filter_by cannot be defined together.
  • filter_by (dict): A dictionary containing the filter conditions. The dictionary should have only one key-value pair. The key should be the logical operator and the value should be one of following;
    • i) when only one logical operator is defined, value should be a dictionary

      • Ex: { "and": { "column_1": "='value1'", "column_1": "!='value1'", "column_1": " in ('value', 'value')", } }
    • ii) when multiple operators are defined, value should be a list of a single dictionary, where each key denotes another logical operator.

      • Ex: { "and": [ { "and": { "column_1": "='value1'", "column_1": "!='value1'", "column_1": " in ('value', 'value')", }, "or": { "column_1": "='value1'", "column_1": "!='value1'", "column_1": " in ('value', 'value')", }, } ] }
  • fields (list, optional): A list of column names to select. If not provided, all columns will be selected.
  • limit (int, optional): The maximum number of rows to return. If not provided, all rows will be returned. `

Usage

filter = {
    "filter_1": f"='{filter_1_value}'",
    "filter_2": f"='{filter_2_value}'",
    "filter_3": f"!='{filter_3_value}'",
    "filter_4": f"in ({filter_4_value}, {filter_5_value})",
}
result = db_session.query(
    model_class=ModelClassName,
    fields=["field_name"],
    filter=filter,
    limit=10,
    offset=10

).fetchone()
  1. update(model_class, data: dict, filter=None, filter_by=None) This method is used to update the data in table using model class """Updates records in the database table based on the provided filter conditions."""

Usage

update_filter = {
    "filter_1": f"='{filter_1_value}'",
    "filter_2": f"='{filter_2_value}'",
}
update_record = dict()
update_record["column_1"] = "value_1"
self.db_session.update(
    model_class=ModelClassName,
    data=update_record,
    filter=update_filter,
)
  1. delete(model_class, filter=None, filter_by=None) """Deletes records from the database table based on the provided filter conditions."""

Usage

db_session.delete(
    model_class=ModelClassName,
    filter={"column_1": f"='{value_1}'"},
)

Troubleshooting

Common Issues

  • Invalid Credentials: Ensure that the USERNAME and PASSWORD are correct.
  • Host Unreachable: Verify the HOST address and network connectivity.
  • Unsupported Data Source: Check if the DB_SOURCE is among the supported ones (snowflake, redshift, postgresql).

Error Handling

The get_db_session method prints exceptions to help identify issues during the connection process. Ensure that the provided connection details are accurate and the data warehouse is accessible.


Conclusion

This module simplifies the process of connecting to various data warehouses. Follow the setup instructions carefully, and refer to the examples for guidance on using the get_session function. For further assistance, check the documentation or raise an issue on the project's GitHub repository.


Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sqlalchemy_snowpark-1.4.tar.gz (17.3 kB view details)

Uploaded Source

Built Distribution

sqlalchemy_snowpark-1.4-py3-none-any.whl (17.7 kB view details)

Uploaded Python 3

File details

Details for the file sqlalchemy_snowpark-1.4.tar.gz.

File metadata

  • Download URL: sqlalchemy_snowpark-1.4.tar.gz
  • Upload date:
  • Size: 17.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.8.19

File hashes

Hashes for sqlalchemy_snowpark-1.4.tar.gz
Algorithm Hash digest
SHA256 5891612c088ce99ac2748c8a779d32be1d4d34096acf03d7731275e3b16418a2
MD5 612abaf886ba8f1f73cfca05612e0b8a
BLAKE2b-256 edf866d14269bab053d962114c008fe26d9ee059c473af90166b2a7d6f96598f

See more details on using hashes here.

File details

Details for the file sqlalchemy_snowpark-1.4-py3-none-any.whl.

File metadata

File hashes

Hashes for sqlalchemy_snowpark-1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 5787df52fa0349a7a9adb96eba2cbb41cbb5cfc2bc13e92c4d634099f2da6464
MD5 a863fcdc5615d86ce7fd6dbe8a14264f
BLAKE2b-256 587e3336c92a64c5e3b91729b2c562de6d7154d3fc5196b62a73dc724793f77d

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page