MongoDB loader for mkpipe.
Project description
mkpipe-loader-mongodb
MongoDB loader plugin for MkPipe. Writes Spark DataFrames into MongoDB collections using the official MongoDB Spark Connector.
Documentation
For more detailed documentation, please visit the GitHub repository.
License
This project is licensed under the Apache 2.0 License - see the LICENSE file for details.
Connection Configuration
connections:
mongo_target:
variant: mongodb
mongo_uri: 'mongodb://user:password@host:27017/mydb?authSource=admin'
database: mydb
Alternatively, use individual parameters (URI is constructed automatically):
connections:
mongo_target:
variant: mongodb
host: localhost
port: 27017
database: mydb
user: myuser
password: mypassword
Table Configuration
pipelines:
- name: pg_to_mongo
source: pg_source
destination: mongo_target
tables:
- name: public.events
target_name: stg_events
replication_method: full
Write Strategy
Control how data is written to MongoDB:
- name: public.events
target_name: stg_events
write_strategy: upsert # append | replace | upsert
write_key: [event_id] # required for upsert
| Strategy | MongoDB Behavior |
|---|---|
append |
Insert documents via Spark connector (default for incremental) |
replace |
Drop collection, then insert (default for full) |
upsert |
Auto-creates a unique index on write_key columns, then writes with Spark connector operationType=replace matching on write_key |
Note:
upsertrequireswrite_key. The loader automatically creates a unique compound index on thewrite_keycolumns before writing. Existing documents matching the key are replaced; new documents are inserted.Migration: If you previously used
dedup_columnsfor implicit upsert behavior, switch to explicitwrite_strategy: upsertwithwrite_key. The old behavior still works but emits a deprecation warning.
Write Parallelism
By default Spark writes to MongoDB using however many partitions the DataFrame currently has. You can control write parallelism with write_partitions, which calls coalesce before the write to reduce the number of open connections to MongoDB:
- name: public.events
target_name: stg_events
replication_method: full
write_partitions: 4 # coalesce DataFrame to N partitions before writing
When to use write_partitions
- Reduce connections: MongoDB has a connection limit per node. If Spark has many executors, each partition opens its own connection. Lowering
write_partitionsreduces connection count. - Increase throughput: A small number of large batches is generally faster than many small batches. A value of 4–8 is a good starting point.
coalescevsrepartition:coalesceavoids a shuffle (preferred for write). If the source has very few partitions and you want to increase them, userepartition— but that requires a code-level change, not a YAML setting.
Performance Notes
- Write speed is mostly limited by MongoDB's write capacity and network, not Spark.
write_partitionsis most effective when reducing an already-large partition count.- For append-mode incremental loads the default partition count is usually fine.
All Table Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
string | required | Source table/collection name |
target_name |
string | required | MongoDB destination collection name |
replication_method |
full / incremental |
full |
Replication strategy |
write_partitions |
int | — | Coalesce DataFrame to N partitions before writing |
batchsize |
int | 10000 |
Records per write batch |
write_strategy |
string | — | append, replace, upsert |
write_key |
list | — | Key columns for upsert (unique index created automatically) |
dedup_columns |
list | — | Columns used for mkpipe_id hash deduplication |
tags |
list | [] |
Tags for selective pipeline execution |
pass_on_error |
bool | false |
Skip table on error instead of failing |
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mkpipe_loader_mongodb-0.8.0.tar.gz.
File metadata
- Download URL: mkpipe_loader_mongodb-0.8.0.tar.gz
- Upload date:
- Size: 12.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a5da9be29dd25f63a53ab2592d0141886547f0ceeffa60c775a0398c3685eba6
|
|
| MD5 |
95b3ef9e7cf10f4546a73ff79b489bd1
|
|
| BLAKE2b-256 |
ea9ad5a0c5a57128ffd4882680acccc60a7dcbabe5ea4511f78088c4294846db
|
File details
Details for the file mkpipe_loader_mongodb-0.8.0-py3-none-any.whl.
File metadata
- Download URL: mkpipe_loader_mongodb-0.8.0-py3-none-any.whl
- Upload date:
- Size: 12.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3620d95a2c5dd12e559b41516c7d91177a9bf6c7f40cdcdbcd310571d44f5171
|
|
| MD5 |
7358e21a8a30c505eb5b3747326c51be
|
|
| BLAKE2b-256 |
ed9392e8b5ac3e3da072bc406fb549aa3cbbf2f24c3df5feb7db0d9bfaf0416a
|