Skip to main content

A simple plugin to use with pytest

Project description

pytest-pyspark-utils

A pytest plugin that provides a reusable spark session fixture and automated Delta table caching for PySpark testing. Eliminates boilerplate Spark session setup and speeds up tests by caching CSV/JSONL-to-Delta conversions.

Features

  • Session-scoped spark fixture — one Spark session per test run, shared across all tests
  • Optional Delta Lake support via configurable Maven JAR coordinates
  • Delta table caching — CSV/JSONL files are converted to Delta once and cached between runs
  • Per-test isolation — each test gets its own copy of the Delta tables via the delta_tables fixture
  • PySpark version-agnostic — works with PySpark 3.x and 4.x
  • Configurable via pytest.ini, pyproject.toml, or CLI flags

Installation

Install the plugin with your chosen PySpark version:

# PySpark 4.x
pip install "pytest-pyspark-utils[pyspark4]"

# PySpark 3.x
pip install "pytest-pyspark-utils[pyspark3]"

# Or pin an exact version alongside the plugin
pip install pytest-pyspark-utils pyspark==4.0.2

Usage of fixtures

Spark fixture

Once installed, the spark fixture is automatically available in all your tests — no import or conftest wiring needed.

from pyspark.sql import SparkSession

def test_something(spark: SparkSession):
    df = spark.createDataFrame([(1, "a"), (2, "b")], ["id", "value"])
    assert df.count() == 2

Delta_tables fixture

The plugin can automatically convert your test data files (CSV or JSONL) into cached Delta tables and register them as Spark SQL tables. Each test gets an isolated copy.

1. Organize your test data

tests/test_my_feature/
├── conftest.py
├── input/
│   ├── users.csv
│   └── orders.csv
├── expected/
│   └── results.csv
└── test_my_feature.py

2. Define your table config in conftest.py

# tests/test_my_feature/conftest.py
import pytest
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pytest_pyspark_utils import TableConfig

users_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
])

orders_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("amount", IntegerType(), True),
])

@pytest.fixture(scope="module")
def delta_tables_config():
    return {
        "users": TableConfig(
            source="input",
            schema=users_schema,
            partition_by=["id"],
        ),
        "orders": TableConfig(
            source="input",
            schema=orders_schema,
        ),
        "expected_results": TableConfig(
            source="expected",
            schema=orders_schema,
        ),
    }

3. Use delta_tables in your tests

# tests/test_my_feature/test_my_feature.py
def test_user_orders(spark, delta_tables):
    users = spark.table("users")
    orders = spark.table("orders")

    result = users.join(orders, users.id == orders.user_id)
    assert result.count() > 0

def test_another_scenario(spark, delta_tables):
    # Each test gets a fresh, isolated copy of all tables
    spark.sql("DELETE FROM users WHERE id = 1")
    assert spark.table("users").count() == 4  # won't affect other tests

How delta_tables works

The fixture chain operates in two layers:

  1. Module-level caching (runs once per test file): Reads CSV/JSONL files, converts them to Delta format, and caches the result in <test_dir>/_delta_cache/. On subsequent runs, if the source file and schema haven't changed, the cached Delta is reused instantly.

  2. Function-level isolation (runs per test): Copies the cached Delta tables to a temporary directory, drops any existing Hive tables, and re-registers fresh tables pointing to the isolated copy.

This means the first run pays the conversion cost, but subsequent runs are fast — and every test is guaranteed a clean slate.

TableConfig reference

@dataclass
class TableConfig:
    source: str = "input"           # "input", "expected", or an absolute path
    schema: Optional[StructType] = None  # PySpark schema (recommended for consistency)
    table_name: Optional[str] = None     # Defaults to the dict key
    partition_by: Optional[List[str]] = None
    liquid_clustering: bool = False
Field Description
source Where to find the data file. "input" resolves to <test_dir>/input/, "expected" resolves to <test_dir>/expected/. Or pass an absolute path.
schema PySpark StructType. If omitted, schema is inferred from the file.
table_name The Spark SQL table name. Defaults to the dictionary key.
partition_by List of columns to partition the Delta table by.
liquid_clustering Use Delta liquid clustering instead of traditional partitioning.

Configuration

With Delta Lake

Set the Delta JAR coordinates in pyproject.toml:

[tool.pytest.ini_options]
delta_jar = "io.delta:delta-spark_2.13:4.0.1"
spark_app_name = "my-project-tests"
delta_cache_dir = "_delta_cache"

Or in pytest.ini:

[pytest]
delta_jar = io.delta:delta-spark_2.13:4.0.1
spark_app_name = my-project-tests

Or pass it directly on the command line:

pytest --delta-jar=io.delta:delta-spark_2.13:4.0.1

When delta_jar is not set, the fixture starts a plain Spark session without Delta extensions.

Delta JAR coordinates by PySpark version

PySpark Delta JAR coordinates
4.0.x io.delta:delta-spark_2.13:4.0.1
3.5.x io.delta:delta-spark_2.12:3.2.0
3.3.x io.delta:delta-core_2.12:2.3.0

Available options

Option pytest.ini key CLI flag Default Description
Delta JAR delta_jar --delta-jar (none) Maven coordinates for Delta Lake JAR
App name spark_app_name pytest-pyspark Spark application name
Cache dir delta_cache_dir _delta_cache Directory name for cached Delta tables

How it works

The plugin registers several fixtures via the pytest entry point:

Fixture Scope Description
spark session PySpark session with optional Delta support
delta_tables function Isolated Delta tables, registered as Spark SQL tables
prepare_tables_for_test module Lower-level helper for custom table preparation
drop_hive_objects function Drops all Spark SQL tables (cleanup utility)

License

GNU GPL v3.0+

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pytest_pyspark_utils-1.0.3.tar.gz (50.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pytest_pyspark_utils-1.0.3-py3-none-any.whl (36.9 kB view details)

Uploaded Python 3

File details

Details for the file pytest_pyspark_utils-1.0.3.tar.gz.

File metadata

  • Download URL: pytest_pyspark_utils-1.0.3.tar.gz
  • Upload date:
  • Size: 50.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pytest_pyspark_utils-1.0.3.tar.gz
Algorithm Hash digest
SHA256 34e9e872c96e22af9b7cc92c1a32fd504a11a914f4f9077ca299bfe0c825e682
MD5 3982d921a90e05722ed27d6b3a2c02f3
BLAKE2b-256 e91956be0a9a1d3de2c3035725033f181ad42177f0753d6448c15b23e8b9bc15

See more details on using hashes here.

Provenance

The following attestation bundles were made for pytest_pyspark_utils-1.0.3.tar.gz:

Publisher: cd.yaml on avolok/pytest-pyspark-utils

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pytest_pyspark_utils-1.0.3-py3-none-any.whl.

File metadata

File hashes

Hashes for pytest_pyspark_utils-1.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 39787378d4e66725bac59d1cba131ca5f3a8ca5ea085fc54373df9418bd337c9
MD5 e8ff678fbe089b88c86c0115a87bc098
BLAKE2b-256 d172391453ab52f44eb447cf552daabbd6b13f02fdbad628d0b5e2784dee4690

See more details on using hashes here.

Provenance

The following attestation bundles were made for pytest_pyspark_utils-1.0.3-py3-none-any.whl:

Publisher: cd.yaml on avolok/pytest-pyspark-utils

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page