Slowly Changing Dimensions implemenation with Databricks Delta Lake
Project description
Slowly Changing Dimensions implemenation with Databricks Delta Lake
What is Slowly Changing Dimension
Slowly Changing Dimensions (SCD) are dimensions which change over time and in Data Warehuse we need to track the changes of the attributes keep the accuracy of the report.
And typically there are three types of SCD
- Type 1: SCD1, No history preservation
- Type 2: SCD2, Unlimited history preservation and new rows
- Type 3: SCD3, Limited history preservation
For example we have a dataset
ShortName | Fruit | Color | Price |
---|---|---|---|
FA | Fiji Apple | Red | 3.6 |
BN | Banana | Yellow | 1 |
GG | Green Grape | Green | 2 |
RG | Red Grape | Red | 2 |
If we change the price of "Fiji Apple" into 3.5, the dataset will be
with SCD1:
ShortName | Fruit | Color | Price |
---|---|---|---|
FA | Fiji Apple | Red | 3.5 |
BN | Banana | Yellow | 1 |
GG | Green Grape | Green | 2 |
RG | Red Grape | Red | 2 |
with SCD2:
ShortName | Fruit | Color | Price | is_last |
---|---|---|---|---|
FA | Fiji Apple | Red | 3.5 | Y |
FA | Fiji Apple | Red | 3.6 | N |
BN | Banana | Yellow | 1 | Y |
GG | Green Grape | Green | 2 | Y |
RG | Red Grape | Red | 2 | Y |
with SCD3:
ShortName | Fruit | Color | Price | Color_old | Price_old |
---|---|---|---|---|---|
FA | Fiji Apple | Red | 3.5 | Red | 3.6 |
BN | Banana | Yellow | 1 | NULL | NULL |
GG | Green Grape | Green | 2 | NULL | NULL |
RG | Red Grape | Red | 2 | NULL | NULL |
SCD implementation in Databricks
In this repository, there are implementations of SCD1, SCD2 and SCD3 in python and Databricks Delta Lake.
- SCD1
SCD1(df, target_table_name, target_partition_keys, key_cols, current_time):
Parameters:
- df: source dataframe
- target_table_name: target table name
- target_partition_keys: partition key of the target table
- key_cols: columns of the key for each row
- current_time: current timestamp
Here is the code to show an example of SCD1
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
import datetime
# create sample dataset
df1 = spark.createDataFrame([('FA', 'Fiji Apple', 'Red', 3.5)
,('BN', 'Banana', 'Yellow', 1.0)
,('GG', 'Green Grape', 'Green', 2.0)
,('RG', 'Red Grape', 'Red', 2.0)],
['ShortName','Fruit', 'Color', 'Price'])
# prepare parameters
current_time = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
target_partition_keys = ['ShortName']
key_cols = "ShortName,Fruit"
target_table_name_scd1 = 'default.table_scd1'
# call the SCD1 function
SCD1(df1, target_table_name_scd1, target_partition_keys, key_cols, current_time)
# display the result
display(spark.sql(f"select * from {target_table_name_scd1}"))
Change the price of "Fiji Apple" into 3.5 and run SCD1 again
df2 = spark.createDataFrame([('FA', 'Fiji Apple', 'Red', 3.6)
,('BN', 'Banana', 'Yellow', 1.0)
,('GG', 'Green Grape', 'Green', 2.0)
,('RG', 'Red Grape', 'Red', 2.0)],
['ShortName','Fruit', 'Color', 'Price'])
# call the SCD1 function again
SCD1(df2, target_table_name_scd1, target_partition_keys, key_cols, current_time)
display(spark.sql(f"select * from {target_table_name_scd1}"))
- SCD2
SCD2(df, target_table_name, target_partition_keys, key_cols, current_time):
Parameters:
- df: source dataframe
- target_table_name: target table name
- target_partition_keys: partition key of the target table
- key_cols: columns of the key for each row
- current_time: current timestamp
Here is the code to show an example of SCD2
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
import datetime
# create sample dataset
df1 = spark.createDataFrame([('FA', 'Fiji Apple', 'Red', 3.5)
,('BN', 'Banana', 'Yellow', 1.0)
,('GG', 'Green Grape', 'Green', 2.0)
,('RG', 'Red Grape', 'Red', 2.0)],
['ShortName','Fruit', 'Color', 'Price'])
# prepare parameters
current_time = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
target_partition_keys = ['ShortName']
key_cols = "ShortName,Fruit"
target_table_name_scd1 = 'default.table_scd2'
# call the SCD1 function
SCD2(df1, target_table_name_scd1, target_partition_keys, key_cols, current_time)
# display the result
display(spark.sql(f"select * from {target_table_name_scd2}"))
Change the price of "Fiji Apple" into 3.5 and run SCD2 again
df2 = spark.createDataFrame([('FA', 'Fiji Apple', 'Red', 3.6)
,('BN', 'Banana', 'Yellow', 1.0)
,('GG', 'Green Grape', 'Green', 2.0)
,('RG', 'Red Grape', 'Red', 2.0)],
['ShortName','Fruit', 'Color', 'Price'])
# call the SCD1 function again
SCD2(df2, target_table_name_scd2, target_partition_keys, key_cols, current_time)
display(spark.sql(f"select * from {target_table_name_scd2}"))
- SCD3
SCD3(df, target_table_name, target_partition_keys, key_cols, current_time):
Parameters:
- df: source dataframe
- target_table_name: target table name
- target_partition_keys: partition key of the target table
- key_cols: columns of the key for each row
- current_time: current timestamp
Here is the code to show an example of SCD3
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
import datetime
# create sample dataset
df1 = spark.createDataFrame([('FA', 'Fiji Apple', 'Red', 3.5)
,('BN', 'Banana', 'Yellow', 1.0)
,('GG', 'Green Grape', 'Green', 2.0)
,('RG', 'Red Grape', 'Red', 2.0)],
['ShortName','Fruit', 'Color', 'Price'])
# prepare parameters
current_time = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
target_partition_keys = ['ShortName']
key_cols = "ShortName,Fruit"
target_table_name_scd1 = 'default.table_scd3'
# call the SCD1 function
SCD3(df1, target_table_name_scd3, target_partition_keys, key_cols, current_time)
# display the result
display(spark.sql(f"select * from {target_table_name_scd3}"))
Change the price of "Fiji Apple" into 3.5 and run SCD3 again
df2 = spark.createDataFrame([('FA', 'Fiji Apple', 'Red', 3.6)
,('BN', 'Banana', 'Yellow', 1.0)
,('GG', 'Green Grape', 'Green', 2.0)
,('RG', 'Red Grape', 'Red', 2.0)],
['ShortName','Fruit', 'Color', 'Price'])
# call the SCD1 function again
SCD3(df2, target_table_name_scd3, target_partition_keys, key_cols, current_time)
display(spark.sql(f"select * from {target_table_name_scd3}"))
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file abxscd-0.0.1.tar.gz
.
File metadata
- Download URL: abxscd-0.0.1.tar.gz
- Upload date:
- Size: 8.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.2 importlib_metadata/4.6.3 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.61.0 CPython/3.8.8
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2170a563e894ef8d25fc84432db4aa94877e0ca6bf8bab8e35c5010cba4743bd |
|
MD5 | 19da401943590a2c6e97ea30e7f59534 |
|
BLAKE2b-256 | 3debe8e41f36666893f8637e240dec4ffb24874fccde2b4a7c23400ef67b6c6c |
File details
Details for the file abxscd-0.0.1-py3-none-any.whl
.
File metadata
- Download URL: abxscd-0.0.1-py3-none-any.whl
- Upload date:
- Size: 8.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.2 importlib_metadata/4.6.3 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.61.0 CPython/3.8.8
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | a599b8399807665552ade179a456d7972587aa9e4c381bdf7a7ccf5d5a88c9bc |
|
MD5 | 828b2e7b203dbf7276e4e8aa0262ef52 |
|
BLAKE2b-256 | 5ceb0980c8641a48526f645c0248e2267e3b4a762c2ac085c2397187e5b2841c |