Easy testing with sparkless or PySpark on demand
This project has been archived.
The maintainers of this project have marked this project as archived. No new releases are expected.
Project description
sparkless-testing
Easy testing with sparkless or PySpark on demand
A Python package that simplifies running tests with either sparkless (mock) or PySpark (real) engines on demand. Write tests once, run them with either engine seamlessly.
Why sparkless-testing?
- 🚀 Fast Mock Testing: Use
sparklessfor lightning-fast unit tests without JVM overhead - 🔄 Real Engine Validation: Test against real PySpark to catch integration issues
- 🎯 Single Test Suite: Write tests once, run with both engines automatically
- 🔧 Zero Configuration: Automatic engine detection and session management
- ⚡ Parallel Ready: Optimized for parallel test execution with pytest-xdist
Features
- Automatic Engine Detection - Automatically detects and configures available engines
- Pytest Fixtures - Ready-to-use fixtures for both engines
- Session Management - Automatic session creation, cleanup, and isolation
- Test Utilities - Helpers for common test patterns
- Parametrization Support - Run tests with both engines automatically
- Parallel Testing Support - Optimized for parallel test execution with pytest-xdist
Installation
pip install sparkless-testing
For specific engines:
# Install with mock engine (sparkless)
pip install sparkless-testing[mock]
# Install with PySpark engine
pip install sparkless-testing[pyspark]
# Install with Delta Lake support
pip install sparkless-testing[delta]
# Install with all engines
pip install sparkless-testing[dev]
Quick Start
Basic Usage
import pytest
from sparkless_testing import pytest_fixtures
# Use the spark_session fixture
def test_my_function(spark_session):
df = spark_session.createDataFrame([{"id": 1, "name": "Alice"}])
assert df.count() == 1
assert df.collect()[0]["name"] == "Alice"
Explicit Engine Selection
def test_mock_only(mock_spark_session):
# Only runs with sparkless
df = mock_spark_session.createDataFrame([{"id": 1}])
assert df.count() == 1
def test_pyspark_only(pyspark_session):
# Only runs with PySpark
df = pyspark_session.createDataFrame([{"id": 1}])
assert df.count() == 1
Using Functions and Types
def test_with_functions(spark_session, spark_functions, spark_types):
F = spark_functions
Types = spark_types
df = spark_session.createDataFrame([{"name": "Alice"}])
result = df.select(F.upper(F.col("name"))).collect()
assert result[0][0] == "ALICE"
Note: Always use spark_functions together with spark_session. PySpark functions like F.col() require an active SparkContext, which is provided by the spark_session fixture.
Parametrized Tests (Both Engines)
You can run tests with both engines in two ways:
Option 1: Using the decorator
@pytest.mark.parametrize_engines
def test_both_engines(spark_session):
# Runs with both engines automatically
df = spark_session.createDataFrame([{"id": 1}])
assert df.count() == 1
Option 2: Using the both_engines fixture
def test_both_engines(both_engines):
spark, engine_type = both_engines
# Runs with both engines automatically
df = spark.createDataFrame([{"id": 1}])
assert df.count() == 1
# engine_type will be EngineType.MOCK or EngineType.PYSPARK
Environment-Based Configuration
# Run tests with mock engine
SPARK_MODE=mock pytest tests/
# Run tests with PySpark engine
SPARK_MODE=pyspark pytest tests/
# Auto-detect (default)
SPARK_MODE=auto pytest tests/
Pytest Markers
@pytest.mark.spark_engine("mock")
def test_mock_specific(spark_session):
# Forces mock engine
pass
@pytest.mark.spark_engine("pyspark")
def test_pyspark_specific(spark_session):
# Forces PySpark engine
pass
Configuration
Environment Variables
SPARK_MODE:mock,pyspark, orauto(default:auto)SPARK_TEST_WAREHOUSE_DIR: Custom warehouse directory for PySpark testsSPARK_TEST_APP_NAME: Custom app name prefix (default:sparkless-testing)
Programmatic Configuration
from sparkless_testing import (
EngineType,
SessionConfig,
auto_configure_engine,
create_session,
create_pyspark_session,
)
# Auto-configure based on availability
auto_configure_engine()
# Or specify preferred engine
auto_configure_engine(EngineType.MOCK)
# Create session manually
spark = create_session(engine_type=EngineType.MOCK)
# Create session with custom configuration
config = SessionConfig(
app_name="my-test",
enable_delta=True,
shuffle_partitions=2,
)
spark = create_pyspark_session(config=config)
# ... use spark ...
spark.stop()
API Reference
Engine Configuration
EngineType: Enum for engine types (MOCK,PYSPARK,AUTO)configure_engine(): Manually configure engine componentsget_engine(): Get current engine configurationdetect_available_engines(): Check which engines are installedauto_configure_engine(): Automatically configure based on availability
Session Factory
create_mock_session(): Create sparkless sessioncreate_pyspark_session(): Create PySpark sessioncreate_session(): Factory function that creates appropriate sessionSessionConfig: Configuration dataclass for session creationapp_name: Application name for the Spark sessionwarehouse_dir: Custom warehouse directory (optional)enable_delta: Enable Delta Lake support (default:Truefor programmatic use,Falsein test fixtures)enable_ui: Enable Spark UI (default:False)shuffle_partitions: Number of shuffle partitions (default:1)parallelism: Default parallelism (default:1)adaptive_enabled: Enable adaptive query execution (default:False)
Test Utilities
detect_spark_type(spark): Detect if session is PySpark or mockcreate_test_dataframe(spark, data, schema): Compatibility wrapper for DataFrame creationis_dataframe_like(obj): Check if object is DataFrame-like
Pytest Fixtures
spark_session: Main fixture (auto-detects engine based onSPARK_MODEor markers)mock_spark_session: Explicitly use sparkless (mock) enginepyspark_session: Explicitly use PySpark (real) engineboth_engines: Fixture that yields(spark, engine_type)for both engines (parametrized)spark_functions: Functions module (F) for current enginespark_types: Types module for current enginespark_engine_type: Current engine type as string ("mock"or"pyspark")
Note: Test fixtures automatically disable Delta Lake for parallel testing compatibility. To use Delta Lake, create sessions programmatically with enable_delta=True in SessionConfig.
Advanced Usage
Custom Session Configuration
from sparkless_testing import SessionConfig, create_pyspark_session
def test_with_custom_config():
config = SessionConfig(
app_name="custom-test",
warehouse_dir="/tmp/my-warehouse",
enable_delta=True,
enable_ui=True,
shuffle_partitions=4,
parallelism=4,
)
spark = create_pyspark_session(config=config)
# ... your test code ...
spark.stop()
Parallel Test Execution
# Run tests in parallel with pytest-xdist
pytest -n 2 # Use 2 workers
pytest -n auto # Auto-detect worker count
Best Practices for Parallel Testing:
- Mock (sparkless) tests can run with high parallelism (
-n 10+) - PySpark tests work best with fewer workers (
-n 2or-n 4) - Test fixtures automatically disable Delta Lake to avoid conflicts
- Each test gets a unique session name and warehouse directory
Using Test Utilities
from sparkless_testing.utils import create_test_dataframe, detect_spark_type
def test_with_utilities(spark_session):
# Create DataFrame with schema handling
data = [("Alice", 25), ("Bob", 30)]
schema = ["name", "age"]
df = create_test_dataframe(spark_session, data, schema)
# Detect engine type
engine = detect_spark_type(spark_session)
assert engine in ("mock", "pyspark")
Examples
Complete Test Example
import pytest
from sparkless_testing.pytest_fixtures import spark_session, spark_functions
def test_data_transformation(spark_session, spark_functions):
F = spark_functions
# Create test data
data = [
{"id": 1, "name": "Alice", "age": 25},
{"id": 2, "name": "Bob", "age": 30},
]
df = spark_session.createDataFrame(data)
# Transform data
result = (
df.filter(F.col("age") > 25)
.select("name", "age")
.collect()
)
# Assertions
assert len(result) == 1
assert result[0]["name"] == "Bob"
Using with Both Engines
@pytest.mark.parametrize_engines
def test_compatibility(spark_session, spark_engine_type):
# This test runs with both engines
df = spark_session.createDataFrame([{"value": 42}])
# Engine-specific logic if needed
if spark_engine_type == "mock":
# Mock-specific assertions
pass
else:
# PySpark-specific assertions
pass
assert df.count() == 1
Troubleshooting
Tests Hang with Parallel Execution
If tests hang when using pytest -n, try:
- Use fewer workers:
pytest -n 2instead ofpytest -n 10 - Run PySpark tests sequentially:
pytest -n 0for PySpark-only tests - Use mock engine for faster parallel execution:
SPARK_MODE=mock pytest -n 10
Delta Lake Not Available
Delta Lake is disabled by default in test fixtures. To enable:
from sparkless_testing import SessionConfig, create_pyspark_session
config = SessionConfig(app_name="test", enable_delta=True)
spark = create_pyspark_session(config=config)
SparkContext Errors
Always use spark_functions together with spark_session:
def test_correct(spark_session, spark_functions): # ✅ Correct
F = spark_functions
df = spark_session.createDataFrame([{"x": 1}])
result = df.select(F.col("x")).collect()
def test_incorrect(spark_functions): # ❌ May fail with PySpark
F = spark_functions
# Missing active SparkContext
Migration Guide
From Manual Engine Switching
Before:
import os
from sparkless import SparkSession as MockSparkSession
from pyspark.sql import SparkSession as PySparkSession
def test_my_function():
if os.environ.get("SPARK_MODE") == "mock":
spark = MockSparkSession("test")
else:
spark = PySparkSession.builder.appName("test").getOrCreate()
# ... test code ...
spark.stop()
After:
from sparkless_testing.pytest_fixtures import spark_session
def test_my_function(spark_session):
# ... test code ...
# spark_session is automatically cleaned up
Requirements
- Python 3.9+
- pytest 7.0+
Optional:
- sparkless>=3.19.0 (for mock engine)
- pyspark>=3.5.0 (for real engine)
- delta-spark>=3.0.0 (for Delta Lake support)
- pytest-xdist (for parallel test execution)
License
MIT License - see LICENSE file for details.
Links
- GitHub: github.com/eddiethedean/sparkless-testing
- Issues: github.com/eddiethedean/sparkless-testing/issues
Made with ❤️ for the data engineering community
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sparkless_testing-0.1.0.tar.gz.
File metadata
- Download URL: sparkless_testing-0.1.0.tar.gz
- Upload date:
- Size: 31.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8a8a8f93a8de513f2ccab56a724b1555bf13c3c99d09123869a144f942cfcc6a
|
|
| MD5 |
7444aa8439febe74b5b791a3f9569f30
|
|
| BLAKE2b-256 |
f1436a6f4e12445e4208a40d6afe77b9fefe6f143162c03cb488b6139606d6e3
|
File details
Details for the file sparkless_testing-0.1.0-py3-none-any.whl.
File metadata
- Download URL: sparkless_testing-0.1.0-py3-none-any.whl
- Upload date:
- Size: 20.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3f8efb32ca1180f53f31070125248d0d5efd22c445491b3cebfa9c1e29865dad
|
|
| MD5 |
ae7d191b3aa07e55b0753d2a1cc28df3
|
|
| BLAKE2b-256 |
77d3fe30e4cb8f76dad2bb5505f88e59f36b919df75036df051c5d49baa5b889
|