Skip to main content

NoSQL to Delta Lake ingestion with schema enforcement, type coercion, and a dead-letter queue. Nothing silently crashes your pipeline.

Project description

nosql-delta-bridge

PyPI Python Tests License

Every document either lands in your Delta table or in a DLQ with an explicit rejection reason. Nothing silently crashes your pipeline.

The Problem

MongoDB's schema-free nature is a feature for application developers. For pipelines, it's a minefield.

In my last role I built and maintained more than 10 ETL jobs pulling data from MongoDB collections into Delta Lake using Spark. The problem was never simple type mismatches — it was structural, and it came in three flavors:

Polymorphic fields. A single field typed as anyOf[object|bool|string] in the JSON Schema. Spark infers the schema from a sample, commits to it, and the moment a document outside that sample has a different shape, the entire write fails with a cryptic cast error. The only safe move was casting everything to StringType — which meant no type guarantees in the raw table and defensive re-casting in every downstream job.

Inconsistent nested structs. Arrays of structs where fields appeared or disappeared depending on the document version. A subfield present in some documents, missing in others, nested structs with subfields that changed shape across batches. Every job ended up with the same boilerplate: rebuild the struct by hand, cast every field explicitly, handle missing fields with lit(None).

Silent failures. When the pipeline didn't crash outright, bad documents were silently coerced or dropped. No DLQ, no audit trail, no contract that said "this field must be this type." Problems surfaced three jobs downstream, not at the boundary where they happened.

nosql-delta-bridge is the abstraction that should have existed from the start. Every document either lands in the Delta table or in a dead-letter queue with an explicit rejection reason. Type conflicts, missing required fields, and structural mismatches are all caught at ingestion — nothing silently crashes, nothing silently disappears.


Architecture

raw JSON documents
        │
        ▼
┌───────────────┐
│    infer.py   │  Examines a batch and infers a unified schema.
│               │  Conflict resolution: widest type wins, all fields 
└───────┬───────┘  nullable by default.
        │  schema: dict[str, FieldSchema]
        ▼
┌───────────────┐
│   flatten.py  │  Flattens nested objects into dot-notation columns.
│               │  address.city, preferences.notifications.email, etc.
└───────┬───────┘
        │  flat documents
        ▼
┌───────────────┐
│   coerce.py   │  Applies per-field type coercion against the schema.
│               │  cast (silent), reject (→ DLQ), or flag.
└───────┬───────┘
        │  typed documents          rejected documents
        ▼                                   ▼
┌───────────────┐               ┌───────────────────┐
│   writer.py   │               │      dlq.py       │
│               │               │                   │
│  Delta Lake   │               │  NDJSON file or   │
│  (delta-rs)   │               │  S3/GCS/Azure Blob│
└───────────────┘               └───────────────────┘

Audit metadata columns are added automatically on every write:

Column Description
_ingested_at UTC timestamp of the write
_source_collection Name of the source collection
_schema_version Short hash of the schema used for this batch

Quickstart

pip install nosql-delta-bridge

Infer a schema from known-good data

bridge infer historical.json --output users.schema.json
historical.json  ·  500 documents  ·  8 fields
  schema written  →  users.schema.json

The schema file is a plain JSON contract that controls what the pipeline will accept:

{
  "name":  {"dtype": "string",  "nullable": false},
  "age":   {"dtype": "integer", "nullable": false},
  "email": {"dtype": "string",  "nullable": true}
}

Ingest new documents against the fixed schema

bridge ingest incoming.json ./delta/users --schema users.schema.json --dlq rejected.ndjson
incoming.json  ·  11 documents  ·  schema from users.schema.json (8 fields)
  written:   8  →  delta/users
  rejected:  3  →  rejected.ndjson
  schema evolved: +4 field(s) ['address', 'address.city', 'address.street', 'address.zip']  →  users.schema.json

Every rejected document in rejected.ndjson has an explicit reason:

{"_dlq_reason": "cast failed on 'age': invalid literal for int() with base 10: 'thirty-two'", "_dlq_stage": "coerce", ...}
{"_dlq_reason": "null value on non-nullable field 'name'", "_dlq_stage": "coerce", ...}

Without a schema file

bridge ingest batch.json ./delta/users

Schema is inferred from the batch itself. All documents land; the DLQ stays empty. Useful for first-pass bronze ingestion.


Pipeline Behavior

The two-command workflow (bridge infer + bridge ingest --schema) is what makes the DLQ meaningful. When the schema is inferred from the same batch being ingested, it always accommodates every value in that batch — types widen, nulls make fields nullable — so nothing is ever rejected. The DLQ only has teeth when coerce runs against a pre-established schema contract.

