Skip to main content

Typed data workflows on Spark and Delta Lake

Project description

uplite — typed data workflows on Spark + Delta Lake

Overview

uplite helps you build small, testable PySpark jobs with a strongly-typed feel:

  • Describe datasets with TableSpec and ColumnSpec (names, types, partitions, docs)
  • Get typed DataFrame helpers (TypedDataFrame) with safe joins and schema verification
  • Define jobs as FlowSteps, compose them into simple workflows
  • Read/write/merge Delta tables through a compact API (JobOps)
  • Work locally or in a metastore/catalog environment

The examples below are distilled from the tests and public API in this repo.

Installation

pip install uplite

Requirements (pulled as dependencies): pyspark>=3.5.0,<=3.5.5, delta-spark>=3.3.0, boto3.

Python 3.12+ is recommended (see pyproject.toml).

Quickstart (local Delta catalog)

  1. Start a Spark session configured for Delta:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("uplite-quickstart")
    .master("local[2]")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.sql.defaultTableFormat", "delta")
    .config("spark.sql.sources.default", "delta")
    # optional: set a local warehouse folder
    .config("spark.sql.warehouse.dir", "/tmp/local-delta-warehouse")
    .getOrCreate()
)

spark.sql("CREATE SCHEMA IF NOT EXISTS uplite;")
spark.sql("USE uplite;")
  1. Tell uplite you are in local mode and provide catalog/schema parameters used to resolve table names:
import uplite.flow as flow

flow.LOCAL_MODE = True  # local mode uses `<schema>.<table>` instead of `<catalog>.<schema>.<table>`
flow.override_params = {
    "catalog": "local",   # can be any string in local mode
    "schema": "uplite",   # matches the schema you `USE`d above
}
  1. (Optional) Auto-register existing Delta tables from a warehouse folder (local or S3). For local, layout is <WAREHOUSE>/uplite.db/<table>:
from uplite.lake import auto_register_tables

auto_register_tables(spark, "/tmp/local-delta-warehouse")

For S3, point to an S3A prefix that contains uplite.db: auto_register_tables(spark, "s3a://my-bucket/path/to/warehouse").

Define a schema with TableSpec

Use TableSpec and ColumnSpec to describe a Delta table. You can use Spark data types or simple strings.

from pyspark.sql.types import IntegerType, StringType, DoubleType
from uplite.datalang import TableSpec, ColumnSpec

class Promotion(TableSpec):
    promotion_id = ColumnSpec(IntegerType(), description="promotion id")
    target_type = ColumnSpec(StringType(), description="target type")
    target_value = ColumnSpec(StringType(), description="target value")
    promotion_type = ColumnSpec(StringType(), description="promotion type")
    discount_percentage = ColumnSpec(DoubleType(), description="discount percentage")

# Useful introspection helpers
Promotion.table_name()   # "promotion"
Promotion.table_zone()   # derived from package name (e.g. "raw" / "processed")
Promotion.table_id()     # "<zone>.<name>", e.g. "raw.promotion"
Promotion.alias()        # used in SQL as a stable table alias

Convert raw DataFrames to typed DataFrames

conform_to_spec(df, TableSpec) casts/aligns columns and returns a TypedDataFrame bound to your spec.

from uplite.datalang import conform_to_spec

raw_promos = spark.read.csv("sample_data", header=True)
t_promos = conform_to_spec(raw_promos, Promotion)

# Validate schema before writes/joins
t_promos.verify_schema()

Write, read, merge — job building blocks

Use FlowStep + JobFlowMixin to implement a job. Interact with the catalog via self.data (a JobOps).

from uplite.flow import FlowStep, JobFlowMixin, WorkflowMixin
from pyspark.sql import SparkSession

class ImportPromotions(JobFlowMixin, FlowStep[None]):
    def define(self, spark: SparkSession) -> None:
        # Read from a raw source
        raw_promos = spark.read.csv("sample_data", header=True)

        # Enforce expected schema and get a TypedDataFrame
        t_promos = conform_to_spec(raw_promos, Promotion)

        # Create/overwrite the Delta table based on the spec
        self.data.write(t_promos, mode="overwrite")

        # Read back as a typed DataFrame
        self.data.read(spark, Promotion).show()

class PromotionWorkflow(WorkflowMixin, FlowStep[None]):
    def __init__(self):
        self.register_sub_flow(ImportPromotions())

# Run it
job = PromotionWorkflow()
job.define(spark)

Merging with deduplication (SCD4 helper available):

# Upsert with deduplication based on matching columns
self.data.merge(
    t_promos,
    matching_columns=[Promotion.promotion_id],
    # order_by defaults to matching_columns, but you can specify custom ordering columns
)

# Or perform SCD4-like merge and automatically append change history
self.data.merge_with_scd4(
    t_promos,
    matching_columns=[Promotion.promotion_id],
)

Change Data Feed checkpoints

Consume only new changes from a Delta table using CDF and automatic checkpoints:

with self.data.read_since_latest_checkpoint(spark, Promotion) as changes:
    # process incremental changes ("insert", "update_postimage")
    changes.show()
    # when the context manager exits successfully, the checkpoint is advanced

