Skip to main content

Easy testing with sparkless or PySpark on demand

This project has been archived.

The maintainers of this project have marked this project as archived. No new releases are expected.

Project description

sparkless-testing

Easy testing with sparkless or PySpark on demand

A Python package that simplifies running tests with either sparkless (mock) or PySpark (real) engines on demand. Write tests once, run them with either engine seamlessly.

Why sparkless-testing?

  • 🚀 Fast Mock Testing: Use sparkless for lightning-fast unit tests without JVM overhead
  • 🔄 Real Engine Validation: Test against real PySpark to catch integration issues
  • 🎯 Single Test Suite: Write tests once, run with both engines automatically
  • 🔧 Zero Configuration: Automatic engine detection and session management
  • Parallel Ready: Optimized for parallel test execution with pytest-xdist

Features

  • Automatic Engine Detection - Automatically detects and configures available engines
  • Pytest Fixtures - Ready-to-use fixtures for both engines
  • Session Management - Automatic session creation, cleanup, and isolation
  • Test Utilities - Helpers for common test patterns
  • Parametrization Support - Run tests with both engines automatically
  • Parallel Testing Support - Optimized for parallel test execution with pytest-xdist

Installation

pip install sparkless-testing

For specific engines:

# Install with mock engine (sparkless)
pip install sparkless-testing[mock]

# Install with PySpark engine
pip install sparkless-testing[pyspark]

# Install with all engines
pip install sparkless-testing[dev]

Quick Start

Basic Usage

import pytest
from sparkless_testing import pytest_fixtures

# Use the spark_session fixture
def test_my_function(spark_session):
    df = spark_session.createDataFrame([{"id": 1, "name": "Alice"}])
    assert df.count() == 1
    assert df.collect()[0]["name"] == "Alice"

Explicit Engine Selection

def test_mock_only(mock_spark_session):
    # Only runs with sparkless
    df = mock_spark_session.createDataFrame([{"id": 1}])
    assert df.count() == 1

def test_pyspark_only(pyspark_session):
    # Only runs with PySpark
    df = pyspark_session.createDataFrame([{"id": 1}])
    assert df.count() == 1

Using Functions and Types

def test_with_functions(spark_session, spark_functions, spark_types):
    F = spark_functions
    Types = spark_types
    
    df = spark_session.createDataFrame([{"name": "Alice"}])
    result = df.select(F.upper(F.col("name"))).collect()
    assert result[0][0] == "ALICE"

Note: Always use spark_functions together with spark_session. PySpark functions like F.col() require an active SparkContext, which is provided by the spark_session fixture.

Parametrized Tests (Both Engines)

You can run tests with both engines in two ways:

Option 1: Using the decorator

@pytest.mark.parametrize_engines
def test_both_engines(spark_session):
    # Runs with both engines automatically
    df = spark_session.createDataFrame([{"id": 1}])
    assert df.count() == 1

Option 2: Using the both_engines fixture

def test_both_engines(both_engines):
    spark, engine_type = both_engines
    # Runs with both engines automatically
    df = spark.createDataFrame([{"id": 1}])
    assert df.count() == 1
    # engine_type will be EngineType.MOCK or EngineType.PYSPARK

Environment-Based Configuration

# Run tests with mock engine
SPARK_MODE=mock pytest tests/

# Run tests with PySpark engine
SPARK_MODE=pyspark pytest tests/

# Auto-detect (default)
SPARK_MODE=auto pytest tests/

Pytest Markers

@pytest.mark.spark_engine("mock")
def test_mock_specific(spark_session):
    # Forces mock engine
    pass

@pytest.mark.spark_engine("pyspark")
def test_pyspark_specific(spark_session):
    # Forces PySpark engine
    pass

Configuration

Environment Variables

  • SPARK_MODE: mock, pyspark, or auto (default: auto)
  • SPARK_TEST_WAREHOUSE_DIR: Custom warehouse directory for PySpark tests
  • SPARK_TEST_APP_NAME: Custom app name prefix (default: sparkless-testing)

Programmatic Configuration

from sparkless_testing import (
    EngineType,
    SessionConfig,
    auto_configure_engine,
    create_session,
    create_pyspark_session,
)

# Auto-configure based on availability
auto_configure_engine()

# Or specify preferred engine
auto_configure_engine(EngineType.MOCK)

# Create session manually
spark = create_session(engine_type=EngineType.MOCK)

# Create session with custom configuration
config = SessionConfig(
    app_name="my-test",
    shuffle_partitions=2,
)
spark = create_pyspark_session(config=config)
# ... use spark ...
spark.stop()

API Reference

Engine Configuration

  • EngineType: Enum for engine types (MOCK, PYSPARK, AUTO)
  • configure_engine(): Manually configure engine components
  • get_engine(): Get current engine configuration
  • detect_available_engines(): Check which engines are installed
  • auto_configure_engine(): Automatically configure based on availability

Session Factory

  • create_mock_session(): Create sparkless session
  • create_pyspark_session(): Create PySpark session
  • create_session(): Factory function that creates appropriate session
  • SessionConfig: Configuration dataclass for session creation
    • app_name: Application name for the Spark session
    • warehouse_dir: Custom warehouse directory (optional)
    • enable_ui: Enable Spark UI (default: False)
    • shuffle_partitions: Number of shuffle partitions (default: 1)
    • parallelism: Default parallelism (default: 1)
    • adaptive_enabled: Enable adaptive query execution (default: False)

Test Utilities

  • detect_spark_type(spark): Detect if session is PySpark or mock
  • create_test_dataframe(spark, data, schema): Compatibility wrapper for DataFrame creation
  • is_dataframe_like(obj): Check if object is DataFrame-like

