Skip to main content

Lightning-fast PySpark testing without JVM - 10x faster with 100% API compatibility

This project has been archived.

The maintainers of this project have marked this project as archived. No new releases are expected.

Project description

Mock Spark

🚀 Test PySpark code at lightning speed—no JVM required

Python 3.9+ PySpark 3.2-3.5 License: MIT PyPI version Tests Type Checked Code Style

⚡ 10x faster tests • 🎯 Drop-in PySpark replacement • 📦 Zero JVM overhead • 🧵 Thread-safe Polars backend


Why Mock Spark?

Tired of waiting 30+ seconds for Spark to initialize in every test?

Mock Spark is a lightweight PySpark replacement that runs your tests 10x faster by eliminating JVM overhead. Your existing PySpark code works unchanged—just swap the import.

# Before
from pyspark.sql import SparkSession

# After  
from mock_spark.sql import SparkSession

Key Benefits

Feature Description
10x Faster No JVM startup (30s → 0.1s)
🎯 Drop-in Replacement Use existing PySpark code unchanged
📦 Zero Java Pure Python with Polars backend (thread-safe, no SQL required)
🧪 100% Compatible Full PySpark 3.2-3.5 API support
🔄 Lazy Evaluation Mirrors PySpark's execution model
🏭 Production Ready 650+ passing tests, 100% mypy typed
🧵 Thread-Safe Polars backend designed for parallel execution
🔧 Modular Design DDL parsing via standalone spark-ddl-parser package
🎯 Type Safe Full type checking with ty, comprehensive type annotations

Perfect For

  • Unit Testing - Fast, isolated test execution with automatic cleanup
  • CI/CD Pipelines - Reliable tests without infrastructure or resource leaks
  • Local Development - Prototype without Spark cluster
  • Documentation - Runnable examples without setup
  • Learning - Understand PySpark without complexity
  • Integration Tests - Configurable memory limits for large dataset testing

Quick Start

Installation

pip install mock-spark

Basic Usage

from mock_spark.sql import SparkSession, functions as F

# Create session
spark = SparkSession("MyApp")

# Your PySpark code works as-is
data = [{"name": "Alice", "age": 25}, {"name": "Bob", "age": 30}]
df = spark.createDataFrame(data)

# All operations work
result = df.filter(F.col("age") > 25).select("name").collect()
print(result)
# Output: [Row(name='Bob')]

# Show the DataFrame
df.show()
# Output:
# DataFrame[2 rows, 2 columns]
# age name 
# 25    Alice  
# 30    Bob

Storage API (Mock-Spark-Specific)

Mock-Spark provides a convenient .storage API for managing databases and tables. Note: This is a mock-spark-specific convenience API that does not exist in PySpark. For PySpark compatibility, use SQL commands or DataFrame operations instead:

# Mock-Spark: Using .storage API (convenient but NOT PySpark-compatible)
spark._storage.create_schema("test_db")
spark._storage.create_table("test_db", "users", schema)
spark._storage.insert_data("test_db", "users", data)
df = spark._storage.query_table("test_db", "users")

# Both Mock-Spark and PySpark: Using SQL commands (recommended for compatibility)
spark.sql("CREATE DATABASE IF NOT EXISTS test_db")
spark.sql("CREATE TABLE test_db.users (name STRING, age INT)")
df.write.saveAsTable("test_db.users")  # Write DataFrame to table
df = spark.table("test_db.users")  # Read table as DataFrame

# PySpark equivalent for insert_data:
# df = spark.createDataFrame(data, schema)
# df.write.mode("append").saveAsTable("test_db.users")

Migration Guide:

  • spark._storage.create_schema()spark.sql("CREATE DATABASE IF NOT EXISTS ...")
  • spark._storage.create_table()spark.sql("CREATE TABLE ...") or df.write.saveAsTable()
  • spark._storage.insert_data()df.write.mode("append").saveAsTable()
  • spark._storage.query_table()spark.table() or spark.sql("SELECT * FROM ...")

See the Storage API Guide and Migration Guide for more details.

Testing Example

import pytest
from mock_spark.sql import SparkSession, functions as F

