Skip to main content

Ingest schema-free NoSQL documents into Delta Lake with predictable, auditable results.

Project description

nosql-delta-bridge

A Python library for ingesting schema-free NoSQL documents into Delta Lake with predictable, auditable results.

The Problem

In my previous role I ran ETL jobs pulling data from MongoDB into Delta Lake using Spark. The pipeline worked fine — until it didn't. A single document where age was "thirty-two" instead of 32, or where a previously stable status field suddenly appeared as an integer, was enough to break the entire write. The Spark job would fail with a cryptic cast error, the table was left in an inconsistent state, and ops scrambled to figure out which document was the culprit.

The fix was always the same: widen the schema, re-run, and hope the same thing didn't happen tomorrow. There was no DLQ, no audit trail, and no contract that said "this field must be an integer." Bad documents were either silently coerced, dropped, or they crashed everything.

nosql-delta-bridge is built to eliminate that class of problem. Every document either lands in the Delta table or in a dead-letter queue with an explicit rejection reason. Nothing is silently dropped. Nothing silently crashes the pipeline.


Architecture

raw JSON documents
        │
        ▼
┌───────────────┐
│    infer.py   │  Examines a batch and infers a unified schema.
│               │  Conflict resolution: widest type wins, all fields 
└───────┬───────┘  nullable by default.
        │  schema: dict[str, FieldSchema]
        ▼
┌───────────────┐
│   flatten.py  │  Flattens nested objects into dot-notation columns.
│               │  address.city, preferences.notifications.email, etc.
└───────┬───────┘
        │  flat documents
        ▼
┌───────────────┐
│   coerce.py   │  Applies per-field type coercion against the schema.
│               │  cast (silent), reject (→ DLQ), or flag.
└───────┬───────┘
        │  typed documents          rejected documents
        ▼                                   ▼
┌───────────────┐               ┌───────────────────┐
│   writer.py   │               │      dlq.py       │
│               │               │                   │
│  Delta Lake   │               │  NDJSON file or   │
│  (delta-rs)   │               │  S3/GCS/Azure Blob│
└───────────────┘               └───────────────────┘

Audit metadata columns are added automatically on every write:

Column Description
_ingested_at UTC timestamp of the write
_source_collection Name of the source collection
_schema_version Short hash of the schema used for this batch

Quickstart

pip install -e ".[dev]"

Infer a schema from known-good data

bridge infer historical.json --output users.schema.json
historical.json  ·  500 documents  ·  8 fields
  schema written  →  users.schema.json

The schema file is a plain JSON contract that controls what the pipeline will accept:

{
  "name":  {"dtype": "string",  "nullable": false},
  "age":   {"dtype": "integer", "nullable": false},
  "email": {"dtype": "string",  "nullable": true}
}

Ingest new documents against the fixed schema

bridge ingest incoming.json ./delta/users --schema users.schema.json --dlq rejected.ndjson
incoming.json  ·  11 documents  ·  schema from users.schema.json (8 fields)
  written:   8  →  delta/users
  rejected:  3  →  rejected.ndjson
  schema evolved: +4 field(s) ['address', 'address.city', 'address.street', 'address.zip']  →  users.schema.json

Every rejected document in rejected.ndjson has an explicit reason:

{"_dlq_reason": "cast failed on 'age': invalid literal for int() with base 10: 'thirty-two'", "_dlq_stage": "coerce", ...}
{"_dlq_reason": "null value on non-nullable field 'name'", "_dlq_stage": "coerce", ...}

Without a schema file

bridge ingest batch.json ./delta/users

Schema is inferred from the batch itself. All documents land; the DLQ stays empty. Useful for first-pass bronze ingestion.


Pipeline Behavior

The two-command workflow (bridge infer + bridge ingest --schema) is what makes the DLQ meaningful. When the schema is inferred from the same batch being ingested, it always accommodates every value in that batch — types widen, nulls make fields nullable — so nothing is ever rejected. The DLQ only has teeth when coerce runs against a pre-established schema contract.

Situation Written DLQ Schema file
Clean batch all empty unchanged
New optional field appears all empty updated (field added, nullable)
Null on a required field valid docs nulled docs unchanged
Castable type mismatch ("25" → integer) all, cast silently empty unchanged
Non-castable mismatch ("twenty-five" → integer) valid docs bad docs unchanged + warning
Partial type widening (some docs changed type) valid-type docs bad-type docs unchanged + warning
Full type migration (all docs changed type) 0 all docs unchanged + warning
Lossy float → integer cast (25.7 → int) clean docs lossy docs unchanged

Type widening is never automatic. When a field goes from integer to string, the pipeline cannot distinguish a legitimate migration from a data quality incident. Both look identical from the outside. The correct response is to stop, warn, and surface the decision to a human:

