Skip to main content

Type annotation system that allows you to specify and validate the schema of PySpark DataFrames using Python type hints for both function arguments and return values.

Project description

:rocket: sparkenforce

sparkenforce is a type annotation system that lets you specify and validate PySpark DataFrame schemas using Python type hints. It validates both function arguments and return values, catching schema mismatches before they cause runtime errors.

Why sparkenforce?

Working with PySpark DataFrames can be error-prone when schemas don't match expectations. sparkenforce helps by:

  • Preventing runtime errors: Catch schema mismatches early with type validation
  • Improving code clarity: Function signatures show exactly what DataFrame structure is expected
  • Enforcing contracts: Ensure functions return DataFrames with the promised schema
  • Better debugging: Clear error messages when validations fail

Installation

Install sparkenforce using pip:

pip install sparkenforce

Or if you're using uv:

uv add sparkenforce

Quick Start

Validating Input DataFrames

import sparkenforce
from pyspark.sql import functions as F

@sparkenforce.validate
def transform_data(df: sparkenforce.Dataset['firstname':str, ...]) -> sparkenforce.Dataset['name':str, 'length':int]:
    """Transform DataFrame with validated input and output schemas."""
    return df.select(
        df.firstname.alias('name'),
        F.length(df.firstname).alias('length')
    )

# If input DataFrame doesn't have 'firstname' column, validation fails
# If return DataFrame doesn't match expected schema, validation fails

Flexible Schemas with Ellipsis

Use ... to allow additional columns beyond the specified ones:

@sparkenforce.validate
def process_names(df: sparkenforce.Dataset['firstname':str, 'lastname':str, ...]):
    """Requires firstname and lastname, but allows other columns too."""
    return df.filter(df.firstname != "")

Return Value Validation

sparkenforce validates that your function returns exactly what you promise:

@sparkenforce.validate
def get_summary(df: sparkenforce.Dataset['firstname':str, ...]) -> sparkenforce.Dataset['firstname':str, 'summary':str, ...]:
    return df.select(
        'firstname',
        F.lit('processed').alias('summary'),
        'lastname'  # Additional columns allowed with ...
    )

Error Handling

When validation fails, sparkenforce provides clear error messages:

# This will raise DatasetValidationError with detailed message:
# "return value columns mismatch. Expected exactly {'name', 'length'}, 
#  got {'lastname', 'firstname'}. missing columns: {'name', 'length'}, 
#  unexpected columns: {'lastname', 'firstname'}"

@sparkenforce.validate  
def bad_function(df: sparkenforce.Dataset['firstname':str, ...]) -> sparkenforce.Dataset['name':str, 'length':int]:
    return df.select('firstname', 'lastname')  # Wrong columns!

Development Setup

Step 1: Create virtual environment

uv venv

Step 2: Activate environment

# Linux/Mac
source .venv/bin/activate

# Windows  
.venv\Scripts\activate

Step 3: Install dependencies

uv sync

CLI Commands

# Run tests
task tests

# Type checking
task type

# Linting
task lint

# Format code
task format

# Coverage report
task coverage

Inspiration

This project builds on dataenforce, extending it with additional validation capabilities for PySpark DataFrame workflows.

License

Apache Software License v2.0

Contact

Created by Agustín Recoba

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sparkenforce-0.1.0.tar.gz (22.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sparkenforce-0.1.0-py3-none-any.whl (14.9 kB view details)

Uploaded Python 3

File details

Details for the file sparkenforce-0.1.0.tar.gz.

File metadata

  • Download URL: sparkenforce-0.1.0.tar.gz
  • Upload date:
  • Size: 22.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.24

File hashes

Hashes for sparkenforce-0.1.0.tar.gz
Algorithm Hash digest
SHA256 73cadf68df9cfee40f925d9eda8406ca9fd25624cc349384ca6a3960123aa3ce
MD5 43b47700f4aa1af185e35b241d3c564b
BLAKE2b-256 4dedf7fb9c7e9b145a166024d7e8b43593e024345bd1e90590ddd5ab1f457f7d

See more details on using hashes here.

File details

Details for the file sparkenforce-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for sparkenforce-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 cde610d2cdf046e6f9f4a752a7124c91884c1fed5d1eb1d51e351a992c33a3d1
MD5 fa06b8f65fdcd4e74d23d5cb48660d92
BLAKE2b-256 9fe7a5e46b69692531777b24ba1924635be80448a3cd041e2247e39a784f5ed3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page