Skip to main content

The Merge Algebra Toolkit โ€” composable, streaming, verified CRDT merge for datasets

Project description

๐Ÿ”€ crdt-merge

Conflict-free merge, dedup & sync for DataFrames, JSON and datasets โ€” powered by CRDTs

PyPI Python 3.9+ License Tests: 133 passed

Merge any two datasets in one function call. No conflicts. No coordination. No data loss.

Quick Start โ€ข What's New in v0.3.0 โ€ข Strategies โ€ข Streaming โ€ข Benchmarks โ€ข API Reference โ€ข All Languages


๐ŸŒ Available in Every Language

Language Package Install Repo
Python ๐Ÿ crdt-merge pip install crdt-merge You are here
TypeScript crdt-merge npm install crdt-merge crdt-merge-ts
Rust ๐Ÿฆ€ crdt-merge cargo add crdt-merge crdt-merge-rs
Java โ˜• io.optitransfer:crdt-merge Maven / Gradle crdt-merge-java
CLI ๐Ÿ–ฅ๏ธ included in Rust cargo install crdt-merge crdt-merge-rs

๐Ÿค— Try it in the browser โ†’


๐Ÿ†• What's New in v0.3.0

"The Schema Release" โ€” from simple merge to composable merge toolkit

Feature What It Does Why It Matters
๐ŸŽฏ Composable Merge Strategies Define per-column merge rules (LWW, MaxWins, UnionSet, Priorityโ€ฆ) True CRDT commutativity โ€” order never matters
๐ŸŒŠ Streaming Merge O(batch_size) memory instead of O(n) Merge 1M+ rows in 3 MB of RAM
๐Ÿ”ฌ Verification Engine Prove CRDT guarantees hold on YOUR data Trust, don't hope (staging for v0.4.0)
๐Ÿ“Š Delta Sync Only sync what changed since last merge 95%+ bandwidth savings (staging for v0.5.0)

100% backward compatible โ€” all existing merge(), dedup(), diff() APIs work exactly as before.

๐Ÿ“‹ Version History
Version Codename Highlights
v0.3.0 The Schema Release Composable strategies, streaming merge, 1M+ row scale
v0.2.0 IP Protection Apache-2.0 license, patent protection, all 4 languages
v0.1.0 Launch Core merge, dedup, diff, CRDT types, 5 modules

๐ŸŽฏ The Problem

You have two versions of a dataset. Maybe two crawlers ran in parallel. Maybe two annotators edited the same file. Maybe you're merging community contributions.

Today: Write custom merge scripts, lose data, or block on a coordinator.

With crdt-merge: One function call. Zero conflicts. Mathematically guaranteed.

from crdt_merge import merge

merged = merge(df_a, df_b, key="id")  # done.

โšก Quick Start

pip install crdt-merge                     # zero dependencies (pure Python)
pip install crdt-merge[pandas]             # with pandas support
pip install crdt-merge[datasets]           # with HuggingFace Datasets support
pip install crdt-merge[fast]               # with orjson + xxhash for max speed
pip install crdt-merge[all]                # everything

Merge Two DataFrames

import pandas as pd
from crdt_merge import merge

# Two contributors edited the same dataset
df_a = pd.DataFrame({"id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"]})
df_b = pd.DataFrame({"id": [2, 3, 4], "name": ["Robert", "Charlie", "Diana"]})

merged = merge(df_a, df_b, key="id")
# id=1: Alice (only in A)
# id=2: Robert (B wins โ€” latest)
# id=3: Charlie (same in both)
# id=4: Diana (only in B)

Merge Two HuggingFace Datasets

from crdt_merge import merge_datasets

merged = merge_datasets("user/dataset-v1", "user/dataset-v2", key="id")

Deduplicate Anything

from crdt_merge import dedup

texts = ["Hello world", "hello  world", "HELLO WORLD", "Something else"]
unique, duplicate_indices = dedup(texts)
# unique = ["Hello world", "Something else"]  (case/whitespace normalized)

