Apache Spark on Polars
Project description
____ _ ____ _
| _ \ ___ | | __ _ _ __ / ___| _ __ __ _ _ __| | __
| |_) / _ \| |/ _` | '__| \___ \| '_ \ / _` | '__| |/ /
| __/ (_) | | (_| | | ___) | |_) | (_| | | | <
|_| \___/|_|\__,_|_| |____/| .__/ \__,_|_| |_|\_\
|_|
๐ Apache Spark on Polars
Polar Spark brings the PySpark API to Polars, optimized for single-machine workloads.
It is designed as a drop-in replacement for PySpark in scenarios where a full Spark cluster is not needed. A common use case is running fast, lightweight unit tests in CI/CD pipelines ๐งช.
Instead of relying on the JVM-based Spark engine, Polar Spark runs on Polarsโ Lazy API, powered by a high-performance Rust execution engine ๐ฆ. This avoids the overhead of the JVM, which can be slow and heavy for small or local workloads.
By leveraging Polars, Polar Spark automatically benefits from:
- ๐ Advanced query optimization
- ๐งต Efficient multithreading
- ๐ฅ๏ธ Excellent performance on modern CPUs
๐ฏ Goal: Make Polar Spark a seamless PySpark replacement whenever workloads fit on a single machine or within local resource limits.
Installation
pip install polarspark==0.2.2a4
Examples:
Spark session
try:
from polarspark.sql.session import SparkSession
except Exception:
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.master("local").appName("myapp").getOrCreate()
print(spark)
print(type(spark))
>>> <polarspark.sql.session.SparkSession object at 0x1043bdd90>
>>> <class 'polarspark.sql.session.SparkSession'>
DataFrame API
try:
from polarspark.sql import Row
from polarspark.sql.types import *
except Exception:
from pyspark.sql import Row
from pyspark.sql.types import *
from pprint import pprint
d = [{'name': 'Alice', 'age': 1},
{'name': 'Tome', 'age': 100},
{'name': 'Sim', 'age': 99}]
df = spark.createDataFrame(d)
rows = df.collect()
SQL
spark.sql("CREATE TABLE input_table (value string) USING parquet")
spark.sql("INSERT INTO input_table VALUES (1), (2), (3)")
spark.sql("""
SELECT *
FROM input_table i
JOIN my_table m
ON i.value = m.age
""").show()
API
pprint(rows)
>>> [Row(age=1, name='Alice'),
>>> Row(age=100, name='Tome'),
>>> Row(age=99, name='Sim')]
df.printSchema()
>>> root
>>> |-- age: long (nullable = true)
>>> |-- name: string (nullable = true)
# With schema
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)])
df_no_rows = spark.createDataFrame([], schema=schema)
print(df_no_rows.isEmpty())
>>> True
# or using Spark DDL
df = spark.createDataFrame([("Alice", 3), ("Ben", 5)], schema="name STRING, age INT")
print(df.isEmpty())
>>> False
Read / write Parquet, Delta, CSV etc.
base_path = "/var/tmp"
df1 = spark.read.format("json").load([f"{base_path}/data.json",
f"{base_path}/data.json"
])
df2 = spark.read.json([f"{base_path}/data.json",
f"{base_path}/data.json"])
df1.write.format("csv").save(f"{base_path}/data_json_to_csv.csv", mode="overwrite")
df1 = spark.read.format("csv").load([f"{base_path}/data_json_to_csv.csv",
f"{base_path}/data_json_to_csv.csv"])
df1 = spark.read.format("parquet").load([f"{base_path}/data_json_to_parquet.parquet",
f"{base_path}/data_json_to_parquet.parquet"])
df2 = spark.read.parquet(f"{base_path}/data_json_to_parquet.parquet",
f"{base_path}/data_json_to_parquet.parquet")
Streaming (Stateless)
df = self.spark.readStream.format("rate").load()
q = df.writeStream.toTable("output_table", format="parquet", checkpointLocation=tmpdir)
q.stop()
result = self.spark.sql("SELECT value FROM output_table").collect()
Streaming (foreachBatch)
def collectBatch(batch_df, batch_id):
batch_df.write.format("parquet").mode("overwrite").saveAsTable("test_table1")
df = self.spark.readStream.format("text").load("polarspark/test_support/sql/streaming")
q = df.writeStream.foreachBatch(collectBatch).start()
q.processAllAvailable()
collected = self.spark.sql("select * from test_table1").collect()
In Memory Catalog
df.write.saveAsTable("my_table")
spark.sql("select * from my_table").show()
Some more:
Filter
pprint(df.offset(1).first())
>>> Row(age=100, name='Tome')
df.show()
shape: (3, 2)
โโโโโโโฌโโโโโโโโโโโ
โ age โ name โ
โ --- โ --- โ
โ i64 โ str โ
โโโโโโโชโโโโโโโโโโโก
โ 1 โ Alice โ
โ 100 โ Tome โ
โ 99 โ Sim โ
โโโโโโโดโโโโโโโโโโโ
df.explain()
0
โโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โ โญโโโโโโโโโโโโโโโโโโโโโโฎ
โ โ DF ["age", "name"] โ
0 โ โ PROJECT */2 COLUMNS โ
โ โฐโโโโโโโโโโโโโโโโโโโโโโฏ
print(repr(df))
>>> DataFrame[age: bigint, name: string]
print(df.count())
>>> 3
def func(row):
print("Row -> {}".format(row))
df.foreach(func)
df = spark.createDataFrame(
[(14, "Tom"), (23, "Alice"), (16, "Bob"), (16, "Bob")], ["age", "name"]
)
def func(itr):
for person in itr:
print(person)
print("Person -> {}".format(person.name))
df.foreachPartition(func)
df.show()
df.distinct().show()
NOTE: Some of the features are not directly mapped but relies on Polars. e.g. df.show() or df.explain() will print polars relevant method output
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file polarspark-0.2.5rc3.tar.gz.
File metadata
- Download URL: polarspark-0.2.5rc3.tar.gz
- Upload date:
- Size: 416.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dd94d68754a25aabdf06cfc846edc8263b129ce3fec917228c6a7226348e7104
|
|
| MD5 |
80889b1a076ed168005809a44b0d3516
|
|
| BLAKE2b-256 |
24c15e8594f399fda3cf1708e4a7af1c58528358a54bc04115e490f71a759e03
|
File details
Details for the file polarspark-0.2.5rc3-py3-none-any.whl.
File metadata
- Download URL: polarspark-0.2.5rc3-py3-none-any.whl
- Upload date:
- Size: 464.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d0609a59c877d77547976aa6909d3f6943b48fc0a63f534291ac67008e9276f1
|
|
| MD5 |
5218366b9901e700454173cbcb1538b1
|
|
| BLAKE2b-256 |
f044b237f1906ccfe15796601d89fcd982a79360bb071f18e31dfc76cd7c4586
|