Skip to main content

Type annotation system that allows you to specify and validate the schema of PySpark DataFrames using Python type hints for both function arguments and return values.

Project description

PyPI - Version GitHub Actions Workflow Status GitHub Actions Workflow Status Coverage Status CodeFactor

sparkenforce

sparkenforce is a type annotation system that lets you specify and validate PySpark DataFrame schemas using Python type hints. It validates both function arguments and return values, catching schema mismatches before they cause runtime errors.

Why sparkenforce?

Working with PySpark DataFrames can be error-prone when schemas don't match expectations. sparkenforce helps by:

  • Preventing runtime errors: Catch schema mismatches early
  • Improving code clarity: Function signatures show exactly what DataFrame structure is expected
  • Enforcing contracts: Ensure functions return DataFrames with the promised schema
  • Better debugging: Clear error messages when validations fail

Getting Started

Installation

Install sparkenforce using pip:

pip install sparkenforce

Or if you're using uv:

uv add sparkenforce

Validating Input DataFrames

from sparkenforce import validate
from pyspark.sql import functions as fn
from pyspark.sql import DataFrame

@validate
def add_length(df: DataFrame['firstname': str, ...]) -> DataFrame['name': str, 'length': int]:
    return df.select(
        df.firstname.alias('name'),
        fn.length(df.firstname).alias('length')
    )

# If input DataFrame doesn't have 'firstname' column, validation fails
# If return DataFrame doesn't match expected schema, validation fails

Flexible Schemas with Ellipsis

Use ... to allow additional columns beyond the specified ones:

@validate
def filter_names(df: DataFrame['firstname': str, 'lastname': str, ...]):
    """Requires firstname and lastname, but allows other columns too."""
    return df.filter(df.firstname != "")

Return Value Validation

sparkenforce validates that your function returns exactly what you promise:

@validate
def get_summary(df: DataFrame['firstname': str, ...]) -> DataFrame['firstname': str, 'summary': str]:
    return df.select(
        'firstname',
        fn.lit('processed').alias('summary'),
    )

Error Handling

When validation fails, sparkenforce provides clear error messages:

# This will raise DataFrameValidationError with detailed message:
# "return value columns mismatch. Expected exactly {'name', 'length'},
#  got {'lastname', 'firstname'}. missing columns: {'name', 'length'},
#  unexpected columns: {'lastname', 'firstname'}"

@validate
def bad_function(df: DataFrame['firstname': str, ...]) -> DataFrame['name': str, 'length': int]:
    return df.select('firstname', 'lastname')  # Wrong columns!

More Examples

Check out the examples notebook.

API Reference

Core Components

@validate Decorator

The main decorator for enabling DataFrame schema validation on functions.

@validate
def process_data(df: DataFrame["id": int, "name": str]) -> DataFrame["result": str]:
    return spark.createDataFrame([("processed",)], ["result"])

Signature: validate(func: Callable) -> Callable

Parameters:

  • func - Function to decorate with validation logic

Returns:

  • Wrapped function that validates DataFrame arguments and return values

Raises:

  • DataFrameValidationError - When schema validation fails

Validation Rules:

  • Validates all function parameters annotated with DataFrame[...] types
  • Validates return values if annotated with DataFrame[...] types
  • Functions without DataFrame annotations are not validated
  • Return type None or no return annotation skips return validation

DataFrame Type Annotations

sparkenforce extends PySpark's DataFrame class to support schema specifications using subscript notation.

Column-Only Validation

DataFrame["id", "name"]       # Requires exactly these columns
DataFrame["id", "name", ...]  # Requires at least these columns

Column + Type Validation

DataFrame["id": int, "name": str]            # Exact columns with types
DataFrame["id": int, "name": str, ...]       # Minimum columns with types
DataFrame["id": int, "name": Optional[str]]  # Optional columns (may not be present)

Supported Types

Python Types:

  • intLongType (with compatibility for IntegerType, ShortType, ByteType)
  • strStringType
  • floatDoubleType (with compatibility for FloatType)
  • boolBooleanType
  • datetime.datetimeTimestampType
  • datetime.dateDateType
  • decimal.DecimalDecimalType
  • bytearrayBinaryType

Spark Types: Any pyspark.sql.types.DataType subclass can be used directly:

from pyspark.sql.types import IntegerType, StringType
DataFrame["id": IntegerType, "name": StringType]

Custom Types: Register custom type mappings for complex types:

@dataclass
class Person:
    name: str
    age: int

person_struct = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

register_type_mapping(Person, person_struct)
DataFrame["person": Person]  # Now supported

Functions

register_type_mapping(python_type, spark_type)

Register custom mappings between Python types and Spark DataTypes.

from dataclasses import dataclass
from pyspark.sql.types import StructType, StructField, StringType

@dataclass
class Name:
    first: str
    last: str