Deep-Merge JSON/Configs

from crdt_merge import merge_dicts

config_a = {"model": {"name": "bert", "layers": 12}, "tags": ["nlp"]}
config_b = {"model": {"name": "bert-large", "dropout": 0.1}, "tags": ["qa"]}

merged = merge_dicts(config_a, config_b)
# {"model": {"name": "bert-large", "layers": 12, "dropout": 0.1}, "tags": ["nlp", "qa"]}

See What Changed

from crdt_merge import diff

changes = diff(df_old, df_new, key="id")
print(changes["summary"])
# "+5 added, -2 removed, ~3 modified, =990 unchanged"

๐ŸŽฏ Composable Merge Strategies

New in v0.3.0 โ€” Define exactly how each column resolves conflicts

Instead of "B always wins", define per-column merge rules:

from crdt_merge.strategies import MergeSchema, LWW, MaxWins, MinWins, UnionSet, Priority

schema = MergeSchema(
    default=LWW(),                                    # newest timestamp wins (default)
    score=MaxWins(),                                   # highest score always wins
    priority=MinWins(),                                # lowest priority number wins
    tags=UnionSet(separator=","),                       # merge comma-separated sets
    status=Priority(["draft", "review", "published"]), # defined progression
)

# Resolve a single row conflict
merged_row = schema.resolve_row(row_a, row_b, timestamp_col="_ts")

# Resolve entire datasets
merged = schema.resolve_all(dataset_a, dataset_b, key="_id", timestamp_col="_ts")

8 Built-in Strategies

Strategy What It Does Example
LWW() Latest timestamp wins Name, email โ€” last edit wins
MaxWins() Highest value wins Scores, ratings, counters
MinWins() Lowest value wins Priority numbers, prices
UnionSet(sep) Merge delimited sets "a,b" + "b,c" โ†’ "a,b,c"
Priority(order) Defined progression draft โ†’ review โ†’ published
Concat(sep) Concatenate + dedup Append notes, merge logs
LongestWins() Longest string wins Descriptions, bios
Custom(fn) Your function Any custom logic

Why this matters: LWW().resolve() is truly commutative โ€” resolve(A, B) == resolve(B, A) regardless of argument order. This is the mathematically correct CRDT merge path.


๐ŸŒŠ Streaming Merge

New in v0.3.0 โ€” Merge datasets larger than your RAM

from crdt_merge.streaming import merge_stream, merge_sorted_stream

# Unsorted data โ€” batched merge with O(batch_size) memory
for batch in merge_stream(large_a, large_b, key="_id", batch_size=5000):
    write_batch(batch)

# Pre-sorted data โ€” true streaming, constant memory
def sorted_gen_a():
    for row in read_csv_streaming("a.csv"):
        yield row

for batch in merge_sorted_stream(sorted_gen_a(), sorted_gen_b(), key="_id"):
    write_batch(batch)  # each batch is batch_size rows, memory stays flat

Memory Model

Approach Memory Scale Limit
merge() O(n) โ€” grows with data ~2M rows (8GB RAM)
merge_stream() O(batch_size) โ€” configurable Unlimited (disk-bound)
merge_sorted_stream() O(batch_size) โ€” constant Unlimited (disk-bound)

At 1M rows, merge_sorted_stream uses 3 MB regardless of input size. The classic merge() would need 688 MB.


๐Ÿง  Why CRDTs

CRDT = Conflict-free Replicated Data Type. A data structure with one mathematical superpower:

Any two copies can merge โ€” in any order, at any time โ€” and the result is always identical and always correct.

Three mathematical guarantees (proven, not hoped):

Property What it means
Commutative merge(A, B) == merge(B, A) โ€” order doesn't matter
Associative merge(merge(A, B), C) == merge(A, merge(B, C)) โ€” grouping doesn't matter
Idempotent merge(A, A) == A โ€” re-merging is safe

