Skip to main content

Update, Upsert, and Merge from Python dataframes to SQL Server and Azure SQL database.

Project description

mssql_dataframe

Tests Status Coverage Status PyPI

Provides efficient mechanisms for updating and merging data into Transact-SQL tables from Python dataframes. This is accomplished by utilizing the fast_executemany feature of pyodbc to quickly insert into a source SQL temporary table, and then updating/merging into a target SQL table from that temporary table.

In practice this module may be useful for updating models, web scraping, or general data engineering tasks.

Quick sample (continue reading for full examples):

# update SQL table using dataframe's index and the SQL primary key
write.update('SomeTable', dataframe[['ColumnA']])
# update SQL table using other columns
write.update('SomeTable', dataframe[['ColumnA','ColumnB','ColumnC']], match_columns=['ColumnB','ColumnC'])
# merge (insert,update,delete) into SQL table using dataframe's index and the SQL primary key
sql.write.merge('SomeTable', dataframe[['ColumnA','ColumnB']])
# merge into SQL table using other columns
sql.write.merge('SomeTable', dataframe[['ColumnA','ColumnB','ColumnC']], match_columns=['ColumnC'])
# upsert (if exists update, otherwise insert)
sql.write.merge('SomeTable', dataframe[['ColumnA']], delete_unmatched=False)

Open in Visual Studio Code

Dependancies

pandas: The Python DataFrame.

pyodbc: The ODBC driver used for Transact-SQL statements.

Installation

pip install mssql-dataframe

Core Functionality

Initialization

Connect to an on-premise database using pyodbc. Connection to an Azure SQL database is also possible by passing a server_name in the format server_name='<server>.database.windows.net'along with a username and password.

If adjust_sql_objects=True (default is False):

  1. columns will be created if they do not exist
  2. column size will increase if needed, for example from TINYINT to INT
  3. an SQL table will be created if it does not exist
import time
import pandas as pd

from mssql_dataframe.connect import connect
from mssql_dataframe.collection import SQLServer

# # connect to database using pyodbc
db = connect(database_name='master', server_name='localhost')
# # initialize the main package
sql = SQLServer(db, adjust_sql_objects=True)

Creating SQL Tables

SQL tables can be simply created from a dataframe object.

Note:

  1. a "best fit" SQL data type is determined in SQL
  2. primary_key='index' creates an SQL primary key based on the dataframe's index

Here a global temporary is created, but in practice a user table would be created so it persists in the database after the database connection is closed.

# create a large sample dataframe of 100,000 records
df = pd.DataFrame({
    'ColumnA': list(range(0,100000,1)),
    'ColumnB': [0]*100000 
})
df.index.name = 'PK_Column'
# create the table with the index as the SQL primary key
sql.create.table_from_dataframe(table_name='##sample_update', dataframe=df,
    primary_key='index'
)

Insert from Dataframe

100,000 records are inserted in approximately 2 seconds in a localhost database.

Note:

  1. since include_timestamps=True a new column named _time_insert is created automatically.
time_start = time.time()
sql.write.insert(table_name='##sample_update', dataframe=df, include_timestamps=True)
print('Inserted {} records in {} seconds'.format(len(df), round(time.time()-time_start,2)))

Reading into Dataframe

Reading data from an SQL table into a dataframe is straight-forward and allows for a limit, order, and where conditions.

Note:

  1. SQL primary key column "PK_Column" has been placed as the dataframe's index.
result = sql.read.select('##sample_update', limit=5)
result

Update from Dataframe

100,000 records are updated in approximately 3 seconds in a localhost database.

Note:

  1. a new _time_update column is created since include_timestamps=True
  2. ColumnC is created with data type VARCHAR(4) since that is ColumnC's max length
  3. ColumnB is changed from data type TINYINT to INT to fit the updated values
  4. since match_columns=None, the SQL primary key / dataframe index is used to perform the update
# update a dataframe column
df['ColumnB'] = list(range(0,100000,1))
# create a new dataframe column
df['ColumnC'] = 'aaaa'
time_start = time.time()
# update records in the target SQL table
sql.write.update('##sample_update', df[['ColumnB','ColumnC']], match_columns=None)
print('Updated {} records in {} seconds'.format(len(df), round(time.time()-time_start,2)))

