Typed data workflows on Spark and Delta Lake
Project description
uplite — typed data workflows on Spark + Delta Lake
Overview
uplite helps you build small, testable PySpark jobs with a strongly-typed feel:
- Describe datasets with
TableSpecandColumnSpec(names, types, partitions, docs) - Get typed
DataFramehelpers (TypedDataFrame) with safe joins and schema verification - Define jobs as
FlowSteps, compose them into simple workflows - Read/write/merge Delta tables through a compact API (
JobOps) - Work locally or in a metastore/catalog environment
The examples below are distilled from the tests and public API in this repo.
Installation
pip install uplite
Requirements (pulled as dependencies): pyspark>=3.5.0,<=3.5.5, delta-spark>=3.3.0, boto3.
Python 3.12+ is recommended (see pyproject.toml).
Quickstart (local Delta catalog)
- Start a Spark session configured for Delta:
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("uplite-quickstart")
.master("local[2]")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.sql.defaultTableFormat", "delta")
.config("spark.sql.sources.default", "delta")
# optional: set a local warehouse folder
.config("spark.sql.warehouse.dir", "/tmp/local-delta-warehouse")
.getOrCreate()
)
spark.sql("CREATE SCHEMA IF NOT EXISTS uplite;")
spark.sql("USE uplite;")
- Tell uplite you are in local mode and provide catalog/schema parameters used to resolve table names:
import uplite.flow as flow
flow.LOCAL_MODE = True # local mode uses `<schema>.<table>` instead of `<catalog>.<schema>.<table>`
flow.override_params = {
"catalog": "local", # can be any string in local mode
"schema": "uplite", # matches the schema you `USE`d above
}
- (Optional) Auto-register existing Delta tables from a warehouse folder (local or S3). For local, layout is
<WAREHOUSE>/uplite.db/<table>:
from uplite.lake import auto_register_tables
auto_register_tables(spark, "/tmp/local-delta-warehouse")
For S3, point to an S3A prefix that contains uplite.db: auto_register_tables(spark, "s3a://my-bucket/path/to/warehouse").
Define a schema with TableSpec
Use TableSpec and ColumnSpec to describe a Delta table. You can use Spark data types or simple strings.
from pyspark.sql.types import IntegerType, StringType, DoubleType
from uplite.datalang import TableSpec, ColumnSpec
class Promotion(TableSpec):
promotion_id = ColumnSpec(IntegerType(), description="promotion id")
target_type = ColumnSpec(StringType(), description="target type")
target_value = ColumnSpec(StringType(), description="target value")
promotion_type = ColumnSpec(StringType(), description="promotion type")
discount_percentage = ColumnSpec(DoubleType(), description="discount percentage")
# Useful introspection helpers
Promotion.table_name() # "promotion"
Promotion.table_zone() # derived from package name (e.g. "raw" / "processed")
Promotion.table_id() # "<zone>.<name>", e.g. "raw.promotion"
Promotion.alias() # used in SQL as a stable table alias
Convert raw DataFrames to typed DataFrames
conform_to_spec(df, TableSpec) casts/aligns columns and returns a TypedDataFrame bound to your spec.
from uplite.datalang import conform_to_spec
raw_promos = spark.read.csv("sample_data", header=True)
t_promos = conform_to_spec(raw_promos, Promotion)
# Validate schema before writes/joins
t_promos.verify_schema()
Write, read, merge — job building blocks
Use FlowStep + JobFlowMixin to implement a job. Interact with the catalog via self.data (a JobOps).
from uplite.flow import FlowStep, JobFlowMixin, WorkflowMixin
from pyspark.sql import SparkSession
class ImportPromotions(JobFlowMixin, FlowStep[None]):
def define(self, spark: SparkSession) -> None:
# Read from a raw source
raw_promos = spark.read.csv("sample_data", header=True)
# Enforce expected schema and get a TypedDataFrame
t_promos = conform_to_spec(raw_promos, Promotion)
# Create/overwrite the Delta table based on the spec
self.data.write(t_promos, mode="overwrite")
# Read back as a typed DataFrame
self.data.read(spark, Promotion).show()
class PromotionWorkflow(WorkflowMixin, FlowStep[None]):
def __init__(self):
self.register_sub_flow(ImportPromotions())
# Run it
job = PromotionWorkflow()
job.define(spark)
Merging with deduplication (SCD4 helper available):
# Upsert with deduplication based on matching columns
self.data.merge(
t_promos,
matching_columns=[Promotion.promotion_id],
# order_by defaults to matching_columns, but you can specify custom ordering columns
)
# Or perform SCD4-like merge and automatically append change history
self.data.merge_with_scd4(
t_promos,
matching_columns=[Promotion.promotion_id],
)
Change Data Feed checkpoints
Consume only new changes from a Delta table using CDF and automatic checkpoints:
with self.data.read_since_latest_checkpoint(spark, Promotion) as changes:
# process incremental changes ("insert", "update_postimage")
changes.show()
# when the context manager exits successfully, the checkpoint is advanced
Typed joins
TypedDataFrame.eq_join verifies join keys exist on both specs and returns a typed result:
left = self.data.read(spark, Promotion)
right = self.data.read(spark, Promotion) # example only
joined = left.eq_join(right, left_on=Promotion.promotion_id, right_on=Promotion.promotion_id)
joined.show()
Registering existing tables into Spark catalog
uplite.lake.auto_register_tables(spark, warehouse_dir) discovers Delta tables and creates catalog entries:
- Local directory: expects
<warehouse_dir>/uplite.db/<table>layout - S3: pass an
s3a://bucket/prefixand it will look under<prefix>/uplite.db/
from uplite.lake import auto_register_tables
auto_register_tables(spark, "/tmp/local-delta-warehouse")
# or
auto_register_tables(spark, "s3a://my-bucket/path/to/warehouse")
Parameter resolution and table names
JobOps builds full table names from module-level parameters in uplite.flow:
override_params["catalog"]andoverride_params["schema"]LOCAL_MODE = True→ resolves to<schema>`.`<table>LOCAL_MODE = False→ resolves to<catalog>`.`<schema>`.`<table>
Set them once at startup:
import uplite.flow as flow
flow.LOCAL_MODE = True
flow.override_params = {"catalog": "prod", "schema": "uplite"}
Tips and gotchas
- Always call
conform_to_specbefore writing/merging to enforce schema. TypedDataFrame.verify_schema()is used internally by writers/mergers but can be called explicitly.- For local development, create and
USEthe schema in Spark (e.g.,uplite) and setLOCAL_MODE = True. - If using S3, configure Hadoop AWS/S3A and credentials;
boto3is used for discovery inauto_register_tables.
Minimal end-to-end example
from pyspark.sql import SparkSession
import uplite.flow as flow
from uplite.datalang import TableSpec, ColumnSpec, conform_to_spec
from uplite.flow import FlowStep, JobFlowMixin, WorkflowMixin
from pyspark.sql.types import IntegerType, StringType
packages = [
"io.delta:delta-spark_2.12:3.1.0",
"org.apache.hadoop:hadoop-aws:3.3.1",
]
spark = (
SparkSession.builder.appName("uplite-example")
.master("local[2]")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
.config("spark.sql.defaultTableFormat", "delta")
.config("spark.sql.sources.default", "delta")
.config("spark.jars.packages", ",".join(packages))
.config("spark.sql.warehouse.dir", "data")
.getOrCreate()
)
spark.sql("CREATE SCHEMA IF NOT EXISTS uplite;")
spark.sql("USE uplite;")
flow.LOCAL_MODE = True
flow.override_params = {"catalog": "local", "schema": "uplite"}
class Promotion(TableSpec):
promotion_id = ColumnSpec(IntegerType())
promotion_type = ColumnSpec(StringType())
class Import(JobFlowMixin, FlowStep[None]):
def define(self, spark: SparkSession) -> None:
raw_df = spark.createDataFrame([
{"promotion_id": 1, "promotion_type": "discount"},
{"promotion_id": 2, "promotion_type": "bundle"},
])
t_df = conform_to_spec(raw_df, Promotion)
self.data.write(t_df, mode="overwrite")
self.data.read(spark, Promotion).show()
class Workflow(WorkflowMixin, FlowStep[None]):
def __init__(self):
self.register_sub_flow(Import())
Workflow().define(spark)
If you run into anything that feels rough or is missing, please open an issue with a short snippet — most features here are driven by real-world tests and use-cases.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file uplite-1.0.1.tar.gz.
File metadata
- Download URL: uplite-1.0.1.tar.gz
- Upload date:
- Size: 13.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.15 {"installer":{"name":"uv","version":"0.9.15","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1ecc43bcd38bbaf5037d27720dbb48239ee0c4ab9330a8b622ddafb2242fe688
|
|
| MD5 |
c976a1fbcc61e6f4a452ff78f5f3a598
|
|
| BLAKE2b-256 |
fa496623f2f77781d143cecf0ce12747a1d956554efe866f2757c6b5ef1e79fc
|
File details
Details for the file uplite-1.0.1-py3-none-any.whl.
File metadata
- Download URL: uplite-1.0.1-py3-none-any.whl
- Upload date:
- Size: 15.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.15 {"installer":{"name":"uv","version":"0.9.15","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
83f30217bf9f011e8c701a7e226696ac2722a564a707d705571fcfb81cdbf8d0
|
|
| MD5 |
6a48931dca4165669fef90a241433660
|
|
| BLAKE2b-256 |
d2ffd31c7e8def510a77fb5d3fa084b0e78a2a9cec9735d5edcf52b4379997b1
|