Skip to main content

The Merge Algebra Toolkit — composable, streaming, verified CRDT merge for datasets

Project description

crdt-merge

Conflict-free merge for structured data. Define strategies. Merge datasets. Prove correctness. Audit every field. Stream at any scale. Zero dependencies.

PyPI Python 3.9+ License: Apache 2.0 Tests


What's New in v0.6.0

v0.6.0 — "The Architecture Release" (2026-03-28)

  • 7 new modules: clocks, schema_evolution, merkle, arrow, gossip, async_merge, parallel
  • Arrow-native merge engine for 10-50× performance gains
  • Hybrid Logical Clocks for distributed CRDT ordering
  • Schema evolution handles mismatched schemas automatically
  • Merkle trees enable efficient incremental sync
  • Gossip protocol for anti-entropy state convergence
  • Async and parallel wrappers for non-blocking/multi-core merges
  • 705 tests, zero regressions

What is crdt-merge?

crdt-merge is a Python library for merging datasets with conflict resolution. Instead of "last write wins" or manual dedup scripts, you declare per-field merge strategies — and the library guarantees deterministic, order-independent results with full audit trails.

pip install crdt-merge

What it does

  • Merges datasets with configurable per-field strategies (max wins, min wins, union, priority, custom)
  • Streams merges at any scale — O(1) memory for sorted sources, tested to 100M rows
  • Proves correctness@verified_merge decorator verifies commutativity, associativity, idempotency
  • Audits everything — per-field provenance trails show exactly which source won each field and why
  • Serializes for the wire — compact binary format for cross-language CRDT exchange
  • Zero dependencies — pure Python, embeds anywhere

What it is NOT

  • Not a real-time collaboration tool. For collaborative text editing, see Yjs or Loro.
  • Not a database. No persistence, no queries, no networking. It's a library.
  • Not a distributed system. Includes gossip state tracking and Merkle sync primitives (v0.6.0), but no built-in networking or consensus. It provides the building blocks that distributed systems can use.

Who is it for?

Anyone merging structured data from multiple sources: data pipelines, ETL, multi-node sync, offline-first apps, federated datasets. Your real alternative today is pandas.merge() (no conflict resolution) or hand-written dedup scripts.


Quick Start

1. Basic Merge

from crdt_merge import merge

# Two datasets with conflicting scores for the same person
dataset_a = [{"id": 1, "name": "Alice", "score": 100}]
dataset_b = [{"id": 1, "name": "Alice", "score": 150}]

result = merge(dataset_a, dataset_b, key="id")
# → [{"id": 1, "name": "Alice", "score": 150}]

By default, when two rows share the same key, the second dataset wins ties. You can control this with prefer="a", prefer="b", or prefer="latest" (uses timestamps).

2. Per-Field Strategies with MergeSchema

The real power is declaring different strategies for different fields:

from crdt_merge import merge
from crdt_merge.strategies import MergeSchema, MaxWins, LWW, UnionSet

schema = MergeSchema(
    default=LWW(),           # Most fields: last-writer-wins
    score=MaxWins(),          # Scores: highest value wins
    tags=UnionSet()           # Tags: merge as set union
)

a = [{"id": 1, "score": 80, "tags": "python;data", "status": "draft"}]
b = [{"id": 1, "score": 95, "tags": "python;ml",   "status": "published"}]

result = merge(a, b, key="id", schema=schema)
# score: 95 (MaxWins), tags: "data;ml;python" (UnionSet), status: "published" (LWW)

8 built-in strategies: LWW, MaxWins, MinWins, UnionSet, Priority, Concat, LongestWins, Custom(fn)

3. Streaming Merge at Scale

from crdt_merge import merge_sorted_stream

# Merge two sorted sources with O(1) memory — works with generators from disk/network
def source_a():
    for i in range(10_000_000):
        yield {"id": i, "value": f"a_{i}"}

def source_b():
    for i in range(0, 10_000_000, 2):
        yield {"id": i, "value": f"b_{i}"}

for batch in merge_sorted_stream(source_a(), source_b(), key="id", batch_size=10000):
    process(batch)  # Memory never exceeds batch_size

4. CRDT Primitives

