Skip to main content

Type annotation system that allows you to specify and validate the schema of PySpark DataFrames using Python type hints for both function arguments and return values.

Project description

:rocket: sparkenforce

sparkenforce is a type annotation system that lets you specify and validate PySpark DataFrame schemas using Python type hints. It validates both function arguments and return values, catching schema mismatches before they cause runtime errors.

Why sparkenforce?

Working with PySpark DataFrames can be error-prone when schemas don't match expectations. sparkenforce helps by:

  • Preventing runtime errors: Catch schema mismatches early with type validation
  • Improving code clarity: Function signatures show exactly what DataFrame structure is expected
  • Enforcing contracts: Ensure functions return DataFrames with the promised schema
  • Better debugging: Clear error messages when validations fail

Installation

Install sparkenforce using pip:

pip install sparkenforce

Or if you're using uv:

uv add sparkenforce

Quick Start

Validating Input DataFrames

import sparkenforce
from pyspark.sql import functions as fn

@sparkenforce.validate
def transform_data(df: sparkenforce.Dataset['firstname':str, ...]) -> sparkenforce.Dataset['name':str, 'length':int]:
    """Transform DataFrame with validated input and output schemas."""
    return df.select(
        df.firstname.alias('name'),
        fn.length(df.firstname).alias('length')
    )

# If input DataFrame doesn't have 'firstname' column, validation fails
# If return DataFrame doesn't match expected schema, validation fails

Flexible Schemas with Ellipsis

Use ... to allow additional columns beyond the specified ones:

@sparkenforce.validate
def process_names(df: sparkenforce.Dataset['firstname':str, 'lastname':str, ...]):
    """Requires firstname and lastname, but allows other columns too."""
    return df.filter(df.firstname != "")

Return Value Validation

sparkenforce validates that your function returns exactly what you promise:

@sparkenforce.validate
def get_summary(df: sparkenforce.Dataset['firstname':str, ...]) -> sparkenforce.Dataset['firstname':str, 'summary':str, ...]:
    return df.select(
        'firstname',
        fn.lit('processed').alias('summary'),
        'lastname'  # Additional columns allowed with ...
    )

Error Handling

When validation fails, sparkenforce provides clear error messages:

# This will raise DatasetValidationError with detailed message:
# "return value columns mismatch. Expected exactly {'name', 'length'},
#  got {'lastname', 'firstname'}. missing columns: {'name', 'length'},
#  unexpected columns: {'lastname', 'firstname'}"

@sparkenforce.validate
def bad_function(df: sparkenforce.Dataset['firstname':str, ...]) -> sparkenforce.Dataset['name':str, 'length':int]:
    return df.select('firstname', 'lastname')  # Wrong columns!

Development Setup

Step 1: Create virtual environment

uv venv

Step 2: Activate environment

# Linux/Mac
source .venv/bin/activate

# Windows
.venv\Scripts\activate

Step 3: Install dependencies

uv sync

CLI Commands

# Run tests
task tests

# Type checking
task type

# Linting
task lint

# Format code
task format

# Coverage report
task coverage

Inspiration

This project builds on dataenforce, extending it with additional validation capabilities for PySpark DataFrame workflows.

License

Apache Software License v2.0

Contact

Created by Agustín Recoba

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sparkenforce-0.2.0.tar.gz (23.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sparkenforce-0.2.0-py3-none-any.whl (15.8 kB view details)

Uploaded Python 3

File details

Details for the file sparkenforce-0.2.0.tar.gz.

File metadata

  • Download URL: sparkenforce-0.2.0.tar.gz
  • Upload date:
  • Size: 23.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.24

File hashes

Hashes for sparkenforce-0.2.0.tar.gz
Algorithm Hash digest
SHA256 3b4af6fefc3b152673a47bfada60d63c20261b87516b1794183c07993cb2400a
MD5 d0bbd05c9792e4207a560de631579db2
BLAKE2b-256 32bf38d6b5c517dfb0befe74b693d6a4f3d18e218be2f15b4cc7f41cdb8f3ef2

See more details on using hashes here.

File details

Details for the file sparkenforce-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for sparkenforce-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ed5bdeeebf9f146f7eddaede03fc9de1bd85375bbd73f8ff146bebd4469e5f84
MD5 0cb6124f63ad25c55cca091db0dbb833
BLAKE2b-256 e9bba39d6dd7289e4dee279617f11473596a2a3e58027ba42818f39f462c37ea

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page