A modern, intuitive Python package for data lakehouse operations
Project description
superlake
A modern, intuitive Python package for data lakehouse operations
Main SuperLake Classes
- SuperSpark: Instantiates a SparkSession with Delta Lake support.
- SuperDeltaTable: Manages Delta tables (create, read, write, optimize, vacuum, SCD2, schema evolution, etc.).
- SuperPipeline: Orchestrates data pipelines from source to bronze and silver layers, including CDC and transformation logic.
- SuperGoldPipeline: Manages gold-layer aggregations and writes results to gold tables.
- SuperDataframe: Utility class for DataFrame cleaning, casting, and manipulation.
- SuperLogger: Logging and metrics for pipeline operations.
Quick Example Usage
# superlake Library
from superlake.core import SuperSpark, SuperDeltaTable, TableSaveMode, SchemaEvolution, SuperPipeline, SuperGoldPipeline
from superlake.monitoring import SuperLogger
# Standard Library
import pyspark.sql.types as T
import pyspark.sql.functions as F
from datetime import date, datetime
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame as SparkDataFrame
import sys
import time
# Initialize Spark
spark = SuperSpark()
logger = SuperLogger()
superlake_dt = datetime.now()
# ------------------------------------------------------------------------------------------------
# Bronze and silver tables, cdc and transformation functions
# ------------------------------------------------------------------------------------------------
# Bronze Customer Table
bronze_customer = SuperDeltaTable(
table_name="01_bronze.customer",
table_path="./data/external-table/01_bronze/customer",
table_schema=T.StructType([
T.StructField("customer_id", T.StringType(), False),
T.StructField("name", T.StringType(), True),
T.StructField("email", T.StringType(), True),
T.StructField("country", T.StringType(), True),
T.StructField("signup_date", T.DateType(), True),
T.StructField("superlake_dt", T.TimestampType(), True)
]),
table_save_mode=TableSaveMode.Append,
primary_keys=["customer_id"],
partition_cols=["superlake_dt"],
pruning_partition_cols=True,
pruning_primary_keys=False,
optimize_table=True,
optimize_zorder_cols=[],
optimize_target_file_size=100000000,
compression_codec="snappy",
schema_evolution_option=SchemaEvolution.Merge,
logger=logger,
managed=True # Managed table (in spark-warehouse)
)
# Silver Customer Table
silver_customer = SuperDeltaTable(
table_name="02_silver.customer",
table_path="./data/external-table/02_silver/customer",
table_schema=T.StructType([
T.StructField("customer_id", T.IntegerType(), False),
T.StructField("name", T.StringType(), True),
T.StructField("email", T.StringType(), True),
T.StructField("country", T.StringType(), True),
T.StructField("signup_date", T.DateType(), True),
T.StructField("superlake_dt", T.TimestampType(), True)
]),
table_save_mode=TableSaveMode.MergeSCD,
primary_keys=["customer_id"],
partition_cols=["scd_is_current"],
pruning_partition_cols=True,
pruning_primary_keys=False,
optimize_table=True,
optimize_zorder_cols=["country"],
optimize_target_file_size=100000000,
compression_codec="snappy",
schema_evolution_option=SchemaEvolution.Merge,
logger=logger,
scd_change_cols=["name", "email", "country"],
managed=False # External table (custom path)
)
# Change Data Capture Function
def customer_cdc(spark):
# ---------------------------------------------------------------------------------------
# mockup customer source data and schema (should be a select from a table)
customer_source_schema = T.StructType([
T.StructField("customer_id", T.StringType(), False),
T.StructField("name", T.StringType(), True),
T.StructField("email", T.StringType(), True),
T.StructField("country", T.StringType(), True),
T.StructField("signup_date", T.DateType(), True)
])
customer_source_data = [
("1", "John Doe", "john.doe@example.com", "US", date(2022, 1, 15)),
("2", "Jane Smith", "jane.smith@example.com", "FR", date(2022, 2, 20)),
("3", "Pedro Alvarez", "pedro.alvarez@example.com", "EN", date(2022, 3, 10)),
("4", "Anna Müller", "anna.mueller@example.com", "DE", date(2022, 4, 5)),
("5", "Li Wei", "li.wei@example.com", "DE", date(2022, 5, 12))
]
customer_source_df = spark.createDataFrame(customer_source_data, schema=customer_source_schema)
# ---------------------------------------------------------------------------------------
# change data capture mechanism
if silver_customer.table_exists(spark):
max_customer_id = silver_customer.read(spark).select(F.max("customer_id")).collect()[0][0]
max_customer_id = max_customer_id - 2
# simulate a change in the source schema
customer_source_schema = T.StructType([
T.StructField("customer_id", T.StringType(), False),
T.StructField("phone_number", T.StringType(), True),
T.StructField("name", T.StringType(), True),
T.StructField("email", T.StringType(), True),
T.StructField("country", T.StringType(), True),
T.StructField("signup_date", T.DateType(), True)
])
customer_source_data = [
("1", "0923623623","John Doe", "john.doe@changed.com", "CH", date(2022, 1, 15)),
("2", "0923623624","Jane changed", "jane.smith@example.com", "EN", date(2022, 2, 20)),
("3", "0923623625","Pedro Alvarez", "pedro.alvarez@example.com", "EN", date(2022, 3, 10)),
("4", "0923623626","Anna Müller", "anna.mueller@example.com", "DE", date(2022, 4, 5)),
("5", "0923623627","Li Wei", "li.wei@example.com", "DE", date(2022, 5, 12))
]
customer_source_df = spark.createDataFrame(customer_source_data, schema=customer_source_schema)
else:
customer_source_df = customer_source_df.filter(F.col("customer_id") <= 3) # mockup cdc
max_customer_id = 0
logger.info(f"CDC max customer id: {max_customer_id}")
# filter out rows based on change data capture mechanism
customer_source_df = customer_source_df.filter(F.col("customer_id") > max_customer_id)
return customer_source_df
# Transformation Function
def customer_tra(df: SparkDataFrame):
"""Clean and transform customer data."""
df = (
df
.withColumn("email", F.lower(F.col("email")))
.withColumn("name", F.lower(F.col("name")))
.withColumn("country", F.upper(F.col("country")))
)
return df
# ------------------------------------------------------------------------------------------------
# Gold table and gold function
# ------------------------------------------------------------------------------------------------
# Gold Customer Agg Function
def gold_customer_agg_function(spark, superlake_dt):
# aggregate customer count by country for current superlake_dt
df = silver_customer.read(spark).filter(F.col("scd_is_current") == True)
df = df.groupBy("country").agg(F.count("*").alias("customer_count"))
df = df.withColumn("superlake_dt", F.lit(superlake_dt))
return df
# Gold Customer Agg Table
gold_customer_agg = SuperDeltaTable(
table_name="03_gold.customer_agg",
table_path="./data/external-table/03_gold/customer_agg",
table_schema=T.StructType([
T.StructField("country", T.StringType(), True),
T.StructField("customer_count", T.LongType(), True),
T.StructField("superlake_dt", T.TimestampType(), True)
]),
table_save_mode=TableSaveMode.Merge,
primary_keys=["country"],
partition_cols=[],
pruning_partition_cols=True,
pruning_primary_keys=False,
optimize_table=True,
optimize_zorder_cols=["country"],
optimize_target_file_size=100000000,
compression_codec="snappy",
schema_evolution_option=SchemaEvolution.Merge,
logger=logger,
managed=False
)
# ------------------------------------------------------------------------------------------------
# Customer Data Pipeline from Source > Bronze > Silver > Gold
# ------------------------------------------------------------------------------------------------
print("################################################################################################")
print("------------------------ drop tables -----------------------")
bronze_customer.drop(spark)
silver_customer.drop(spark)
gold_customer_agg.drop(spark)
print("------------------------ pipeline 1 ------------------------")
# set superlake_dt
superlake_dt = datetime.now()
# source > bronze > silver pipeline
customer_pipeline = SuperPipeline(
superlake_dt = superlake_dt,
bronze_table = bronze_customer,
silver_table = silver_customer,
cdc_function = customer_cdc,
tra_function = customer_tra,
logger = logger,
spark = spark,
environment = "test"
)
customer_pipeline.execute()
# gold pipeline
gold_pipeline = SuperGoldPipeline(
gold_function = gold_customer_agg_function,
gold_table = gold_customer_agg,
logger = logger,
spark = spark,
superlake_dt = superlake_dt,
environment = "test"
)
gold_pipeline.execute()
print("-------------------- waiting 5 seconds --------------------")
time.sleep(5)
print("------------------------ pipeline 2 ------------------------")
# set superlake_dt
superlake_dt = datetime.now()
# source > bronze > silver pipeline
customer_pipeline = SuperPipeline(
superlake_dt = superlake_dt,
bronze_table = bronze_customer,
silver_table = silver_customer,
cdc_function = customer_cdc,
tra_function = customer_tra,
logger = logger,
spark = spark,
environment = "test"
)
customer_pipeline.execute()
# gold pipeline
gold_pipeline = SuperGoldPipeline(
gold_function = gold_customer_agg_function,
gold_table = gold_customer_agg,
logger = logger,
spark = spark,
superlake_dt = superlake_dt,
environment = "test"
)
gold_pipeline.execute()
print("------------------------ optimize tables ------------------------")
bronze_customer.optimize(spark)
silver_customer.optimize(spark)
gold_customer_agg.optimize(spark)
print("------------------------ vacuum tables ------------------------")
bronze_customer.vacuum(spark)
silver_customer.vacuum(spark)
gold_customer_agg.vacuum(spark)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
superlake-0.1.3.tar.gz
(18.7 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
superlake-0.1.3-py3-none-any.whl
(18.3 kB
view details)
File details
Details for the file superlake-0.1.3.tar.gz.
File metadata
- Download URL: superlake-0.1.3.tar.gz
- Upload date:
- Size: 18.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c9ab031cbbc3fd2af5143c8c82cf9ccc699a556dd1a90e093a108b3722d8c782
|
|
| MD5 |
48de8ef00ad9218663d4710e0eea003d
|
|
| BLAKE2b-256 |
e27fd218e827844389d3d4b5c10805f0c2095e9e53d3a322b0232eee5777497a
|
File details
Details for the file superlake-0.1.3-py3-none-any.whl.
File metadata
- Download URL: superlake-0.1.3-py3-none-any.whl
- Upload date:
- Size: 18.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6cd4e4d05cf0d41c878b28b075e40910419b06ecb63de77bc54ae6376f0eedba
|
|
| MD5 |
6a8d2f3459c4294743e82224a18e301d
|
|
| BLAKE2b-256 |
005ea396b787b183b0c0f7818d64db686b520b897a19176ff7aa971482d6d47d
|