Skip to main content

MongoDB loader for mkpipe.

Project description

mkpipe-loader-mongodb

MongoDB loader plugin for MkPipe. Writes Spark DataFrames into MongoDB collections using the official MongoDB Spark Connector.

Documentation

For more detailed documentation, please visit the GitHub repository.

License

This project is licensed under the Apache 2.0 License - see the LICENSE file for details.


Connection Configuration

connections:
  mongo_target:
    variant: mongodb
    mongo_uri: 'mongodb://user:password@host:27017/mydb?authSource=admin'
    database: mydb

Alternatively, use individual parameters (URI is constructed automatically):

connections:
  mongo_target:
    variant: mongodb
    host: localhost
    port: 27017
    database: mydb
    user: myuser
    password: mypassword

Table Configuration

pipelines:
  - name: pg_to_mongo
    source: pg_source
    destination: mongo_target
    tables:
      - name: public.events
        target_name: stg_events
        replication_method: full

Write Strategy

Control how data is written to MongoDB:

      - name: public.events
        target_name: stg_events
        write_strategy: upsert       # append | replace | upsert
        write_key: [event_id]        # required for upsert
Strategy MongoDB Behavior
append Insert documents via Spark connector (default for incremental)
replace Drop collection, then insert (default for full). Use if_exists: append to preserve existing collection
upsert Auto-creates a unique index on write_key columns, then writes with Spark connector operationType=replace matching on write_key

Note: upsert requires write_key. The loader automatically creates a unique compound index on the write_key columns before writing. Existing documents matching the key are replaced; new documents are inserted.

Migration: If you previously used dedup_columns for implicit upsert behavior, switch to explicit write_strategy: upsert with write_key. The old behavior still works but emits a deprecation warning.


Write Parallelism

By default Spark writes to MongoDB using however many partitions the DataFrame currently has. You can control write parallelism with write_partitions, which calls coalesce before the write to reduce the number of open connections to MongoDB:

      - name: public.events
        target_name: stg_events
        replication_method: full
        write_partitions: 4     # coalesce DataFrame to N partitions before writing

When to use write_partitions

  • Reduce connections: MongoDB has a connection limit per node. If Spark has many executors, each partition opens its own connection. Lowering write_partitions reduces connection count.
  • Increase throughput: A small number of large batches is generally faster than many small batches. A value of 4–8 is a good starting point.
  • coalesce vs repartition: coalesce avoids a shuffle (preferred for write). If the source has very few partitions and you want to increase them, use repartition — but that requires a code-level change, not a YAML setting.

Performance Notes

  • Write speed is mostly limited by MongoDB's write capacity and network, not Spark.
  • write_partitions is most effective when reducing an already-large partition count.
  • For append-mode incremental loads the default partition count is usually fine.

All Table Parameters

Parameter Type Default Description
name string required Source table/collection name
target_name string required MongoDB destination collection name
replication_method full / incremental full Replication strategy
write_partitions int Coalesce DataFrame to N partitions before writing
batchsize int 10000 Records per write batch
write_strategy string append, replace, upsert
write_key list Key columns for upsert (unique index created automatically)
if_exists string replace (drop+create) or append (preserve collection). Inherits from settings
dedup_columns list Columns used for mkpipe_id hash deduplication
tags list [] Tags for selective pipeline execution
pass_on_error bool false Skip table on error instead of failing

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mkpipe_loader_mongodb-0.11.0.tar.gz (12.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mkpipe_loader_mongodb-0.11.0-py3-none-any.whl (13.1 kB view details)

Uploaded Python 3

File details

Details for the file mkpipe_loader_mongodb-0.11.0.tar.gz.

File metadata

  • Download URL: mkpipe_loader_mongodb-0.11.0.tar.gz
  • Upload date:
  • Size: 12.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for mkpipe_loader_mongodb-0.11.0.tar.gz
Algorithm Hash digest
SHA256 aacdd4ad3e0f7266de99975f57522c851a4e0deb58cb50de5a75d51596694167
MD5 a3c2510aebcd2afd08651fee4b04b648
BLAKE2b-256 cf689fe866c5b6263368af12138e6666f5f6e69b091204e2fb89e7db51ab3d11

See more details on using hashes here.

File details

Details for the file mkpipe_loader_mongodb-0.11.0-py3-none-any.whl.

File metadata

File hashes

Hashes for mkpipe_loader_mongodb-0.11.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a8e3e237dcdbd2e65288a5d54e6613ee32d7d7cbab77d4f5958cee20303807c7
MD5 369fc0a125fdb9cff2c5a5da110bcdba
BLAKE2b-256 c752bb6300c3442a4cead2336f65c102a541be7808c25eca49a1edebc920a80e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page