Skip to main content

A package for database session management using sqlalchemy and snowpark libraries

Project description

SQLAlchemy-Snowpark Session Management Module

This pip module simplifies the management of database sessions using SQLAlchemy and Snowpark. It allows users to create a session with Snowflake, PostgreSQL, and other supported databases using a connection string or environment variables.

Overview

This README.md file provides comprehensive instructions for installing, setting up, and using the Sqlalchemy-Snowpark module, ensuring users can easily establish connections to their data warehouses and use inbuilt methods to query the datawarehouse and manage results.


Installation

You can install the module using pip:

pip install sqlalchemy-snowpark

Sqlalchemy - Snowpark

Snowflake

This Python module helps establish a database session to Snowflake using SQLAlchemy or using Snowpark python connector. It supports creating connections via a provided connection string or by using environment variables for credentials.

Requires DB_SOURCE, USERNAME, HOST, PASSWORD, ROLE, WAREHOUSE, and DATABASE.

If you want to create a session using SQLAlchemy then set the following environment variables

export DB_ENGINE=sqlalchemy

and if you want to create a Snowpark session the set the following environment variables

export DB_ENGINE=snowpark

1. Create DB Session Using a Connection String

If you have a connection string, you can create a session like this:

from sqlalchemy_snowpark.connection import get_db_session

connection_string = "snowflake://user:password@account/database/schema?warehouse=warehouse&role=role"
session = get_db_session(snowflake_creds)
session.close()

2. Create DB Session Using environment variables

Environment Variables The following environment variables are required if no connection string is provided:

export user={snowflake_username}
export password={snowflake_password}
export account={snowflake_account}
export database={snowflake_database}
export schema={snowflake_schema}
export warehouse={snowflake_warehouse}
export role={snowflake_role}
from sqlalchemy_snowpark.connection import get_db_session

session = get_db_session()

Whitelisting

If network policy is activated in the snowflake account and incoming ips are not allowed or restricted then need to whitelist our StepFunction IP :

Please follow the below steps for the same :

  1. Navigate to the Admin->Security section by clicking on "Admin" in the left navigation panel

  2. Switch to Network Rules. Create a new rule by clicking on + Network Rule button a. Name: SFN_RULE b. Choose Type: IPv4 and Mode: Ingress c. Under Identifiers -> Add IP 18.210.244.167

  3. Switch to Network Policy. Create a new policy by clicking on + Network Policy button a. Name: SFN_POLICY b. Under Allowed Section & Under Select Rule Dropdown select SFN_RULE then click on Create button to create the policy. c. Click on the dotted icon(...) at the end of the policy name and click Activate to start the policy.

  4. Navigate back to the worksheet and replace placeholder with the StepFunctions public IP address.

    ALTER NETWORK POLICY SFN_POLICY SET ALLOWED_IP_LIST=('18.210.244.167')

Redshift

Requires USERNAME, HOST, PASSWORD, and DATABASE.

1. Create DB Session Using a Connection String

### Direct Connection (Redshift in Public Subnet)
from sqlalchemy_snowpark.connector import get_db_session
from sqlalchemy.engine.url import URL

# Define the connection parameters
redshift_connection_string = URL.create(
    drivername="redshift+redshift_connector",  # The driver to use
    username="your_username",  # Your Redshift username
    password="your_password",  # Your Redshift password
    host="your_redshift_cluster_host",  # Redshift cluster endpoint
    port=5439,  # Default port for Redshift
    database="your_database_name",  # The name of your Redshift database
    query={"sslmode": "verify-ca"}  # Optional: to ensure the connection is encrypted
)

session = get_db_session(redshift_connection_string)
session.close()

2. Create DB Session Using Environment Variables

Environment Variables The following environment variables are required if no connection string is provided:

export user={redshift_username}
export password={redshift_password}
export host={redshift_host}
export database={redshift_database}
from sqlalchemy_snowpark.connection import get_db_session

session = get_db_session()

PostgreSQL

Requires USERNAME, HOST, PASSWORD, and DATABASE.

1. Create DB Session Using a Connection String

from sqlalchemy_snowpark.connection import get_db_session

postgresql_connection_string = f"postgresql+psycopg2://{username}:{password}@{host}:5432/{database}"
session = get_session(postgresql_connection_string)
session.close()

2. Create DB Session Using Environment Variables

Environment Variables The following environment variables are required if no connection string is provided:

export user={postgresql_username}
export password={postgresql_password}
export host={postgresql_host}
export database={postgresql_database}
from sqlalchemy_snowpark.connection import get_db_session

session = get_db_session()

Handling Connection

Once the session is established, you can interact with your data warehouse using most of the SQLAlchemy's ORM capabilities.


ORM Capabilities

Methods Overview

  1. fetchone(get_obj=False) This method is used to fetch one record. If get_obj=True, it will return response in dictionary_format otherwise plain result will be returned.

Usage

result = db_session.execute(query).fetchone() 
# OR
result db_session.execute(query).fetchone(get_obj=True)
  1. fetchmany(count, get_obj=False) This method is used to fetch multiple records at a time . If get_obj=True, it will return response in dictionary_format otherwise plain result will be returned.

Usage

result = db_session.execute(query).fetchmany(count=10, get_obj=False) 
# OR
result db_session.execute(query).fetchone(count=10, get_obj=True)
  1. fetchall(get_obj=False) This method is used to fetch all records at a time . If get_obj=True, it will return response in dictionary_format otherwise plain result will be returned.

Usage

