Skip to main content

Easy testing with sparkless or PySpark on demand

This project has been archived.

The maintainers of this project have marked this project as archived. No new releases are expected.

Project description

sparkless-testing

Easy testing with sparkless or PySpark on demand

A Python package that simplifies running tests with either sparkless (mock) or PySpark (real) engines on demand. Write tests once, run them with either engine seamlessly.

Why sparkless-testing?

  • 🚀 Fast Mock Testing: Use sparkless for lightning-fast unit tests without JVM overhead
  • 🔄 Real Engine Validation: Test against real PySpark to catch integration issues
  • 🎯 Single Test Suite: Write tests once, run with both engines automatically
  • 🔧 Zero Configuration: Automatic engine detection and session management
  • Parallel Ready: Optimized for parallel test execution with pytest-xdist

Features

  • Automatic Engine Detection - Automatically detects and configures available engines
  • Pytest Fixtures - Ready-to-use fixtures for both engines
  • Session Management - Automatic session creation, cleanup, and isolation
  • Test Utilities - Helpers for common test patterns
  • Parametrization Support - Run tests with both engines automatically
  • Parallel Testing Support - Optimized for parallel test execution with pytest-xdist

Installation

pip install sparkless-testing

For specific engines:

# Install with mock engine (sparkless)
pip install sparkless-testing[mock]

# Install with PySpark engine
pip install sparkless-testing[pyspark]

# Install with Delta Lake support
pip install sparkless-testing[delta]

# Install with all engines
pip install sparkless-testing[dev]

Quick Start

Basic Usage

import pytest
from sparkless_testing import pytest_fixtures

# Use the spark_session fixture
def test_my_function(spark_session):
    df = spark_session.createDataFrame([{"id": 1, "name": "Alice"}])
    assert df.count() == 1
    assert df.collect()[0]["name"] == "Alice"

Explicit Engine Selection

def test_mock_only(mock_spark_session):
    # Only runs with sparkless
    df = mock_spark_session.createDataFrame([{"id": 1}])
    assert df.count() == 1

def test_pyspark_only(pyspark_session):
    # Only runs with PySpark
    df = pyspark_session.createDataFrame([{"id": 1}])
    assert df.count() == 1

Using Functions and Types

def test_with_functions(spark_session, spark_functions, spark_types):
    F = spark_functions
    Types = spark_types
    
    df = spark_session.createDataFrame([{"name": "Alice"}])
    result = df.select(F.upper(F.col("name"))).collect()
    assert result[0][0] == "ALICE"

Note: Always use spark_functions together with spark_session. PySpark functions like F.col() require an active SparkContext, which is provided by the spark_session fixture.

Parametrized Tests (Both Engines)

You can run tests with both engines in two ways:

Option 1: Using the decorator

@pytest.mark.parametrize_engines
def test_both_engines(spark_session):
    # Runs with both engines automatically
    df = spark_session.createDataFrame([{"id": 1}])
    assert df.count() == 1

Option 2: Using the both_engines fixture

def test_both_engines(both_engines):
    spark, engine_type = both_engines
    # Runs with both engines automatically
    df = spark.createDataFrame([{"id": 1}])
    assert df.count() == 1
    # engine_type will be EngineType.MOCK or EngineType.PYSPARK

Environment-Based Configuration

# Run tests with mock engine
SPARK_MODE=mock pytest tests/

# Run tests with PySpark engine
SPARK_MODE=pyspark pytest tests/

# Auto-detect (default)
SPARK_MODE=auto pytest tests/

Pytest Markers

@pytest.mark.spark_engine("mock")
def test_mock_specific(spark_session):
    # Forces mock engine
    pass

@pytest.mark.spark_engine("pyspark")
def test_pyspark_specific(spark_session):
    # Forces PySpark engine
    pass

Configuration

Environment Variables

  • SPARK_MODE: mock, pyspark, or auto (default: auto)
  • SPARK_TEST_WAREHOUSE_DIR: Custom warehouse directory for PySpark tests
  • SPARK_TEST_APP_NAME: Custom app name prefix (default: sparkless-testing)

