The Merge Algebra Toolkit โ composable, streaming, verified CRDT merge for datasets
Project description
๐ crdt-merge
Conflict-free merge, dedup & sync for DataFrames, JSON and datasets โ powered by CRDTs
Merge any two datasets in one function call. No conflicts. No coordination. No data loss.
Quick Start โข What's New in v0.3.0 โข Strategies โข Streaming โข Benchmarks โข API Reference โข All Languages
๐ Available in Every Language
| Language | Package | Install | Repo |
|---|---|---|---|
| Python ๐ | crdt-merge |
pip install crdt-merge |
You are here |
| TypeScript | crdt-merge |
npm install crdt-merge |
crdt-merge-ts |
| Rust ๐ฆ | crdt-merge |
cargo add crdt-merge |
crdt-merge-rs |
| Java โ | io.optitransfer:crdt-merge |
Maven / Gradle | crdt-merge-java |
| CLI ๐ฅ๏ธ | included in Rust | cargo install crdt-merge |
crdt-merge-rs |
๐ What's New in v0.3.0
"The Schema Release" โ from simple merge to composable merge toolkit
| Feature | What It Does | Why It Matters |
|---|---|---|
| ๐ฏ Composable Merge Strategies | Define per-column merge rules (LWW, MaxWins, UnionSet, Priorityโฆ) | True CRDT commutativity โ order never matters |
| ๐ Streaming Merge | O(batch_size) memory instead of O(n) | Merge 1M+ rows in 3 MB of RAM |
| ๐ฌ Verification Engine | Prove CRDT guarantees hold on YOUR data | Trust, don't hope (staging for v0.4.0) |
| ๐ Delta Sync | Only sync what changed since last merge | 95%+ bandwidth savings (staging for v0.5.0) |
100% backward compatible โ all existing merge(), dedup(), diff() APIs work exactly as before.
๐ Version History
| Version | Codename | Highlights |
|---|---|---|
| v0.3.0 | The Schema Release | Composable strategies, streaming merge, 1M+ row scale |
| v0.2.0 | IP Protection | Apache-2.0 license, patent protection, all 4 languages |
| v0.1.0 | Launch | Core merge, dedup, diff, CRDT types, 5 modules |
๐ฏ The Problem
You have two versions of a dataset. Maybe two crawlers ran in parallel. Maybe two annotators edited the same file. Maybe you're merging community contributions.
Today: Write custom merge scripts, lose data, or block on a coordinator.
With crdt-merge: One function call. Zero conflicts. Mathematically guaranteed.
from crdt_merge import merge
merged = merge(df_a, df_b, key="id") # done.
โก Quick Start
pip install crdt-merge # zero dependencies (pure Python)
pip install crdt-merge[pandas] # with pandas support
pip install crdt-merge[datasets] # with HuggingFace Datasets support
pip install crdt-merge[fast] # with orjson + xxhash for max speed
pip install crdt-merge[all] # everything
Merge Two DataFrames
import pandas as pd
from crdt_merge import merge
# Two contributors edited the same dataset
df_a = pd.DataFrame({"id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"]})
df_b = pd.DataFrame({"id": [2, 3, 4], "name": ["Robert", "Charlie", "Diana"]})
merged = merge(df_a, df_b, key="id")
# id=1: Alice (only in A)
# id=2: Robert (B wins โ latest)
# id=3: Charlie (same in both)
# id=4: Diana (only in B)
Merge Two HuggingFace Datasets
from crdt_merge import merge_datasets
merged = merge_datasets("user/dataset-v1", "user/dataset-v2", key="id")
Deduplicate Anything
from crdt_merge import dedup
texts = ["Hello world", "hello world", "HELLO WORLD", "Something else"]
unique, duplicate_indices = dedup(texts)
# unique = ["Hello world", "Something else"] (case/whitespace normalized)
Deep-Merge JSON/Configs
from crdt_merge import merge_dicts
config_a = {"model": {"name": "bert", "layers": 12}, "tags": ["nlp"]}
config_b = {"model": {"name": "bert-large", "dropout": 0.1}, "tags": ["qa"]}
merged = merge_dicts(config_a, config_b)
# {"model": {"name": "bert-large", "layers": 12, "dropout": 0.1}, "tags": ["nlp", "qa"]}
See What Changed
from crdt_merge import diff
changes = diff(df_old, df_new, key="id")
print(changes["summary"])
# "+5 added, -2 removed, ~3 modified, =990 unchanged"
๐ฏ Composable Merge Strategies
New in v0.3.0 โ Define exactly how each column resolves conflicts
Instead of "B always wins", define per-column merge rules:
from crdt_merge.strategies import MergeSchema, LWW, MaxWins, MinWins, UnionSet, Priority
schema = MergeSchema(
default=LWW(), # newest timestamp wins (default)
score=MaxWins(), # highest score always wins
priority=MinWins(), # lowest priority number wins
tags=UnionSet(separator=","), # merge comma-separated sets
status=Priority(["draft", "review", "published"]), # defined progression
)
# Resolve a single row conflict
merged_row = schema.resolve_row(row_a, row_b, timestamp_col="_ts")
# Resolve entire datasets
merged = schema.resolve_all(dataset_a, dataset_b, key="_id", timestamp_col="_ts")
8 Built-in Strategies
| Strategy | What It Does | Example |
|---|---|---|
LWW() |
Latest timestamp wins | Name, email โ last edit wins |
MaxWins() |
Highest value wins | Scores, ratings, counters |
MinWins() |
Lowest value wins | Priority numbers, prices |
UnionSet(sep) |
Merge delimited sets | "a,b" + "b,c" โ "a,b,c" |
Priority(order) |
Defined progression | draft โ review โ published |
Concat(sep) |
Concatenate + dedup | Append notes, merge logs |
LongestWins() |
Longest string wins | Descriptions, bios |
Custom(fn) |
Your function | Any custom logic |
Why this matters: LWW().resolve() is truly commutative โ resolve(A, B) == resolve(B, A) regardless of argument order. This is the mathematically correct CRDT merge path.
๐ Streaming Merge
New in v0.3.0 โ Merge datasets larger than your RAM
from crdt_merge.streaming import merge_stream, merge_sorted_stream
# Unsorted data โ batched merge with O(batch_size) memory
for batch in merge_stream(large_a, large_b, key="_id", batch_size=5000):
write_batch(batch)
# Pre-sorted data โ true streaming, constant memory
def sorted_gen_a():
for row in read_csv_streaming("a.csv"):
yield row
for batch in merge_sorted_stream(sorted_gen_a(), sorted_gen_b(), key="_id"):
write_batch(batch) # each batch is batch_size rows, memory stays flat
Memory Model
| Approach | Memory | Scale Limit |
|---|---|---|
merge() |
O(n) โ grows with data | ~2M rows (8GB RAM) |
merge_stream() |
O(batch_size) โ configurable | Unlimited (disk-bound) |
merge_sorted_stream() |
O(batch_size) โ constant | Unlimited (disk-bound) |
At 1M rows, merge_sorted_stream uses 3 MB regardless of input size. The classic merge() would need 688 MB.
๐ง Why CRDTs
CRDT = Conflict-free Replicated Data Type. A data structure with one mathematical superpower:
Any two copies can merge โ in any order, at any time โ and the result is always identical and always correct.
Three mathematical guarantees (proven, not hoped):
| Property | What it means |
|---|---|
| Commutative | merge(A, B) == merge(B, A) โ order doesn't matter |
| Associative | merge(merge(A, B), C) == merge(A, merge(B, C)) โ grouping doesn't matter |
| Idempotent | merge(A, A) == A โ re-merging is safe |
This means: zero coordination, zero locks, zero conflicts. Two workers can independently edit a dataset and merge later โ the result is mathematically guaranteed correct.
Note: The core
merge()function uses "B-wins" overlay semantics (likegit merge remote). For true order-independent commutativity, useMergeSchemawithLWW()โ see Composable Merge Strategies.
Built-in CRDT Types
| Type | Use Case | Example |
|---|---|---|
GCounter |
Grow-only counters | Download counts, page views |
PNCounter |
Increment + decrement | Stock levels, balances |
LWWRegister |
Single value (latest wins) | Name, email, status fields |
ORSet |
Add/remove set | Tags, memberships, dedup sets |
LWWMap |
Key-value map | Row merges, config objects |
๐ Benchmarks
Core Operations (v0.2.0 baseline)
Tested on real data (rotten_tomatoes dataset, 8,530 rows):
| Operation | Size | Time | Throughput |
|---|---|---|---|
| DataFrame merge | 1K + 1K โ 1.5K | 3.6ms | 411K rows/sec |
| DataFrame merge | 10K + 10K โ 15K | 42.6ms | 352K rows/sec |
| DataFrame merge | 50K + 50K โ 75K | 234ms | 320K rows/sec |
| Exact dedup | 9K texts | 76ms | 118K texts/sec |
| GCounter ops | 100K increments | - | 8.6M ops/sec |
| OR-Set ops | 10K adds | - | 250K+ ops/sec |
v0.3.0 Stress Results (224 measurements)
Tested in sandbox (1.9 GB RAM):
| Module | Scale | Throughput | Memory | Notes |
|---|---|---|---|---|
| Core merge | 500K + 500K | 57K rows/sec | 688 MB | Linear scaling, predictable |
| Core merge | 2M rows | 55K rows/sec | ~1.5 GB | Sandbox ceiling |
| Strategies | 1M rows | 58K rows/sec | Proportional | All 8 strategies verified |
| merge_stream | 1M rows | 115K rows/sec | O(batch) | 2ร faster than core |
| merge_sorted_stream | 1M rows | 200K rows/sec | 3 MB flat | โญ Memory constant at any scale |
| Delta compute | 500K rows | 953K rows/sec | Minimal | Near 1M/sec โ sync is instant |
| Delta apply | 500K โ 500K | 253K rows/sec | Proportional | Full round-trip |
| CRDT verification | 500 trials | All pass | - | Commutativity, associativity, idempotency โ |
๐ Run A100 Stress Tests โ Push past sandbox limits with 80GB GPU RAM
๐ Full stress reports
- v0.3.0 Stress Report โ 224 measurements across 8 suites
- v0.2.0 Breaking Point Report โ Baseline measurements (109 tests)
Zero dependencies. Pure Python. Works offline. Works everywhere.
๐ API Reference
Core (since v0.1.0)
merge(df_a, df_b, key=None, timestamp_col=None, prefer="latest", dedup=True)
Merge two DataFrames (pandas, polars, or list of dicts).
- key: Column to match rows.
None= append + dedup. - timestamp_col: Column with timestamps for conflict resolution.
- prefer:
"latest"(B wins),"a", or"b". - dedup: Remove exact duplicate rows.
dedup(items, method="exact", threshold=0.85)
Deduplicate a list of strings. Returns (unique_items, duplicate_indices).
- method:
"exact"or"fuzzy"(bigram similarity). - threshold: Similarity threshold for fuzzy dedup (0.0-1.0).
diff(df_a, df_b, key)
Show what changed between two DataFrames. Returns added, removed, modified, unchanged counts.
merge_dicts(a, b, timestamps_a=None, timestamps_b=None)
Deep-merge two dicts with CRDT semantics. Nested dicts recurse, lists concatenate + dedup.
merge_datasets(dataset_a, dataset_b, key=None, ...)
Merge two HuggingFace Dataset objects or dataset names. Requires pip install crdt-merge[datasets].
dedup_dataset(dataset, columns=None, method="exact", threshold=0.85)
Deduplicate a HuggingFace Dataset. Requires pip install crdt-merge[datasets].
DedupIndex(node_id)
Distributed dedup index backed by CRDT OR-Set. Multiple workers build indices independently, then merge.
MinHashDedup(num_hashes=128, threshold=0.5)
Locality-sensitive hashing for O(n) near-duplicate detection at scale.
Strategies (since v0.3.0)
MergeSchema(default=LWW(), **column_strategies)
Define per-column merge strategies. Apply to individual rows or entire datasets.
schema = MergeSchema(default=LWW(), score=MaxWins(), tags=UnionSet(","))
merged_row = schema.resolve_row(row_a, row_b, timestamp_col="_ts")
merged_all = schema.resolve_all(dataset_a, dataset_b, key="_id", timestamp_col="_ts")
Strategy Classes
| Class | Constructor | Resolve Semantics |
|---|---|---|
LWW() |
No args | Newest timestamp wins |
MaxWins() |
No args | max(a, b) |
MinWins() |
No args | min(a, b) |
UnionSet(separator) |
"," or any delimiter |
Set union of delimited values |
Priority(order) |
["draft", "published"] |
Later in list wins |
Concat(separator) |
" " or any delimiter |
Concatenate + dedup tokens |
LongestWins() |
No args | Longest string wins |
Custom(fn) |
fn(a, b, ts_a, ts_b) โ value |
Your function |
Streaming (since v0.3.0)
merge_stream(a, b, key="_id", batch_size=5000)
Batched merge for large datasets. Yields lists of merged rows. O(batch_size) memory.
merge_sorted_stream(a_iter, b_iter, key="_id", batch_size=5000)
True streaming merge for pre-sorted generators. Constant memory regardless of input size.
StreamStats
Attached to stream results via .stats โ tracks batches_emitted, rows_emitted, peak_memory_mb, elapsed_sec.
๐๏ธ Use Cases
- Dataset curation: Multiple annotators edit simultaneously โ merge without conflicts
- Parallel crawlers: Two crawlers produce overlapping data โ merge + dedup automatically
- Model training: Merge training logs, configs, and metrics from distributed runs
- Community datasets: Accept contributions from multiple forks without merge conflicts
- Data pipelines: Incremental processing with automatic state reconciliation
- Offline-first apps: Sync data between devices that were offline for days
- Edge computing: Stream-merge sensor data from thousands of IoT nodes in constant memory
- Multi-tenant SaaS: Per-column merge strategies let different teams own different fields
๐ค Contributing
PRs welcome! Run tests with:
pip install -e ".[dev]"
pytest tests/ -v
๐ License
Licensed under the Apache License, Version 2.0.
Contributing? By opening a pull request, you agree to our Contributor License Agreement.
Copyright 2024โ2026 Ryan Gillespie / Optitransfer. See NOTICE for attribution requirements.
For commercial licensing inquiries: rgillespie83@icloud.com, data@optitransfer.ch
Built with math, not hope. ๐งฌ
โญ Star on GitHub โข ๐ค Try on HuggingFace โข ๐ฆ PyPI
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file crdt_merge-0.4.0.tar.gz.
File metadata
- Download URL: crdt_merge-0.4.0.tar.gz
- Upload date:
- Size: 54.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
077332dc09de6464f5a5d672bc17e25ebf79385ee1a7ddc581fa4cc28a4c28da
|
|
| MD5 |
4eb502f748662d82084103b2e9d1de05
|
|
| BLAKE2b-256 |
a37cf8fa87d81d8e9a1b89cd36ae925d85b4a5959dd30cd240f20b6d888c7301
|
File details
Details for the file crdt_merge-0.4.0-py3-none-any.whl.
File metadata
- Download URL: crdt_merge-0.4.0-py3-none-any.whl
- Upload date:
- Size: 41.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cc80e44965c5cd954135f03e772e3e02cb1dd488c9a9a6dd7610c96749761a3c
|
|
| MD5 |
1052934cab7e4e608be5a1a409b08ac1
|
|
| BLAKE2b-256 |
d8313e7fa56b91f48cf0545d60f8d17ce0ed52b4c0940b74bd8cf61276c8a569
|