Situation Written DLQ Schema file
Clean batch all empty unchanged
New optional field appears all empty updated (field added, nullable)
Null on a required field valid docs nulled docs unchanged
Castable type mismatch ("25" → integer) all, cast silently empty unchanged
Non-castable mismatch ("twenty-five" → integer) valid docs bad docs unchanged + warning
Partial type widening (some docs changed type) valid-type docs bad-type docs unchanged + warning
Full type migration (all docs changed type) 0 all docs unchanged + warning
Lossy float → integer cast (25.7 → int) clean docs lossy docs unchanged

Type widening is never automatic. When a field goes from integer to string, the pipeline cannot distinguish a legitimate migration from a data quality incident. Both look identical from the outside. The correct response is to stop, warn, and surface the decision to a human:

# re-infer from new authoritative data and overwrite the table
bridge infer new_batch.json --output users.schema.json
bridge ingest new_batch.json ./delta/users --schema users.schema.json --mode overwrite

Flattening

Nested objects are expanded into dot-notation columns before coercion and write. Only leaf fields appear in the schema — parent keys are excluded when their children are tracked.

# {"address": {"city": "SP", "zip": "01310"}}
# becomes:
# {"address.city": "SP", "address.zip": "01310"}
# "address" does NOT appear as a column

max_depth

Controls how many levels deep the flattener recurses. The default is 5, which covers most real-world documents. Beyond the limit, the remaining sub-object is kept as a single opaque column instead of being expanded further.

Given this document:

{"user": {"profile": {"address": {"city": "São Paulo"}}}}
max_depth Result
5 (default) user.profile.address.city = "São Paulo"
2 user.profile = {"address": {"city": "São Paulo"}}
1 user = {"profile": {"address": {"city": "São Paulo"}}}

The column still exists in Delta when the limit is hit — it holds a serialized object instead of a scalar. You can query it, but not with simple column predicates.

Lower max_depth when documents are extremely wide or deeply nested across many schema variants. A product catalog with 10-level-deep specs objects across 500 product types can easily produce hundreds of distinct column names with the default depth. Capping the recursion keeps the schema manageable; the deep structure lands in an opaque column you can process separately.

Arrays

Arrays are never recursed into — they are kept as Python lists and stored as list columns in Delta Lake:

{"tags": ["python", "delta", "etl"]}
# stays as:
{"tags": ["python", "delta", "etl"]}

{"orders": [{"id": 1, "total": 99.0}, {"id": 2, "total": 45.0}]}
# also stays as:
{"orders": [{"id": 1, "total": 99.0}, {"id": 2, "total": 45.0}]}

Exploding an array (one document → many rows) is a cardinality change that breaks joins and aggregations on all other columns. That is a deliberate transform — a LATERAL VIEW EXPLODE in Spark or UNNEST in SQL — not a side effect of flattening. Do it explicitly before feeding documents into the pipeline if needed.

Custom separator

from nosql_delta_bridge.flatten import flatten_document

flat = flatten_document(doc, max_depth=2, separator="__")
# produces: address__city, address__zip

Schema Evolution

New fields are added automatically and safely. Existing field types and nullability are never changed by an ingestion run — the schema file is a contract, not a snapshot.

When a new field appears in a batch:

  • It is added to the Delta table as a nullable column via schema_mode="merge".
  • Historical rows get null for that column — no data loss.
  • The schema file is updated to include the new field.
  • Existing fields in the schema file are written back verbatim — nullable flags are never silently widened.

Writing to Cloud Storage

Pass credentials as --storage-option KEY=VALUE (repeatable). Both the Delta table URI and the DLQ path accept cloud URIs. The same credentials are forwarded to both the writer and the DLQ automatically.

Backend URI scheme Status
AWS S3 s3:// tested end-to-end (Cloudflare R2)
Azure Blob Storage az:// tested against Azurite emulator
GCS gs:// writer supported; DLQ not yet implemented — contributions welcome
Local plain path tested

AWS S3 / Cloudflare R2

bridge ingest incoming.json s3://my-bucket/delta/users \
  --schema users.schema.json \
  --dlq s3://my-bucket/dlq/users.ndjson \
  --storage-option AWS_ACCESS_KEY_ID=abc123 \
  --storage-option AWS_SECRET_ACCESS_KEY=secret \
  --storage-option AWS_ENDPOINT_URL=https://<account>.r2.cloudflarestorage.com \
  --storage-option AWS_REGION=auto

Azure Blob Storage

bridge ingest incoming.json az://my-container/delta/users \
  --schema users.schema.json \
  --dlq az://my-container/dlq/users.ndjson \
  --storage-option AZURE_STORAGE_ACCOUNT_NAME=myaccount \
  --storage-option AZURE_STORAGE_ACCOUNT_KEY=... \

