Efficient large-CSV processing for Apache Spark / Databricks with auto-salting, skew detection, and partition tuning
Project description
saltmill
Efficient large CSV processing for PySpark and Databricks.
saltmill automatically computes optimal salt buckets, partition counts, and Spark configuration for processing CSV files of any size — from a single API call.
The problem
Reading a 500 GB CSV file naively in Spark causes data skew, memory pressure, and slow shuffles. Fixing it requires manually tuning:
- Salt bucket count
- Repartition strategy
spark.sql.shuffle.partitionsspark.sql.files.maxPartitionBytes- Databricks Delta write optimizations
saltmill does all of this automatically, based on file size and cluster parallelism.
Installation
pip install saltmill
PySpark is a peer dependency — saltmill works with whatever version your cluster runs.
Quick start
import saltmill
df = saltmill.read(spark, "s3://my-bucket/large.csv")
That's it. saltmill will:
- Detect file size via Hadoop FileSystem
- Auto-compute salt buckets and partition count
- Apply optimized Spark configs (shuffle partitions, maxPartitionBytes, Delta settings on Databricks)
- Infer schema from a 0.1% sample of the first file
- Read and return a well-partitioned DataFrame
Usage
Module-level function (simplest)
import saltmill
df = saltmill.read(spark, "s3://bucket/huge.csv")
Class-based (more control)
from saltmill import SaltMill
sm = SaltMill(spark, workers=32)
df = sm.read("s3://bucket/huge.csv")
Multiple files
df = saltmill.read(
spark,
["s3://bucket/2024-01.csv", "s3://bucket/2024-02.csv"],
hint_size_gb=500,
)
With explicit schema
df = saltmill.read(
spark,
"s3://bucket/sales.csv",
schema={
"order_id": "long",
"region": "string",
"amount": "double",
"created_at": "timestamp",
},
partition_col="region",
)
With a PySpark StructType
from pyspark.sql.types import StructType, StructField, LongType, StringType
schema = StructType([
StructField("id", LongType(), True),
StructField("name", StringType(), True),
])
df = saltmill.read(spark, "s3://bucket/data.csv", schema=schema)
Preview tuning parameters without reading
sm = SaltMill(spark)
params = sm.tune("s3://bucket/huge.csv", hint_size_gb=500)
print(params.summary())
# saltmill tuning → file: 500.0 GB, workers: 64, salt_buckets: 64,
# partitions: 640, maxPartitionBytes: 64 MB
Write to Delta Lake
sm = SaltMill(spark)
df = sm.read("s3://bucket/huge.csv", partition_col="region")
sm.write_delta(df, "s3://bucket/delta/sales", partition_by="region")
How it works
Salting
saltmill assigns each row a random bucket using:
df.withColumn("_salt", pmod(monotonically_increasing_id(), salt_buckets))
.repartition(num_partitions, partition_col, "_salt")
.drop("_salt")
This breaks data skew even when a join or group-by column is highly imbalanced.
Auto-tuning formula
| Input | Rule |
|---|---|
| File size | 1 salt bucket per 8 GB, rounded to nearest power of 2 |
| Salt buckets | Clamped to [8, 512] |
| Partitions | salt_buckets × 10, rounded up to nearest multiple of worker count |
| maxPartitionBytes | 64 MB (matches default HDFS block) |
Example: 500 GB file, 64 workers
file_size_gb = 500
salt_buckets = round_pow2(500 / 8) = round_pow2(62.5) = 64
num_partitions = 64 × 10 = 640 (already a multiple of 64)
shuffle_partitions = 640
maxPartitionBytes = 64 MB
This matches the pattern proven in production:
# What saltmill does internally
spark.conf.set("spark.sql.shuffle.partitions", 640)
spark.conf.set("spark.sql.files.maxPartitionBytes", 64 * 1024 * 1024)
dfw = (
df.withColumn("_salt", pmod(monotonically_increasing_id(), 64))
.repartition(640, "region", "_salt")
.drop("_salt")
)
Databricks-specific settings
When running on Databricks, saltmill also sets:
spark.databricks.delta.optimizeWrite.enabled = true
spark.databricks.delta.autoCompact.enabled = true
Schema dict shorthand
| Alias | Spark type |
|---|---|
"str", "string" |
StringType |
"int", "integer" |
IntegerType |
"long", "bigint" |
LongType |
"float" |
FloatType |
"double" |
DoubleType |
"bool", "boolean" |
BooleanType |
"date" |
DateType |
"timestamp" |
TimestampType |
"decimal" |
DecimalType(38,10) |
Any Spark SQL type string is also accepted directly (e.g. "decimal(10,2)").
API reference
saltmill.read(spark, paths, *, schema, partition_col, workers, salt_buckets, num_partitions, hint_size_gb, delimiter, encoding, null_value, verbose)
Module-level convenience function. See class docs for parameter details.
SaltMill(spark, *, workers, verbose)
| Parameter | Type | Default | Description |
|---|---|---|---|
spark |
SparkSession | required | Active session |
workers |
int | auto-detected | Worker node count |
verbose |
bool | True | Print tuning summary |
SaltMill.read(paths, *, schema, partition_col, salt_buckets, num_partitions, hint_size_gb, delimiter, encoding, null_value)
| Parameter | Type | Default | Description |
|---|---|---|---|
paths |
str or list | required | CSV path(s) |
schema |
StructType / dict / None | None | Schema or auto-infer |
partition_col |
str or list | None | Extra partition key(s) |
salt_buckets |
int | auto | Override salt bucket count |
num_partitions |
int | auto | Override total partitions |
hint_size_gb |
float | None | File size hint (when detection fails) |
delimiter |
str | "," |
CSV separator |
encoding |
str | "UTF-8" |
File encoding |
null_value |
str | "" |
Null string |
SaltMill.tune(paths, *, salt_buckets, num_partitions, hint_size_gb) → TuningParams
Returns computed tuning parameters without reading any data.
SaltMill.write_delta(df, path, *, partition_by, mode)
Writes a DataFrame to Delta Lake with Databricks-optimized settings.
Development
pip install -e ".[dev]"
pytest
License
Apache 2.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file saltmill_spark-0.2.0.tar.gz.
File metadata
- Download URL: saltmill_spark-0.2.0.tar.gz
- Upload date:
- Size: 21.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cdf62af77e586f4688c71f4a3249580f0513bc961e587970542b4178cedd4bf8
|
|
| MD5 |
f438674d781dd57395e3ee025a4bf170
|
|
| BLAKE2b-256 |
c0b503d90a1562f8ce27e6a0dd2264c7f83c5e835c742b59533692e05b508f05
|
Provenance
The following attestation bundles were made for saltmill_spark-0.2.0.tar.gz:
Publisher:
publish.yml on Yuvaraj-Munirathinam/saltmill
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
saltmill_spark-0.2.0.tar.gz -
Subject digest:
cdf62af77e586f4688c71f4a3249580f0513bc961e587970542b4178cedd4bf8 - Sigstore transparency entry: 2020841983
- Sigstore integration time:
-
Permalink:
Yuvaraj-Munirathinam/saltmill@4d73c1df1a60b4b5491a32d34b876f7ed5227224 -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/Yuvaraj-Munirathinam
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@4d73c1df1a60b4b5491a32d34b876f7ed5227224 -
Trigger Event:
push
-
Statement type:
File details
Details for the file saltmill_spark-0.2.0-py3-none-any.whl.
File metadata
- Download URL: saltmill_spark-0.2.0-py3-none-any.whl
- Upload date:
- Size: 24.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4e68ad1581cd9b303c67fb992b881dc08685ef33bf1931fab705c624b4ffa349
|
|
| MD5 |
eb386ba6cafc84399605d17d7da3a47e
|
|
| BLAKE2b-256 |
f9daa67b671792c0df10df05d516566b03e6b785aeb3920aa7bce7557458f7fd
|
Provenance
The following attestation bundles were made for saltmill_spark-0.2.0-py3-none-any.whl:
Publisher:
publish.yml on Yuvaraj-Munirathinam/saltmill
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
saltmill_spark-0.2.0-py3-none-any.whl -
Subject digest:
4e68ad1581cd9b303c67fb992b881dc08685ef33bf1931fab705c624b4ffa349 - Sigstore transparency entry: 2020842106
- Sigstore integration time:
-
Permalink:
Yuvaraj-Munirathinam/saltmill@4d73c1df1a60b4b5491a32d34b876f7ed5227224 -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/Yuvaraj-Munirathinam
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@4d73c1df1a60b4b5491a32d34b876f7ed5227224 -
Trigger Event:
push
-
Statement type: