Skip to main content

PySpark and Sedona compatibility layer for e6data

Project description

e6-spark-compat

A PySpark and Sedona compatibility layer for e6data, providing seamless migration from PySpark to e6data with minimal code changes.

Overview

e6-spark-compat is a production-ready compatibility library that translates PySpark and Apache Sedona operations into optimized e6data SQL queries. The library implements lazy evaluation, professional SQL generation using SQLGlot, and comprehensive PySpark API compatibility.

Key Features

  • Lazy Evaluation: Build query plans that execute only when actions are called
  • Professional SQL Generation: SQLGlot-based AST optimization for high-performance queries
  • Complete PySpark API: DataFrame operations, SQL functions, window functions, and aggregations
  • Spatial Operations: Full Apache Sedona compatibility with ST_* functions
  • Window Functions: Complete Window specification API for advanced analytics
  • Multiple File Formats: Parquet, ORC, CSV, JSON, and GeoParquet support
  • Dual-Mode Test Suite: 1270 tests run offline (SQL validation) and live (cluster execution)
  • CI/CD: GitHub Actions with offline (automatic) and live (manual) test runs

Installation

For Users

# Install from PyPI
pip install e6data-spark-compatibility

# Install from GitHub
pip install git+https://github.com/e6data/e6-spark-compat.git

# Install from local clone
git clone https://github.com/e6data/e6-spark-compat.git
cd e6-spark-compat
pip install -e .

For Developers

# Clone the repository
git clone https://github.com/e6data/e6-spark-compat.git
cd e6-spark-compat

# Install with development dependencies
pip install -e ".[dev]"

# Run tests
pytest tests/

Quick Start

Migration from PySpark

Simply change your import statements - everything else stays the same:

# Before: PySpark imports
# from pyspark.sql import SparkSession
# from pyspark.sql.functions import col, upper, count, sum, row_number
# from pyspark.sql.window import Window

# After: e6-spark-compat imports
from e6_spark_compat import SparkSession
from e6_spark_compat.sql.functions import col, upper, count, sum, row_number
from e6_spark_compat.sql.window import Window

# Connection configuration
spark = (SparkSession.builder
    .appName("E6DataExample")
    .config("spark.e6data.host", "your-host")
    .config("spark.e6data.username", "your-username")
    .config("spark.e6data.password", "your-password")
    .config("spark.e6data.database", "your-database")
    .config("spark.e6data.catalog", "your-catalog")
    .config("spark.e6data.cluster", "your-cluster-name")
    .config("spark.e6data.secure", True)
    .getOrCreate())

# All PySpark operations work identically
df = spark.read.parquet("s3://bucket/path/to/data.parquet")

# Transformations (lazy evaluation)
result = (df.filter(col("age") > 21)
    .select("name", upper(col("city")).alias("city_upper"), "salary")
    .groupBy("city_upper")
    .agg(
        count("*").alias("total_people"),
        sum("salary").alias("total_salary")
    )
    .orderBy(col("total_people").desc()))

# Action triggers execution
result.show()

Window Functions

from e6_spark_compat.sql.window import Window
from e6_spark_compat.sql.functions import row_number, rank, lag

# Window specifications
window_spec = Window.partitionBy("department").orderBy("salary")
frame_spec = (Window.partitionBy("team").orderBy("hire_date")
    .rowsBetween(Window.UNBOUNDED_PRECEDING, Window.CURRENT_ROW))

# Window functions
df_with_analytics = df.select(
    "*",
    row_number().over(window_spec).alias("rank"),
    lag("salary", 1).over(window_spec).alias("prev_salary"),
    sum("salary").over(frame_spec).alias("running_total")
)

Spatial Support

For spatial operations with Sedona compatibility:

from e6_spark_compat.sedona import SedonaRegistrator
from e6_spark_compat.sql.functions import expr

# Register Sedona functions
SedonaRegistrator.registerAll(spark)

# Use spatial functions
points_df = spark.read.parquet("s3://bucket/points.parquet")
polygons_df = spark.read.parquet("s3://bucket/polygons.parquet")

# Spatial join
result = points_df.join(
    polygons_df,
    expr("ST_Contains(polygons_df.geometry, ST_Point(points_df.lon, points_df.lat))"),
    "inner"
)

Comprehensive Feature Set

DataFrame Operations

  • Transformations: select(), filter()/where(), join(), groupBy(), orderBy()/sort(), limit(), distinct()
  • Set Operations: union(), intersect(), exceptAll()
  • Column Operations: withColumn(), withColumnRenamed(), drop(), cast()
  • Caching: cache()/persist(), unpersist()

