A modern, intuitive Python package for data lakehouse operations
Project description
superlake
A modern, intuitive Python package for data lakehouse operations
Main SuperLake Classes
- SuperSpark: Instantiates a SparkSession with Delta Lake support.
- SuperDeltaTable: Manages Delta tables (create, read, write, optimize, vacuum, SCD2, schema evolution, etc.).
- SuperPipeline: Orchestrates data pipelines from source to bronze and silver layers, including CDC and transformation logic.
- SuperGoldPipeline: Manages gold-layer aggregations and writes results to gold tables.
- SuperDataframe: Utility class for DataFrame cleaning, casting, and manipulation.
- SuperLogger: Logging and metrics for pipeline operations.
Quick Example Usage
from superlake.core import SuperSpark, SuperDeltaTable, TableSaveMode, SchemaEvolution, SuperPipeline, SuperGoldPipeline
from superlake.monitoring import SuperLogger
import pyspark.sql.types as T
from datetime import datetime
# Initialize Spark and logger
spark = SuperSpark()
logger = SuperLogger()
superlake_dt = datetime.now()
# Define a Delta table
bronze_customer = SuperDeltaTable(
table_name="01_bronze.customer",
table_path="/path/to/bronze/customer",
table_schema=T.StructType([
T.StructField("customer_id", T.StringType(), False),
T.StructField("name", T.StringType(), True),
T.StructField("email", T.StringType(), True),
T.StructField("country", T.StringType(), True),
T.StructField("signup_date", T.DateType(), True),
T.StructField("superlake_dt", T.TimestampType(), True)
]),
table_save_mode=TableSaveMode.Append,
primary_keys=["customer_id"],
partition_cols=["superlake_dt"],
schema_evolution_option=SchemaEvolution.Merge,
logger=logger,
managed=True
)
# Define CDC and transformation functions
def customer_cdc(spark):
# Return a DataFrame with new/changed customer data
...
def customer_tra(df):
# Clean and transform customer data
return df
# Create and run a pipeline
customer_pipeline = SuperPipeline(
superlake_dt=superlake_dt,
bronze_table=bronze_customer,
silver_table=..., # another SuperDeltaTable
cdc_function=customer_cdc,
tra_function=customer_tra,
logger=logger,
spark=spark,
environment="test"
)
customer_pipeline.execute()
See example/superlake_example.py for a full pipeline example, including gold table aggregation.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
superlake-0.1.2.tar.gz
(16.8 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
superlake-0.1.2-py3-none-any.whl
(17.1 kB
view details)
File details
Details for the file superlake-0.1.2.tar.gz.
File metadata
- Download URL: superlake-0.1.2.tar.gz
- Upload date:
- Size: 16.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
181ec6982addd204f2bf0a8bb3a2b2475a1db6f6fe634c54a18389a5c4fc26f1
|
|
| MD5 |
f58d680effad7bb3f3af1202e73acee0
|
|
| BLAKE2b-256 |
08c63ba48c55159cb11f67bd708bd8571d5095f684d73e6ff3552bf8b1a7493b
|
File details
Details for the file superlake-0.1.2-py3-none-any.whl.
File metadata
- Download URL: superlake-0.1.2-py3-none-any.whl
- Upload date:
- Size: 17.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0fe628437a45a0e81996e111507d482051eba8e262e05e02e66510aa5616636e
|
|
| MD5 |
030feaef73b325d78997dee31619fbfe
|
|
| BLAKE2b-256 |
1035dda3e4341efd5bde0361a7b50017c6384281b2aab101483c32c3c9705d2d
|