Any size dataframe can be used to update matching records in SQL. Here match_columns is specified instead of using the dataframe's index/SQL primary key.

# update ColumnA to -1 where ColumnB=0
df_small = pd.DataFrame({'ColumnB': [0], 'ColumnA': [-1]})
time_start = time.time()
# update the target table using columns
sql.write.update('##sample_update', df_small[['ColumnB','ColumnA']],
    match_columns = ['ColumnB']
)
print('Updated ? records in {} seconds'.format(round(time.time()-time_start,2)))

Merge from Dataframe

Merging inserts, updates, and/or deletes records in a target SQL table depending on how records are matched. This uses the T-SQL MERGE statement. This also can cover an UPSERT type action (update if exists, otherwise insert).

First, create a sample table to merge into.

df_merge = pd.DataFrame({
    'State': ['A','B'],
    'ColumnA': [1,2],
    'ColumnB': ['a','b']
}, index=[0,1])
df_merge.index.name = 'PK_Column'
sql.create.table_from_dataframe(table_name='##sample_merge', dataframe=df_merge,
    primary_key='index'
)
sql.write.insert(table_name='##sample_merge', dataframe=df_merge, include_timestamps=True)
result = sql.read.select('##sample_merge', limit=5)
result

Similate dataframe records that have been deleted/updated/added before merging them into the target table.

df_source = df_merge.copy()
# a deleted record
df_source = df_source[df_source.index!=0]
# an updated record
df_source.loc[1,'ColumnA'] = 3
# a new record
df_append = pd.DataFrame({'State': ['C'], 'ColumnA': [6], 'ColumnB': ['d']}, index=[2])
df_append.index.name = 'PK_Column'
df_source = df_source.append(df_append)

Performing the merge, note:

  1. a new _time_update column since include_timestamps=True
  2. the record where State=A has been deleted
  3. the record where State=B has been updated
  4. a new record has been inserted for State=C
sql.write.merge(table_name='##sample_merge', dataframe=df_source, 
    match_columns=['PK_Column','State'], include_timestamps=True
)
result = sql.read.select('##sample_merge', limit=5)
result

It's possible to specify additional critera for record deletion. This is useful in case records are incrementally being merged from a dataframe. Use cases for incremental loading may be long running web scraping or model training tasks.

Note:

  1. a match on State is also required for a record to be deleted
  2. the record where State=A remains in the table since the delete condition was not met
  3. one of the records for State=B is updated (since _pk=1 is in the dataframe)
  4. one of the records for State=B is deleted (since _pk=2 is not in the dataframe)
  5. a record for State=C has been inserted
# create a sample table and insert sample records
df_condition = pd.DataFrame({
    'State': ['A','B','B'],
    'ColumnA': [3,4,4],
    'ColumnB': ['a','b','b']
}, index=[0,1,2])
df_condition.index.name='_pk'
sql.create.table_from_dataframe("##sample_merge_delete_condition", df_condition, 
    primary_key='index'
)
sql.write.insert("##sample_merge_delete_condition", df_condition, include_timestamps=True)

# simulate deleted records
df_condition = df_condition[df_condition.index==1]
# simulate updated records
df_condition.loc[df_condition.index==1,'ColumnA'] = 5
df_condition.loc[df_condition.index==1,'ColumnB'] = 'c'
# simulate new record
df_condition = df_condition.append(
    pd.DataFrame({'State':['C'],'ColumnA':[6],'ColumnB':['d']},index=[3])
)
df_condition.index.name = '_pk'

# perform merge
sql.write.merge('##sample_merge_delete_condition', df_condition, match_columns=['_pk'], 
    delete_conditions=['State']
)
result = sql.read.select('##sample_merge_delete_condition', limit=5)
result

Performing an upsert action (if exists update, otherwise insert) is possible by passing in the parameter delete_unmatched=False.

Note:

  1. the record where State=A remains after the merge
# create a sample table and insert sample records
df_upsert = pd.DataFrame({
    'ColumnA': [3,4]
})
sql.create.table_from_dataframe("##sample_upsert", df_upsert, primary_key='index')
sql.write.insert("##sample_upsert", df_upsert, include_timestamps=False)

