NoSQL to Delta Lake ingestion with schema enforcement, type coercion, and a dead-letter queue. Nothing silently crashes your pipeline.
Project description
nosql-delta-bridge
Every document either lands in your Delta table or in a DLQ with an explicit rejection reason. Nothing silently crashes your pipeline.
The Problem
MongoDB's schema-free nature is a feature for application developers. For pipelines, it's a minefield.
In my last role I built and maintained more than 10 ETL jobs pulling data from MongoDB collections into Delta Lake using Spark. The problem was never simple type mismatches — it was structural, and it came in three flavors:
Polymorphic fields. A single field typed as anyOf[object|bool|string]
in the JSON Schema. Spark infers the schema from a sample, commits to it,
and the moment a document outside that sample has a different shape, the
entire write fails with a cryptic cast error. The only safe move was casting
everything to StringType — which meant no type guarantees in the raw table
and defensive re-casting in every downstream job.
Inconsistent nested structs. Arrays of structs where fields appeared or
disappeared depending on the document version. A subfield present in some
documents, missing in others, nested structs with subfields that changed shape
across batches. Every job ended up with the same boilerplate: rebuild the struct
by hand, cast every field explicitly, handle missing fields with lit(None).
Silent failures. When the pipeline didn't crash outright, bad documents were silently coerced or dropped. No DLQ, no audit trail, no contract that said "this field must be this type." Problems surfaced three jobs downstream, not at the boundary where they happened.
nosql-delta-bridge is the abstraction that should have existed from the start.
Every document either lands in the Delta table or in a dead-letter queue with an
explicit rejection reason. Type conflicts, missing required fields, and structural
mismatches are all caught at ingestion — nothing silently crashes, nothing
silently disappears.
Architecture
raw JSON documents
│
▼
┌───────────────┐
│ infer.py │ Examines a batch and infers a unified schema.
│ │ Conflict resolution: widest type wins, all fields
└───────┬───────┘ nullable by default.
│ schema: dict[str, FieldSchema]
▼
┌───────────────┐
│ flatten.py │ Flattens nested objects into dot-notation columns.
│ │ address.city, preferences.notifications.email, etc.
└───────┬───────┘
│ flat documents
▼
┌───────────────┐
│ coerce.py │ Applies per-field type coercion against the schema.
│ │ cast (silent), reject (→ DLQ), or flag.
└───────┬───────┘
│ typed documents rejected documents
▼ ▼
┌───────────────┐ ┌───────────────────┐
│ writer.py │ │ dlq.py │
│ │ │ │
│ Delta Lake │ │ NDJSON file or │
│ (delta-rs) │ │ S3/GCS/Azure Blob│
└───────────────┘ └───────────────────┘
Audit metadata columns are added automatically on every write:
| Column | Description |
|---|---|
_ingested_at |
UTC timestamp of the write |
_source_collection |
Name of the source collection |
_schema_version |
Short hash of the schema used for this batch |
Quickstart
pip install nosql-delta-bridge
Infer a schema from known-good data
bridge infer historical.json --output users.schema.json
historical.json · 500 documents · 8 fields
schema written → users.schema.json
The schema file is a plain JSON contract that controls what the pipeline will accept:
{
"name": {"dtype": "string", "nullable": false},
"age": {"dtype": "integer", "nullable": false},
"email": {"dtype": "string", "nullable": true}
}
Ingest new documents against the fixed schema
bridge ingest incoming.json ./delta/users --schema users.schema.json --dlq rejected.ndjson
incoming.json · 11 documents · schema from users.schema.json (8 fields)
written: 8 → delta/users
rejected: 3 → rejected.ndjson
schema evolved: +4 field(s) ['address', 'address.city', 'address.street', 'address.zip'] → users.schema.json
Every rejected document in rejected.ndjson has an explicit reason:
{"_dlq_reason": "cast failed on 'age': invalid literal for int() with base 10: 'thirty-two'", "_dlq_stage": "coerce", ...}
{"_dlq_reason": "null value on non-nullable field 'name'", "_dlq_stage": "coerce", ...}
Without a schema file
bridge ingest batch.json ./delta/users
Schema is inferred from the batch itself. All documents land; the DLQ stays empty. Useful for first-pass bronze ingestion.
Pipeline Behavior
The two-command workflow (bridge infer + bridge ingest --schema) is what makes the DLQ meaningful. When the schema is inferred from the same batch being ingested, it always accommodates every value in that batch — types widen, nulls make fields nullable — so nothing is ever rejected. The DLQ only has teeth when coerce runs against a pre-established schema contract.
| Situation | Written | DLQ | Schema file |
|---|---|---|---|
| Clean batch | all | empty | unchanged |
| New optional field appears | all | empty | updated (field added, nullable) |
| Null on a required field | valid docs | nulled docs | unchanged |
Castable type mismatch ("25" → integer) |
all, cast silently | empty | unchanged |
Non-castable mismatch ("twenty-five" → integer) |
valid docs | bad docs | unchanged + warning |
| Partial type widening (some docs changed type) | valid-type docs | bad-type docs | unchanged + warning |
| Full type migration (all docs changed type) | 0 | all docs | unchanged + warning |
Lossy float → integer cast (25.7 → int) |
clean docs | lossy docs | unchanged |
Type widening is never automatic. When a field goes from integer to string, the pipeline cannot distinguish a legitimate migration from a data quality incident. Both look identical from the outside. The correct response is to stop, warn, and surface the decision to a human:
# re-infer from new authoritative data and overwrite the table
bridge infer new_batch.json --output users.schema.json
bridge ingest new_batch.json ./delta/users --schema users.schema.json --mode overwrite
Flattening
Nested objects are expanded into dot-notation columns before coercion and write. Only leaf fields appear in the schema — parent keys are excluded when their children are tracked.
# {"address": {"city": "SP", "zip": "01310"}}
# becomes:
# {"address.city": "SP", "address.zip": "01310"}
# "address" does NOT appear as a column
max_depth
Controls how many levels deep the flattener recurses. The default is 5, which covers most real-world documents. Beyond the limit, the remaining sub-object is kept as a single opaque column instead of being expanded further.
Given this document:
{"user": {"profile": {"address": {"city": "São Paulo"}}}}
max_depth |
Result |
|---|---|
5 (default) |
user.profile.address.city = "São Paulo" |
2 |
user.profile = {"address": {"city": "São Paulo"}} |
1 |
user = {"profile": {"address": {"city": "São Paulo"}}} |
The column still exists in Delta when the limit is hit — it holds a serialized object instead of a scalar. You can query it, but not with simple column predicates.
Lower max_depth when documents are extremely wide or deeply nested across many schema variants. A product catalog with 10-level-deep specs objects across 500 product types can easily produce hundreds of distinct column names with the default depth. Capping the recursion keeps the schema manageable; the deep structure lands in an opaque column you can process separately.
Arrays
Arrays are never recursed into — they are kept as Python lists and stored as list columns in Delta Lake:
{"tags": ["python", "delta", "etl"]}
# stays as:
{"tags": ["python", "delta", "etl"]}
{"orders": [{"id": 1, "total": 99.0}, {"id": 2, "total": 45.0}]}
# also stays as:
{"orders": [{"id": 1, "total": 99.0}, {"id": 2, "total": 45.0}]}
Exploding an array (one document → many rows) is a cardinality change that breaks joins and aggregations on all other columns. That is a deliberate transform — a LATERAL VIEW EXPLODE in Spark or UNNEST in SQL — not a side effect of flattening. Do it explicitly before feeding documents into the pipeline if needed.
Custom separator
from nosql_delta_bridge.flatten import flatten_document
flat = flatten_document(doc, max_depth=2, separator="__")
# produces: address__city, address__zip
Schema Evolution
New fields are added automatically and safely. Existing field types and nullability are never changed by an ingestion run — the schema file is a contract, not a snapshot.
When a new field appears in a batch:
- It is added to the Delta table as a nullable column via
schema_mode="merge". - Historical rows get
nullfor that column — no data loss. - The schema file is updated to include the new field.
- Existing fields in the schema file are written back verbatim — nullable flags are never silently widened.
Writing to Cloud Storage
Pass credentials as --storage-option KEY=VALUE (repeatable). Both the Delta table URI and the DLQ path accept cloud URIs. The same credentials are forwarded to both the writer and the DLQ automatically.
| Backend | URI scheme | Status |
|---|---|---|
| AWS S3 | s3:// |
tested end-to-end (Cloudflare R2) |
| Azure Blob Storage | az:// |
tested against Azurite emulator |
| GCS | gs:// |
writer supported; DLQ not yet implemented — contributions welcome |
| Local | plain path | tested |
AWS S3 / Cloudflare R2
bridge ingest incoming.json s3://my-bucket/delta/users \
--schema users.schema.json \
--dlq s3://my-bucket/dlq/users.ndjson \
--storage-option AWS_ACCESS_KEY_ID=abc123 \
--storage-option AWS_SECRET_ACCESS_KEY=secret \
--storage-option AWS_ENDPOINT_URL=https://<account>.r2.cloudflarestorage.com \
--storage-option AWS_REGION=auto
Azure Blob Storage
bridge ingest incoming.json az://my-container/delta/users \
--schema users.schema.json \
--dlq az://my-container/dlq/users.ndjson \
--storage-option AZURE_STORAGE_ACCOUNT_NAME=myaccount \
--storage-option AZURE_STORAGE_ACCOUNT_KEY=... \
For the Azure Storage Emulator (Azurite):
bridge ingest incoming.json az://my-container/delta/users \
--storage-option AZURE_STORAGE_ACCOUNT_NAME=devstoreaccount1 \
--storage-option AZURE_STORAGE_ACCOUNT_KEY=... \
--storage-option AZURE_STORAGE_USE_EMULATOR=true
Python API
from nosql_delta_bridge.writer import WriterConfig, write_batch
from nosql_delta_bridge.dlq import DeadLetterQueue
storage_options = {
"AWS_ACCESS_KEY_ID": "...",
"AWS_SECRET_ACCESS_KEY": "...",
"AWS_ENDPOINT_URL": "https://<account>.r2.cloudflarestorage.com",
"AWS_REGION": "auto",
}
config = WriterConfig(
table_uri="s3://my-bucket/delta/users",
source_collection="users",
storage_options=storage_options,
)
with DeadLetterQueue("s3://my-bucket/dlq/users.ndjson", storage_options=storage_options) as dlq:
...
The writer uses delta-rs (Rust object_store) for all cloud backends. The DLQ uses fsspec, which handles credential translation internally — the same storage_options key format works across backends without code changes.
Running Tests
pytest
141 unit tests across all five modules and the CLI. Fixtures in tests/fixtures/ are fully synthetic — documents with missing fields, type conflicts, deeply nested structures, and mixed-type arrays. No external services required.
To run Azure integration tests (requires Docker):
pytest -m integration
Key Decisions
Two schemas per ingestion run, not one. When a fixed schema is provided, the pipeline derives two schemas from a merge:
coerce_schema= old schema only. Enforces the established contract strictly.write_schema= old fields (types and nullability preserved) + new fields from the batch (added as nullable). Prevents type conflicts with the existing Delta table while allowing safe growth.
Type widening is operator-triggered, not automatic. The pipeline cannot tell whether a field going from integer to string is intentional (the source system changed) or accidental (bad data). Both look identical from the outside. The correct response is to stop, warn, and surface the decision to a human.
DLQ as NDJSON, not Delta. Rejected documents have no guaranteed schema — that is why they were rejected. NDJSON is the only format that can hold arbitrary shapes, is appendable across runs without schema negotiation, and is directly queryable by DuckDB's read_ndjson.
delta-rs over Spark. No cluster required. The full pipeline runs locally on a laptop and writes to a Delta table that any Spark or DuckDB reader can consume. This makes it reproducible for anyone cloning the repo.
Lossy float-to-integer casts are rejections, not silent truncation. 30.0 → 30 is lossless and accepted. 25.7 → 25 silently drops .7. In a financial or scientific context that is a data quality bug. The coerce layer enforces float.is_integer() before casting.
Project Structure
nosql_delta_bridge/
├── infer.py # Schema inference and merging
├── coerce.py # Type coercion with cast / reject / flag
├── flatten.py # Nested object and array flattening
├── writer.py # Delta Lake writer with schema evolution
├── dlq.py # Dead letter queue (local or cloud via fsspec)
└── cli.py # bridge infer / bridge ingest commands
tests/
└── fixtures/ # Synthetic messy JSON documents (no external deps)
examples/
├── mongo_to_delta.py # Local two-phase demo: infer schema, then ingest with DLQ
├── airflow_mongo_to_delta.py # Production DAG: MongoDB Atlas → Airflow → Delta Lake on R2
└── data/ # Synthetic JSON fixtures used by the local demo
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file nosql_delta_bridge-0.1.2.tar.gz.
File metadata
- Download URL: nosql_delta_bridge-0.1.2.tar.gz
- Upload date:
- Size: 34.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
69c765109b7b84e68a517650fca61ca0b5a94444cb73d30c7ecbc5438f09db81
|
|
| MD5 |
631a2364169eb77f6db17567bbea765b
|
|
| BLAKE2b-256 |
f267a24f46da2587c1bdf1027edadd5ffb31e39cb6ab824e0684f870ee05c581
|
File details
Details for the file nosql_delta_bridge-0.1.2-py3-none-any.whl.
File metadata
- Download URL: nosql_delta_bridge-0.1.2-py3-none-any.whl
- Upload date:
- Size: 20.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c86d647ae00b443d1de6689a88fd18e3ee16074ea2b99fa8af241303ab5a075d
|
|
| MD5 |
bc02c7815138a1140385fe79654648f0
|
|
| BLAKE2b-256 |
183889e179b623cfd4ba7dfd273212e0fa47b33831ad2c5f3f99560dabe9d035
|