This means: zero coordination, zero locks, zero conflicts. Two workers can independently edit a dataset and merge later โ€” the result is mathematically guaranteed correct.

Note: The core merge() function uses "B-wins" overlay semantics (like git merge remote). For true order-independent commutativity, use MergeSchema with LWW() โ€” see Composable Merge Strategies.

Built-in CRDT Types

Type Use Case Example
GCounter Grow-only counters Download counts, page views
PNCounter Increment + decrement Stock levels, balances
LWWRegister Single value (latest wins) Name, email, status fields
ORSet Add/remove set Tags, memberships, dedup sets
LWWMap Key-value map Row merges, config objects

๐Ÿ“Š Benchmarks

Core Operations (v0.2.0 baseline)

Tested on real data (rotten_tomatoes dataset, 8,530 rows):

Operation Size Time Throughput
DataFrame merge 1K + 1K โ†’ 1.5K 3.6ms 411K rows/sec
DataFrame merge 10K + 10K โ†’ 15K 42.6ms 352K rows/sec
DataFrame merge 50K + 50K โ†’ 75K 234ms 320K rows/sec
Exact dedup 9K texts 76ms 118K texts/sec
GCounter ops 100K increments - 8.6M ops/sec
OR-Set ops 10K adds - 250K+ ops/sec

v0.3.0 Stress Results (224 measurements)

Tested in sandbox (1.9 GB RAM):

Module Scale Throughput Memory Notes
Core merge 500K + 500K 57K rows/sec 688 MB Linear scaling, predictable
Core merge 2M rows 55K rows/sec ~1.5 GB Sandbox ceiling
Strategies 1M rows 58K rows/sec Proportional All 8 strategies verified
merge_stream 1M rows 115K rows/sec O(batch) 2ร— faster than core
merge_sorted_stream 1M rows 200K rows/sec 3 MB flat โญ Memory constant at any scale
Delta compute 500K rows 953K rows/sec Minimal Near 1M/sec โ€” sync is instant
Delta apply 500K โ†’ 500K 253K rows/sec Proportional Full round-trip
CRDT verification 500 trials All pass - Commutativity, associativity, idempotency โœ…

๐Ÿ““ Run A100 Stress Tests โ†’ Push past sandbox limits with 80GB GPU RAM

๐Ÿ“‹ Full stress reports

Zero dependencies. Pure Python. Works offline. Works everywhere.


๐Ÿ“– API Reference

Core (since v0.1.0)

merge(df_a, df_b, key=None, timestamp_col=None, prefer="latest", dedup=True)

Merge two DataFrames (pandas, polars, or list of dicts).

  • key: Column to match rows. None = append + dedup.
  • timestamp_col: Column with timestamps for conflict resolution.
  • prefer: "latest" (B wins), "a", or "b".
  • dedup: Remove exact duplicate rows.

dedup(items, method="exact", threshold=0.85)

Deduplicate a list of strings. Returns (unique_items, duplicate_indices).

  • method: "exact" or "fuzzy" (bigram similarity).
  • threshold: Similarity threshold for fuzzy dedup (0.0-1.0).

diff(df_a, df_b, key)

Show what changed between two DataFrames. Returns added, removed, modified, unchanged counts.

merge_dicts(a, b, timestamps_a=None, timestamps_b=None)

Deep-merge two dicts with CRDT semantics. Nested dicts recurse, lists concatenate + dedup.

merge_datasets(dataset_a, dataset_b, key=None, ...)

Merge two HuggingFace Dataset objects or dataset names. Requires pip install crdt-merge[datasets].

dedup_dataset(dataset, columns=None, method="exact", threshold=0.85)

Deduplicate a HuggingFace Dataset. Requires pip install crdt-merge[datasets].

DedupIndex(node_id)

Distributed dedup index backed by CRDT OR-Set. Multiple workers build indices independently, then merge.

MinHashDedup(num_hashes=128, threshold=0.5)

Locality-sensitive hashing for O(n) near-duplicate detection at scale.

Strategies (since v0.3.0)