Pytest Fixtures

  • spark_session: Main fixture (auto-detects engine based on SPARK_MODE or markers)
  • mock_spark_session: Explicitly use sparkless (mock) engine
  • pyspark_session: Explicitly use PySpark (real) engine
  • both_engines: Fixture that yields (spark, engine_type) for both engines (parametrized)
  • spark_functions: Functions module (F) for current engine
  • spark_types: Types module for current engine
  • spark_engine_type: Current engine type as string ("mock" or "pyspark")

Advanced Usage

Custom Session Configuration

from sparkless_testing import SessionConfig, create_pyspark_session

def test_with_custom_config():
    config = SessionConfig(
        app_name="custom-test",
        warehouse_dir="/tmp/my-warehouse",
        enable_ui=True,
        shuffle_partitions=4,
        parallelism=4,
    )
    spark = create_pyspark_session(config=config)
    # ... your test code ...
    spark.stop()

Parallel Test Execution

# Run tests in parallel with pytest-xdist
pytest -n 2  # Use 2 workers
pytest -n auto  # Auto-detect worker count

Best Practices for Parallel Testing:

  • Mock (sparkless) tests can run with high parallelism (-n 10+)
  • PySpark tests work best with fewer workers (-n 2 or -n 4)
  • Each test gets a unique session name and warehouse directory

Using Test Utilities

from sparkless_testing.utils import create_test_dataframe, detect_spark_type

def test_with_utilities(spark_session):
    # Create DataFrame with schema handling
    data = [("Alice", 25), ("Bob", 30)]
    schema = ["name", "age"]
    df = create_test_dataframe(spark_session, data, schema)
    
    # Detect engine type
    engine = detect_spark_type(spark_session)
    assert engine in ("mock", "pyspark")

Examples

Complete Test Example

import pytest
from sparkless_testing.pytest_fixtures import spark_session, spark_functions

def test_data_transformation(spark_session, spark_functions):
    F = spark_functions
    
    # Create test data
    data = [
        {"id": 1, "name": "Alice", "age": 25},
        {"id": 2, "name": "Bob", "age": 30},
    ]
    df = spark_session.createDataFrame(data)
    
    # Transform data
    result = (
        df.filter(F.col("age") > 25)
        .select("name", "age")
        .collect()
    )
    
    # Assertions
    assert len(result) == 1
    assert result[0]["name"] == "Bob"

Using with Both Engines

@pytest.mark.parametrize_engines
def test_compatibility(spark_session, spark_engine_type):
    # This test runs with both engines
    df = spark_session.createDataFrame([{"value": 42}])
    
    # Engine-specific logic if needed
    if spark_engine_type == "mock":
        # Mock-specific assertions
        pass
    else:
        # PySpark-specific assertions
        pass
    
    assert df.count() == 1

Troubleshooting

Tests Hang with Parallel Execution

If tests hang when using pytest -n, try:

  • Use fewer workers: pytest -n 2 instead of pytest -n 10
  • Run PySpark tests sequentially: pytest -n 0 for PySpark-only tests
  • Use mock engine for faster parallel execution: SPARK_MODE=mock pytest -n 10

SparkContext Errors

Always use spark_functions together with spark_session:

def test_correct(spark_session, spark_functions):  # ✅ Correct
    F = spark_functions
    df = spark_session.createDataFrame([{"x": 1}])
    result = df.select(F.col("x")).collect()

def test_incorrect(spark_functions):  # ❌ May fail with PySpark
    F = spark_functions
    # Missing active SparkContext

Migration Guide

From Manual Engine Switching

Before:

import os
from sparkless import SparkSession as MockSparkSession
from pyspark.sql import SparkSession as PySparkSession

def test_my_function():
    if os.environ.get("SPARK_MODE") == "mock":
        spark = MockSparkSession("test")
    else:
        spark = PySparkSession.builder.appName("test").getOrCreate()
    # ... test code ...
    spark.stop()

After:

from sparkless_testing.pytest_fixtures import spark_session

def test_my_function(spark_session):
    # ... test code ...
    # spark_session is automatically cleaned up

Requirements

  • Python 3.9+
  • pytest 7.0+

Optional:

  • sparkless>=3.19.0 (for mock engine)
  • pyspark>=3.5.0 (for real engine)
  • pytest-xdist (for parallel test execution)

License

MIT License - see LICENSE file for details.

Links


Made with ❤️ for the data engineering community

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sparkless_testing-0.2.0.tar.gz (30.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sparkless_testing-0.2.0-py3-none-any.whl (19.7 kB view details)

Uploaded Python 3

File details

Details for the file sparkless_testing-0.2.0.tar.gz.

File metadata

  • Download URL: sparkless_testing-0.2.0.tar.gz
  • Upload date:
  • Size: 30.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for sparkless_testing-0.2.0.tar.gz
Algorithm Hash digest
SHA256 3a59320dd810d12559780df608e1329d4b75d1c970221d86ec7a06f419cafb20
MD5 f1d797f88c3cc4872e98e2cfeabd8960
BLAKE2b-256 64d6106610c9d7f997ad3cf77040008b9461b7949606543f3f29ed327162ae26

See more details on using hashes here.

File details

Details for the file sparkless_testing-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for sparkless_testing-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 abebd0ded3eb6d68a8df96cd664cdf04c57bc6f0e54fd6cb087969479b707a0f
MD5 120e558e96a96272847ac214eff84b54
BLAKE2b-256 8cf66d1d4199a44063dc18a7717c0398e7c0e2db621f4710c7929015d1cb08d3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page