from crdt_merge import GCounter, PNCounter, LWWRegister, ORSet, LWWMap

# Grow-only counter — nodes increment independently, merge via max-per-node
a = GCounter("node_a")
a.increment("node_a", 10)
b = GCounter("node_b")
b.increment("node_b", 5)
merged = a.merge(b)
print(merged.value)  # 15 — guaranteed correct regardless of merge order

All primitives satisfy CRDT properties: commutative (a ⊔ b = b ⊔ a), associative ((a ⊔ b) ⊔ c = a ⊔ (b ⊔ c)), idempotent (a ⊔ a = a).


Feature Matrix

Module Feature Since Description
core GCounter, PNCounter, LWWRegister, ORSet, LWWMap v0.1.0 CRDT primitives with merge, serialize, deserialize
dataframe merge(), diff() v0.1.0 Dataset merge with key matching, conflict resolution, schema support
dedup dedup_list(), dedup_records(), MinHashDedup v0.1.0 Exact, fuzzy (bigram), and MinHash deduplication
json_merge merge_dicts(), merge_json_lines() v0.1.0 Deep dict merge with LWW, None-as-missing semantics
strategies MergeSchema + 8 strategies v0.3.0 Per-field strategy DSL: LWW, MaxWins, MinWins, UnionSet, Priority, Concat, LongestWins, Custom
streaming merge_stream(), merge_sorted_stream() v0.3.0 O(batch_size) and O(1) memory streaming merge
delta Delta, DeltaStore, compute_delta(), compose_deltas() v0.3.0 Delta-state CRDT sync — compute, apply, compose deltas
provenance merge_with_provenance(), ProvenanceLog v0.4.0 Per-field audit trail: which source won, which strategy, why
verify @verified_merge decorator v0.4.0 Property-based testing of commutativity, associativity, idempotency
wire serialize(), deserialize(), serialize_batch() v0.5.0 Compact binary wire format for all CRDT types
probabilistic MergeableHLL, MergeableBloom, MergeableCMS v0.5.0 Probabilistic data structures with CRDT merge semantics
datasets_ext merge_datasets(), dedup_dataset() v0.1.0 HuggingFace Datasets integration (optional)
clocks HybridLogicalClock, HLC timestamps v0.6.0 Hybrid Logical Clocks for distributed CRDT ordering
schema_evolution Column mapping, type coercion v0.6.0 Automatic schema evolution for mismatched datasets
merkle MerkleHashTree, diff, sync v0.6.0 Merkle hash trees for efficient incremental sync
arrow Arrow-native merge engine v0.6.0 Apache Arrow-native merge path (10-50× speedup)
gossip GossipState, anti-entropy protocol v0.6.0 Gossip protocol state tracking for convergence
async_merge async_merge(), async_stream() v0.6.0 Async/await wrappers for non-blocking merges
parallel parallel_merge(), multi-core execution v0.6.0 Parallel merge execution across multiple cores

20 modules, ~7,300 lines of source, zero required dependencies.


API Reference

merge() — The Main Entry Point

from crdt_merge import merge

result = merge(
    df_a,                    # First dataset (list of dicts, pandas DataFrame, or Polars DataFrame)
    df_b,                    # Second dataset
    key="id",                # Column to match rows on (raises KeyError if not found)
    prefer="latest",         # "a", "b", or "latest" — conflict resolution (raises ValueError if invalid)
    schema=None,             # Optional MergeSchema for per-field strategies (overrides prefer)
    timestamp_col=None,      # Column with timestamps for LWW resolution
    dedup=True,              # Remove exact duplicates in output
    fuzzy_dedup=False,       # Also remove near-duplicates
    fuzzy_threshold=0.85,    # Similarity threshold for fuzzy dedup
)
  • When key is provided: rows with matching keys are merged, unique rows from both sides are preserved.
  • When key is None: datasets are appended and deduplication is applied.
  • When schema is provided: per-field strategies override the prefer parameter for matched rows.
  • Input/output format matches: pass pandas in → get pandas out. Pass list of dicts → get list of dicts.

merge_with_provenance() — Merge + Full Audit Trail

from crdt_merge import merge_with_provenance