# simulate a deleted record
df_upsert = df_upsert[df_upsert.index!=0]
# simulate an updated record
df_upsert.loc[df_upsert.index==1,'ColumnA'] = 5
# simulate a new record
df_upsert = df_upsert.append(pd.Series([6], index=['ColumnA'], name=2))

# perform the merge
sql.write.merge('##sample_upsert', df_upsert, delete_unmatched=False)
result = sql.read.select('##sample_merge_delete_condition', limit=5)
result

Additional Functionality

Altering SQL Objects Manually

If this module creates/alters an SQL object in an undesired manner, the underlying modify methods can be manually used to correct them.

Note:

  1. Column1 was initially inferred to be a BIT/BOOLEAN as it consisted of only 1s and 0s.
# import function to get SQL schema
from mssql_dataframe.core.helpers import get_schema
# create a sample SQL table
df_modify = pd.DataFrame({'Column1': [0,1,0,1]})
sql.create.table_from_dataframe('##sample_modify', df_modify)
sql.write.insert('##sample_modify', df_modify)
# get SQL schema and records
schema = get_schema(sql.connection, '##sample_modify')
result = sql.read.select('##sample_modify')
schema[['data_type','python_type','is_nullable']]
result
# manually change the SQL data type
sql.modify.column('##sample_modify', 'alter', 'Column1', data_type='TINYINT', 
    not_null=False
)
schema = get_schema(sql.connection, '##sample_modify')
result = sql.read.select('##sample_modify')
schema[['data_type','python_type','is_nullable']]
result

Dynamic SQL

Table and column names are passed through the stored procedure sp_executesql and/or use the T-SQL function QUOTENAME to prevent any dynamic strings from being directly executed.

For example, a column is added to a table using the pattern:

statement = '''
DECLARE @SQLStatement AS NVARCHAR(MAX);
DECLARE @TableName SYSNAME = ?;
DECLARE @ColumnName SYSNAME = ?;
DECLARE @ColumnType SYSNAME = ?;

SET @SQLStatement = 
    N'ALTER TABLE '+QUOTENAME(@TableName)+
    'ADD' +QUOTENAME(@ColumnName)+' '+QUOTENAME(@ColumnType)';'

EXEC sp_executesql 
    @SQLStatement,
    N'@TableName SYSNAME, @ColumnName SYSNAME, @ColumnType SYSNAME',
    @TableName=@TableName, @ColumnName=@ColumnName, @ColumnType=@ColumnType;
'''

args = ['DynamicSQLTableName','DynamicSQLColumnName','DynamicSQLDataType']
cursor.execute(statment, *args)

Contributing

See CONTRIBUTING.md

See Also

A similiar project is pangres, but doesn't include SQL Server / Transact-SQL. The primary motivation for creating a new project is differences in Transact-SQL syntax, specifically MERGE in T-SQL vs UPSERT in other SQL flavors.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mssql_dataframe-1.0.3.tar.gz (24.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mssql_dataframe-1.0.3-py3-none-any.whl (41.3 kB view details)

Uploaded Python 3

File details

Details for the file mssql_dataframe-1.0.3.tar.gz.

File metadata

  • Download URL: mssql_dataframe-1.0.3.tar.gz
  • Upload date:
  • Size: 24.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.6.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.61.2 CPython/3.9.1

File hashes

Hashes for mssql_dataframe-1.0.3.tar.gz
Algorithm Hash digest
SHA256 e6e57b5c695935b3c85296e31e2521e2d17caacb1b2a27d6473ba823b9db1630
MD5 1f8f2b5ad361b0057ff901bb32e41a3a
BLAKE2b-256 4a520d24f7dd45d0735e86cf29e4e9d757d55f0170445ff968f8d0846d0db145

See more details on using hashes here.

File details

Details for the file mssql_dataframe-1.0.3-py3-none-any.whl.

File metadata

  • Download URL: mssql_dataframe-1.0.3-py3-none-any.whl
  • Upload date:
  • Size: 41.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.6.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.61.2 CPython/3.9.1

File hashes

Hashes for mssql_dataframe-1.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 2fd7344d8c4a44223109121f103770efdaa03511bd027c9c14dd62b782c94a53
MD5 2057b9a0feac65cb1c3b33b73c45a4d7
BLAKE2b-256 9cdbbfc296176b1b1e6109025cd842991427ce60f4d8c03054c7a363b9cfb467

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page