Action Methods

  • Data Retrieval: collect(), count(), first(), take(), head()
  • Display: show(), explain(), describe()
  • Export: toPandas(), write.parquet(), write.orc(), write.csv(), write.json()
  • Views: createOrReplaceTempView(), createGlobalTempView()

SQL Functions (135+)

  • String Functions: upper, lower, concat, substring, trim, split, regexp_replace, length
  • Math Functions: abs, round, floor, ceil, sqrt, pow, sin, cos, log
  • Aggregate Functions: sum, avg, count, min, max, first, last, collect_list
  • Date/Time Functions: year, month, dayofmonth, hour, minute, date_add, date_sub, datediff
  • Window Functions: row_number, rank, dense_rank, percent_rank, ntile, lag, lead
  • Conditional Functions: when, coalesce, isnull, isnan, greatest, least
  • Hash Functions: md5, sha1, sha2, hash, xxhash64
  • Array Functions: array, array_contains, explode, size, array_sort, and 16 more
  • Map Functions: map_keys, map_values, map_from_arrays, map_concat
  • JSON Functions: get_json_object, json_extract, from_json, to_json

Spatial Operations (Sedona Compatible)

  • Geometry Functions: All ST_* functions including ST_Point, ST_Polygon, ST_Buffer
  • Spatial Relationships: ST_Contains, ST_Intersects, ST_Distance, ST_Within
  • Transformations: ST_Transform, ST_Centroid, ST_ConvexHull
  • Indexing: H3 and GeoHash spatial indexing support

Testing

Test Suite Overview

The test suite uses dual-mode execution: tests run offline by default (mock connection, SQL validation) and can also execute on a live e6data cluster when credentials are provided.

Category Tests Description
E2E SQL Generation 360 Full pipeline: PySpark API → SQL string validation
TPC-DS Translations 37 Standard benchmark queries (37 of 99 TPC-DS queries)
TPC-DS Analytics 22 Complex analytics workloads (star schema, RFM, ETL pipelines)
SQLGlot Expressions 288 SQL expression generation via SQLGlot
Integration 227 Query plan node composition
Unit 200 Individual class/method tests
Writer 38 DataFrameWriter operations
Workloads 94 Customer workload pipeline tests (Kroger DQ + DFM analytics)
Total 1270 1263 pass, 7 known failures

Customer Scripts

Standalone scripts that exercise real customer pipelines end-to-end:

Script Description Dataset
kroger_original_test.py Kroger pre-DQ validation pipeline mars_full
e6_dfm_on_mars_full.py DFM pipeline patterns (39 tests) mars_full
e6_dfm_test.py DFM feature coverage (42 tests) TPC-DS
*_rust.py variants Same scripts with FORMAT_NUMBER workaround for rust engine
# Run customer script on Java engine
E6DATA_HOST=... S3_READ_BASE=... python tests/customer_scripts/kroger_original_test.py

# Run on rust engine
E6DATA_HOST=... S3_READ_BASE=... python tests/customer_scripts/kroger_original_test_rust.py

Running Tests

# Offline (default) — fast, no cluster needed (~5 seconds)
./run_tests.sh

# Specific file or directory
./run_tests.sh offline 1 tests/workloads/
./run_tests.sh offline 1 tests/workloads/test_dfm_pipeline.py
./run_tests.sh offline 1 tests/workloads/test_dfm_pipeline.py::TestDFMMissingFunctions

# By keyword or marker
./run_tests.sh offline 1 -k "dfm"
./run_tests.sh offline 1 -m e2e

# Or use pytest directly
pytest tests/e2e/                    # E2E tests
pytest tests/tpcds/                  # TPC-DS queries
pytest tests/e2e/ -m spatial         # Spatial tests

Live Mode (cluster execution)

Set environment variables to run the same tests against a real e6data cluster:

export E6DATA_HOST="your-cluster-host"
export E6DATA_PORT="443"
export E6DATA_USERNAME="your-user"
export E6DATA_PASSWORD="your-token"
export E6DATA_DATABASE="tpcds_1000_delta"
export E6DATA_CATALOG="your-catalog"
export E6DATA_CLUSTER="your-cluster"

# Sequential with full HTML report
./run_tests.sh live

# Parallel with N workers
./run_tests.sh live 4
./run_tests.sh live 8

In live mode, each test generates SQL (same as offline) and executes it on the cluster with LIMIT 5. The HTML report captures query IDs, row counts, execution times, and error details.