merged, log = merge_with_provenance(
    df_a, df_b,
    key="id",
    schema=my_schema,          # Optional MergeSchema
    timestamp_col=None,        # Optional timestamp column
    as_dataframe=False,        # Set True to get pandas DataFrame output
)

# Inspect the audit trail
print(log.summary())           # Human-readable summary
print(log.total_conflicts)     # Number of field-level conflicts resolved
for entry in log.entries:
    print(entry.field, entry.winner, entry.strategy)

# Export
from crdt_merge import export_provenance
json_str = export_provenance(log, format="json")  # Returns string
csv_str = export_provenance(log, format="csv")     # Returns string
log_dict = log.to_dict()                           # Returns dict

merge_stream() — Streaming Merge

from crdt_merge import merge_stream, StreamStats

stats = StreamStats()
for batch in merge_stream(
    source_a,                # Iterable of dicts (streamed)
    source_b,                # Iterable of dicts (loaded into memory)
    key="id",
    batch_size=5000,
    schema=my_schema,        # Optional MergeSchema
    prefer="b",              # Optional prefer shorthand
    timestamp_col=None,
    stats=stats,
):
    process(batch)

print(f"{stats.rows_per_second:.0f} rows/s, {stats.batch_count} batches")

Memory: O(|source_b| + batch_size). Loads source_b fully for key lookup, streams source_a in batches.

merge_sorted_stream() — True O(1) Memory Merge

from crdt_merge import merge_sorted_stream

for batch in merge_sorted_stream(
    sorted_source_a,          # MUST be sorted by key ascending
    sorted_source_b,          # MUST be sorted by key ascending
    key="id",
    batch_size=5000,
    schema=my_schema,
    verify_order=True,        # Raises ValueError if sources aren't sorted
):
    process(batch)

Memory: O(batch_size). Never loads more than 1 row from each source at a time. Tested to 100M rows at 10.8 MB.

Composable Strategies

from crdt_merge.strategies import (
    MergeSchema, LWW, MaxWins, MinWins, UnionSet,
    Priority, Concat, LongestWins, Custom
)

# Declare per-field strategies
schema = MergeSchema(
    default=LWW(),                                    # Fallback for unspecified fields
    score=MaxWins(),                                   # Highest value wins
    rating=MinWins(),                                  # Lowest value wins
    tags=UnionSet(delimiter=";"),                       # Set union of delimited values
    status=Priority(order=["draft", "review", "live"]), # Priority ranking
    notes=Concat(delimiter="\n"),                       # Concatenate with dedup
    title=LongestWins(),                               # Longer string wins
    custom_field=Custom(fn=my_merge_fn),               # Your own function
)

# Apply to any merge function
result = merge(df_a, df_b, key="id", schema=schema)
merged, log = merge_with_provenance(df_a, df_b, key="id", schema=schema)
for batch in merge_stream(src_a, src_b, key="id", schema=schema):
    ...

# Serialize for storage/transmission
d = schema.to_dict()    # → {"__default__": {"strategy": "LWW"}, "score": {"strategy": "MaxWins"}, ...}
restored = MergeSchema.from_dict(d)

Note: Custom(fn) strategies cannot be serialized — to_dict() stores {"strategy": "Custom"} and from_dict() raises ValueError to prevent silent behavior change.

Delta Sync

from crdt_merge.delta import compute_delta, apply_delta, compose_deltas, DeltaStore

# Compute what changed between two versions
delta = compute_delta(old_records, new_records, key="id")
print(delta.size)  # Number of changes

# Apply delta to bring a remote node up to date
updated = apply_delta(remote_records, delta, key="id")

# Compose multiple deltas: delta(v1→v2) ⊔ delta(v2→v3) = delta(v1→v3)
combined = compose_deltas(delta_1, delta_2, key="id")

# DeltaStore tracks versions automatically
store = DeltaStore(key="id", node_id="node_a")
delta = store.ingest(new_records)  # Returns delta from previous state
print(store.version, store.size)

Note: DeltaStore is in-memory. Use Delta.to_dict() / Delta.from_dict() to persist deltas externally.

Binary Wire Format