name_type = StructType([
    StructField("first", StringType(), True),
    StructField("last", StringType(), True)
])

register_type_mapping(Name, name_type)

@validate
def process_names(df: DataFrame["person": Name]) -> DataFrame["full_name": str]:
    return df.select(concat(col("person.first"), lit(" "), col("person.last")).alias("full_name"))

Parameters:

  • python_type (type) - Python type or class to register
  • spark_type (pyspark.sql.types.DataType) - Corresponding Spark DataType instance

Use Cases:

  • Complex nested structures using dataclasses
  • Custom business domain types
  • Third-party type integration

infer_dataframe_annotation(df)

Generate DataFrame type annotation strings from existing DataFrames.

df = spark.createDataFrame([
    (1, "Alice", 25.5),
    (2, "Bob", 30.0)
], ["id", "name", "score"])

annotation = infer_dataframe_annotation(df)
# Returns: 'DataFrame["id": int, "name": str, "score": float]'

Parameters:

  • df (pyspark.sql.DataFrame) - DataFrame to analyze

Returns:

  • str - Type annotation string ready for copy-paste into code

Use Cases:

  • Reverse engineering schemas from existing DataFrames
  • Generating boilerplate for new functions
  • Documentation and debugging

Classes

TypedDataFrame

Alternative name for DataFrame type annotations, providing explicit typing semantics.

from sparkenforce import TypedDataFrame

@validate
def process(data: TypedDataFrame["id": int, "name": str]) -> TypedDataFrame["result": str]:
    # Functionally identical to DataFrame[...] but more explicit
    return spark.createDataFrame([("success",)], ["result"])

Usage:

  • Drop-in replacement for DataFrame[...] annotations
  • Provides clearer semantic meaning in domain-specific code
  • Same validation behavior as DataFrame

Exceptions

DataFrameValidationError

Raised when DataFrame schema validation fails.

class DataFrameValidationError(TypeError):
    """Raised when DataFrame validation fails."""

Common Scenarios:

Missing Columns:

DataFrameValidationError: argument 'df' is missing required columns: {'name'}

Type Mismatches:

DataFrameValidationError: argument 'df' column 'age' has incorrect type. Expected LongType(), got StringType()

Return Value Errors:

DataFrameValidationError: return value must be a PySpark DataFrame, got <class 'str'>

Advanced Usage

Optional Columns

Use typing.Optional to mark columns as not always present:

from typing import Optional

@validate
def process(df: DataFrame["id": int, "name": Optional[str]]) -> DataFrame["result": str]:
    # 'name' column may be missing; function should handle that case
    if 'name' in df.columns:
        return df.select(fn.concat(col("name"), fn.lit(" processed")).alias("result"))
    else:
        return df.select(fn.lit("no name").alias("result"))

Flexible Schemas

Use ellipsis (...) for minimum column requirements:

@validate
def add_metadata(df: DataFrame["id": int, ...]) -> DataFrame["id": int, "processed": bool, ...]:
    # Input: requires 'id' column, allows others
    # Output: guarantees 'id' and 'processed' columns, allows others
    return df.withColumn("processed", lit(True))

Type Compatibility

sparkenforce provides intelligent type compatibility:

  • Integer types (ByteType, ShortType, IntegerType, LongType) are interchangeable
  • Float types (FloatType, DoubleType) are interchangeable
  • Timestamp types (TimestampType, TimestampNTZType) are interchangeable
  • String variants (StringType, VarcharType) are interchangeable

Inspiration

This project builds on dataenforce, extending it with additional validation capabilities for PySpark DataFrame workflows.

License

Apache Software License v2.0

Contact

Created by Agustín Recoba

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sparkenforce-1.0.0.tar.gz (27.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sparkenforce-1.0.0-py3-none-any.whl (17.6 kB view details)

Uploaded Python 3

File details

Details for the file sparkenforce-1.0.0.tar.gz.

File metadata

  • Download URL: sparkenforce-1.0.0.tar.gz
  • Upload date:
  • Size: 27.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.24

File hashes

Hashes for sparkenforce-1.0.0.tar.gz
Algorithm Hash digest
SHA256 7eb8551aa8009352bcd8f35bb9d0510ef068533310b54cb576e80a542b7813ff
MD5 67c13be97c963c18330727627dda610c
BLAKE2b-256 f108f4a082e3419b8608aacc1510702ebae7cc4664ab0155cff257d2f373c0b8

See more details on using hashes here.

File details

Details for the file sparkenforce-1.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for sparkenforce-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 70b6a5239bd20d60a74d18083b97fae3942af1d404fccdb2b26e569e68fefb1b
MD5 e658730f3696d06ff074bda1ddd029d2
BLAKE2b-256 699539aca98bbe61d589d45e244282260e1e9446432cfdb3f5e5df60c0ec1b48

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page