MergeSchema(default=LWW(), **column_strategies)

Define per-column merge strategies. Apply to individual rows or entire datasets.

schema = MergeSchema(default=LWW(), score=MaxWins(), tags=UnionSet(","))
merged_row = schema.resolve_row(row_a, row_b, timestamp_col="_ts")
merged_all = schema.resolve_all(dataset_a, dataset_b, key="_id", timestamp_col="_ts")

Strategy Classes

Class Constructor Resolve Semantics
LWW() No args Newest timestamp wins
MaxWins() No args max(a, b)
MinWins() No args min(a, b)
UnionSet(separator) "," or any delimiter Set union of delimited values
Priority(order) ["draft", "published"] Later in list wins
Concat(separator) " " or any delimiter Concatenate + dedup tokens
LongestWins() No args Longest string wins
Custom(fn) fn(a, b, ts_a, ts_b) โ†’ value Your function

Streaming (since v0.3.0)

merge_stream(a, b, key="_id", batch_size=5000)

Batched merge for large datasets. Yields lists of merged rows. O(batch_size) memory.

merge_sorted_stream(a_iter, b_iter, key="_id", batch_size=5000)

True streaming merge for pre-sorted generators. Constant memory regardless of input size.

StreamStats

Attached to stream results via .stats โ€” tracks batches_emitted, rows_emitted, peak_memory_mb, elapsed_sec.


๐Ÿ—๏ธ Use Cases

  • Dataset curation: Multiple annotators edit simultaneously โ€” merge without conflicts
  • Parallel crawlers: Two crawlers produce overlapping data โ€” merge + dedup automatically
  • Model training: Merge training logs, configs, and metrics from distributed runs
  • Community datasets: Accept contributions from multiple forks without merge conflicts
  • Data pipelines: Incremental processing with automatic state reconciliation
  • Offline-first apps: Sync data between devices that were offline for days
  • Edge computing: Stream-merge sensor data from thousands of IoT nodes in constant memory
  • Multi-tenant SaaS: Per-column merge strategies let different teams own different fields

๐Ÿค Contributing

PRs welcome! Run tests with:

pip install -e ".[dev]"
pytest tests/ -v

๐Ÿ“„ License

Licensed under the Apache License, Version 2.0.

Contributing? By opening a pull request, you agree to our Contributor License Agreement.

Copyright 2024โ€“2026 Ryan Gillespie / Optitransfer. See NOTICE for attribution requirements.

For commercial licensing inquiries: rgillespie83@icloud.com, data@optitransfer.ch


Built with math, not hope. ๐Ÿงฌ

โญ Star on GitHub โ€ข ๐Ÿค— Try on HuggingFace โ€ข ๐Ÿ“ฆ PyPI

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

crdt_merge-0.4.0.tar.gz (54.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

crdt_merge-0.4.0-py3-none-any.whl (41.8 kB view details)

Uploaded Python 3

File details

Details for the file crdt_merge-0.4.0.tar.gz.

File metadata

  • Download URL: crdt_merge-0.4.0.tar.gz
  • Upload date:
  • Size: 54.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for crdt_merge-0.4.0.tar.gz
Algorithm Hash digest
SHA256 077332dc09de6464f5a5d672bc17e25ebf79385ee1a7ddc581fa4cc28a4c28da
MD5 4eb502f748662d82084103b2e9d1de05
BLAKE2b-256 a37cf8fa87d81d8e9a1b89cd36ae925d85b4a5959dd30cd240f20b6d888c7301

See more details on using hashes here.

File details

Details for the file crdt_merge-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: crdt_merge-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 41.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for crdt_merge-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 cc80e44965c5cd954135f03e772e3e02cb1dd488c9a9a6dd7610c96749761a3c
MD5 1052934cab7e4e608be5a1a409b08ac1
BLAKE2b-256 d8313e7fa56b91f48cf0545d60f8d17ce0ed52b4c0940b74bd8cf61276c8a569

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page