Skip to main content

Update, Upsert, and Merge from Python dataframes to SQL Server and Azure SQL database.

Project description

mssql_dataframe

Test Status Coverage Status Flake8 Status PyPI Code style: black Build Status Open in Visual Studio Code

A data engineering package for Python pandas dataframes and Microsoft SQL Server / Azure SQL Database.

Provides efficient mechanisms for updating and merging from Python dataframes into SQL tables. This is accomplished by quickly inserting into a source SQL temporary table, and then updating/merging into a target SQL table from that temporary table.

Dependancies

pandas: Python DataFrames.

pyodbc: ODBC driver used for executing Transact-SQL statements.

Installation

pip install mssql-dataframe

Quick Start

Initialization

import pandas as pd
from mssql_dataframe import SQLServer

# connect to database using pyodbc
sql = SQLServer(database='master', server='localhost')

Create Sample Table

Create a global temporary table for demonstration purposes. Notice a dataframe is returned with better data types assigned, and the index corresponds to the primary key.

# create a demonstration dataframe
df = pd.DataFrame({
    'ColumnA': ['1','2','3','4','5'],
    'ColumnB': ['a  .','b!','  c','d','e'],
    'ColumnC': [False, True, True, False, False]
}, index=pd.Index([0, 1, 2, 3, 4], name='PrimaryKey'))

# create the table using a dataframe
df = sql.create.table_from_dataframe(
    table_name='##mssql_dataframe',
    dataframe = df,
    primary_key = 'index'
)

Updating SQL Table

Update an SQL table using the primary key. Without match column details provided, the primary key / dataframe index is automatically used.

# peform a basic text cleaning task
df['ColumnB'] = df['ColumnB'].str.replace('[^\w\s]','', regex=True)
df['ColumnB'] = df['ColumnB'].str.strip()

# perform the update in SQL
updated = sql.write.update('##mssql_dataframe', df[['ColumnB']])

# read the result from SQL after the update
result = sql.read.table('##mssql_dataframe')

Update an SQL table using other columns that are not the primary key by specifying match columns.

# update ColumnA to 0 where ColumnC is False
sample = pd.DataFrame({
    'ColumnA': [0],
    'ColumnC': [False]
})

# peform the update in SQL
updated = sql.write.update('##mssql_dataframe', sample, match_columns='ColumnC')

# read the result from SQL after the update
result = sql.read.table('##mssql_dataframe')

Merging/Upsert SQL Table

Merging the dataframe into an SQL table will:

  1. Insert new records in the dataframe that are not in the SQL table.
  2. Update records in the SQL table that are also in the dataframe.
  3. Delete records in the SQL table not in the dataframe (if upsert=False).
# read what is currenly in the table
sample = sql.read.table('##mssql_dataframe', order_column='ColumnA', order_direction='ASC')

# simulate new records
sample = sample.append(
    pd.DataFrame(
        [
            [9, 'x', True],
            [9, 'y', True],
        ], 
        columns=['ColumnA', 'ColumnB', 'ColumnC'], 
        index = pd.Index([5,6], name='PrimaryKey')
    )
)

# simulate updated records
sample.loc[sample['ColumnB'].isin(['d','e']),'ColumnA'] = 1

# simulate deleted records
sample = sample[~sample['ColumnA'].isin([2,3])]

# perform the merge
merged = sql.write.merge('##mssql_dataframe', sample)

# read the result from SQL after the merge
# records for PrimaryKey 5 & 6 have been inserted
# records for PrimaryKey 0, 3, & 4 have been updated
# records for PrimaryKey 2 & 3 have been deleted
result = sql.read.table('##mssql_dataframe')

Additional functionality allows data to be incrementally merged into an SQL table. This can be useful for file processing, web scraping multiple pages, or other batch processing situations.

# read what is currenly in the table
sample = sql.read.table('##mssql_dataframe', order_column='ColumnA', order_direction='ASC')

# simulate new records
sample = sample.append(
    pd.DataFrame(
        [
            [10, 'z', False],
            [10, 'z', True],
            [0, 'A', True]
        ], 
        columns=['ColumnA', 'ColumnB', 'ColumnC'], 
        index = pd.Index([7,8,9], name='PrimaryKey')
    )
)

# simulate updated records
sample.loc[sample['ColumnA']==1, 'ColumnC'] = True

# simulate deleted records
sample = sample[sample['ColumnB']!='a']
sample = sample[sample['ColumnA']!=9]

# perform the merge
merged = sql.write.merge('##mssql_dataframe', sample, delete_requires=['ColumnA'])

# read the result from SQL after the merge
# records for PrimaryKey 5 & 6 were not deleted (value 9 not in dataframe ColumnA)
# record for PrimaryKey 0 was deleted (value 0 in dataframe ColumnA)
# records for PrimaryKey 7 & 8 have been inserted
# records for PrimaryKey 0, 3, & 4 have been updated
result = sql.read.table('##mssql_dataframe')

Upsert functionality is accomplished by setting upsert=False. This results in records only being inserted or updated.

# simulate a new record
sample = sample.append(
    pd.DataFrame(
        [
            [11, 'z', False],
        ], 
        columns=['ColumnA', 'ColumnB', 'ColumnC'], 
        index = pd.Index([10], name='PrimaryKey')
    )
)