def test_data_pipeline():
    """Test PySpark logic without Spark cluster."""
    spark = SparkSession("TestApp")
    
    # Test data
    data = [{"score": 95}, {"score": 87}, {"score": 92}]
    df = spark.createDataFrame(data)
    
    # Business logic
    high_scores = df.filter(F.col("score") > 90)
    
    # Assertions
    assert high_scores.count() == 2
    assert high_scores.agg(F.avg("score")).collect()[0][0] == 93.5
    
    # Always clean up
    spark.stop()

Core Features

🚀 Complete PySpark API Compatibility

Mock Spark implements 120+ functions and 70+ DataFrame methods across PySpark 3.0-3.5:

Category Functions Examples
String (40+) Text manipulation, regex, formatting upper, concat, regexp_extract, soundex
Math (35+) Arithmetic, trigonometry, rounding abs, sqrt, sin, cos, ln
DateTime (30+) Date/time operations, timezones date_add, hour, weekday, convert_timezone
Array (25+) Array manipulation, lambdas array_distinct, transform, filter, aggregate
Aggregate (20+) Statistical functions sum, avg, median, percentile, max_by
Map (10+) Dictionary operations map_keys, map_filter, transform_values
Conditional (8+) Logic and null handling when, coalesce, ifnull, nullif
Window (8+) Ranking and analytics row_number, rank, lag, lead
XML (9+) XML parsing and generation from_xml, to_xml, xpath_*
Bitwise (6+) Bit manipulation bit_count, bit_and, bit_xor

📖 See complete function list: PYSPARK_FUNCTION_MATRIX.md

DataFrame Operations

  • Transformations: select, filter, withColumn, drop, distinct, orderBy, replace
  • Aggregations: groupBy, agg, count, sum, avg, min, max, median, mode
  • Joins: inner, left, right, outer, cross
  • Advanced: union, pivot, unpivot, explode, transform

Window Functions

from mock_spark.sql import Window, functions as F

# Ranking and analytics
df = spark.createDataFrame([
    {"name": "Alice", "dept": "IT", "salary": 50000},
    {"name": "Bob", "dept": "HR", "salary": 60000},
    {"name": "Charlie", "dept": "IT", "salary": 70000},
])

result = df.withColumn("rank", F.row_number().over(
    Window.partitionBy("dept").orderBy("salary")
))

# Show results
for row in result.collect():
    print(row)
# Output:
# Row(dept='HR', name='Bob', salary=60000, rank=1)
# Row(dept='IT', name='Alice', salary=50000, rank=1)
# Row(dept='IT', name='Charlie', salary=70000, rank=2)

SQL Support

df = spark.createDataFrame([
    {"name": "Alice", "salary": 50000},
    {"name": "Bob", "salary": 60000},
    {"name": "Charlie", "salary": 70000},
])

# Create temporary view for SQL queries
df.createOrReplaceTempView("employees")

# Execute SQL queries
result = spark.sql("SELECT name, salary FROM employees WHERE salary > 50000")
result.show()
# SQL support enables querying DataFrames using SQL syntax

Delta Lake Format

Full Delta Lake table format support:

# Write as Delta table
df.write.format("delta").mode("overwrite").saveAsTable("catalog.users")

# Time travel - query historical versions
v0_data = spark.read.format("delta").option("versionAsOf", 0).table("catalog.users")

# Schema evolution
new_df.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .saveAsTable("catalog.users")

# MERGE operations for upserts
spark.sql("""
    MERGE INTO catalog.users AS target
    USING updates AS source
    ON target.id = source.id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
""")

Lazy Evaluation

Mock Spark mirrors PySpark's lazy execution model:

# Transformations are queued (not executed)
result = df.filter(F.col("age") > 25).select("name")  

# Actions trigger execution
rows = result.collect()  # ← Execution happens here
count = result.count()    # ← Or here

CTE Query Optimization

DataFrame operation chains are automatically optimized using Common Table Expressions:

# Enable lazy evaluation for CTE optimization
data = [
    {"name": "Alice", "age": 25, "salary": 50000},
    {"name": "Bob", "age": 30, "salary": 60000},
    {"name": "Charlie", "age": 35, "salary": 70000},
    {"name": "David", "age": 28, "salary": 55000},
]
df = spark.createDataFrame(data)