from crdt_merge import serialize, deserialize, peek_type, wire_size, serialize_batch, deserialize_batch

# Serialize any CRDT type to compact binary
gc = GCounter("node1")
gc.increment("node1", 100)
wire_bytes = serialize(gc, compress=True)   # zlib compression
restored = deserialize(wire_bytes)          # → GCounter with value 100

# Inspect without deserializing
type_name = peek_type(wire_bytes)           # → "g_counter"
info = wire_size(wire_bytes)                # → {total_bytes, header_bytes, payload_bytes, ...}

# Batch serialize/deserialize
batch_bytes = serialize_batch([gc, pn, lww])
objects = deserialize_batch(batch_bytes)    # → [GCounter, PNCounter, LWWRegister]

Supported types: GCounter, PNCounter, LWWRegister, ORSet, LWWMap, MergeableHLL, MergeableBloom, MergeableCMS, Delta.

Wire format specification: Deterministic byte layout with 12-byte header (magic, version, type, flags, length). Any language implementation that speaks this format can interoperate.

Probabilistic CRDTs

from crdt_merge import MergeableHLL, MergeableBloom, MergeableCMS

# HyperLogLog — estimate unique counts across distributed nodes
hll_a = MergeableHLL(precision=14)
hll_a.add_all(user_ids_node_a)
hll_b = MergeableHLL(precision=14)
hll_b.add_all(user_ids_node_b)
merged = hll_a.merge(hll_b)       # register-max merge
print(f"~{merged.cardinality():.0f} unique users (±0.81%)")

# Bloom filter — distributed membership testing
bloom = MergeableBloom(capacity=1_000_000, fp_rate=0.01)
bloom.add("blocked_ip")
merged = bloom_a.merge(bloom_b)    # bitwise-OR merge

# Count-Min Sketch — distributed frequency estimation
cms = MergeableCMS(width=1000, depth=5)
cms.add("event_type", count=3)
merged = cms_a.merge(cms_b)       # element-wise max merge

All three structures satisfy CRDT merge properties and can be serialized via the wire format.

Verified Merge

from crdt_merge import verified_merge

@verified_merge(samples=100, key="id")
def my_merge(a, b, key="id"):
    return merge(a, b, key=key)

# Calling my_merge() automatically verifies:
# - Commutativity: my_merge(a, b) == my_merge(b, a)
# - Associativity: my_merge(my_merge(a, b), c) == my_merge(a, my_merge(b, c))
# - Idempotency: my_merge(a, a) == a
# Raises CRDTVerificationError if any property fails.

JSON Deep Merge

from crdt_merge import merge_dicts, merge_json_lines

# Deep merge two dicts with LWW semantics
merged = merge_dicts(
    {"user": {"name": "Alice", "score": 80}},
    {"user": {"name": "Alice", "score": 95, "level": 5}},
)
# → {"user": {"name": "Alice", "score": 95, "level": 5}}

# None values in B are treated as missing (A's value preserved)
merged = merge_dicts({"x": 10}, {"x": None})
# → {"x": 10}

# Merge JSONL files line by line
merged_lines = merge_json_lines(jsonl_a, jsonl_b, key="id")

Deduplication

from crdt_merge import dedup, dedup_records, DedupIndex, MinHashDedup

# Exact dedup on strings
unique = dedup(["hello", "world", "hello"])

# Fuzzy dedup on records
unique_records = dedup_records(records, key="title", threshold=0.85)

# MinHash for large-scale approximate dedup
mh = MinHashDedup(num_hashes=128, threshold=0.5)
unique = mh.dedup(items, text_fn=lambda x: x["description"])

# DedupIndex — CRDT-mergeable dedup state
idx_a = DedupIndex("node_a")
idx_a.add_exact("item1")
idx_b = DedupIndex("node_b")
idx_b.add_exact("item2")
merged_idx = idx_a.merge(idx_b)

Benchmarks — A100 Stress Tests

All benchmarks run on Google Colab A100 (83.5 GB RAM). Full results in docs/benchmarks/.

v0.3.0 — Core Performance (173 measurements)

