Skip to main content

Partition-aware MinHash LSH deduplication for large-scale text data curation on Apache Spark

Project description

distributed-curator

Partition-aware MinHash LSH deduplication library for large-scale text data curation on Apache Spark.

Headline

2.53 billion documents deduplicated on a 63-node EMR cluster for ~$500 using partition-aware MinHash LSH — zero shuffle during local deduplication.

Quick Start

pip install distributed-curator
from distributed_curator import partition_aware_deduplicate, get_jar_path
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.jars", get_jar_path()) \
    .getOrCreate()

# input_df must have a "doc_id" column and a text column
result = partition_aware_deduplicate(
    spark=spark,
    input_df=df,
    text_column="text",
    similarity_threshold=0.8,
    num_partitions=1000,
)

# result has original columns + "representative_id" and "is_duplicate"
unique_docs = result.filter(~result.is_duplicate)

How It Works

The pipeline assigns documents to partitions based on LSH band hashes so that similar documents are co-located. All comparisons happen locally within partitions with no shuffle, then a two-phase union-find merges components across partition boundaries.

Step What happens Shuffle?
1. MinHash Cython/NumPy SIMD computes signatures via pandas_udf No
2. Partition assignment Scala UDF maps LSH bands to partition IDs No
3. Identity repartition Documents move to assigned partitions Yes (once)
4. Local dedup Scala mapPartitions finds similar pairs within each partition No
5a. Local union-find Scala partition-local connected components No
5b. Global union-find Driver-side union-find with Eclipse Collections LongLongHashMap No
6. Mark duplicates Join results back to input Yes (once)

For a detailed architecture walkthrough, see docs/architecture.md.

Benchmark Results

All benchmarks run on Common Crawl WET files (CC-MAIN-2024-22) with similarity_threshold=0.9, num_hashes=64, num_bands=8.

Scale Documents Duplicates Rate Cluster Time Cost
9K WET 253M 55M 21.82% 9× r5ad.8xlarge ~1.5 hr ~$50
90K WET 2.53B 827.7M 32.76% 63× r6gd.8xlarge ~4.5 hr ~$500

Configuration

partition_aware_deduplicate(
    spark,                          # SparkSession
    input_df,                       # DataFrame with doc_id + text columns
    text_column="text",             # name of the text column
    similarity_threshold=0.8,       # Jaccard similarity threshold (0.0–1.0)
    num_hashes=64,                  # MinHash signature length
    num_bands=8,                    # LSH bands (num_hashes must be divisible by num_bands)
    num_partitions=1000,            # number of partitions for dedup
    ngram=9,                        # character n-gram size for shingling
    checkpoint_path=None,           # [optional] S3/HDFS path to cache intermediate results
    enable_diagnostics=False,       # [optional] enable driver memory logging and heap capture
)

When to tune:

  • similarity_threshold: lower catches more near-duplicates, higher is stricter. 0.8–0.9 is typical for web text.
  • num_partitions: set to spark.sql.shuffle.partitions. More partitions = less memory per partition but more overhead.
  • num_bands / num_hashes: controls the LSH probability curve. More bands = higher recall but more comparisons. num_hashes must be divisible by num_bands.
  • checkpoint_path: recommended for runs over 1K WET files. Saves MinHash signatures so you don't recompute on retry.
  • ngram: 9 works well for English web text. Shorter n-grams increase recall but reduce precision.

Version Compatibility

Component Supported Planned
PySpark 3.5.x 4.0+
Python 3.9, 3.10, 3.11, 3.12
Scala 2.12
AWS EMR 7.5–7.12 (Spark 3.5)
Java 8, 11, 17 (runtime)

The Scala UDF JAR is compiled against Spark 3.5. Spark 4.0 support requires recompilation due to breaking API changes in StructType.

Documentation

  • Architecture — detailed pipeline walkthrough, design decisions, and performance characteristics
  • EMR Deployment — Terraform setup, bootstrap configuration, and spark-submit examples
  • Contributing — development setup, running tests, and how to submit changes

License

Apache License 2.0 — see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

distributed_curator-0.1.1.tar.gz (15.5 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

distributed_curator-0.1.1-cp39-cp39-macosx_26_0_arm64.whl (15.5 MB view details)

Uploaded CPython 3.9macOS 26.0+ ARM64

File details

Details for the file distributed_curator-0.1.1.tar.gz.

File metadata

  • Download URL: distributed_curator-0.1.1.tar.gz
  • Upload date:
  • Size: 15.5 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.25

File hashes

Hashes for distributed_curator-0.1.1.tar.gz
Algorithm Hash digest
SHA256 fe59758c6401c09fd08e1db2941aae4919cbd6e1e0073a3cbcd778e48f134521
MD5 eb0a1a23add7d35c74d7c04079b79e65
BLAKE2b-256 9a774b0d53ff8809b73d90b60646cf3a6257091da881955760f2b846cee47aa1

See more details on using hashes here.

File details

Details for the file distributed_curator-0.1.1-cp39-cp39-macosx_26_0_arm64.whl.

File metadata

File hashes

Hashes for distributed_curator-0.1.1-cp39-cp39-macosx_26_0_arm64.whl
Algorithm Hash digest
SHA256 d11cb9b99b916c842de871f42d4e27abbe330b1b0920255cb56535a4d79a50ba
MD5 b9de1c1c70402c3eaca8c3a00365f4e5
BLAKE2b-256 ac6a101b0b309803b6dc9bf297cd3db387ae2d6e217bd6f0400ad0bb2e791a19

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page