result = db_session.execute(query).fetchall() 
# OR
result db_session.execute(query).fetchall(get_obj=True)
  1. mappings_one() This method is used to fetch one record in dictionary format.

Usage

result = db_session.execute(query).mappings_one() 
  1. mappings_all() This method is used to fetch all records in dictionary format.

Usage

result = db_session.execute(query).mappings_all() 
  1. close() This method is used to close the db session. Usage
db_session.close()
  1. execute(query) This method is used to execute db query.

Usage

db_session.execute({query}).fetchone()

8.commit() Commits the transaction

Usage

db_session.commit()
  1. rollback() Rolls back the transaction.

Usage

db_session.rollback()
  1. add(model_class, data) Adds a new record to the database.

Note: Model class represents the sqlachemy model class Usage

db_session.add(model_class={model_class}, data={data_in_dict_format})
  1. create_table(cls, checkfirst=True) Creates a table based on the model class.

Usage

db_session.create_table(cls=model_class, checkfirst=True)
  1. query(model_class, filter: dict = dict(), filter_by: dict = dict(), order_by: list = [], fields: list = [], limit: int = None) This method is used to run query using a model class just like we do in sqlachemy ` Executes a SQL query on the Snowflake database using the provided parameters.

Parameters:

  • schema (str): The name of the database schema.
  • table (str): The name of the table to query.
  • filter (dict): A dictionary containing a single filter condition.
    • Ex: {"column_1": "='value1'"}
    • Note: Both filter and filter_by cannot be defined together.
  • filter_by (dict): A dictionary containing the filter conditions. The dictionary should have only one key-value pair. The key should be the logical operator and the value should be one of following;
    • i) when only one logical operator is defined, value should be a dictionary

      • Ex: { "and": { "column_1": "='value1'", "column_1": "!='value1'", "column_1": " in ('value', 'value')", } }
    • ii) when multiple operators are defined, value should be a list of a single dictionary, where each key denotes another logical operator.

      • Ex: { "and": [ { "and": { "column_1": "='value1'", "column_1": "!='value1'", "column_1": " in ('value', 'value')", }, "or": { "column_1": "='value1'", "column_1": "!='value1'", "column_1": " in ('value', 'value')", }, } ] }
  • fields (list, optional): A list of column names to select. If not provided, all columns will be selected.
  • limit (int, optional): The maximum number of rows to return. If not provided, all rows will be returned. `

Usage

filter = {
    "filter_1": f"='{filter_1_value}'",
    "filter_2": f"='{filter_2_value}'",
    "filter_3": f"!='{filter_3_value}'",
    "filter_4": f"in ({filter_4_value}, {filter_5_value})",
}
result = db_session.query(
    model_class=ModelClassName,
    fields=["field_name"],
    filter=filter,
    limit=10,
    offset=10

).fetchone()
  1. update(model_class, data: dict, filter=None, filter_by=None) This method is used to update the data in table using model class """Updates records in the database table based on the provided filter conditions."""

Usage

update_filter = {
    "filter_1": f"='{filter_1_value}'",
    "filter_2": f"='{filter_2_value}'",
}
update_record = dict()
update_record["column_1"] = "value_1"
self.db_session.update(
    model_class=ModelClassName,
    data=update_record,
    filter=update_filter,
)
  1. delete(model_class, filter=None, filter_by=None) """Deletes records from the database table based on the provided filter conditions."""

Usage

db_session.delete(
    model_class=ModelClassName,
    filter={"column_1": f"='{value_1}'"},
)

Troubleshooting

Common Issues

  • Invalid Credentials: Ensure that the USERNAME and PASSWORD are correct.
  • Host Unreachable: Verify the HOST address and network connectivity.
  • Unsupported Data Source: Check if the DB_SOURCE is among the supported ones (snowflake, redshift, postgresql).

Error Handling

The get_db_session method prints exceptions to help identify issues during the connection process. Ensure that the provided connection details are accurate and the data warehouse is accessible.


Conclusion

This module simplifies the process of connecting to various data warehouses. Follow the setup instructions carefully, and refer to the examples for guidance on using the get_session function. For further assistance, check the documentation or raise an issue on the project's GitHub repository.


Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sqlalchemy_snowpark-0.6.tar.gz (16.4 kB view details)

Uploaded Source

Built Distribution

sqlalchemy_snowpark-0.6-py3-none-any.whl (16.8 kB view details)

Uploaded Python 3

File details

Details for the file sqlalchemy_snowpark-0.6.tar.gz.

File metadata

  • Download URL: sqlalchemy_snowpark-0.6.tar.gz
  • Upload date:
  • Size: 16.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.8.19

File hashes

Hashes for sqlalchemy_snowpark-0.6.tar.gz
Algorithm Hash digest
SHA256 0b4f94527b29162d695b33a3b770c9dab1a54a2c8b0d0eecc8d4c7d9d507d6be
MD5 4ea9c850335314d31bc226cf60329540
BLAKE2b-256 e698623434d4f8e3f9bea728034e29c88cca9d36deccc9876237575629cf3b36

See more details on using hashes here.

File details

Details for the file sqlalchemy_snowpark-0.6-py3-none-any.whl.

File metadata

File hashes

Hashes for sqlalchemy_snowpark-0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 cd6ddd25a305522864401c591890fca566605c2eb1b58830ff8514bfe23a32aa
MD5 2c3520a3aaaa11e6f71f292d63abff13
BLAKE2b-256 d076e89a279a2e5a6dc0f9c39889f7f1be51929fa0a70656e79c44680f47d161

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page