# re-infer from new authoritative data and overwrite the table
bridge infer new_batch.json --output users.schema.json
bridge ingest new_batch.json ./delta/users --schema users.schema.json --mode overwrite

Flattening

Nested objects are expanded into dot-notation columns before coercion and write. Only leaf fields appear in the schema — parent keys are excluded when their children are tracked.

# {"address": {"city": "SP", "zip": "01310"}}
# becomes:
# {"address.city": "SP", "address.zip": "01310"}
# "address" does NOT appear as a column

max_depth

Controls how many levels deep the flattener recurses. The default is 5, which covers most real-world documents. Beyond the limit, the remaining sub-object is kept as a single opaque column instead of being expanded further.

Given this document:

{"user": {"profile": {"address": {"city": "São Paulo"}}}}
max_depth Result
5 (default) user.profile.address.city = "São Paulo"
2 user.profile = {"address": {"city": "São Paulo"}}
1 user = {"profile": {"address": {"city": "São Paulo"}}}

The column still exists in Delta when the limit is hit — it holds a serialized object instead of a scalar. You can query it, but not with simple column predicates.

Lower max_depth when documents are extremely wide or deeply nested across many schema variants. A product catalog with 10-level-deep specs objects across 500 product types can easily produce hundreds of distinct column names with the default depth. Capping the recursion keeps the schema manageable; the deep structure lands in an opaque column you can process separately.

Arrays

Arrays are never recursed into — they are kept as Python lists and stored as list columns in Delta Lake:

{"tags": ["python", "delta", "etl"]}
# stays as:
{"tags": ["python", "delta", "etl"]}

{"orders": [{"id": 1, "total": 99.0}, {"id": 2, "total": 45.0}]}
# also stays as:
{"orders": [{"id": 1, "total": 99.0}, {"id": 2, "total": 45.0}]}

Exploding an array (one document → many rows) is a cardinality change that breaks joins and aggregations on all other columns. That is a deliberate transform — a LATERAL VIEW EXPLODE in Spark or UNNEST in SQL — not a side effect of flattening. Do it explicitly before feeding documents into the pipeline if needed.

Custom separator

from nosql_delta_bridge.flatten import flatten_document

flat = flatten_document(doc, max_depth=2, separator="__")
# produces: address__city, address__zip

Schema Evolution

New fields are added automatically and safely. Existing field types and nullability are never changed by an ingestion run — the schema file is a contract, not a snapshot.

When a new field appears in a batch:

  • It is added to the Delta table as a nullable column via schema_mode="merge".
  • Historical rows get null for that column — no data loss.
  • The schema file is updated to include the new field.
  • Existing fields in the schema file are written back verbatim — nullable flags are never silently widened.

Writing to Cloud Storage

Pass credentials as --storage-option KEY=VALUE (repeatable). Both the Delta table URI and the DLQ path accept cloud URIs. The same credentials are forwarded to both the writer and the DLQ automatically.

Backend URI scheme Status
AWS S3 s3:// tested end-to-end (Cloudflare R2)
Azure Blob Storage az:// tested against Azurite emulator
GCS gs:// writer supported; DLQ untested
Local plain path tested

AWS S3 / Cloudflare R2

bridge ingest incoming.json s3://my-bucket/delta/users \
  --schema users.schema.json \
  --dlq s3://my-bucket/dlq/users.ndjson \
  --storage-option AWS_ACCESS_KEY_ID=abc123 \
  --storage-option AWS_SECRET_ACCESS_KEY=secret \
  --storage-option AWS_ENDPOINT_URL=https://<account>.r2.cloudflarestorage.com \
  --storage-option AWS_REGION=auto

Azure Blob Storage

bridge ingest incoming.json az://my-container/delta/users \
  --schema users.schema.json \
  --dlq az://my-container/dlq/users.ndjson \
  --storage-option AZURE_STORAGE_ACCOUNT_NAME=myaccount \
  --storage-option AZURE_STORAGE_ACCOUNT_KEY=... \

For the Azure Storage Emulator (Azurite):

bridge ingest incoming.json az://my-container/delta/users \
  --storage-option AZURE_STORAGE_ACCOUNT_NAME=devstoreaccount1 \
  --storage-option AZURE_STORAGE_ACCOUNT_KEY=... \
  --storage-option AZURE_STORAGE_USE_EMULATOR=true

Python API

from nosql_delta_bridge.writer import WriterConfig, write_batch
from nosql_delta_bridge.dlq import DeadLetterQueue

storage_options = {
    "AWS_ACCESS_KEY_ID":     "...",
    "AWS_SECRET_ACCESS_KEY": "...",
    "AWS_ENDPOINT_URL":      "https://<account>.r2.cloudflarestorage.com",
    "AWS_REGION":            "auto",
}

config = WriterConfig(
    table_uri="s3://my-bucket/delta/users",
    source_collection="users",
    storage_options=storage_options,
)