# This entire chain executes as ONE optimized query:
result = (
    df.filter(F.col("age") > 25)           # CTE 0: WHERE clause
      .select("name", "age", "salary")     # CTE 1: Column selection
      .withColumn("bonus", F.col("salary") * 0.1)  # CTE 2: New column
      .orderBy(F.desc("salary"))           # CTE 3: ORDER BY
      .limit(2)                            # CTE 4: LIMIT
).collect()  # Single query execution here

# Result:
# [Row(name='Charlie', age=35, salary=70000, bonus=7000.0),
#  Row(name='Bob', age=30, salary=60000, bonus=6000.0)]

# Performance: 5-10x faster than creating 5 intermediate tables

Backend Architecture

Polars Backend (Default)

Mock Spark uses Polars as the default backend, providing:

  • 🧵 Thread Safety - Designed for parallel execution
  • High Performance - Optimized DataFrame operations
  • 📊 Parquet Storage - Tables persist as Parquet files
  • 🔄 Lazy Evaluation - Automatic query optimization
# Default backend (Polars) - thread-safe, high-performance
spark = SparkSession("MyApp")

# Explicit backend selection
spark = SparkSession.builder \
    .config("spark.mock.backend", "polars") \
    .getOrCreate()

Alternative Backends

# Memory backend for lightweight testing
spark = SparkSession.builder \
    .config("spark.mock.backend", "memory") \
    .getOrCreate()

# File backend for persistent storage
spark = SparkSession.builder \
    .config("spark.mock.backend", "file") \
    .config("spark.mock.backend.basePath", "/tmp/mock_spark") \
    .getOrCreate()

Available Backends:

  • Polars (default): High-performance analytical database with thread safety
  • Memory: In-memory storage for lightweight testing
  • File: File-based storage for persistent data
  • DuckDB (optional): Legacy SQL backend. Requires the optional DuckDB modules from Mock-Spark 2.x plus the duckdb/duckdb-engine Python packages.

Set MOCK_SPARK_BACKEND to override globally (for example, MOCK_SPARK_BACKEND=memory pytest). See docs/backend_selection.md for a full matrix of options, dependencies, and troubleshooting tips.


Advanced Features

Table Persistence

Tables created with saveAsTable() can persist across multiple sessions:

# First session - create table
spark1 = SparkSession("App1", db_path="test.db")
df = spark1.createDataFrame([{"id": 1, "name": "Alice"}])
df.write.mode("overwrite").saveAsTable("schema.my_table")
spark1.stop()

# Second session - table persists
spark2 = SparkSession("App2", db_path="test.db")
assert spark2.catalog.tableExists("schema", "my_table")  # ✅ True
result = spark2.table("schema.my_table").collect()  # ✅ Works!
spark2.stop()

Key Features:

  • Cross-Session Persistence: Tables persist when using db_path parameter
  • Schema Discovery: Automatically discovers existing schemas and tables
  • Catalog Synchronization: Reliable catalog.tableExists() checks
  • Data Integrity: Full support for append and overwrite modes

Configurable Memory & Isolation

Control memory usage and test isolation:

# Default: 1GB memory limit, no disk spillover (best for tests)
spark = SparkSession("MyApp")

# Custom memory limit
spark = SparkSession("MyApp", max_memory="4GB")

# Allow disk spillover for large datasets
spark = SparkSession(
    "MyApp",
    max_memory="8GB",
    allow_disk_spillover=True  # Uses unique temp directory per session
)

Performance Comparison

Real-world test suite improvements:

Operation PySpark Mock Spark Speedup
Session Creation 30-45s 0.1s 300x
Simple Query 2-5s 0.01s 200x
Window Functions 5-10s 0.05s 100x
Full Test Suite 5-10min 30-60s 10x

Performance Tooling



Recent Updates

Version 3.7.0 - Full SQL DDL/DML Support

  • 🗄️ Complete SQL DDL/DML – Full implementation of CREATE TABLE, DROP TABLE, INSERT INTO, UPDATE, and DELETE FROM statements in the SQL executor.
  • 📝 Enhanced SQL Parser – Comprehensive support for DDL statements with column definitions, IF NOT EXISTS, and IF EXISTS clauses.
  • 💾 INSERT Operations – Support for INSERT INTO ... VALUES (...) with multiple rows and INSERT INTO ... SELECT ... sub-queries.
  • 🔄 UPDATE & DELETE – Full support for UPDATE ... SET ... WHERE ... and DELETE FROM ... WHERE ... with Python-based expression evaluation.
  • 🐛 Bug Fixes – Fixed recursion errors in schema projection and resolved import shadowing issues in SQL executor.
  • Code Quality – Improved linting, formatting, and type safety across the codebase.