Programmatic Configuration

from sparkless_testing import (
    EngineType,
    SessionConfig,
    auto_configure_engine,
    create_session,
    create_pyspark_session,
)

# Auto-configure based on availability
auto_configure_engine()

# Or specify preferred engine
auto_configure_engine(EngineType.MOCK)

# Create session manually
spark = create_session(engine_type=EngineType.MOCK)

# Create session with custom configuration
config = SessionConfig(
    app_name="my-test",
    enable_delta=True,
    shuffle_partitions=2,
)
spark = create_pyspark_session(config=config)
# ... use spark ...
spark.stop()

API Reference

Engine Configuration

  • EngineType: Enum for engine types (MOCK, PYSPARK, AUTO)
  • configure_engine(): Manually configure engine components
  • get_engine(): Get current engine configuration
  • detect_available_engines(): Check which engines are installed
  • auto_configure_engine(): Automatically configure based on availability

Session Factory

  • create_mock_session(): Create sparkless session
  • create_pyspark_session(): Create PySpark session
  • create_session(): Factory function that creates appropriate session
  • SessionConfig: Configuration dataclass for session creation
    • app_name: Application name for the Spark session
    • warehouse_dir: Custom warehouse directory (optional)
    • enable_delta: Enable Delta Lake support (default: True for programmatic use, False in test fixtures)
    • enable_ui: Enable Spark UI (default: False)
    • shuffle_partitions: Number of shuffle partitions (default: 1)
    • parallelism: Default parallelism (default: 1)
    • adaptive_enabled: Enable adaptive query execution (default: False)

Test Utilities

  • detect_spark_type(spark): Detect if session is PySpark or mock
  • create_test_dataframe(spark, data, schema): Compatibility wrapper for DataFrame creation
  • is_dataframe_like(obj): Check if object is DataFrame-like

Pytest Fixtures

  • spark_session: Main fixture (auto-detects engine based on SPARK_MODE or markers)
  • mock_spark_session: Explicitly use sparkless (mock) engine
  • pyspark_session: Explicitly use PySpark (real) engine
  • both_engines: Fixture that yields (spark, engine_type) for both engines (parametrized)
  • spark_functions: Functions module (F) for current engine
  • spark_types: Types module for current engine
  • spark_engine_type: Current engine type as string ("mock" or "pyspark")

Note: Test fixtures automatically disable Delta Lake for parallel testing compatibility. To use Delta Lake, create sessions programmatically with enable_delta=True in SessionConfig.

Advanced Usage

Custom Session Configuration

from sparkless_testing import SessionConfig, create_pyspark_session

def test_with_custom_config():
    config = SessionConfig(
        app_name="custom-test",
        warehouse_dir="/tmp/my-warehouse",
        enable_delta=True,
        enable_ui=True,
        shuffle_partitions=4,
        parallelism=4,
    )
    spark = create_pyspark_session(config=config)
    # ... your test code ...
    spark.stop()

Parallel Test Execution

# Run tests in parallel with pytest-xdist
pytest -n 2  # Use 2 workers
pytest -n auto  # Auto-detect worker count

Best Practices for Parallel Testing:

  • Mock (sparkless) tests can run with high parallelism (-n 10+)
  • PySpark tests work best with fewer workers (-n 2 or -n 4)
  • Test fixtures automatically disable Delta Lake to avoid conflicts
  • Each test gets a unique session name and warehouse directory

Using Test Utilities

from sparkless_testing.utils import create_test_dataframe, detect_spark_type

def test_with_utilities(spark_session):
    # Create DataFrame with schema handling
    data = [("Alice", 25), ("Bob", 30)]
    schema = ["name", "age"]
    df = create_test_dataframe(spark_session, data, schema)
    
    # Detect engine type
    engine = detect_spark_type(spark_session)
    assert engine in ("mock", "pyspark")

Examples

Complete Test Example

import pytest
from sparkless_testing.pytest_fixtures import spark_session, spark_functions