Typed joins

TypedDataFrame.eq_join verifies join keys exist on both specs and returns a typed result:

left = self.data.read(spark, Promotion)
right = self.data.read(spark, Promotion)  # example only

joined = left.eq_join(right, left_on=Promotion.promotion_id, right_on=Promotion.promotion_id)
joined.show()

Registering existing tables into Spark catalog

uplite.lake.auto_register_tables(spark, warehouse_dir) discovers Delta tables and creates catalog entries:

  • Local directory: expects <warehouse_dir>/uplite.db/<table> layout
  • S3: pass an s3a://bucket/prefix and it will look under <prefix>/uplite.db/
from uplite.lake import auto_register_tables

auto_register_tables(spark, "/tmp/local-delta-warehouse")
# or
auto_register_tables(spark, "s3a://my-bucket/path/to/warehouse")

Parameter resolution and table names

JobOps builds full table names from module-level parameters in uplite.flow:

  • override_params["catalog"] and override_params["schema"]
  • LOCAL_MODE = True → resolves to <schema>`.`<table>
  • LOCAL_MODE = False → resolves to <catalog>`.`<schema>`.`<table>

Set them once at startup:

import uplite.flow as flow

flow.LOCAL_MODE = True
flow.override_params = {"catalog": "prod", "schema": "uplite"}

Tips and gotchas

  • Always call conform_to_spec before writing/merging to enforce schema.
  • TypedDataFrame.verify_schema() is used internally by writers/mergers but can be called explicitly.
  • For local development, create and USE the schema in Spark (e.g., uplite) and set LOCAL_MODE = True.
  • If using S3, configure Hadoop AWS/S3A and credentials; boto3 is used for discovery in auto_register_tables.

Minimal end-to-end example

from pyspark.sql import SparkSession
import uplite.flow as flow
from uplite.datalang import TableSpec, ColumnSpec, conform_to_spec
from uplite.flow import FlowStep, JobFlowMixin, WorkflowMixin
from pyspark.sql.types import IntegerType, StringType

packages = [
    "io.delta:delta-spark_2.12:3.1.0",
    "org.apache.hadoop:hadoop-aws:3.3.1",
]

spark = (
    SparkSession.builder.appName("uplite-example")
    .master("local[2]")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .config("spark.sql.defaultTableFormat", "delta")
    .config("spark.sql.sources.default", "delta")
    .config("spark.jars.packages", ",".join(packages))
    .config("spark.sql.warehouse.dir", "data")
    .getOrCreate()
)

spark.sql("CREATE SCHEMA IF NOT EXISTS uplite;")
spark.sql("USE uplite;")

flow.LOCAL_MODE = True
flow.override_params = {"catalog": "local", "schema": "uplite"}

class Promotion(TableSpec):
    promotion_id = ColumnSpec(IntegerType())
    promotion_type = ColumnSpec(StringType())

class Import(JobFlowMixin, FlowStep[None]):
    def define(self, spark: SparkSession) -> None:
        raw_df = spark.createDataFrame([
            {"promotion_id": 1, "promotion_type": "discount"},
            {"promotion_id": 2, "promotion_type": "bundle"},
        ])
        t_df = conform_to_spec(raw_df, Promotion)
        self.data.write(t_df, mode="overwrite")
        self.data.read(spark, Promotion).show()

class Workflow(WorkflowMixin, FlowStep[None]):
    def __init__(self):
        self.register_sub_flow(Import())

Workflow().define(spark)

If you run into anything that feels rough or is missing, please open an issue with a short snippet — most features here are driven by real-world tests and use-cases.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

uplite-1.0.1.tar.gz (13.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

uplite-1.0.1-py3-none-any.whl (15.1 kB view details)

Uploaded Python 3

File details

Details for the file uplite-1.0.1.tar.gz.

File metadata

  • Download URL: uplite-1.0.1.tar.gz
  • Upload date:
  • Size: 13.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.15 {"installer":{"name":"uv","version":"0.9.15","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for uplite-1.0.1.tar.gz
Algorithm Hash digest
SHA256 1ecc43bcd38bbaf5037d27720dbb48239ee0c4ab9330a8b622ddafb2242fe688
MD5 c976a1fbcc61e6f4a452ff78f5f3a598
BLAKE2b-256 fa496623f2f77781d143cecf0ce12747a1d956554efe866f2757c6b5ef1e79fc

See more details on using hashes here.

File details

Details for the file uplite-1.0.1-py3-none-any.whl.

File metadata

  • Download URL: uplite-1.0.1-py3-none-any.whl
  • Upload date:
  • Size: 15.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.15 {"installer":{"name":"uv","version":"0.9.15","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for uplite-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 83f30217bf9f011e8c701a7e226696ac2722a564a707d705571fcfb81cdbf8d0
MD5 6a48931dca4165669fef90a241433660
BLAKE2b-256 d2ffd31c7e8def510a77fb5d3fa084b0e78a2a9cec9735d5edcf52b4379997b1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page