Skip to main content

Type annotation system that allows you to specify and validate the schema of PySpark DataFrames using Python type hints for both function arguments and return values.

Project description

:rocket: sparkenforce

sparkenforce is a type annotation system that lets you specify and validate PySpark DataFrame schemas using Python type hints. It validates both function arguments and return values, catching schema mismatches before they cause runtime errors.

Why sparkenforce?

Working with PySpark DataFrames can be error-prone when schemas don't match expectations. sparkenforce helps by:

  • Preventing runtime errors: Catch schema mismatches early with type validation
  • Improving code clarity: Function signatures show exactly what DataFrame structure is expected
  • Enforcing contracts: Ensure functions return DataFrames with the promised schema
  • Better debugging: Clear error messages when validations fail

Installation

Install sparkenforce using pip:

pip install sparkenforce

Or if you're using uv:

uv add sparkenforce

Quick Start

Validating Input DataFrames

import sparkenforce
from pyspark.sql import functions as fn

@sparkenforce.validate
def transform_data(df: sparkenforce.Dataset['firstname':str, ...]) -> sparkenforce.Dataset['name':str, 'length':int]:
    """Transform DataFrame with validated input and output schemas."""
    return df.select(
        df.firstname.alias('name'),
        fn.length(df.firstname).alias('length')
    )

# If input DataFrame doesn't have 'firstname' column, validation fails
# If return DataFrame doesn't match expected schema, validation fails

Flexible Schemas with Ellipsis

Use ... to allow additional columns beyond the specified ones:

@sparkenforce.validate
def process_names(df: sparkenforce.Dataset['firstname':str, 'lastname':str, ...]):
    """Requires firstname and lastname, but allows other columns too."""
    return df.filter(df.firstname != "")

Return Value Validation

sparkenforce validates that your function returns exactly what you promise:

@sparkenforce.validate
def get_summary(df: sparkenforce.Dataset['firstname':str, ...]) -> sparkenforce.Dataset['firstname':str, 'summary':str, ...]:
    return df.select(
        'firstname',
        fn.lit('processed').alias('summary'),
        'lastname'  # Additional columns allowed with ...
    )

Error Handling

When validation fails, sparkenforce provides clear error messages:

# This will raise DatasetValidationError with detailed message:
# "return value columns mismatch. Expected exactly {'name', 'length'},
#  got {'lastname', 'firstname'}. missing columns: {'name', 'length'},
#  unexpected columns: {'lastname', 'firstname'}"

@sparkenforce.validate
def bad_function(df: sparkenforce.Dataset['firstname':str, ...]) -> sparkenforce.Dataset['name':str, 'length':int]:
    return df.select('firstname', 'lastname')  # Wrong columns!

Development Setup

Step 1: Create virtual environment

uv venv

Step 2: Activate environment

# Linux/Mac
source .venv/bin/activate

# Windows
.venv\Scripts\activate

Step 3: Install dependencies

uv sync

CLI Commands

# Run tests
task tests

# Type checking
task type

# Linting
task lint

# Format code
task format

# Coverage report
task coverage

Inspiration

This project builds on dataenforce, extending it with additional validation capabilities for PySpark DataFrame workflows.

License

Apache Software License v2.0

Contact

Created by Agustín Recoba

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sparkenforce-0.1.1.tar.gz (22.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sparkenforce-0.1.1-py3-none-any.whl (14.9 kB view details)

Uploaded Python 3

File details

Details for the file sparkenforce-0.1.1.tar.gz.

File metadata

  • Download URL: sparkenforce-0.1.1.tar.gz
  • Upload date:
  • Size: 22.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.24

File hashes

Hashes for sparkenforce-0.1.1.tar.gz
Algorithm Hash digest
SHA256 515eab444932ce282f3cd6a03a9351a22bed6e033ad102d24a4d87f9777cbaf4
MD5 6c6ac8720c03af837a82a44c8b0cf2e3
BLAKE2b-256 41897a1db06eaa4fb7432e7276e3e1698545362d6c269a179af5021c95167710

See more details on using hashes here.

File details

Details for the file sparkenforce-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for sparkenforce-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 78a484138e8f23130bde9735f287f7dd42ab04af5c3111858e873c017a73b460
MD5 a03a9cd9e9c46775ebe3f75b865e66be
BLAKE2b-256 9a590c329cefd41cb99ec9fc4666e65234f391e55225f58f019621a32edf2fd0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page