Operation Scale Throughput Memory Scaling
merge() 100K → 10M rows 39–42K rows/s O(n) — 0.3 MB/1K rows 7% degradation over 100× scale
resolve_row() (strategies) 100K → 5M rows 42–44K rows/s Zero overhead Flat — O(1) per row
merge_stream() (batched) 100K → 5M rows 9.7–21K rows/s O(|source_b|) ⚠️ 54% degradation at 5M
merge_sorted_stream() 100K → 100M rows 1.05–1.17M rows/s 10.8 MB constant ✅ 10.5% degradation over 1000×

Key findings:

  • merge_sorted_stream() is the star: 1.2M rows/s with truly constant 10.8 MB memory at 100M rows. O(1) memory proven at scale.
  • Core merge is honest O(n): No hidden quadratics. Stable throughput to 10M rows.
  • Strategies have zero overhead: 42–44K rows/s vs 39–42K for raw merge. The DSL is free.
  • merge_stream() caveat: Loads source_b into memory. Use merge_sorted_stream() for truly large-scale work.

v0.4.0 — Provenance & Verification (55 measurements)

All provenance and verification features tested. Schema-aware merge with 50-row datasets: 1.62ms. Export to JSON/CSV verified.

v0.5.0 — Wire Format & Probabilistic (50 measurements)

All wire format roundtrips verified (GCounter, PNCounter, LWWRegister, ORSet, LWWMap + probabilistic types). Batch serialize/deserialize, compression, and peek_type all verified.

v0.6.0 — Architecture & Performance (280 new tests)

Arrow-native merge engine achieves 10-50× speedup over dict-based path. HLC clock ordering, Merkle tree sync, gossip convergence, schema evolution, async/parallel wrappers all verified. 705 total tests, zero regressions.

Notebooks: Available in notebooks/ for independent reproduction on Google Colab.


Cross-Language Ports

crdt-merge follows a reference + protocol architecture:

Language Package Version Status
Python (reference) crdt-merge v0.6.0 ✅ Full feature set
TypeScript crdt-merge v0.2.0 Core CRDTs + merge
Rust crdt-merge v0.2.0 Core CRDTs + merge
Java crdt-merge v0.2.0 Source complete

