Skip to main content

Apache Spark on Polars

Project description

CI PyPI Python License

 ____       _              ____                   _    
|  _ \ ___ | | __ _ _ __  / ___| _ __   __ _ _ __| | __
| |_) / _ \| |/ _` | '__| \___ \| '_ \ / _` | '__| |/ /
|  __/ (_) | | (_| | |     ___) | |_) | (_| | |  |   < 
|_|   \___/|_|\__,_|_|    |____/| .__/ \__,_|_|  |_|\_\
                                |_|                    

๐Ÿš€ Apache Spark on Polars

Polar Spark brings the PySpark API to Polars, optimized for single-machine workloads.

It is designed as a drop-in replacement for PySpark in scenarios where a full Spark cluster is not needed. A common use case is running fast, lightweight unit tests in CI/CD pipelines ๐Ÿงช.

Instead of relying on the JVM-based Spark engine, Polar Spark runs on Polarsโ€™ Lazy API, powered by a high-performance Rust execution engine ๐Ÿฆ€. This avoids the overhead of the JVM, which can be slow and heavy for small or local workloads.

By leveraging Polars, Polar Spark automatically benefits from:

  • ๐Ÿš€ Advanced query optimization
  • ๐Ÿงต Efficient multithreading
  • ๐Ÿ–ฅ๏ธ Excellent performance on modern CPUs

๐ŸŽฏ Goal: Make Polar Spark a seamless PySpark replacement whenever workloads fit on a single machine or within local resource limits.

Installation

pip install polarspark==0.2.2a4

Examples:

Spark session

try:            
    from polarspark.sql.session import SparkSession
except Exception:
    from pyspark.sql.session import SparkSession

spark = SparkSession.builder.master("local").appName("myapp").getOrCreate()

print(spark)
print(type(spark))

>>> <polarspark.sql.session.SparkSession object at 0x1043bdd90>
>>> <class 'polarspark.sql.session.SparkSession'>

DataFrame API

try:
    from polarspark.sql import Row
    from polarspark.sql.types import *
except Exception:
    from pyspark.sql import Row
    from pyspark.sql.types import *    
from pprint import pprint
d = [{'name': 'Alice', 'age': 1}, 
     {'name': 'Tome', 'age': 100}, 
     {'name': 'Sim', 'age': 99}]
df = spark.createDataFrame(d)
rows = df.collect()

SQL

spark.sql("CREATE TABLE input_table (value string) USING parquet")
spark.sql("INSERT INTO input_table VALUES (1), (2), (3)")

spark.sql("""
    SELECT * 
    FROM input_table i 
        JOIN my_table m 
    ON i.value = m.age
""").show()

API

pprint(rows)
>>> [Row(age=1, name='Alice'),
>>>  Row(age=100, name='Tome'),
>>>  Row(age=99, name='Sim')]
df.printSchema()
>>> root
>>>  |-- age: long (nullable = true)
>>>  |-- name: string (nullable = true)
# With schema
schema = StructType([
            StructField("name", StringType(), True),
            StructField("age", IntegerType(), True)])
df_no_rows = spark.createDataFrame([], schema=schema)

print(df_no_rows.isEmpty())
>>> True
# or using Spark DDL
df = spark.createDataFrame([("Alice", 3), ("Ben", 5)], schema="name STRING, age INT")
print(df.isEmpty())
>>> False

Read / write Parquet, Delta, CSV etc.

base_path = "/var/tmp"

df1 = spark.read.format("json").load([f"{base_path}/data.json",
                                     f"{base_path}/data.json"
                                     ])
df2 = spark.read.json([f"{base_path}/data.json",
                      f"{base_path}/data.json"])


df1.write.format("csv").save(f"{base_path}/data_json_to_csv.csv", mode="overwrite")

df1 = spark.read.format("csv").load([f"{base_path}/data_json_to_csv.csv",
                                       f"{base_path}/data_json_to_csv.csv"])

df1 = spark.read.format("parquet").load([f"{base_path}/data_json_to_parquet.parquet",
                                       f"{base_path}/data_json_to_parquet.parquet"])
df2 = spark.read.parquet(f"{base_path}/data_json_to_parquet.parquet",
                               f"{base_path}/data_json_to_parquet.parquet")

Streaming (Stateless)

df = self.spark.readStream.format("rate").load()
q = df.writeStream.toTable("output_table", format="parquet", checkpointLocation=tmpdir)
q.stop()
result = self.spark.sql("SELECT value FROM output_table").collect()    

Streaming (foreachBatch)

def collectBatch(batch_df, batch_id):
    batch_df.write.format("parquet").mode("overwrite").saveAsTable("test_table1")

df = self.spark.readStream.format("text").load("polarspark/test_support/sql/streaming")
q = df.writeStream.foreachBatch(collectBatch).start()
q.processAllAvailable()
collected = self.spark.sql("select * from test_table1").collect()

In Memory Catalog

df.write.saveAsTable("my_table")
spark.sql("select * from my_table").show()

Some more:

Filter

pprint(df.offset(1).first())
>>>  Row(age=100, name='Tome')
df.show()

shape: (3, 2)
โ”Œโ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ age โ”† name     โ”‚
โ”‚ --- โ”† ---      โ”‚
โ”‚ i64 โ”† str      โ”‚
โ•žโ•โ•โ•โ•โ•โ•ชโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ก
โ”‚ 1   โ”† Alice    โ”‚
โ”‚ 100 โ”† Tome     โ”‚
โ”‚ 99  โ”† Sim      โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
df.explain()
                 0
   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
   โ”‚
   โ”‚  โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
   โ”‚  โ”‚ DF ["age", "name"]  โ”‚
 0 โ”‚  โ”‚ PROJECT */2 COLUMNS โ”‚
   โ”‚  โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ
print(repr(df))
>>>  DataFrame[age: bigint, name: string]
print(df.count())
>>>  3
def func(row):
    print("Row -> {}".format(row))

df.foreach(func)

df = spark.createDataFrame(
    [(14, "Tom"), (23, "Alice"), (16, "Bob"), (16, "Bob")], ["age", "name"]
)

def func(itr):
    for person in itr:
        print(person)
        print("Person -> {}".format(person.name))
df.foreachPartition(func)

df.show()
df.distinct().show()

NOTE: Some of the features are not directly mapped but relies on Polars. e.g. df.show() or df.explain() will print polars relevant method output

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

polarspark-0.2.5rc3.tar.gz (416.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

polarspark-0.2.5rc3-py3-none-any.whl (464.7 kB view details)

Uploaded Python 3

File details

Details for the file polarspark-0.2.5rc3.tar.gz.

File metadata

  • Download URL: polarspark-0.2.5rc3.tar.gz
  • Upload date:
  • Size: 416.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for polarspark-0.2.5rc3.tar.gz
Algorithm Hash digest
SHA256 dd94d68754a25aabdf06cfc846edc8263b129ce3fec917228c6a7226348e7104
MD5 80889b1a076ed168005809a44b0d3516
BLAKE2b-256 24c15e8594f399fda3cf1708e4a7af1c58528358a54bc04115e490f71a759e03

See more details on using hashes here.

File details

Details for the file polarspark-0.2.5rc3-py3-none-any.whl.

File metadata

  • Download URL: polarspark-0.2.5rc3-py3-none-any.whl
  • Upload date:
  • Size: 464.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for polarspark-0.2.5rc3-py3-none-any.whl
Algorithm Hash digest
SHA256 d0609a59c877d77547976aa6909d3f6943b48fc0a63f534291ac67008e9276f1
MD5 5218366b9901e700454173cbcb1538b1
BLAKE2b-256 f044b237f1906ccfe15796601d89fcd982a79360bb071f18e31dfc76cd7c4586

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page