Skip to main content

MongoDB extractor for mkpipe.

Project description

mkpipe-extractor-mongodb

MongoDB extractor plugin for MkPipe. Reads MongoDB collections into Spark DataFrames using the official MongoDB Spark Connector.

Documentation

For more detailed documentation, please visit the GitHub repository.

License

This project is licensed under the Apache 2.0 License - see the LICENSE file for details.


Connection Configuration

connections:
  mongo_source:
    variant: mongodb
    mongo_uri: 'mongodb://user:password@host:27017/mydb?authSource=admin'
    database: mydb

Alternatively, use individual parameters (URI is constructed automatically):

connections:
  mongo_source:
    variant: mongodb
    host: localhost
    port: 27017
    database: mydb
    user: myuser
    password: mypassword

Table Configuration

pipelines:
  - name: mongo_to_pg
    source: mongo_source
    destination: pg_target
    tables:
      - name: events
        target_name: stg_events
        replication_method: full

Incremental Replication

      - name: events
        target_name: stg_events
        replication_method: incremental
        iterate_column: updated_at
        iterate_column_type: datetime

Custom Aggregation Pipeline

      - name: orders
        target_name: stg_orders
        replication_method: full
        custom_query: '[{"$match": {"status": "active"}}, {"$project": {"_id": 0}}]'

Read Parallelism (Partitioners)

By default the MongoDB Spark Connector reads with a single partition (MongoSinglePartitioner), which means all data flows through one Spark task. For large collections, configuring a partitioner gives a significant speed-up by splitting the read across multiple parallel Spark tasks.

Available Partitioners

Partitioner Best For
MongoSamplePartitioner General-purpose large collections (samples the collection to determine split points)
MongoPaginateByCountPartitioner When you want an exact number of partitions
MongoPaginateIntoPartitionsPartitioner When you want to control records per partition
MongoShardedPartitioner Sharded MongoDB clusters
MongoSplitVectorPartitioner Uses MongoDB's splitVector command (requires admin privileges)
MongoSinglePartitioner Small collections or when parallelism is not needed (default)

Configuration

      - name: events
        target_name: stg_events
        replication_method: full
        partitioner: MongoSamplePartitioner
        partitioner_options:
          partition.size: 64          # target partition size in MB (default: 64)
          samples.per.partition: 10   # sample points per partition (default: 10)

MongoPaginateByCountPartitioner — exact partition count

        partitioner: MongoPaginateByCountPartitioner
        partitioner_options:
          numberOfPartitions: 8       # total number of partitions
          partitionKey: _id           # field to paginate on (default: _id)

MongoPaginateIntoPartitionsPartitioner — records per partition

        partitioner: MongoPaginateIntoPartitionsPartitioner
        partitioner_options:
          numberOfPartitions: 8

Performance Notes

  • Small collections (<1M docs): partitioner overhead is not worth it — omit it.
  • Large collections (>5M docs): MongoSamplePartitioner is a safe default. Expect 3–10x read speed-up when Spark has multiple cores/executors.
  • partition.size: lower values → more, smaller partitions → more parallelism but more MongoDB connections.
  • The partitioner only affects the read side. Write parallelism is controlled by write_partitions in the loader.

All Table Parameters

Parameter Type Default Description
name string required MongoDB collection name
target_name string required Destination table/collection name
replication_method full / incremental full Replication strategy
iterate_column string Column used for incremental watermark
iterate_column_type string Type hint for watermark column
custom_query string MongoDB aggregation pipeline (JSON array string)
partitioner string MongoDB Spark partitioner class name
partitioner_options map {} Key-value options passed to the partitioner
tags list [] Tags for selective pipeline execution
pass_on_error bool false Skip table on error instead of failing

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mkpipe_extractor_mongodb-0.5.1.tar.gz (11.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mkpipe_extractor_mongodb-0.5.1-py3-none-any.whl (12.2 kB view details)

Uploaded Python 3

File details

Details for the file mkpipe_extractor_mongodb-0.5.1.tar.gz.

File metadata

  • Download URL: mkpipe_extractor_mongodb-0.5.1.tar.gz
  • Upload date:
  • Size: 11.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for mkpipe_extractor_mongodb-0.5.1.tar.gz
Algorithm Hash digest
SHA256 90bcf59739d8d2c66f8b65586b12f93119a9315477311f36af43c8b6e95e2b70
MD5 c6f2cd8e9a922e447ecbc2932b0e263c
BLAKE2b-256 d4da825ff4a101e029c4b0b0ffe228bedfcb48f9139a47a82a086278af00d666

See more details on using hashes here.

File details

Details for the file mkpipe_extractor_mongodb-0.5.1-py3-none-any.whl.

File metadata

File hashes

Hashes for mkpipe_extractor_mongodb-0.5.1-py3-none-any.whl
Algorithm Hash digest
SHA256 e40fcea480951f9af0d44d46c030b3c80056285e12c6148a283a68604ad5098c
MD5 5f290445e563693afd28a20633bdeeae
BLAKE2b-256 0e240928a6981cf95b30323234a337aea1a78352456e3432c826cce3da485685

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page