def test_data_transformation(spark_session, spark_functions):
    F = spark_functions
    
    # Create test data
    data = [
        {"id": 1, "name": "Alice", "age": 25},
        {"id": 2, "name": "Bob", "age": 30},
    ]
    df = spark_session.createDataFrame(data)
    
    # Transform data
    result = (
        df.filter(F.col("age") > 25)
        .select("name", "age")
        .collect()
    )
    
    # Assertions
    assert len(result) == 1
    assert result[0]["name"] == "Bob"

Using with Both Engines

@pytest.mark.parametrize_engines
def test_compatibility(spark_session, spark_engine_type):
    # This test runs with both engines
    df = spark_session.createDataFrame([{"value": 42}])
    
    # Engine-specific logic if needed
    if spark_engine_type == "mock":
        # Mock-specific assertions
        pass
    else:
        # PySpark-specific assertions
        pass
    
    assert df.count() == 1

Troubleshooting

Tests Hang with Parallel Execution

If tests hang when using pytest -n, try:

  • Use fewer workers: pytest -n 2 instead of pytest -n 10
  • Run PySpark tests sequentially: pytest -n 0 for PySpark-only tests
  • Use mock engine for faster parallel execution: SPARK_MODE=mock pytest -n 10

Delta Lake Not Available

Delta Lake is disabled by default in test fixtures. To enable:

from sparkless_testing import SessionConfig, create_pyspark_session

config = SessionConfig(app_name="test", enable_delta=True)
spark = create_pyspark_session(config=config)

SparkContext Errors

Always use spark_functions together with spark_session:

def test_correct(spark_session, spark_functions):  # ✅ Correct
    F = spark_functions
    df = spark_session.createDataFrame([{"x": 1}])
    result = df.select(F.col("x")).collect()

def test_incorrect(spark_functions):  # ❌ May fail with PySpark
    F = spark_functions
    # Missing active SparkContext

Migration Guide

From Manual Engine Switching

Before:

import os
from sparkless import SparkSession as MockSparkSession
from pyspark.sql import SparkSession as PySparkSession

def test_my_function():
    if os.environ.get("SPARK_MODE") == "mock":
        spark = MockSparkSession("test")
    else:
        spark = PySparkSession.builder.appName("test").getOrCreate()
    # ... test code ...
    spark.stop()

After:

from sparkless_testing.pytest_fixtures import spark_session

def test_my_function(spark_session):
    # ... test code ...
    # spark_session is automatically cleaned up

Requirements

  • Python 3.9+
  • pytest 7.0+

Optional:

  • sparkless>=3.19.0 (for mock engine)
  • pyspark>=3.5.0 (for real engine)
  • delta-spark>=3.0.0 (for Delta Lake support)
  • pytest-xdist (for parallel test execution)

License

MIT License - see LICENSE file for details.

Links


Made with ❤️ for the data engineering community

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sparkless_testing-0.1.0.tar.gz (31.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sparkless_testing-0.1.0-py3-none-any.whl (20.2 kB view details)

Uploaded Python 3

File details

Details for the file sparkless_testing-0.1.0.tar.gz.

File metadata

  • Download URL: sparkless_testing-0.1.0.tar.gz
  • Upload date:
  • Size: 31.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for sparkless_testing-0.1.0.tar.gz
Algorithm Hash digest
SHA256 8a8a8f93a8de513f2ccab56a724b1555bf13c3c99d09123869a144f942cfcc6a
MD5 7444aa8439febe74b5b791a3f9569f30
BLAKE2b-256 f1436a6f4e12445e4208a40d6afe77b9fefe6f143162c03cb488b6139606d6e3

See more details on using hashes here.

File details

Details for the file sparkless_testing-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for sparkless_testing-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3f8efb32ca1180f53f31070125248d0d5efd22c445491b3cebfa9c1e29865dad
MD5 ae7d191b3aa07e55b0753d2a1cc28df3
BLAKE2b-256 77d3fe30e4cb8f76dad2bb5505f88e59f36b919df75036df051c5d49baa5b889

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page