Type annotation system that allows you to specify and validate the schema of PySpark DataFrames using Python type hints for both function arguments and return values.
Project description
sparkenforce
sparkenforce is a type annotation system that lets you specify and validate PySpark DataFrame schemas using Python type hints. It validates both function arguments and return values, catching schema mismatches before they cause runtime errors.
Why sparkenforce?
Working with PySpark DataFrames can be error-prone when schemas don't match expectations. sparkenforce helps by:
- Preventing runtime errors: Catch schema mismatches early
- Improving code clarity: Function signatures show exactly what DataFrame structure is expected
- Enforcing contracts: Ensure functions return DataFrames with the promised schema
- Better debugging: Clear error messages when validations fail
Getting Started
Installation
Install sparkenforce using pip:
pip install sparkenforce
Or if you're using uv:
uv add sparkenforce
Validating Input DataFrames
from sparkenforce import validate
from pyspark.sql import functions as fn
from pyspark.sql import DataFrame
@validate
def add_length(df: DataFrame['firstname': str, ...]) -> DataFrame['name': str, 'length': int]:
return df.select(
df.firstname.alias('name'),
fn.length(df.firstname).alias('length')
)
# If input DataFrame doesn't have 'firstname' column, validation fails
# If return DataFrame doesn't match expected schema, validation fails
Flexible Schemas with Ellipsis
Use ... to allow additional columns beyond the specified ones:
@validate
def filter_names(df: DataFrame['firstname': str, 'lastname': str, ...]):
"""Requires firstname and lastname, but allows other columns too."""
return df.filter(df.firstname != "")
Return Value Validation
sparkenforce validates that your function returns exactly what you promise:
@validate
def get_summary(df: DataFrame['firstname': str, ...]) -> DataFrame['firstname': str, 'summary': str]:
return df.select(
'firstname',
fn.lit('processed').alias('summary'),
)
Error Handling
When validation fails, sparkenforce provides clear error messages:
# This will raise DataFrameValidationError with detailed message:
# "return value columns mismatch. Expected exactly {'name', 'length'},
# got {'lastname', 'firstname'}. missing columns: {'name', 'length'},
# unexpected columns: {'lastname', 'firstname'}"
@validate
def bad_function(df: DataFrame['firstname': str, ...]) -> DataFrame['name': str, 'length': int]:
return df.select('firstname', 'lastname') # Wrong columns!
More Examples
Check out the examples notebook.
API Reference
Core Components
@validate Decorator
The main decorator for enabling DataFrame schema validation on functions.
@validate
def process_data(df: DataFrame["id": int, "name": str]) -> DataFrame["result": str]:
return spark.createDataFrame([("processed",)], ["result"])
Signature: validate(func: Callable) -> Callable
Parameters:
func- Function to decorate with validation logic
Returns:
- Wrapped function that validates DataFrame arguments and return values
Raises:
DataFrameValidationError- When schema validation fails
Validation Rules:
- Validates all function parameters annotated with
DataFrame[...]types - Validates return values if annotated with
DataFrame[...]types - Functions without DataFrame annotations are not validated
- Return type
Noneor no return annotation skips return validation
DataFrame Type Annotations
sparkenforce extends PySpark's DataFrame class to support schema specifications using subscript notation.
Column-Only Validation
DataFrame["id", "name"] # Requires exactly these columns
DataFrame["id", "name", ...] # Requires at least these columns
Column + Type Validation
DataFrame["id": int, "name": str] # Exact columns with types
DataFrame["id": int, "name": str, ...] # Minimum columns with types
DataFrame["id": int, "name": Optional[str]] # Optional columns (may not be present)
Supported Types
Python Types:
int→LongType(with compatibility forIntegerType,ShortType,ByteType)str→StringTypefloat→DoubleType(with compatibility forFloatType)bool→BooleanTypedatetime.datetime→TimestampTypedatetime.date→DateTypedecimal.Decimal→DecimalTypebytearray→BinaryType
Spark Types:
Any pyspark.sql.types.DataType subclass can be used directly:
from pyspark.sql.types import IntegerType, StringType
DataFrame["id": IntegerType, "name": StringType]
Custom Types: Register custom type mappings for complex types:
@dataclass
class Person:
name: str
age: int
person_struct = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
register_type_mapping(Person, person_struct)
DataFrame["person": Person] # Now supported
Functions
register_type_mapping(python_type, spark_type)
Register custom mappings between Python types and Spark DataTypes.
from dataclasses import dataclass
from pyspark.sql.types import StructType, StructField, StringType
@dataclass
class Name:
first: str
last: str
name_type = StructType([
StructField("first", StringType(), True),
StructField("last", StringType(), True)
])
register_type_mapping(Name, name_type)
@validate
def process_names(df: DataFrame["person": Name]) -> DataFrame["full_name": str]:
return df.select(concat(col("person.first"), lit(" "), col("person.last")).alias("full_name"))
Parameters:
python_type(type) - Python type or class to registerspark_type(pyspark.sql.types.DataType) - Corresponding Spark DataType instance
Use Cases:
- Complex nested structures using dataclasses
- Custom business domain types
- Third-party type integration
infer_dataframe_annotation(df)
Generate DataFrame type annotation strings from existing DataFrames.
df = spark.createDataFrame([
(1, "Alice", 25.5),
(2, "Bob", 30.0)
], ["id", "name", "score"])
annotation = infer_dataframe_annotation(df)
# Returns: 'DataFrame["id": int, "name": str, "score": float]'
Parameters:
df(pyspark.sql.DataFrame) - DataFrame to analyze
Returns:
str- Type annotation string ready for copy-paste into code
Use Cases:
- Reverse engineering schemas from existing DataFrames
- Generating boilerplate for new functions
- Documentation and debugging
Classes
TypedDataFrame
Alternative name for DataFrame type annotations, providing explicit typing semantics.
from sparkenforce import TypedDataFrame
@validate
def process(data: TypedDataFrame["id": int, "name": str]) -> TypedDataFrame["result": str]:
# Functionally identical to DataFrame[...] but more explicit
return spark.createDataFrame([("success",)], ["result"])
Usage:
- Drop-in replacement for
DataFrame[...]annotations - Provides clearer semantic meaning in domain-specific code
- Same validation behavior as DataFrame
Exceptions
DataFrameValidationError
Raised when DataFrame schema validation fails.
class DataFrameValidationError(TypeError):
"""Raised when DataFrame validation fails."""
Common Scenarios:
Missing Columns:
DataFrameValidationError: argument 'df' is missing required columns: {'name'}
Type Mismatches:
DataFrameValidationError: argument 'df' column 'age' has incorrect type. Expected LongType(), got StringType()
Return Value Errors:
DataFrameValidationError: return value must be a PySpark DataFrame, got <class 'str'>
Advanced Usage
Optional Columns
Use typing.Optional to mark columns as not always present:
from typing import Optional
@validate
def process(df: DataFrame["id": int, "name": Optional[str]]) -> DataFrame["result": str]:
# 'name' column may be missing; function should handle that case
if 'name' in df.columns:
return df.select(fn.concat(col("name"), fn.lit(" processed")).alias("result"))
else:
return df.select(fn.lit("no name").alias("result"))
Flexible Schemas
Use ellipsis (...) for minimum column requirements:
@validate
def add_metadata(df: DataFrame["id": int, ...]) -> DataFrame["id": int, "processed": bool, ...]:
# Input: requires 'id' column, allows others
# Output: guarantees 'id' and 'processed' columns, allows others
return df.withColumn("processed", lit(True))
Type Compatibility
sparkenforce provides intelligent type compatibility:
- Integer types (
ByteType,ShortType,IntegerType,LongType) are interchangeable - Float types (
FloatType,DoubleType) are interchangeable - Timestamp types (
TimestampType,TimestampNTZType) are interchangeable - String variants (
StringType,VarcharType) are interchangeable
Inspiration
This project builds on dataenforce, extending it with additional validation capabilities for PySpark DataFrame workflows.
License
Apache Software License v2.0
Contact
Created by Agustín Recoba
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sparkenforce-1.0.0.tar.gz.
File metadata
- Download URL: sparkenforce-1.0.0.tar.gz
- Upload date:
- Size: 27.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.24
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7eb8551aa8009352bcd8f35bb9d0510ef068533310b54cb576e80a542b7813ff
|
|
| MD5 |
67c13be97c963c18330727627dda610c
|
|
| BLAKE2b-256 |
f108f4a082e3419b8608aacc1510702ebae7cc4664ab0155cff257d2f373c0b8
|
File details
Details for the file sparkenforce-1.0.0-py3-none-any.whl.
File metadata
- Download URL: sparkenforce-1.0.0-py3-none-any.whl
- Upload date:
- Size: 17.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.24
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
70b6a5239bd20d60a74d18083b97fae3942af1d404fccdb2b26e569e68fefb1b
|
|
| MD5 |
e658730f3696d06ff074bda1ddd029d2
|
|
| BLAKE2b-256 |
699539aca98bbe61d589d45e244282260e1e9446432cfdb3f5e5df60c0ec1b48
|