For the Azure Storage Emulator (Azurite):

bridge ingest incoming.json az://my-container/delta/users \
  --storage-option AZURE_STORAGE_ACCOUNT_NAME=devstoreaccount1 \
  --storage-option AZURE_STORAGE_ACCOUNT_KEY=... \
  --storage-option AZURE_STORAGE_USE_EMULATOR=true

Python API

from nosql_delta_bridge.writer import WriterConfig, write_batch
from nosql_delta_bridge.dlq import DeadLetterQueue

storage_options = {
    "AWS_ACCESS_KEY_ID":     "...",
    "AWS_SECRET_ACCESS_KEY": "...",
    "AWS_ENDPOINT_URL":      "https://<account>.r2.cloudflarestorage.com",
    "AWS_REGION":            "auto",
}

config = WriterConfig(
    table_uri="s3://my-bucket/delta/users",
    source_collection="users",
    storage_options=storage_options,
)

with DeadLetterQueue("s3://my-bucket/dlq/users.ndjson", storage_options=storage_options) as dlq:
    ...

The writer uses delta-rs (Rust object_store) for all cloud backends. The DLQ uses fsspec, which handles credential translation internally — the same storage_options key format works across backends without code changes.


Running Tests

pytest

141 unit tests across all five modules and the CLI. Fixtures in tests/fixtures/ are fully synthetic — documents with missing fields, type conflicts, deeply nested structures, and mixed-type arrays. No external services required.

To run Azure integration tests (requires Docker):

pytest -m integration

Key Decisions

Two schemas per ingestion run, not one. When a fixed schema is provided, the pipeline derives two schemas from a merge:

  • coerce_schema = old schema only. Enforces the established contract strictly.
  • write_schema = old fields (types and nullability preserved) + new fields from the batch (added as nullable). Prevents type conflicts with the existing Delta table while allowing safe growth.

Type widening is operator-triggered, not automatic. The pipeline cannot tell whether a field going from integer to string is intentional (the source system changed) or accidental (bad data). Both look identical from the outside. The correct response is to stop, warn, and surface the decision to a human.

DLQ as NDJSON, not Delta. Rejected documents have no guaranteed schema — that is why they were rejected. NDJSON is the only format that can hold arbitrary shapes, is appendable across runs without schema negotiation, and is directly queryable by DuckDB's read_ndjson.

delta-rs over Spark. No cluster required. The full pipeline runs locally on a laptop and writes to a Delta table that any Spark or DuckDB reader can consume. This makes it reproducible for anyone cloning the repo.

Lossy float-to-integer casts are rejections, not silent truncation. 30.0 → 30 is lossless and accepted. 25.7 → 25 silently drops .7. In a financial or scientific context that is a data quality bug. The coerce layer enforces float.is_integer() before casting.


Project Structure

nosql_delta_bridge/
├── infer.py       # Schema inference and merging
├── coerce.py      # Type coercion with cast / reject / flag
├── flatten.py     # Nested object and array flattening
├── writer.py      # Delta Lake writer with schema evolution
├── dlq.py         # Dead letter queue (local or cloud via fsspec)
└── cli.py         # bridge infer / bridge ingest commands

tests/
└── fixtures/      # Synthetic messy JSON documents (no external deps)

examples/
├── mongo_to_delta.py          # Local two-phase demo: infer schema, then ingest with DLQ
├── airflow_mongo_to_delta.py  # Production DAG: MongoDB Atlas → Airflow → Delta Lake on R2
└── data/                      # Synthetic JSON fixtures used by the local demo

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nosql_delta_bridge-0.1.2.tar.gz (34.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

nosql_delta_bridge-0.1.2-py3-none-any.whl (20.1 kB view details)

Uploaded Python 3

File details

Details for the file nosql_delta_bridge-0.1.2.tar.gz.

File metadata

  • Download URL: nosql_delta_bridge-0.1.2.tar.gz
  • Upload date:
  • Size: 34.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for nosql_delta_bridge-0.1.2.tar.gz
Algorithm Hash digest
SHA256 69c765109b7b84e68a517650fca61ca0b5a94444cb73d30c7ecbc5438f09db81
MD5 631a2364169eb77f6db17567bbea765b
BLAKE2b-256 f267a24f46da2587c1bdf1027edadd5ffb31e39cb6ab824e0684f870ee05c581

See more details on using hashes here.

File details

Details for the file nosql_delta_bridge-0.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for nosql_delta_bridge-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 c86d647ae00b443d1de6689a88fd18e3ee16074ea2b99fa8af241303ab5a075d
MD5 bc02c7815138a1140385fe79654648f0
BLAKE2b-256 183889e179b623cfd4ba7dfd273212e0fa47b33831ad2c5f3f99560dabe9d035

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page