Version 3.6.0 - Profiling & Adaptive Execution

  • Feature-Flagged Profiling – Introduced mock_spark.utils.profiling with opt-in instrumentation for Polars hot paths and expression evaluation, plus a new guide at docs/performance/profiling.md.
  • 🔁 Adaptive Execution Simulation – Query plans can now inject synthetic REPARTITION steps based on skew metrics, configurable via QueryOptimizer.configure_adaptive_execution and covered by new regression tests.
  • 🐼 Pandas Backend Choice – Added an optional native pandas mode (MOCK_SPARK_PANDAS_MODE) with benchmarking support (scripts/benchmark_pandas_fallback.py) and documentation in docs/performance/pandas_fallback.md.

Version 3.5.0 - Session-Aware Catalog & Safer Fallbacks

  • 🧭 Session-Literal HelpersF.current_catalog, F.current_database, F.current_schema, and F.current_user return PySpark-compatible literals and understand the active session (with new regression coverage).
  • 🗃️ Reliable Catalog Context – The Polars backend and unified storage manager now track the selected schema so setCurrentDatabase works end-to-end, and SparkContext.sparkUser() mirrors PySpark behaviour.
  • 🧮 Pure-Python Stats – Lightweight percentile and covariance helpers keep percentile/cov tests green even without NumPy, eliminating native-crash regressions.
  • 🛠️ Dynamic DispatchF.call_function("func_name", ...) lets wrappers dynamically invoke registered Mock Spark functions with PySpark-style error messages.

Version 3.4.0 - Workflow & CI Refresh

  • ♻️ Unified CommandsMakefile, install.sh, and docs now point to bash tests/run_all_tests.sh, ruff, and mypy as the standard dev workflow.
  • 🛡️ Automated Gates – New GitHub Actions pipeline runs linting, type-checking, and the full test suite on every push and PR.
  • 🗺️ Forward Roadmap – Published plans/typing_delta_roadmap.md to track mypy debt reduction and Delta feature milestones.
  • 📝 Documentation Sweep – README and quick-start docs highlight the 3.4.0 tooling changes and contributor expectations.

Version 3.3.0 - Type Hardening & Clean Type Check

  • 🧮 Zero mypy Debtmypy mock_spark now runs clean after migrating the Polars executor, expression evaluator, Delta merge helpers, and reader/writer stack to Python 3.9 union syntax.
  • 🧾 Accurate DataFrame InterfacesDataFrameReader.load() and related helpers now return IDataFrame consistently while keeping type-only imports behind TYPE_CHECKING.
  • 🧱 Safer Delta & Projection Fallbacks – Python-evaluated select columns always receive string aliases, and Delta merge alias handling no longer leaks None keys into evaluation contexts.
  • 📚 Docs & Metadata Updated – README highlights the new type guarantees and all packaging metadata points to v3.3.0.

Version 3.2.0 - Python 3.9 Baseline & Tooling Refresh

  • 🐍 Python 3.9+ Required – Packaging metadata, tooling configs, and installation docs now align on Python 3.9 as the minimum supported runtime.
  • 🧩 Lean Compatibility Layer – The Python 3.8 sitecustomize shim has been retired; datetime helpers use native typing without runtime fallbacks.
  • 🪄 Type Hint Modernisation – Replaced legacy typing.List/Dict usage with built-in generics (list, dict, tuple) and moved iterators to collections.abc.
  • 🧼 Ruff Formatting by Default – Adopted ruff format across the repository, keeping style consistent with the Ruff rule set.

Version 3.1.0 - Type-Safe Protocols & Tooling

  • 260-File Type Coverage – DataFrame mixins now implement structural typing protocols (SupportsDataFrameOps), giving a clean mypy run across the entire project.
  • 🧹 Zero Ruff Debt – Repository-wide linting is enabled by default; ruff check passes with no warnings thanks to tighter casts, imports, and configuration.
  • 🧭 Backend Selection Docs – Updated configuration builder and new docs/backend_selection.md make it trivial to toggle between Polars, Memory, File, or DuckDB backends.
  • 🧪 Delta Schema Evolution Fixes – Polars mergeSchema appends now align frames to the on-disk schema, restoring compatibility with evolving Delta tables.
  • 🧰 Improved Test Harnesstests/run_all_tests.sh respects virtual environments and ensures documentation examples are executed with the correct interpreter.

