Skip to main content

High-performance reconciliation engine for SQL tables, queries, CSV, and Parquet using DuckDB, Polars, and Arrow.

Project description

fastrecon

A focused, high-performance reconciliation engine for comparing SQL tables, SQL queries, CSV files, and Parquet files at scale. Built on DuckDB, Polars, and Apache Arrow.

fastrecon is not a pandas replacement. It is a reconciliation engine — built specifically for proving that two datasets are (or aren't) the same.

Why fastrecon

Most data teams hand-roll reconciliation with pandas, ad-hoc SQL, or shell scripts. None scale. fastrecon gives you one consistent API across every common combination:

Left Right
SQL table SQL table
SQL table SQL query
SQL query SQL query
SQL table/query CSV / Parquet
CSV / Parquet CSV / Parquet

Everything is normalized into a single internal relation (a DuckDB view), then compared with pushdown-friendly SQL — no whole-dataset materialization in Python.

Install

pip install fastrecon                 # core
pip install "fastrecon[postgres]"     # + psycopg
pip install "fastrecon[mysql]"        # + pymysql

Requires Python 3.9+.

Quick start

from fastrecon import compare, SqlTable, ParquetFile

result = compare(
    left=SqlTable(conn="postgresql://user:pw@host/db", table="public.orders"),
    right=ParquetFile(path="orders.parquet"),
    keys=["order_id"],
    compare_mode="keyed",
    exclude_columns=["load_ts"],
    tolerances={"amount": 0.01},
)

print(result.summary())
print(result.to_json(indent=True))

Sample output:

status               : MISMATCH
compare_mode         : keyed
row_count_left       : 1,000,001
row_count_right      : 1,000,000
schema_match         : True
data_match           : False
missing_in_left      : 0
missing_in_right     : 1
changed_rows         : 4
duplicate_keys_left  : 0
duplicate_keys_right : 0
elapsed_sec          : 1.842
engine               : duckdb+polars

Compare modes

Mode What it does
schema Column names, types, missing/extra columns
rowcount Schema + row counts on both sides
keyed Schema + counts + key-based diff (missing / changed / dup keys)
profile Schema + counts + per-column null/distinct/min/max
hash Schema + counts + one whole-dataset checksum per side

keyed mode is the default and supports partition-wise execution for big-data workloads — see below. hash mode is the fastest path when you only need a yes/no answer (see "Hash & checksum compare").

Partition-wise compare (big data)

Joining 100M+ rows in one shot is dangerous. fastrecon can split a keyed compare into independent partitions and aggregate the results. Each partition runs as its own filtered SQL job inside DuckDB, so memory stays bounded by the partition size, not the dataset size.

from fastrecon import compare, SqlTable, ParquetFile, PartitionSpec

# Partition by a low-cardinality column (e.g. country, status, load_date)
result = compare(
    left=SqlTable(conn=SRC, table="orders"),
    right=ParquetFile("orders/*.parquet"),
    keys=["order_id"],
    partition=PartitionSpec(column="region", strategy="value"),
)

# Or hash-bucket any column (works for high-cardinality keys too)
result = compare(
    left=..., right=..., keys=["order_id"],
    partition=PartitionSpec(column="order_id", strategy="hash", buckets=64),
)

# Or explicit ranges (great for dates / sequential ids)
result = compare(
    left=..., right=..., keys=["order_id"],
    partition=PartitionSpec(
        column="order_dt", strategy="range",
        boundaries=[("2026-01-01", "2026-02-01"),
                    ("2026-02-01", "2026-03-01"),
                    ("2026-03-01", "2026-04-01")],
    ),
)

print(result.summary())
for p in result.column_stats["partitions"]:
    print(p)   # per-partition counts + match flag

Strategies

Strategy Best for Notes
value Low-cardinality partition keys (region, status, load_date) Auto-discovers distinct values from both sides; capped by max_partitions (default 1000)
hash Any column, especially high-cardinality keys buckets=N controls partition count and memory footprint
range Ordered columns (dates, sequential ids) Half-open [lo, hi) boundaries; you supply them

What you get back

When you pass partition=..., the result includes a per-partition breakdown under column_stats:

result.column_stats["partitioned_by"]
# {"column": "region", "strategy": "value", "n_partitions": 5}

result.column_stats["partitions"]
# [
#   {"partition": "EU", "row_count_left": 312_054, "row_count_right": 312_054,
#    "missing_in_left": 0, "missing_in_right": 0, "changed_rows": 2,
#    "duplicate_keys_left": 0, "duplicate_keys_right": 0, "match": False},
#   ...
# ]

Top-level counts (missing_in_left, changed_rows, etc.) are aggregated across partitions; sample_mismatches is a globally capped sample drawn from any partition.

Choosing a strategy

  • You know the data has natural partitions (load_date, region, tenant_id) → use value.
  • You don't, and just want bounded memory → use hash with bucketsdataset_rows / 5_000_000.
  • The data is time-series and you want to reconcile per window → use range with date boundaries.

Hash & checksum compare

Two complementary fast paths for the question "are these the same?":

1. Whole-dataset checksum (compare_mode="hash") — one fingerprint per side, single pass, no join. Fastest possible answer when you don't need to know which rows differ:

res = compare(left, right, keys=["order_id"], compare_mode="hash")
print(res.status)                       # MATCH or MISMATCH
print(res.column_stats["hash"])
# {'algo': 'xxhash64',
#  'left_checksum':  '8ac3…f12d',
#  'right_checksum': '8ac3…f12d',
#  'columns_hashed': ['name', 'amount', 'qty']}

The fingerprint is bit_xor(hash(normalized_col1, ...)) aggregated per side, so it's order-independent — re-sorting a file or swapping ETL load order does not change the digest. Key columns are excluded by default (pass keys=None to fingerprint the whole row instead). All ReconConfig normalization (trim / case / decimal scale / timezone / exclude_columns / column_mapping) is applied so a hash-mode MATCH means the same thing as a keyed-mode MATCH on identical data.

2. Per-row hash inside keyed mode (ReconConfig.row_hash=True) — keeps the per-row diff but replaces the per-column equality check with a single 64-bit integer compare after the join. Big win on wide tables (50+ columns); the only trade-off is that sample_mismatches["changed"] carries just the keys of differing rows, not the per-column __left/__right values.

cfg = ReconConfig(row_hash=True)
res = compare(left, right, keys=["order_id"], config=cfg)

CLI:

fastrecon compare --left ... --right ... --keys id --hash-only       # mode 1
fastrecon compare --left ... --right ... --keys id --row-hash        # mode 2

Algorithm. The default fingerprint is DuckDB's built-in hash() — a 64-bit non-cryptographic hash in the same performance class as xxhash64. It runs entirely inside the engine with zero Python overhead. md5 / sha256 variants are deliberately not exposed here; if you need a cryptographic digest for audit, run a custom SQL via SqlQuery.

Configuration & normalization

Reconciliation is mostly about handling the messy reality of "the same" data:

from fastrecon import ReconConfig, compare

cfg = ReconConfig(
    trim_strings=True,
    case_sensitive=False,
    null_equals_empty=True,
    decimal_scale=2,
    timestamp_tz="UTC",
    column_mapping={"orderId": "order_id"},   # left -> right rename
    exclude_columns=["load_ts", "etl_batch"],
    tolerances={"amount": 0.01, "tax": 0.01},
    sample_limit=200,
)

result = compare(left, right, keys=["order_id"], config=cfg)

Result object

compare() returns a ReconResult with:

  • statusMATCH / MISMATCH / ERROR
  • row_count_left, row_count_right
  • schema_match, data_match, schema_diff
  • missing_in_left, missing_in_right, changed_rows
  • duplicate_keys_left, duplicate_keys_right
  • sample_mismatches — sample rows for each mismatch class
  • column_stats — populated in profile mode
  • execution_metricselapsed_sec, engine

Use result.summary() for a printable report or result.to_json() / result.to_dict() to ship it to a logger, dashboard, or CI gate.

Sources

from fastrecon import SqlTable, SqlQuery, CsvFile, ParquetFile

SqlTable(conn="postgresql://...", table="schema.orders")
SqlQuery(conn="postgresql://...", query="SELECT * FROM orders WHERE dt >= '2026-01-01'")
CsvFile("/path/to/orders.csv", options={"delim": ","})
ParquetFile("/path/to/orders.parquet")        # also supports DuckDB globs: 'data/*.parquet'

Architecture

fastrecon/
├── api.py                  # public compare()
├── config.py               # ReconConfig
├── sources/                # SqlTable / SqlQuery / CsvFile / ParquetFile
├── engines/                # DuckDB execution engine
├── compare/                # schema / rowcount / keyed / profile
├── output/                 # ReconResult (summary, to_dict, to_json)
└── utils/                  # normalization, logging

Internally:

  1. Each source is registered into an in-memory DuckDB connection as a view (zero-copy from Arrow when possible).
  2. Schema is read with DESCRIBE.
  3. Row counts, anti-joins, and inner joins run in DuckDB — no full Python materialization.
  4. Mismatch samples are pulled lazily, capped by sample_limit.

CLI

fastrecon ships with a first-class typer-built command-line interface — drop it into any CI pipeline:

fastrecon compare \
  --left  csv:./orders_today.csv \
  --right 'postgres:postgresql://u:p@h/db#public.orders' \
  --keys order_id \
  --tolerance amount=0.01 \
  --partition region:value \
  --report html:./report.html \
  --report junit:./report.xml \
  --fail-on mismatch

Source URI grammar (passed to --left / --right):

URI Meaning
csv:<path> CSV file
parquet:<path> Parquet file
sqltable:<sqlalchemy_url>#<table> SQL table (any SQLAlchemy backend)
sqlquery:<sqlalchemy_url>#<SELECT ...> Arbitrary SQL
postgres:<sqlalchemy_url>#<table> Native DuckDB postgres_scanner
postgres-query:<sqlalchemy_url>#<SELECT ...> Native scanner with custom SQL

Reports: --report <fmt>:<path> is repeatable; supported formats are html, junit, json.

Exit codes (driven by --fail-on {never,mismatch,error}, default mismatch): 0 MATCH, 1 MISMATCH, 2 ERROR. The same semantics are exposed on ReconResult.exit_code.

Add --verbose to enable rich-formatted structured logging of source loads, partition timings, and report writes.

Backwards compatibility. The legacy --left-type/--left-path/--left-conn/... flag set from 0.3.x still works and is hidden from --help.

Use fastrecon in CI

The CLI returns a non-zero exit code on mismatch, so CI pipelines fail builds automatically. Both reports are uploaded as artifacts so engineers can inspect them after the fact.

GitHub Actions

name: nightly-recon
on:
  schedule: [{ cron: "0 6 * * *" }]
jobs:
  recon:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with: { python-version: "3.11" }
      - run: pip install 'fastrecon[postgres]'
      - name: Reconcile orders snapshot vs warehouse
        env:
          PG_URL: ${{ secrets.WAREHOUSE_URL }}
        run: |
          fastrecon compare \
            --left  parquet:./snapshots/orders.parquet \
            --right "postgres:${PG_URL}#public.orders" \
            --keys order_id \
            --tolerance amount=0.01 \
            --partition region:value \
            --report html:./recon.html \
            --report junit:./recon.xml \
            --fail-on mismatch
      - if: always()
        uses: actions/upload-artifact@v4
        with:
          name: recon-report
          path: |
            recon.html
            recon.xml
      - if: always()
        uses: mikepenz/action-junit-report@v4
        with:
          report_paths: recon.xml

GitLab CI

recon:
  image: python:3.11-slim
  script:
    - pip install 'fastrecon[postgres]'
    - |
      fastrecon compare \
        --left  "parquet:./snapshots/orders.parquet" \
        --right "postgres:${WAREHOUSE_URL}#public.orders" \
        --keys order_id \
        --tolerance amount=0.01 \
        --partition region:value \
        --report html:./recon.html \
        --report junit:./recon.xml \
        --fail-on mismatch
  artifacts:
    when: always
    paths: [recon.html]
    reports:
      junit: recon.xml

Reports

Self-contained HTML and JUnit XML reports — no template engine, no external assets, perfect for emailing or attaching to a CI build:

res = compare(left, right, keys=["id"])
res.to_html("report.html")           # standalone HTML, embeddable in CI artifacts
res.to_junit("report.xml")           # JUnit XML — Jenkins / GitLab / Buildkite read this natively
res.exit_code                        # 0 / 1 / 2 for shell scripts

The HTML report includes the summary, schema diff, per-partition heatmap (when partitioned), and tables of mismatch samples. The JUnit report emits one <testcase> per partition so dashboards pinpoint which slice failed.

Streaming SQL loader & native Postgres scanner

Both SQL sources stream batches via a server-side cursor (Arrow RecordBatchReader → DuckDB), so you don't fetchall() 100M rows into Python before doing anything useful.

SqlTable(conn=URL, table="orders", chunk_size=200_000)   # batch size
SqlQuery(conn=URL, query="SELECT ...", chunk_size=200_000)

# Opt out for drivers that don't support server-side cursors:
SqlTable(conn=URL, table="orders", streaming=False)
When to use Setting
Default — large or unknown size streaming=True (default), tune chunk_size
Tiny result set, want to avoid per-batch overhead streaming=False
Driver doesn't support stream_results=True streaming=False

For Postgres specifically, use PostgresSource to bypass SQLAlchemy entirely. DuckDB's native postgres_scanner extension talks libpq directly, pushes filters down, and zero-copies result batches into the engine:

from fastrecon import PostgresSource, ParquetFile, compare

result = compare(
    left=PostgresSource(conn="postgresql://u:p@h/db", table="public.orders"),
    right=ParquetFile("orders.parquet"),
    keys=["order_id"],
)

Use PostgresSource whenever both speed and memory matter — it's the recommended path for production warehouses.

Benchmarks

A reproducible head-to-head suite lives under benchmarks/. fastrecon is benchmarked against four other tools at 10K / 1M / 10M / 100M rows across four canonical reconciliation scenarios:

Tool What it is
fastrecon This library (DuckDB + Arrow).
datacompy Capital One's pandas-bound recon library.
data-diff Datafold's now-maintenance-mode in-DB differ.
pandas-merge Hand-rolled pd.merge(..., indicator=True) baseline.
pyspark Spark local[*] full-outer join (eqNullSafe).

Each tool runs in its own isolated venv. Every cell is run multiple times (smoke=1, PR=3, nightly/full=5) and reported as median (p95) · peak RSS · rows/sec · ✓, so the headline numbers aren't single-shot noise. Cells that OOM, time out, or hit a missing dependency surface as DNF with a reason — never as a silent zero.

  • Smoke (10K) reference table is checked in at benchmarks/results/reference.md.
  • PR (1M) runs on every pull request; download the bench-pr-1m artifact.
  • Nightly (10M) runs on cron and publishes to a static dashboard (benchmarks/dashboard/) on the gh-pages branch.
  • Full (100M) is manual-trigger only — datacompy and pandas-merge typically OOM at this tier.

See docs/benchmarks.md for the full methodology, metric definitions, and instructions for reproducing locally.

Roadmap

  • ✅ MVP: package, sources, schema/rowcount/keyed/profile compare, JSON result, tests
  • ✅ Partition-wise compare (value / hash / range strategies)
  • ✅ Streaming SQL loader (Arrow RecordBatchReader)
  • ✅ Native Postgres scanner via DuckDB postgres extension
  • ✅ HTML + JUnit XML reports + CLI with exit codes
  • ✅ Hash / checksum compare mode + per-row hash opt-in for keyed mode
  • ✅ Multi-tool benchmark suite (datacompy, data-diff, pandas-merge, pyspark)
  • ✅ Multi-sample timing with median / p95 / per-sample CI spread summary
  • ✅ Static benchmark trend dashboard (Chart.js, published to gh-pages)
  • ⏳ Parallel partition execution (thread pool)
  • ⏳ Snowflake / BigQuery / Delta / Iceberg sources
  • ⏳ Rust extension (PyO3) for hashing / normalization hot path

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastrecon-0.4.0.tar.gz (1.9 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastrecon-0.4.0-py3-none-any.whl (48.8 kB view details)

Uploaded Python 3

File details

Details for the file fastrecon-0.4.0.tar.gz.

File metadata

  • Download URL: fastrecon-0.4.0.tar.gz
  • Upload date:
  • Size: 1.9 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for fastrecon-0.4.0.tar.gz
Algorithm Hash digest
SHA256 b0cd5e0cc21958abb0d7b1a29727505a91eb041e035de514cea974d8e5a0fc06
MD5 05432df365be177cd94a8c75a22d4aa1
BLAKE2b-256 24561d6353c885a27fea849d59cb7726f537f8cc0e0ff4411383be424d513d12

See more details on using hashes here.

File details

Details for the file fastrecon-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: fastrecon-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 48.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for fastrecon-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4c3ccdac00f735442707aa33cb89f1ebdcf857e2d76e299bcb37f6dccefa1269
MD5 47752503c98c4293261a3ad3c9c0260d
BLAKE2b-256 280ab720bc12d5978416e2d4cad97085d17f17c39a5cd87d095e9e7150dfd48d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page