PySpark and Sedona compatibility layer for e6data
Project description
e6-spark-compat
A PySpark and Sedona compatibility layer for e6data, providing seamless migration from PySpark to e6data with minimal code changes.
Overview
e6-spark-compat is a production-ready compatibility library that translates PySpark and Apache Sedona operations into optimized e6data SQL queries. The library implements lazy evaluation, professional SQL generation using SQLGlot, and comprehensive PySpark API compatibility.
Key Features
- Lazy Evaluation: Build query plans that execute only when actions are called
- Professional SQL Generation: SQLGlot-based AST optimization for high-performance queries
- Complete PySpark API: DataFrame operations, SQL functions, window functions, and aggregations
- Spatial Operations: Full Apache Sedona compatibility with ST_* functions
- Window Functions: Complete Window specification API for advanced analytics
- Multiple File Formats: Parquet, ORC, CSV, JSON, and GeoParquet support
- Dual-Mode Test Suite: 1270 tests run offline (SQL validation) and live (cluster execution)
- CI/CD: GitHub Actions with offline (automatic) and live (manual) test runs
Installation
For Users
# Install from PyPI
pip install e6data-spark-compatibility
# Install from GitHub
pip install git+https://github.com/e6data/e6-spark-compat.git
# Install from local clone
git clone https://github.com/e6data/e6-spark-compat.git
cd e6-spark-compat
pip install -e .
For Developers
# Clone the repository
git clone https://github.com/e6data/e6-spark-compat.git
cd e6-spark-compat
# Install with development dependencies
pip install -e ".[dev]"
# Run tests
pytest tests/
Quick Start
Migration from PySpark
Simply change your import statements - everything else stays the same:
# Before: PySpark imports
# from pyspark.sql import SparkSession
# from pyspark.sql.functions import col, upper, count, sum, row_number
# from pyspark.sql.window import Window
# After: e6-spark-compat imports
from e6_spark_compat import SparkSession
from e6_spark_compat.sql.functions import col, upper, count, sum, row_number
from e6_spark_compat.sql.window import Window
# Connection configuration
spark = (SparkSession.builder
.appName("E6DataExample")
.config("spark.e6data.host", "your-host")
.config("spark.e6data.username", "your-username")
.config("spark.e6data.password", "your-password")
.config("spark.e6data.database", "your-database")
.config("spark.e6data.catalog", "your-catalog")
.config("spark.e6data.cluster", "your-cluster-name")
.config("spark.e6data.secure", True)
.getOrCreate())
# All PySpark operations work identically
df = spark.read.parquet("s3://bucket/path/to/data.parquet")
# Transformations (lazy evaluation)
result = (df.filter(col("age") > 21)
.select("name", upper(col("city")).alias("city_upper"), "salary")
.groupBy("city_upper")
.agg(
count("*").alias("total_people"),
sum("salary").alias("total_salary")
)
.orderBy(col("total_people").desc()))
# Action triggers execution
result.show()
Window Functions
from e6_spark_compat.sql.window import Window
from e6_spark_compat.sql.functions import row_number, rank, lag
# Window specifications
window_spec = Window.partitionBy("department").orderBy("salary")
frame_spec = (Window.partitionBy("team").orderBy("hire_date")
.rowsBetween(Window.UNBOUNDED_PRECEDING, Window.CURRENT_ROW))
# Window functions
df_with_analytics = df.select(
"*",
row_number().over(window_spec).alias("rank"),
lag("salary", 1).over(window_spec).alias("prev_salary"),
sum("salary").over(frame_spec).alias("running_total")
)
Spatial Support
For spatial operations with Sedona compatibility:
from e6_spark_compat.sedona import SedonaRegistrator
from e6_spark_compat.sql.functions import expr
# Register Sedona functions
SedonaRegistrator.registerAll(spark)
# Use spatial functions
points_df = spark.read.parquet("s3://bucket/points.parquet")
polygons_df = spark.read.parquet("s3://bucket/polygons.parquet")
# Spatial join
result = points_df.join(
polygons_df,
expr("ST_Contains(polygons_df.geometry, ST_Point(points_df.lon, points_df.lat))"),
"inner"
)
Comprehensive Feature Set
DataFrame Operations
- Transformations:
select(),filter()/where(),join(),groupBy(),orderBy()/sort(),limit(),distinct() - Set Operations:
union(),intersect(),exceptAll() - Column Operations:
withColumn(),withColumnRenamed(),drop(),cast() - Caching:
cache()/persist(),unpersist()
Action Methods
- Data Retrieval:
collect(),count(),first(),take(),head() - Display:
show(),explain(),describe() - Export:
toPandas(),write.parquet(),write.orc(),write.csv(),write.json() - Views:
createOrReplaceTempView(),createGlobalTempView()
SQL Functions (135+)
- String Functions:
upper,lower,concat,substring,trim,split,regexp_replace,length - Math Functions:
abs,round,floor,ceil,sqrt,pow,sin,cos,log - Aggregate Functions:
sum,avg,count,min,max,first,last,collect_list - Date/Time Functions:
year,month,dayofmonth,hour,minute,date_add,date_sub,datediff - Window Functions:
row_number,rank,dense_rank,percent_rank,ntile,lag,lead - Conditional Functions:
when,coalesce,isnull,isnan,greatest,least - Hash Functions:
md5,sha1,sha2,hash,xxhash64 - Array Functions:
array,array_contains,explode,size,array_sort, and 16 more - Map Functions:
map_keys,map_values,map_from_arrays,map_concat - JSON Functions:
get_json_object,json_extract,from_json,to_json
Spatial Operations (Sedona Compatible)
- Geometry Functions: All ST_* functions including
ST_Point,ST_Polygon,ST_Buffer - Spatial Relationships:
ST_Contains,ST_Intersects,ST_Distance,ST_Within - Transformations:
ST_Transform,ST_Centroid,ST_ConvexHull - Indexing: H3 and GeoHash spatial indexing support
Testing
Test Suite Overview
The test suite uses dual-mode execution: tests run offline by default (mock connection, SQL validation) and can also execute on a live e6data cluster when credentials are provided.
| Category | Tests | Description |
|---|---|---|
| E2E SQL Generation | 360 | Full pipeline: PySpark API → SQL string validation |
| TPC-DS Translations | 37 | Standard benchmark queries (37 of 99 TPC-DS queries) |
| TPC-DS Analytics | 22 | Complex analytics workloads (star schema, RFM, ETL pipelines) |
| SQLGlot Expressions | 288 | SQL expression generation via SQLGlot |
| Integration | 227 | Query plan node composition |
| Unit | 200 | Individual class/method tests |
| Writer | 38 | DataFrameWriter operations |
| Workloads | 94 | Customer workload pipeline tests (Kroger DQ + DFM analytics) |
| Total | 1270 | 1263 pass, 7 known failures |
Customer Scripts
Standalone scripts that exercise real customer pipelines end-to-end:
| Script | Description | Dataset |
|---|---|---|
kroger_original_test.py |
Kroger pre-DQ validation pipeline | mars_full |
e6_dfm_on_mars_full.py |
DFM pipeline patterns (39 tests) | mars_full |
e6_dfm_test.py |
DFM feature coverage (42 tests) | TPC-DS |
*_rust.py variants |
Same scripts with FORMAT_NUMBER workaround for rust engine |
# Run customer script on Java engine
E6DATA_HOST=... S3_READ_BASE=... python tests/customer_scripts/kroger_original_test.py
# Run on rust engine
E6DATA_HOST=... S3_READ_BASE=... python tests/customer_scripts/kroger_original_test_rust.py
Running Tests
# Offline (default) — fast, no cluster needed (~5 seconds)
./run_tests.sh
# Specific file or directory
./run_tests.sh offline 1 tests/workloads/
./run_tests.sh offline 1 tests/workloads/test_dfm_pipeline.py
./run_tests.sh offline 1 tests/workloads/test_dfm_pipeline.py::TestDFMMissingFunctions
# By keyword or marker
./run_tests.sh offline 1 -k "dfm"
./run_tests.sh offline 1 -m e2e
# Or use pytest directly
pytest tests/e2e/ # E2E tests
pytest tests/tpcds/ # TPC-DS queries
pytest tests/e2e/ -m spatial # Spatial tests
Live Mode (cluster execution)
Set environment variables to run the same tests against a real e6data cluster:
export E6DATA_HOST="your-cluster-host"
export E6DATA_PORT="443"
export E6DATA_USERNAME="your-user"
export E6DATA_PASSWORD="your-token"
export E6DATA_DATABASE="tpcds_1000_delta"
export E6DATA_CATALOG="your-catalog"
export E6DATA_CLUSTER="your-cluster"
# Sequential with full HTML report
./run_tests.sh live
# Parallel with N workers
./run_tests.sh live 4
./run_tests.sh live 8
In live mode, each test generates SQL (same as offline) and executes it on the cluster with LIMIT 5. The HTML report captures query IDs, row counts, execution times, and error details.
Test Reports
Reports are auto-generated on every run:
- HTML:
reports/test_report.html— browsable report with expandable details per test:- PySpark input source code
- Generated SQL
- Query ID (live mode)
- Engine error details (live mode, failures only)
- JUnit XML:
reports/test_results.xml— for CI/CD integration
CI / GitHub Actions
Offline tests run automatically on every push/PR — no setup needed.
Live tests are triggered manually (requires GitHub secrets configured):
From GitHub UI:
- Go to Actions tab → e6-spark-compat CI
- Click Run workflow
- Select branch, cluster (
general/rust-azure-benchmark), and worker count - Click Run workflow
From CLI:
gh workflow run tests.yml --ref main -f run_live=true -f cluster=general -f concurrency=4
The CI summary includes category breakdown tables, failure details, and the dorny/test-reporter renders an interactive test results tab.
Architecture
Core Components
- Query Plan Tree: AST-like nodes representing SQL operations (Filter, Project, Join, Aggregate, etc.)
- SQLGlot Integration: Professional SQL generation using AST-based optimization
- Lazy Evaluation: Operations build query plans without immediate execution
- Connection Management: Efficient e6data connection handling and query execution
Execution Flow
- Build Phase: DataFrame operations create query plan nodes
- Optimization Phase: SQLGlot optimizes the query plan using AST transformations
- Generation Phase: Query plans generate optimized SQL via SQLGlot
- Execution Phase: Actions trigger SQL execution on e6data
- Result Phase: Results returned in PySpark-compatible format
Development
Project Structure
e6_spark_compat/
├── core/ # Session, connection, query plan, SQL generator
├── sql/ # DataFrame, Column, functions, window, reader, writer, types
├── spatial/ # Sedona-compatible ST_* functions
└── sedona/ # SedonaRegistrator compatibility shim
tests/
├── e2e/ # E2E SQL generation tests (dual-mode: offline + live)
├── tpcds/ # TPC-DS benchmark query translations (37 queries)
├── integration/ # Query plan composition tests
├── sqlglot_tests/ # SQLGlot expression tests
├── unit/ # Unit tests
├── workloads/ # Customer workload pipeline tests
├── customer_scripts/ # Standalone customer pipeline scripts
└── live/ # Live cluster-only execution tests
Code Quality
# Format code
black e6_spark_compat/
# Run linting
flake8 e6_spark_compat/
# Type checking
mypy e6_spark_compat/
Migration Guide
From PySpark
- Update Imports: Change
pysparkimports toe6_spark_compat - Configuration: Update SparkSession configuration for e6data connection
- Test: Run your existing PySpark code - it should work unchanged
From Sedona
- Spatial Functions: Replace Sedona imports with
e6_spark_compat.spatial.functions - Registration: Use
SedonaRegistrator.registerAll(spark)for compatibility - Geometry Types: Spatial operations work identically to Sedona
Contributing
We welcome contributions! Here's how to get started:
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass (
pytest tests/) - Submit a pull request
Support
- Documentation: See
docs/pyspark-compatibility/for detailed guides - Issues: Report bugs and feature requests on GitHub
- Contact: support@e6data.com for enterprise support
License
Apache License 2.0 - see LICENSE for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file e6data_spark_compatibility-1.0.0.tar.gz.
File metadata
- Download URL: e6data_spark_compatibility-1.0.0.tar.gz
- Upload date:
- Size: 595.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6e985523bdb1ab130f43942e45d0608322639b8ef53f79b5ce0de31c0a20ae84
|
|
| MD5 |
974aa06abb79ae6314a868a0c0c107d7
|
|
| BLAKE2b-256 |
0f593edd9ef8e3d9f0c37bab1e539a41a80f105c0fdb7332fe8a510a95fade5a
|
File details
Details for the file e6data_spark_compatibility-1.0.0-py3-none-any.whl.
File metadata
- Download URL: e6data_spark_compatibility-1.0.0-py3-none-any.whl
- Upload date:
- Size: 647.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
740dc2d76245c7c966f434918f451b6fcf131d83121c3b0ee19de7bbd6273858
|
|
| MD5 |
d8c9ad3dbb30937158ef124be54d10ac
|
|
| BLAKE2b-256 |
27e22312387988315c76be3ec4186d8671cd8d6f93102e9112f71c8865e29142
|