Skip to main content

Type annotation system that allows you to specify and validate the schema of PySpark DataFrames using Python type hints for both function arguments and return values.

Project description

:rocket: sparkenforce

sparkenforce is a type annotation system that lets you specify and validate PySpark DataFrame schemas using Python type hints. It validates both function arguments and return values, catching schema mismatches before they cause runtime errors.

Why sparkenforce?

Working with PySpark DataFrames can be error-prone when schemas don't match expectations. sparkenforce helps by:

  • Preventing runtime errors: Catch schema mismatches early with type validation
  • Improving code clarity: Function signatures show exactly what DataFrame structure is expected
  • Enforcing contracts: Ensure functions return DataFrames with the promised schema
  • Better debugging: Clear error messages when validations fail

Installation

Install sparkenforce using pip:

pip install sparkenforce

Or if you're using uv:

uv add sparkenforce

Quick Start

Validating Input DataFrames

import sparkenforce
from pyspark.sql import functions as fn

@sparkenforce.validate
def transform_data(df: sparkenforce.Dataset['firstname':str, ...]) -> sparkenforce.Dataset['name':str, 'length':int]:
    """Transform DataFrame with validated input and output schemas."""
    return df.select(
        df.firstname.alias('name'),
        fn.length(df.firstname).alias('length')
    )

# If input DataFrame doesn't have 'firstname' column, validation fails
# If return DataFrame doesn't match expected schema, validation fails

Flexible Schemas with Ellipsis

Use ... to allow additional columns beyond the specified ones:

@sparkenforce.validate
def process_names(df: sparkenforce.Dataset['firstname':str, 'lastname':str, ...]):
    """Requires firstname and lastname, but allows other columns too."""
    return df.filter(df.firstname != "")

Return Value Validation

sparkenforce validates that your function returns exactly what you promise:

@sparkenforce.validate
def get_summary(df: sparkenforce.Dataset['firstname':str, ...]) -> sparkenforce.Dataset['firstname':str, 'summary':str, ...]:
    return df.select(
        'firstname',
        fn.lit('processed').alias('summary'),
        'lastname'  # Additional columns allowed with ...
    )

Error Handling

When validation fails, sparkenforce provides clear error messages:

# This will raise DatasetValidationError with detailed message:
# "return value columns mismatch. Expected exactly {'name', 'length'},
#  got {'lastname', 'firstname'}. missing columns: {'name', 'length'},
#  unexpected columns: {'lastname', 'firstname'}"

@sparkenforce.validate
def bad_function(df: sparkenforce.Dataset['firstname':str, ...]) -> sparkenforce.Dataset['name':str, 'length':int]:
    return df.select('firstname', 'lastname')  # Wrong columns!

Development Setup

Step 1: Create virtual environment

uv venv

Step 2: Activate environment

# Linux/Mac
source .venv/bin/activate

# Windows
.venv\Scripts\activate

Step 3: Install dependencies

uv sync

CLI Commands

# Run tests
task tests

# Type checking
task type

# Linting
task lint

# Format code
task format

# Coverage report
task coverage

Inspiration

This project builds on dataenforce, extending it with additional validation capabilities for PySpark DataFrame workflows.

License

Apache Software License v2.0

Contact

Created by Agustín Recoba

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sparkenforce-0.1.3.tar.gz (22.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sparkenforce-0.1.3-py3-none-any.whl (14.9 kB view details)

Uploaded Python 3

File details

Details for the file sparkenforce-0.1.3.tar.gz.

File metadata

  • Download URL: sparkenforce-0.1.3.tar.gz
  • Upload date:
  • Size: 22.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.24

File hashes

Hashes for sparkenforce-0.1.3.tar.gz
Algorithm Hash digest
SHA256 f38b9e4a142d66a55da2aa51290df92dbca0a6c37a68d145914bfd7bfb93b0f1
MD5 f75bf5a625e1e6ec5cf66d80ada32324
BLAKE2b-256 0a6afc486bb3e614dd7940ae7a8b523a97da8fcc1eabc6ace6076275d1c27039

See more details on using hashes here.

File details

Details for the file sparkenforce-0.1.3-py3-none-any.whl.

File metadata

File hashes

Hashes for sparkenforce-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 8b06c809fdcc0dd4a5c93957c58cdc0454ea393bb990ad4090cc3de34190b3c4
MD5 bda774d2323cf72c63fb8a588f7b30c3
BLAKE2b-256 ff0e9304ed43387ef4c72b8e0a92a29338a2b143214c30af7037e9518f945d17

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page