Test Reports

Reports are auto-generated on every run:

  • HTML: reports/test_report.html — browsable report with expandable details per test:
    • PySpark input source code
    • Generated SQL
    • Query ID (live mode)
    • Engine error details (live mode, failures only)
  • JUnit XML: reports/test_results.xml — for CI/CD integration

CI / GitHub Actions

Offline tests run automatically on every push/PR — no setup needed.

Live tests are triggered manually (requires GitHub secrets configured):

From GitHub UI:

  1. Go to Actions tab → e6-spark-compat CI
  2. Click Run workflow
  3. Select branch, cluster (general / rust-azure-benchmark), and worker count
  4. Click Run workflow

From CLI:

gh workflow run tests.yml --ref main -f run_live=true -f cluster=general -f concurrency=4

The CI summary includes category breakdown tables, failure details, and the dorny/test-reporter renders an interactive test results tab.

Architecture

Core Components

  1. Query Plan Tree: AST-like nodes representing SQL operations (Filter, Project, Join, Aggregate, etc.)
  2. SQLGlot Integration: Professional SQL generation using AST-based optimization
  3. Lazy Evaluation: Operations build query plans without immediate execution
  4. Connection Management: Efficient e6data connection handling and query execution

Execution Flow

  1. Build Phase: DataFrame operations create query plan nodes
  2. Optimization Phase: SQLGlot optimizes the query plan using AST transformations
  3. Generation Phase: Query plans generate optimized SQL via SQLGlot
  4. Execution Phase: Actions trigger SQL execution on e6data
  5. Result Phase: Results returned in PySpark-compatible format

Development

Project Structure

e6_spark_compat/
├── core/           # Session, connection, query plan, SQL generator
├── sql/            # DataFrame, Column, functions, window, reader, writer, types
├── spatial/        # Sedona-compatible ST_* functions
└── sedona/         # SedonaRegistrator compatibility shim

tests/
├── e2e/              # E2E SQL generation tests (dual-mode: offline + live)
├── tpcds/            # TPC-DS benchmark query translations (37 queries)
├── integration/      # Query plan composition tests
├── sqlglot_tests/    # SQLGlot expression tests
├── unit/             # Unit tests
├── workloads/        # Customer workload pipeline tests
├── customer_scripts/ # Standalone customer pipeline scripts
└── live/             # Live cluster-only execution tests

Code Quality

# Format code
black e6_spark_compat/

# Run linting
flake8 e6_spark_compat/

# Type checking
mypy e6_spark_compat/

Migration Guide

From PySpark

  1. Update Imports: Change pyspark imports to e6_spark_compat
  2. Configuration: Update SparkSession configuration for e6data connection
  3. Test: Run your existing PySpark code - it should work unchanged

From Sedona

  1. Spatial Functions: Replace Sedona imports with e6_spark_compat.spatial.functions
  2. Registration: Use SedonaRegistrator.registerAll(spark) for compatibility
  3. Geometry Types: Spatial operations work identically to Sedona

Contributing

We welcome contributions! Here's how to get started:

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for new functionality
  4. Ensure all tests pass (pytest tests/)
  5. Submit a pull request

Support

  • Documentation: See docs/pyspark-compatibility/ for detailed guides
  • Issues: Report bugs and feature requests on GitHub
  • Contact: support@e6data.com for enterprise support

License

Apache License 2.0 - see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

e6data_spark_compatibility-1.0.0.tar.gz (595.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

e6data_spark_compatibility-1.0.0-py3-none-any.whl (647.8 kB view details)

Uploaded Python 3

File details

Details for the file e6data_spark_compatibility-1.0.0.tar.gz.

File metadata

File hashes

Hashes for e6data_spark_compatibility-1.0.0.tar.gz
Algorithm Hash digest
SHA256 6e985523bdb1ab130f43942e45d0608322639b8ef53f79b5ce0de31c0a20ae84
MD5 974aa06abb79ae6314a868a0c0c107d7
BLAKE2b-256 0f593edd9ef8e3d9f0c37bab1e539a41a80f105c0fdb7332fe8a510a95fade5a

See more details on using hashes here.

File details

Details for the file e6data_spark_compatibility-1.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for e6data_spark_compatibility-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 740dc2d76245c7c966f434918f451b6fcf131d83121c3b0ee19de7bbd6273858
MD5 d8c9ad3dbb30937158ef124be54d10ac
BLAKE2b-256 27e22312387988315c76be3ec4186d8671cd8d6f93102e9112f71c8865e29142

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page