The Merge Algebra Toolkit โ composable, streaming, verified CRDT merge for datasets
Project description
๐ crdt-merge
Conflict-free merge, dedup & sync for DataFrames, JSON and datasets โ powered by CRDTs
Latest: v0.4.0 "The Trust Release" โ merge provenance, verified merges, 16x streaming speedup
Merge any two datasets in one function call. No conflicts. No coordination. No data loss.
Quick Start โข What's New in v0.4.0 โข Strategies โข Streaming โข Provenance โข Benchmarks โข API Reference โข All Languages
๐ Available in Every Language
| Language | Package | Install | Repo |
|---|---|---|---|
| Python ๐ | crdt-merge |
pip install crdt-merge |
You are here |
| TypeScript | crdt-merge |
npm install crdt-merge |
crdt-merge-ts |
| Rust ๐ฆ | crdt-merge |
cargo add crdt-merge |
crdt-merge-rs |
| Java โ | io.optitransfer:crdt-merge |
Maven / Gradle | crdt-merge-java |
| CLI ๐ฅ๏ธ | included in Rust | cargo install crdt-merge |
crdt-merge-rs |
๐ What's New in v0.4.0
"The Trust Release" โ every merge decision is now auditable and verifiable
| Feature | What It Does | Why It Matters |
|---|---|---|
| ๐ Merge Provenance | Full audit trail of every merge decision โ which source won, what changed, why | Compliance, debugging, trust |
| โ @verified_merge Decorator | Wrap any merge with automatic CRDT property checks | Prove correctness, don't hope |
| โก 16x Streaming Speedup | merge_stream flat at ~400K rows/s regardless of scale |
Was degrading 110K โ 23K in v0.3.0 |
| ๐ Convergence Fix | verify_convergence no longer false-flags valid CRDTs |
Accurate verification |
100% backward compatible โ all existing APIs work exactly as before. 11 modules, 2,820 lines, 175 tests.
๐ What shipped in v0.3.0 "The Schema Release"
| Feature | What It Does | Why It Matters |
|---|---|---|
| ๐ฏ Composable Merge Strategies | Define per-column merge rules (LWW, MaxWins, UnionSet, Priorityโฆ) | True CRDT commutativity โ order never matters |
| ๐ Streaming Merge | O(batch_size) memory instead of O(n) | Merge 1M+ rows in 3 MB of RAM |
| ๐ Delta Sync | Only sync what changed since last merge | 95%+ bandwidth savings |
| ๐ฌ Verification Engine | Prove CRDT guarantees hold on YOUR data | Foundation for v0.4.0 decorator |
๐ Version History
| Version | Codename | Highlights |
|---|---|---|
| v0.4.0 | The Trust Release | Merge provenance, @verified_merge decorator, 16x streaming speedup |
| v0.3.0 | The Schema Release | Composable strategies, streaming merge, delta sync, 1M+ row scale |
| v0.2.0 | IP Protection | Apache-2.0 license, patent protection, all 4 languages |
| v0.1.0 | Launch | Core merge, dedup, diff, CRDT types, 5 modules |
๐ฏ The Problem
You have two versions of a dataset. Maybe two crawlers ran in parallel. Maybe two annotators edited the same file. Maybe you're merging community contributions.
Today: Write custom merge scripts, lose data, or block on a coordinator.
With crdt-merge: One function call. Zero conflicts. Mathematically guaranteed.
from crdt_merge import merge
merged = merge(df_a, df_b, key="id") # done.
โก Quick Start
pip install crdt-merge # zero dependencies (pure Python)
pip install crdt-merge[pandas] # with pandas support
pip install crdt-merge[datasets] # with HuggingFace Datasets support
pip install crdt-merge[fast] # with orjson + xxhash for max speed
pip install crdt-merge[all] # everything
Merge Two DataFrames
import pandas as pd
from crdt_merge import merge
# Two contributors edited the same dataset
df_a = pd.DataFrame({"id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"]})
df_b = pd.DataFrame({"id": [2, 3, 4], "name": ["Robert", "Charlie", "Diana"]})
merged = merge(df_a, df_b, key="id")
# id=1: Alice (only in A)
# id=2: Robert (B wins โ latest)
# id=3: Charlie (same in both)
# id=4: Diana (only in B)
Merge Two HuggingFace Datasets
from crdt_merge import merge_datasets
merged = merge_datasets("user/dataset-v1", "user/dataset-v2", key="id")
Deduplicate Anything
from crdt_merge import dedup
texts = ["Hello world", "hello world", "HELLO WORLD", "Something else"]
unique, duplicate_indices = dedup(texts)
# unique = ["Hello world", "Something else"] (case/whitespace normalized)
Deep-Merge JSON/Configs
from crdt_merge import merge_dicts
config_a = {"model": {"name": "bert", "layers": 12}, "tags": ["nlp"]}
config_b = {"model": {"name": "bert-large", "dropout": 0.1}, "tags": ["qa"]}
merged = merge_dicts(config_a, config_b)
# {"model": {"name": "bert-large", "layers": 12, "dropout": 0.1}, "tags": ["nlp", "qa"]}
See What Changed
from crdt_merge import diff
changes = diff(df_old, df_new, key="id")
print(changes["summary"])
# "+5 added, -2 removed, ~3 modified, =990 unchanged"
๐ฏ Composable Merge Strategies
Since v0.3.0 โ Define exactly how each column resolves conflicts
Instead of "B always wins", define per-column merge rules:
from crdt_merge.strategies import MergeSchema, LWW, MaxWins, MinWins, UnionSet, Priority
schema = MergeSchema(
default=LWW(), # newest timestamp wins (default)
score=MaxWins(), # highest score always wins
priority=MinWins(), # lowest priority number wins
tags=UnionSet(separator=","), # merge comma-separated sets
status=Priority(["draft", "review", "published"]), # defined progression
)
# Resolve a single row conflict
merged_row = schema.resolve_row(row_a, row_b, timestamp_col="_ts")
# Resolve entire datasets
merged = schema.resolve_all(dataset_a, dataset_b, key="_id", timestamp_col="_ts")
8 Built-in Strategies
| Strategy | What It Does | Example |
|---|---|---|
LWW() |
Latest timestamp wins | Name, email โ last edit wins |
MaxWins() |
Highest value wins | Scores, ratings, counters |
MinWins() |
Lowest value wins | Priority numbers, prices |
UnionSet(sep) |
Merge delimited sets | "a,b" + "b,c" โ "a,b,c" |
Priority(order) |
Defined progression | draft โ review โ published |
Concat(sep) |
Concatenate + dedup | Append notes, merge logs |
LongestWins() |
Longest string wins | Descriptions, bios |
Custom(fn) |
Your function | Any custom logic |
Why this matters: LWW().resolve() is truly commutative โ resolve(A, B) == resolve(B, A) regardless of argument order. This is the mathematically correct CRDT merge path.
๐ Merge Provenance & Audit Trails
New in v0.4.0 โ Know exactly what happened in every merge
Every merge decision is now traceable. Track which source won each row, what fields changed, and why:
from crdt_merge.provenance import merge_with_provenance
result, log = merge_with_provenance(base_df, incoming_df, key="id")
# Inspect every decision
for decision in log.decisions:
print(f"Row {decision.key}: {decision.source} won โ {decision.reason}")
# Export as DataFrame for analysis
audit_df = log.to_dataframe()
# Export as JSON for compliance logs
audit_json = log.to_json()
Use cases: regulatory compliance, debugging merge anomalies, building trust with stakeholders who need to know why a value changed.
โ Verified Merge Decorator
New in v0.4.0 โ Prove your merges are mathematically correct
Wrap any merge function with automatic CRDT property verification:
from crdt_merge.verify import verified_merge
import crdt_merge
@verified_merge
def my_merge(a, b, **kwargs):
return crdt_merge.merge(a, b, **kwargs)
# Automatically checks on every call:
# โ
Idempotency: merge(A, A) == A
# โ
Commutativity: merge(A, B) == merge(B, A)
# โ
Associativity: merge(merge(A, B), C) == merge(A, merge(B, C))
result = my_merge(df_a, df_b, key="id")
Use @verified_merge in your test suite or staging environment to mathematically prove your merge pipeline preserves CRDT guarantees.
๐ Streaming Merge
Since v0.3.0 โ Merge datasets larger than your RAM
v0.4.0 upgrade:
merge_streamnow delivers 16x faster throughput at scale โ flat ~400K rows/s regardless of dataset size (was degrading from 110K โ 23K rows/s in v0.3.0)
from crdt_merge.streaming import merge_stream, merge_sorted_stream
# Unsorted data โ batched merge with O(batch_size) memory
for batch in merge_stream(large_a, large_b, key="_id", batch_size=5000):
write_batch(batch)
# Pre-sorted data โ true streaming, constant memory
def sorted_gen_a():
for row in read_csv_streaming("a.csv"):
yield row
for batch in merge_sorted_stream(sorted_gen_a(), sorted_gen_b(), key="_id"):
write_batch(batch) # each batch is batch_size rows, memory stays flat
Memory & Throughput Model
| Approach | Memory | Throughput (v0.4.0) | Scale Limit |
|---|---|---|---|
merge() |
O(n) โ grows with data | ~55K rows/s | ~2M rows (8GB RAM) |
merge_stream() |
O(batch_size) โ configurable | ~400K rows/s (flat) โก | Unlimited (disk-bound) |
merge_sorted_stream() |
O(batch_size) โ constant | ~200K rows/s | Unlimited (disk-bound) |
v0.3.0 โ v0.4.0 improvement:
merge_streamsuffered from two hidden performance bugs (per-batch GC sweep + per-row column reallocation). v0.4.0 removed both โ throughput is now scale-independent.
At 1M rows, merge_sorted_stream uses 3 MB regardless of input size. The classic merge() would need 688 MB.
๐ง Why CRDTs
CRDT = Conflict-free Replicated Data Type. A data structure with one mathematical superpower:
Any two copies can merge โ in any order, at any time โ and the result is always identical and always correct.
Three mathematical guarantees (proven, not hoped):
| Property | What it means |
|---|---|
| Commutative | merge(A, B) == merge(B, A) โ order doesn't matter |
| Associative | merge(merge(A, B), C) == merge(A, merge(B, C)) โ grouping doesn't matter |
| Idempotent | merge(A, A) == A โ re-merging is safe |
This means: zero coordination, zero locks, zero conflicts. Two workers can independently edit a dataset and merge later โ the result is mathematically guaranteed correct.
Architectural note: The core
merge()function uses "B-wins" overlay semantics (likegit merge remote). For true order-independent commutativity, useMergeSchemawithLWW()โ see Composable Merge Strategies. Both approaches are valid for different use cases.
Built-in CRDT Types
| Type | Use Case | Example |
|---|---|---|
GCounter |
Grow-only counters | Download counts, page views |
PNCounter |
Increment + decrement | Stock levels, balances |
LWWRegister |
Single value (latest wins) | Name, email, status fields |
ORSet |
Add/remove set | Tags, memberships, dedup sets |
LWWMap |
Key-value map | Row merges, config objects |
๐ Benchmarks
Core Operations (v0.2.0 baseline)
Tested on real data (rotten_tomatoes dataset, 8,530 rows):
| Operation | Size | Time | Throughput |
|---|---|---|---|
| DataFrame merge | 1K + 1K โ 1.5K | 3.6ms | 411K rows/sec |
| DataFrame merge | 10K + 10K โ 15K | 42.6ms | 352K rows/sec |
| DataFrame merge | 50K + 50K โ 75K | 234ms | 320K rows/sec |
| Exact dedup | 9K texts | 76ms | 118K texts/sec |
| GCounter ops | 100K increments | - | 8.6M ops/sec |
| OR-Set ops | 10K adds | - | 250K+ ops/sec |
v0.3.0 Stress Results (224 measurements)
Tested in sandbox (1.9 GB RAM):
| Module | Scale | Throughput | Memory | Notes |
|---|---|---|---|---|
| Core merge | 500K + 500K | 57K rows/sec | 688 MB | Linear scaling, predictable |
| Core merge | 2M rows | 55K rows/sec | ~1.5 GB | Sandbox ceiling |
| Strategies | 1M rows | 58K rows/sec | Proportional | All 8 strategies verified |
| merge_stream | 1M rows | 115K rows/sec | O(batch) | 2ร faster than core |
| merge_sorted_stream | 1M rows | 200K rows/sec | 3 MB flat | โญ Memory constant at any scale |
| Delta compute | 500K rows | 953K rows/sec | Minimal | Near 1M/sec โ sync is instant |
| Delta apply | 500K โ 500K | 253K rows/sec | Proportional | Full round-trip |
| CRDT verification | 500 trials | All pass | - | Commutativity, associativity, idempotency โ |
v0.4.0 Streaming Performance (16x improvement)
| Scale | v0.3.0 | v0.4.0 | Improvement |
|---|---|---|---|
| 100K rows | 110K rows/s | 430K rows/s | +288% |
| 500K rows | 64K rows/s | 393K rows/s | +513% |
| 1M rows | 41K rows/s | 410K rows/s | +906% |
| 2M rows | 23K rows/s | 393K rows/s | +1,572% |
v0.3.0
merge_streamdegraded linearly as data grew. v0.4.0 holds flat โ the throughput line is a ruler.
๐ Run A100 Stress Tests โ Push past sandbox limits with 80GB GPU RAM
๐ Full stress reports
- v0.3.0 Stress Report โ 224 measurements across 8 suites
- v0.2.0 Breaking Point Report โ Baseline measurements (109 tests)
Zero dependencies. Pure Python. Works offline. Works everywhere.
๐ API Reference
Core (since v0.1.0)
merge(df_a, df_b, key=None, timestamp_col=None, prefer="latest", dedup=True)
Merge two DataFrames (pandas, polars, or list of dicts).
- key: Column to match rows.
None= append + dedup. - timestamp_col: Column with timestamps for conflict resolution.
- prefer:
"latest"(B wins),"a", or"b". - dedup: Remove exact duplicate rows.
dedup(items, method="exact", threshold=0.85)
Deduplicate a list of strings. Returns (unique_items, duplicate_indices).
- method:
"exact"or"fuzzy"(bigram similarity). - threshold: Similarity threshold for fuzzy dedup (0.0-1.0).
diff(df_a, df_b, key)
Show what changed between two DataFrames. Returns added, removed, modified, unchanged counts.
merge_dicts(a, b, timestamps_a=None, timestamps_b=None)
Deep-merge two dicts with CRDT semantics. Nested dicts recurse, lists concatenate + dedup.
merge_datasets(dataset_a, dataset_b, key=None, ...)
Merge two HuggingFace Dataset objects or dataset names. Requires pip install crdt-merge[datasets].
dedup_dataset(dataset, columns=None, method="exact", threshold=0.85)
Deduplicate a HuggingFace Dataset. Requires pip install crdt-merge[datasets].
DedupIndex(node_id)
Distributed dedup index backed by CRDT OR-Set. Multiple workers build indices independently, then merge.
MinHashDedup(num_hashes=128, threshold=0.5)
Locality-sensitive hashing for O(n) near-duplicate detection at scale.
Strategies (since v0.3.0)
MergeSchema(default=LWW(), **column_strategies)
Define per-column merge strategies. Apply to individual rows or entire datasets.
schema = MergeSchema(default=LWW(), score=MaxWins(), tags=UnionSet(","))
merged_row = schema.resolve_row(row_a, row_b, timestamp_col="_ts")
merged_all = schema.resolve_all(dataset_a, dataset_b, key="_id", timestamp_col="_ts")
Strategy Classes
| Class | Constructor | Resolve Semantics |
|---|---|---|
LWW() |
No args | Newest timestamp wins |
MaxWins() |
No args | max(a, b) |
MinWins() |
No args | min(a, b) |
UnionSet(separator) |
"," or any delimiter |
Set union of delimited values |
Priority(order) |
["draft", "published"] |
Later in list wins |
Concat(separator) |
" " or any delimiter |
Concatenate + dedup tokens |
LongestWins() |
No args | Longest string wins |
Custom(fn) |
fn(a, b, ts_a, ts_b) โ value |
Your function |
Provenance (since v0.4.0)
merge_with_provenance(base, incoming, key, timestamp_col=None)
Merge two datasets with full audit trail. Returns (merged_df, ProvenanceLog).
- ProvenanceLog.decisions: List of
MergeDecisionobjects โ one per row - ProvenanceLog.to_dataframe(): Export audit trail as DataFrame
- ProvenanceLog.to_json(): Export as JSON for compliance systems
MergeDecision
| Field | Type | Description |
|---|---|---|
key |
any | Row key value |
source |
str | "base", "incoming", or "both" |
reason |
str | Human-readable explanation |
fields_changed |
list | Which columns were modified |
Verification (since v0.3.0, enhanced v0.4.0)
@verified_merge (v0.4.0+)
Decorator that wraps a merge function with automatic CRDT property checks. Verifies idempotency, commutativity, and associativity on every call.
verify_convergence(merge_fn, datasets, key)
Test convergence across multiple dataset permutations. Returns verification report.
verify_crdt_properties(merge_fn, dataset_a, dataset_b, key)
Check all three CRDT properties (commutative, associative, idempotent) on a pair of datasets.
Streaming (since v0.3.0, optimized v0.4.0)
merge_stream(a, b, key="_id", batch_size=5000)
Batched merge for large datasets. Yields lists of merged rows. O(batch_size) memory. v0.4.0: 16x faster at scale.
merge_sorted_stream(a_iter, b_iter, key="_id", batch_size=5000)
True streaming merge for pre-sorted generators. Constant memory regardless of input size.
StreamStats
Attached to stream results via .stats โ tracks batches_emitted, rows_emitted, peak_memory_mb, elapsed_sec.
Delta Sync (since v0.3.0)
Delta(added, removed, modified)
Represents changes between two dataset versions. Compute with compute_delta(), apply with apply_delta().
compute_delta(old, new, key)
Compute the delta between two versions. Returns a Delta object.
apply_delta(base, delta, key)
Apply a delta to a base dataset to reconstruct the new version.
๐๏ธ Use Cases
- Dataset curation: Multiple annotators edit simultaneously โ merge without conflicts
- Parallel crawlers: Two crawlers produce overlapping data โ merge + dedup automatically
- Model training: Merge training logs, configs, and metrics from distributed runs
- Community datasets: Accept contributions from multiple forks without merge conflicts
- Data pipelines: Incremental processing with automatic state reconciliation
- Offline-first apps: Sync data between devices that were offline for days
- Edge computing: Stream-merge sensor data from thousands of IoT nodes in constant memory
- Multi-tenant SaaS: Per-column merge strategies let different teams own different fields
- Audit & compliance: Full provenance trails for regulated industries (v0.4.0+)
- CI/CD pipelines:
@verified_mergein tests to prove correctness before deploy (v0.4.0+)
๐ค Contributing
PRs welcome! Run tests with:
pip install -e ".[dev]"
pytest tests/ -v
๐ License
Licensed under the Apache License, Version 2.0.
Contributing? By opening a pull request, you agree to our Contributor License Agreement.
Copyright 2024โ2026 Ryan Gillespie / Optitransfer. See NOTICE for attribution requirements.
For commercial licensing inquiries: rgillespie83@icloud.com, data@optitransfer.ch
Built with math, not hope. ๐งฌ
โญ Star on GitHub โข ๐ค Try on HuggingFace โข ๐ฆ PyPI
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file crdt_merge-0.5.0.tar.gz.
File metadata
- Download URL: crdt_merge-0.5.0.tar.gz
- Upload date:
- Size: 70.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9851701f72a177fb58e3b0a20c6d69cbb607383213ba4ab16d6049299ed4f570
|
|
| MD5 |
07d2c65f173506b6875e6907ed6f6117
|
|
| BLAKE2b-256 |
e683628034ee91d47eb78e6135e70c262d31b60821db3c9ad1026c90951b765f
|
File details
Details for the file crdt_merge-0.5.0-py3-none-any.whl.
File metadata
- Download URL: crdt_merge-0.5.0-py3-none-any.whl
- Upload date:
- Size: 53.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7d40997443d7c9c126cb2f49b709685bd73ec916af9f85f548876e6a63d237ba
|
|
| MD5 |
d33bd42e4a1fc7386c299806e3be4239
|
|
| BLAKE2b-256 |
de33f3cdcf213ff38bcd9bde0ef82f360d8582a02332747a07fe31d9d105efe6
|