Partition-aware MinHash LSH deduplication for large-scale text data curation on Apache Spark
Project description
distributed-curator
Partition-aware MinHash LSH deduplication library for large-scale text data curation on Apache Spark.
Headline
2.53 billion documents deduplicated on a 63-node EMR cluster for ~$750 using partition-aware MinHash LSH — zero shuffle during local deduplication.
Quick Start
pip install distributed-curator
from distributed_curator import partition_aware_deduplicate, get_jar_path
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.jars", get_jar_path()) \
.getOrCreate()
# input_df must have a "doc_id" column and a text column
result = partition_aware_deduplicate(
spark=spark,
input_df=df,
text_column="text",
similarity_threshold=0.8,
num_partitions=1000,
)
# result has original columns + "representative_id" and "is_duplicate"
unique_docs = result.filter(~result.is_duplicate)
How It Works
The pipeline assigns documents to partitions based on LSH band hashes so that similar documents are co-located. All comparisons happen locally within partitions with no shuffle, then a two-phase union-find merges components across partition boundaries.
| Step | What happens | Shuffle? |
|---|---|---|
| 1. MinHash | Cython/NumPy SIMD computes signatures via pandas_udf | No |
| 2. Partition assignment | Scala UDF maps LSH bands to partition IDs | No |
| 3. Identity repartition | Documents move to assigned partitions | Yes (once) |
| 4. Local dedup | Scala mapPartitions finds similar pairs within each partition | No |
| 5a. Local union-find | Scala partition-local connected components | No |
| 5b. Global union-find | Driver-side union-find with Eclipse Collections LongLongHashMap | No |
| 6. Mark duplicates | Join results back to input | Yes (once) |
For a detailed architecture walkthrough, see docs/architecture.md.
Benchmark Results
All benchmarks run on Common Crawl WET files (CC-MAIN-2024-22) with similarity_threshold=0.9, num_hashes=64, num_bands=8.
| Scale | Documents | Duplicates | Rate | Cluster | Time | Cost |
|---|---|---|---|---|---|---|
| 9K WET | 253M | 55M | 21.82% | 9× r5ad.8xlarge | ~1.5 hr | ~$50 |
| 90K WET | 2.53B | 827.7M | 32.76% | 63× r6gd.8xlarge | ~4.5 hr | ~$750 |
Configuration
partition_aware_deduplicate(
spark, # SparkSession
input_df, # DataFrame with doc_id + text columns
text_column="text", # name of the text column
similarity_threshold=0.8, # Jaccard similarity threshold (0.0–1.0)
num_hashes=64, # MinHash signature length
num_bands=8, # LSH bands (num_hashes must be divisible by num_bands)
num_partitions=1000, # number of partitions for dedup
ngram=9, # character n-gram size for shingling
checkpoint_path=None, # [optional] S3/HDFS path to cache intermediate results
enable_diagnostics=False, # [optional] enable driver memory logging and heap capture
)
When to tune:
similarity_threshold: lower catches more near-duplicates, higher is stricter. 0.8–0.9 is typical for web text.num_partitions: set tospark.sql.shuffle.partitions. More partitions = less memory per partition but more overhead.num_bands/num_hashes: controls the LSH probability curve. More bands = higher recall but more comparisons.num_hashesmust be divisible bynum_bands.checkpoint_path: recommended for runs over 1K WET files. Saves MinHash signatures so you don't recompute on retry.ngram: 9 works well for English web text. Shorter n-grams increase recall but reduce precision.
Version Compatibility
| Component | Supported | Planned |
|---|---|---|
| PySpark | 3.5.x | 4.0+ |
| Python | 3.9, 3.10, 3.11, 3.12 | |
| Scala | 2.12 | |
| AWS EMR | 7.5–7.12 (Spark 3.5) | |
| Java | 8, 11, 17 (runtime) |
The Scala UDF JAR is compiled against Spark 3.5. Spark 4.0 support requires recompilation due to breaking API changes in StructType.
Documentation
- Architecture — detailed pipeline walkthrough, design decisions, and performance characteristics
- EMR Deployment — Terraform setup, bootstrap configuration, and spark-submit examples
- Contributing — development setup, running tests, and how to submit changes
License
Apache License 2.0 — see LICENSE for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file distributed_curator-0.1.4.tar.gz.
File metadata
- Download URL: distributed_curator-0.1.4.tar.gz
- Upload date:
- Size: 15.5 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.25
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2f2994936555eae1ec579151ec8f37d1410cbcb7e32abd52c24c95e97b89433c
|
|
| MD5 |
728a658627977d666bd3ddc1f3d3ca87
|
|
| BLAKE2b-256 |
eaf9b7bb0a8b4ac128dd73712b99a99fec240424f88fcb6de1fb436b7abb0a09
|
File details
Details for the file distributed_curator-0.1.4-cp39-cp39-macosx_26_0_arm64.whl.
File metadata
- Download URL: distributed_curator-0.1.4-cp39-cp39-macosx_26_0_arm64.whl
- Upload date:
- Size: 15.5 MB
- Tags: CPython 3.9, macOS 26.0+ ARM64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.25
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e7c3ab4485664547ea764d7d682965a49625735f28f7441a08ec092307cb51e0
|
|
| MD5 |
b04fa3e2d392487f06776fb67306e434
|
|
| BLAKE2b-256 |
be8e7ab3a26cf80d8e66c0a3d165e209ebb4a42521525d2ad296f44a2aac7c1c
|