# simulate an updated record
sample.at[3,'ColumnA'] = 12

# perform the upsert
merged = sql.write.merge('##mssql_dataframe', sample, upsert=True)

# read the result from SQL after the upsert
# record for PrimaryKey 3 was updated
# record for PrimaryKey 10 was inserted
# all other records remain unchanged
result = sql.read.table('##mssql_dataframe')

Additional Functionality

include_metadata_timestamps

If mssql_dataframe is initialized with include_metadata_timestamps=True insert, update, and merge operations will include columns detailing when records are inserted or updated. These are timestamps in server time.

# intialized with flag to include metadata timestamps
sql = SQLServer(include_metadata_timestamps=True)

# create sample table
df = pd.DataFrame({
    'ColumnA': ['1','2','3','4','5'],
}, index=pd.Index([0, 1, 2, 3, 4], name='PrimaryKey'))

df = sql.create.table_from_dataframe(
    table_name='##mssql_metadata',
    dataframe = df,
    primary_key = 'index'
)

# all records have a _time_insert value
result = sql.read.table('##mssql_metadata')

# simulate an updated record
result.at[0,'ColumnA'] = 9
updated = sql.write.update('##mssql_metadata', result.loc[[0]])

# record 0 now has a _time_update value
# the _time_update column was automatically created
result = sql.read.table('##mssql_metadata')

Manual SQL Column Modification

mssql_dataframe contains methods to adjust SQL columns.

import pandas as pd
from mssql_dataframe import SQLServer

sql = SQLServer()

# create sample table
df = pd.DataFrame({
    'ColumnA': ['1','2','3','4','5'],
}, index=pd.Index([0, 1, 2, 3, 4], name='PrimaryKey'))

df = sql.create.table_from_dataframe(
    table_name='##mssql_modify',
    dataframe = df,
    primary_key = 'index'
)

# modify ColumnA
sql.modify.column('##mssql_modify', 'alter', 'ColumnA', 'BIGINT', is_nullable=True)

# notice ColumnA is now BIGINT and nullable
schema = sql.get_schema('##mssql_modify')

Automatic SQL Object Creation and Modification

SQL objects will be created/modified as needed if the class is initialized with autoadjust_sql_objects=True.

  1. Tables will be created if they do not exist.
  2. Column size will increase if needed, for example from TINYINT to BIGINT or VARCHAR(5) to VARCHAR(10).
import pandas as pd
from mssql_dataframe import SQLServer

sql = SQLServer(autoadjust_sql_objects=True)

# sample dataframe
df = pd.DataFrame({
    'ColumnA': [1,2,3,4,5],
}, index=pd.Index([0, 1, 2, 3, 4], name='PrimaryKey'))

# create table by inserting into a table that doesn't exist
df = sql.write.insert('##mssql_auto', df)

# automatically add a column
new = pd.DataFrame({
    'ColumnA': [6],
    'ColumnB' : ['z']
}, index=pd.Index([5], name='PrimaryKey'))
new = sql.write.insert('##mssql_auto', new)

# automatically modify columns
alter = pd.DataFrame({
    'ColumnA': [1000],
    'ColumnB' : ['zzz']
}, index=pd.Index([6], name='PrimaryKey'))
alter = sql.write.insert('##mssql_auto', alter)

# prevent  automatically modifying to different category
error = pd.DataFrame({
    'ColumnA': ['z'],
}, index=pd.Index([7], name='PrimaryKey'))
try:
    error = sql.write.insert('##mssql_auto', error)
except sql.exceptions.DataframeColumnInvalidValue:
    print('ColumnA not changed to string like column.')

Contributing

See CONTRIBUTING.md

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mssql_dataframe-1.3.0.tar.gz (44.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mssql_dataframe-1.3.0-py3-none-any.whl (34.4 kB view details)

Uploaded Python 3

File details

Details for the file mssql_dataframe-1.3.0.tar.gz.

File metadata

  • Download URL: mssql_dataframe-1.3.0.tar.gz
  • Upload date:
  • Size: 44.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.10.1 pkginfo/1.8.2 requests/2.27.1 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.9

File hashes

Hashes for mssql_dataframe-1.3.0.tar.gz
Algorithm Hash digest
SHA256 bcb73bf341ece137e9e8c28f82fc0de12a3eae2332c96e79acb67c2007a8321a
MD5 e7c739b05e239c0b66af034108aefcaf
BLAKE2b-256 aab2e540037474be46f4539cd0d918e31ec6f9e485922f11caec9b8eea84bcb1

See more details on using hashes here.

File details

Details for the file mssql_dataframe-1.3.0-py3-none-any.whl.

File metadata

  • Download URL: mssql_dataframe-1.3.0-py3-none-any.whl
  • Upload date:
  • Size: 34.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.10.1 pkginfo/1.8.2 requests/2.27.1 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.9

File hashes

Hashes for mssql_dataframe-1.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 cd7eaf86171b2b0a21b6b67fb0d59d18bd2be1cb1ba73868d4892ced7697d566
MD5 a9fabc4959922e36aa91c852a08c8835
BLAKE2b-256 645a1301e260bccfcd2c0837ec87459c20ea2859c1b5517c67e10d21c9e0be15

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page