Skip to main content

Apache Spark on Polars

Project description

CI PyPI Python License

 ____       _              ____                   _    
|  _ \ ___ | | __ _ _ __  / ___| _ __   __ _ _ __| | __
| |_) / _ \| |/ _` | '__| \___ \| '_ \ / _` | '__| |/ /
|  __/ (_) | | (_| | |     ___) | |_) | (_| | |  |   < 
|_|   \___/|_|\__,_|_|    |____/| .__/ \__,_|_|  |_|\_\
                                |_|                    

๐Ÿš€ Apache Spark on Polars

Polar Spark brings the PySpark API to Polars, optimized for single-machine workloads.

It is designed as a drop-in replacement for PySpark in scenarios where a full Spark cluster is not needed. A common use case is running fast, lightweight unit tests in CI/CD pipelines ๐Ÿงช.

Instead of relying on the JVM-based Spark engine, Polar Spark runs on Polarsโ€™ Lazy API, powered by a high-performance Rust execution engine ๐Ÿฆ€. This avoids the overhead of the JVM, which can be slow and heavy for small or local workloads.

By leveraging Polars, Polar Spark automatically benefits from:

  • ๐Ÿš€ Advanced query optimization
  • ๐Ÿงต Efficient multithreading
  • ๐Ÿ–ฅ๏ธ Excellent performance on modern CPUs

๐ŸŽฏ Goal: Make Polar Spark a seamless PySpark replacement whenever workloads fit on a single machine or within local resource limits.

Installation

pip install polarspark==0.2.2a4

Examples:

Spark session

try:            
    from polarspark.sql.session import SparkSession
except Exception:
    from pyspark.sql.session import SparkSession

spark = SparkSession.builder.master("local").appName("myapp").getOrCreate()

print(spark)
print(type(spark))

>>> <polarspark.sql.session.SparkSession object at 0x1043bdd90>
>>> <class 'polarspark.sql.session.SparkSession'>

DataFrame API

try:
    from polarspark.sql import Row
    from polarspark.sql.types import *
except Exception:
    from pyspark.sql import Row
    from pyspark.sql.types import *    
from pprint import pprint
d = [{'name': 'Alice', 'age': 1}, 
     {'name': 'Tome', 'age': 100}, 
     {'name': 'Sim', 'age': 99}]
df = spark.createDataFrame(d)
rows = df.collect()

SQL

spark.sql("CREATE TABLE input_table (value string) USING parquet")
spark.sql("INSERT INTO input_table VALUES (1), (2), (3)")

spark.sql("""
    SELECT * 
    FROM input_table i 
        JOIN my_table m 
    ON i.value = m.age
""").show()

API

pprint(rows)
>>> [Row(age=1, name='Alice'),
>>>  Row(age=100, name='Tome'),
>>>  Row(age=99, name='Sim')]
df.printSchema()
>>> root
>>>  |-- age: long (nullable = true)
>>>  |-- name: string (nullable = true)
# With schema
schema = StructType([
            StructField("name", StringType(), True),
            StructField("age", IntegerType(), True)])
df_no_rows = spark.createDataFrame([], schema=schema)

print(df_no_rows.isEmpty())
>>> True
# or using Spark DDL
df = spark.createDataFrame([("Alice", 3), ("Ben", 5)], schema="name STRING, age INT")
print(df.isEmpty())
>>> False

Read / write Parquet, Delta, CSV etc.

base_path = "/var/tmp"

df1 = spark.read.format("json").load([f"{base_path}/data.json",
                                     f"{base_path}/data.json"
                                     ])
df2 = spark.read.json([f"{base_path}/data.json",
                      f"{base_path}/data.json"])


df1.write.format("csv").save(f"{base_path}/data_json_to_csv.csv", mode="overwrite")

df1 = spark.read.format("csv").load([f"{base_path}/data_json_to_csv.csv",
                                       f"{base_path}/data_json_to_csv.csv"])

df1 = spark.read.format("parquet").load([f"{base_path}/data_json_to_parquet.parquet",
                                       f"{base_path}/data_json_to_parquet.parquet"])
df2 = spark.read.parquet(f"{base_path}/data_json_to_parquet.parquet",
                               f"{base_path}/data_json_to_parquet.parquet")

Streaming (Stateless)

df = self.spark.readStream.format("rate").load()
q = df.writeStream.toTable("output_table", format="parquet", checkpointLocation=tmpdir)
q.stop()
result = self.spark.sql("SELECT value FROM output_table").collect()    

Streaming (foreachBatch)

def collectBatch(batch_df, batch_id):
    batch_df.write.format("parquet").mode("overwrite").saveAsTable("test_table1")

df = self.spark.readStream.format("text").load("polarspark/test_support/sql/streaming")
q = df.writeStream.foreachBatch(collectBatch).start()
q.processAllAvailable()
collected = self.spark.sql("select * from test_table1").collect()

In Memory Catalog

df.write.saveAsTable("my_table")
spark.sql("select * from my_table").show()

Some more:

Filter

pprint(df.offset(1).first())
>>>  Row(age=100, name='Tome')
df.show()

shape: (3, 2)
โ”Œโ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ age โ”† name     โ”‚
โ”‚ --- โ”† ---      โ”‚
โ”‚ i64 โ”† str      โ”‚
โ•žโ•โ•โ•โ•โ•โ•ชโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ก
โ”‚ 1   โ”† Alice    โ”‚
โ”‚ 100 โ”† Tome     โ”‚
โ”‚ 99  โ”† Sim      โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
df.explain()
                 0
   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
   โ”‚
   โ”‚  โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
   โ”‚  โ”‚ DF ["age", "name"]  โ”‚
 0 โ”‚  โ”‚ PROJECT */2 COLUMNS โ”‚
   โ”‚  โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ
print(repr(df))
>>>  DataFrame[age: bigint, name: string]
print(df.count())
>>>  3
def func(row):
    print("Row -> {}".format(row))

df.foreach(func)

df = spark.createDataFrame(
    [(14, "Tom"), (23, "Alice"), (16, "Bob"), (16, "Bob")], ["age", "name"]
)

def func(itr):
    for person in itr:
        print(person)
        print("Person -> {}".format(person.name))
df.foreachPartition(func)

df.show()
df.distinct().show()

NOTE: Some of the features are not directly mapped but relies on Polars. e.g. df.show() or df.explain() will print polars relevant method output

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

polarspark-0.2.3rc1.tar.gz (413.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

polarspark-0.2.3rc1-py3-none-any.whl (460.4 kB view details)

Uploaded Python 3

File details

Details for the file polarspark-0.2.3rc1.tar.gz.

File metadata

  • Download URL: polarspark-0.2.3rc1.tar.gz
  • Upload date:
  • Size: 413.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for polarspark-0.2.3rc1.tar.gz
Algorithm Hash digest
SHA256 f7d2a09ee313656bd44f1b08b72ac87f7ef170d313e4fb743c71a65aa2085b44
MD5 13313907104341e8a00883850b5e48e2
BLAKE2b-256 61ee8f71786d6292b446e96ef0a287f68118b490f37b41150810fc0b06170b92

See more details on using hashes here.

File details

Details for the file polarspark-0.2.3rc1-py3-none-any.whl.

File metadata

  • Download URL: polarspark-0.2.3rc1-py3-none-any.whl
  • Upload date:
  • Size: 460.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for polarspark-0.2.3rc1-py3-none-any.whl
Algorithm Hash digest
SHA256 1821797c48aa858c4fc7a6348a4d13b4b7ea60ba7f7cbbfdfd90ee0714d147cf
MD5 6beac4a6f22dcc93a7beb880dcfa5929
BLAKE2b-256 d0103ef34207b34005519b9054f42172b00576e9eb8a62524f9cb2d4dca12d21

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page