Architecture: Python is the reference implementation where all innovation starts. The Rust crate will become a thin protocol engine (~1,000 lines) implementing wire format + merge semantics. FFI wrappers (Go, C#, Swift) will wrap the protocol engine. The golden rule: Python serialize → Rust deserialize → merge → serialize → Python deserialize must roundtrip perfectly.


Roadmap

Released

Version Name Highlights
v0.1.0 Initial Release Core CRDTs, merge, diff, dedup, JSON merge, HuggingFace integration
v0.2.0 License Update Apache-2.0 + patent protection
v0.3.0 The Schema Release MergeSchema DSL, 8 strategies, streaming merge, delta sync
v0.4.0 The Audit Release Provenance tracking, @verified_merge, streaming optimizations
v0.5.0 The Protocol Release Binary wire format, probabilistic CRDTs (HLL, Bloom, CMS)
v0.6.0 The Architecture Release HLC clocks, schema evolution, Merkle trees, Arrow-native merge, gossip protocol, async/parallel merge, multi-key composites

Upcoming

Version Name Key Features
v0.7.0 The SQL Release MergeQL — CRDT merge as DuckDB SQL UDF, self-merging Parquet files
v0.8.0 The AI Release ModelCRDT — AI model merging (TIES/DARE/SLERP as strategies), conflict topology visualization
v0.9.0 The Compliance Release UnmergeEngine — reversible CRDT merge for GDPR erasure, parallel merge
v1.0.0 The Platform Release API freeze, cross-language port sync, full documentation, production certification

Full roadmap: docs/roadmap/v051_v100_unicorn_roadmap.md


Known Limitations

These are honest constraints of the current version:

Limitation Details Planned Fix
Python dict merge path merge() converts DataFrames to list-of-dicts internally. Slow for >1M rows. ✅ Resolved in v0.6.0 — Arrow-native engine (10-50× speedup)
No type system Strategies operate on Any. No type checking during merge. ✅ Resolved in v0.6.0 — Schema evolution with column mapping + type coercion
Single-threaded All operations are synchronous, single-threaded Python. ✅ Resolved in v0.6.0 — Async wrappers + parallel merge execution
Single key column merge() supports one key column only. Composite keys require manual concatenation. ✅ Resolved in v0.6.0 — Multi-key composite merges
No persistence DeltaStore is in-memory. State lost on process exit. Use to_dict()/from_dict() to serialize externally. By design — persistence belongs in the application layer
No networking Gossip protocol provides state tracking but no built-in transport. Wire format enables interop. By design — transport belongs in the application layer
O(n²) fuzzy dedup _fuzzy_dedup_records compares every record against all existing records. Unusable above ~10K records. Algorithmic improvement planned
Wire format doesn't include MergeSchema You can serialize CRDTs and Deltas, but not MergeSchema over the wire. Planned for future release

Installation

# Core — zero dependencies
pip install crdt-merge

# With optional accelerators for heavy workloads
pip install crdt-merge[fast]       # orjson + xxhash
pip install crdt-merge[pandas]     # pandas DataFrame support
pip install crdt-merge[polars]     # Polars DataFrame support
pip install crdt-merge[datasets]   # HuggingFace Datasets
pip install crdt-merge[all]        # Everything
pip install crdt-merge[dev]        # pytest + hypothesis for development

Requirements: Python 3.9+. No required dependencies. Works on Linux, macOS, Windows.


Test Results

705 tests across 21 test files. 703 passed, 2 skipped (pandas optional), 0 failures.

Test File Tests Status
test_core.py 30
test_dataframe.py 14
test_dedup.py 15
test_json_merge.py 10
test_strategies.py 39
test_streaming.py 19
test_provenance.py 24 ✅ (2 skipped)
test_verified_merge.py 10
test_wire.py 40
test_probabilistic.py 42
test_v050_integration.py 157
test_longest_wins.py 11
test_stress_v030.py 8
test_benchmark.py 6
test_clocks.py 40
test_schema_evolution.py 40
test_merkle.py 40
test_arrow.py 40
test_gossip.py 40
test_async_merge.py 40
test_parallel.py 40

Version history:

Version Tests Growth
v0.1.0 45
v0.2.0 88 +43
v0.3.0 133 +45
v0.4.0 277 +144
v0.5.0 425 +148
v0.6.0 705 +280

Full details: TEST_RESULTS.md


Project Stats

Metric Value
Modules 20
Source lines ~7,300
Test lines ~6,800
Test:source ratio ~0.93:1
Dependencies 0 (required)
Python versions 3.9, 3.10, 3.11, 3.12
License Apache-2.0

Contributing

We welcome contributions. Please see CLA.md before submitting pull requests.

Commercial licensing & inquiries:


License

Copyright 2026 Ryan Gillespie

Licensed under the Apache License, Version 2.0. See LICENSE for the full text.

Patent grant included — see PATENTS.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

crdt_merge-0.6.0.tar.gz (143.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

crdt_merge-0.6.0-py3-none-any.whl (86.1 kB view details)

Uploaded Python 3

File details

Details for the file crdt_merge-0.6.0.tar.gz.

File metadata

  • Download URL: crdt_merge-0.6.0.tar.gz
  • Upload date:
  • Size: 143.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for crdt_merge-0.6.0.tar.gz
Algorithm Hash digest
SHA256 78d1728d34aa8903dc62322e414e90f8c5a59ffa885bca7f0dbe08ccb81df147
MD5 386fe0e0049351f2c7430d8a83a25577
BLAKE2b-256 8a5bd2fb06e4514b89af2cd28036b3ad31d874460827451e46497d68f8fffdef

See more details on using hashes here.

File details

Details for the file crdt_merge-0.6.0-py3-none-any.whl.

File metadata

  • Download URL: crdt_merge-0.6.0-py3-none-any.whl
  • Upload date:
  • Size: 86.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for crdt_merge-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8957bcc210844541473f7eb08f1d118e24947ff3091ed058a6ca4773418b648e
MD5 dec941b66d377017e1b9a98789198fcf
BLAKE2b-256 028025cb24ba2ed17795bb0b2eade551dac7724385eecadd28bf933640780726

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page