Version 3.0.0+ - Code Quality & Cleanup

Dependency Cleanup & Type Safety:

  • 🧹 Removed Legacy Dependencies - Removed unused sqlglot dependency (legacy DuckDB/SQL backend code)
  • 🗑️ Code Cleanup - Removed unused legacy SQL translation modules (sql_translator.py, spark_function_mapper.py)
  • Type Safety - Fixed 177 type errors using ty type checker, improved return type annotations
  • 🔍 Linting - Fixed all 63 ruff linting errors, codebase fully formatted
  • All Tests Passing - Full test suite validated (641+ tests, all passing)
  • 📦 Cleaner Dependencies - Reduced dependency footprint, faster installation

Version 3.0.0 - MAJOR UPDATE

Polars Backend Migration:

  • 🚀 Polars Backend - Complete migration to Polars for thread-safe, high-performance operations
  • 🧵 Thread Safety - Polars is thread-safe by design - no more connection locks or threading issues
  • 📊 Parquet Storage - Tables now persist as Parquet files
  • Performance - Better performance for DataFrame operations
  • All tests passing - Full test suite validated with Polars backend
  • 📦 Production-ready - Stable release with improved architecture

See Migration Guide for details.


Documentation

Getting Started

Related Packages

Core Concepts

Advanced Topics


Development Setup

# Install for development
git clone https://github.com/eddiethedean/mock-spark.git
cd mock-spark
pip install -e ".[dev]"

# Run all tests (with proper isolation)
bash tests/run_all_tests.sh

# Format code
ruff format .
ruff check . --fix

# Type checking
mypy mock_spark tests

# Linting
ruff check .

Contributing

We welcome contributions! Areas of interest:

  • Performance - Further Polars optimizations
  • 📚 Documentation - Examples, guides, tutorials
  • 🐛 Bug Fixes - Edge cases and compatibility issues
  • 🧪 PySpark API Coverage - Additional functions and methods
  • 🧪 Tests - Additional test coverage and scenarios

Known Limitations

While Mock Spark provides comprehensive PySpark compatibility, some advanced features are planned for future releases:

  • Error Handling: Enhanced error messages with recovery strategies
  • Performance: Advanced query optimization, parallel execution, intelligent caching
  • Enterprise: Schema evolution, data lineage, audit logging
  • Compatibility: PySpark 3.6+, Iceberg support

Want to contribute? These are great opportunities for community contributions!


License

MIT License - see LICENSE file for details.


Links


Built with ❤️ for the PySpark community

Star ⭐ this repo if Mock Spark helps speed up your tests!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mock_spark-3.14.0.tar.gz (419.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mock_spark-3.14.0-py3-none-any.whl (478.3 kB view details)

Uploaded Python 3

File details

Details for the file mock_spark-3.14.0.tar.gz.

File metadata

  • Download URL: mock_spark-3.14.0.tar.gz
  • Upload date:
  • Size: 419.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for mock_spark-3.14.0.tar.gz
Algorithm Hash digest
SHA256 be40fb72b3725a727037b4d61cd38351fd0fbb9276753ac1b9dc42e9baf5551c
MD5 8b81a36261d81a4577dd07283b57be96
BLAKE2b-256 c932826f319a94cfd1ccaa42ff7c1a6e24f0a39152f1690f6e8c9c4cfea9accd

See more details on using hashes here.

File details

Details for the file mock_spark-3.14.0-py3-none-any.whl.

File metadata

  • Download URL: mock_spark-3.14.0-py3-none-any.whl
  • Upload date:
  • Size: 478.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for mock_spark-3.14.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d1dc11ac431f72b04a56ae35cf1f931437f5295aaf9921d9272e06c3f604f90b
MD5 64e2a65d88c6dd259835afa4ba6658c1
BLAKE2b-256 fb6c639db313903ff77bc752dbd4b77152bb60bea586e0ada0942af84e5cb67c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page