with DeadLetterQueue("s3://my-bucket/dlq/users.ndjson", storage_options=storage_options) as dlq:
    ...

The writer uses delta-rs (Rust object_store) for all cloud backends. The DLQ uses fsspec, which handles credential translation internally — the same storage_options key format works across backends without code changes.


Running Tests

pytest

141 unit tests across all five modules and the CLI. Fixtures in tests/fixtures/ are fully synthetic — documents with missing fields, type conflicts, deeply nested structures, and mixed-type arrays. No external services required.

To run Azure integration tests (requires Docker):

pytest -m integration

Key Decisions

Two schemas per ingestion run, not one. When a fixed schema is provided, the pipeline derives two schemas from a merge:

  • coerce_schema = old schema only. Enforces the established contract strictly.
  • write_schema = old fields (types and nullability preserved) + new fields from the batch (added as nullable). Prevents type conflicts with the existing Delta table while allowing safe growth.

Type widening is operator-triggered, not automatic. The pipeline cannot tell whether a field going from integer to string is intentional (the source system changed) or accidental (bad data). Both look identical from the outside. The correct response is to stop, warn, and surface the decision to a human.

DLQ as NDJSON, not Delta. Rejected documents have no guaranteed schema — that is why they were rejected. NDJSON is the only format that can hold arbitrary shapes, is appendable across runs without schema negotiation, and is directly queryable by DuckDB's read_ndjson.

delta-rs over Spark. No cluster required. The full pipeline runs locally on a laptop and writes to a Delta table that any Spark or DuckDB reader can consume. This makes it reproducible for anyone cloning the repo.

Lossy float-to-integer casts are rejections, not silent truncation. 30.0 → 30 is lossless and accepted. 25.7 → 25 silently drops .7. In a financial or scientific context that is a data quality bug. The coerce layer enforces float.is_integer() before casting.


What I Would Do Differently

PySpark-native flatten and coerce. The library uses Pandas + PyArrow, which works well for batch sizes up to a few million documents per run. The original problem this solves — Spark jobs pulling from MongoDB into Delta Lake — operates at a different scale. A PySpark variant of the flatten and coerce stages expressed as column transformations or UDFs would allow the same schema enforcement logic to run distributed across a cluster. Schema inference could still run driver-side on a representative sample and then be broadcast to workers. The DLQ in that context would write to a partitioned Delta table rather than NDJSON. The current design made a deliberate trade-off: no cluster required means anyone can clone the repo and run the full pipeline on a laptop. That reproducibility has value, but it is not the right choice for a 100M-document-per-hour production job.

--allow-widening flag for explicit type migrations. For the case where all documents in a batch changed type, an opt-in flag could auto-rewrite the Delta table with the evolved schema instead of requiring two manual commands. Kept out of scope deliberately — the conservative default should be explicit, not opt-out.


Project Structure

nosql_delta_bridge/
├── infer.py       # Schema inference and merging
├── coerce.py      # Type coercion with cast / reject / flag
├── flatten.py     # Nested object and array flattening
├── writer.py      # Delta Lake writer with schema evolution
├── dlq.py         # Dead letter queue (local or cloud via fsspec)
└── cli.py         # bridge infer / bridge ingest commands

tests/
└── fixtures/      # Synthetic messy JSON documents (no external deps)

examples/
├── mongo_to_delta.py          # Local two-phase demo: infer schema, then ingest with DLQ
├── airflow_mongo_to_delta.py  # Production DAG: MongoDB Atlas → Airflow → Delta Lake on R2
└── data/                      # Synthetic JSON fixtures used by the local demo

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nosql_delta_bridge-0.1.0.tar.gz (34.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

nosql_delta_bridge-0.1.0-py3-none-any.whl (20.1 kB view details)

Uploaded Python 3

File details

Details for the file nosql_delta_bridge-0.1.0.tar.gz.

File metadata

  • Download URL: nosql_delta_bridge-0.1.0.tar.gz
  • Upload date:
  • Size: 34.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for nosql_delta_bridge-0.1.0.tar.gz
Algorithm Hash digest
SHA256 d2c90fcddd8e17786f5c1e77a055dd6c2d9ba543f45f0b8baf9171eea0df0b5f
MD5 23e4aea2b8913ced449cc9ee97f211b8
BLAKE2b-256 fec1eb835dd558bba70169d5273aa4d40a5283cc8b8c07991b1dbee71bd49027

See more details on using hashes here.

File details

Details for the file nosql_delta_bridge-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for nosql_delta_bridge-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d752f50a9fc1b38a744f7a982bc31b31ff691d3af762076822059a5b2039c7be
MD5 158cbf40a7661ee8b8971a4759fc4733
BLAKE2b-256 0bad371aa7c11358b96685a8b109467bd3333d65f93088a422efd243492006c7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page