Skip to main content

MongoDB extractor for mkpipe.

Project description

mkpipe-extractor-mongodb

MongoDB extractor plugin for MkPipe. Reads MongoDB collections into Spark DataFrames using the official MongoDB Spark Connector.

Documentation

For more detailed documentation, please visit the GitHub repository.

License

This project is licensed under the Apache 2.0 License - see the LICENSE file for details.


Connection Configuration

connections:
  mongo_source:
    variant: mongodb
    mongo_uri: 'mongodb://user:password@host:27017/mydb?authSource=admin'
    database: mydb

Alternatively, use individual parameters (URI is constructed automatically):

connections:
  mongo_source:
    variant: mongodb
    host: localhost
    port: 27017
    database: mydb
    user: myuser
    password: mypassword

Table Configuration

pipelines:
  - name: mongo_to_pg
    source: mongo_source
    destination: pg_target
    tables:
      - name: events
        target_name: stg_events
        replication_method: full

Incremental Replication

      - name: events
        target_name: stg_events
        replication_method: incremental
        iterate_column: updated_at
        iterate_column_type: datetime

Custom Aggregation Pipeline

      - name: orders
        target_name: stg_orders
        replication_method: full
        custom_query: '[{"$match": {"status": "active"}}, {"$project": {"_id": 0}}]'

Read Parallelism (Partitioners)

By default the MongoDB Spark Connector reads with a single partition (MongoSinglePartitioner), which means all data flows through one Spark task. For large collections, configuring a partitioner gives a significant speed-up by splitting the read across multiple parallel Spark tasks.

Available Partitioners

Partitioner Best For
MongoSamplePartitioner General-purpose large collections (samples the collection to determine split points)
MongoPaginateByCountPartitioner When you want an exact number of partitions
MongoPaginateIntoPartitionsPartitioner When you want to control records per partition
MongoShardedPartitioner Sharded MongoDB clusters
MongoSplitVectorPartitioner Uses MongoDB's splitVector command (requires admin privileges)
MongoSinglePartitioner Small collections or when parallelism is not needed (default)

Configuration

      - name: events
        target_name: stg_events
        replication_method: full
        partitioner: MongoSamplePartitioner
        partitioner_options:
          partition.size: 64          # target partition size in MB (default: 64)
          samples.per.partition: 10   # sample points per partition (default: 10)

MongoPaginateByCountPartitioner — exact partition count

        partitioner: MongoPaginateByCountPartitioner
        partitioner_options:
          numberOfPartitions: 8       # total number of partitions
          partitionKey: _id           # field to paginate on (default: _id)

MongoPaginateIntoPartitionsPartitioner — records per partition

        partitioner: MongoPaginateIntoPartitionsPartitioner
        partitioner_options:
          numberOfPartitions: 8

Performance Notes

  • Small collections (<1M docs): partitioner overhead is not worth it — omit it.
  • Large collections (>5M docs): MongoSamplePartitioner is a safe default. Expect 3–10x read speed-up when Spark has multiple cores/executors.
  • partition.size: lower values → more, smaller partitions → more parallelism but more MongoDB connections.
  • The partitioner only affects the read side. Write parallelism is controlled by write_partitions in the loader.

All Table Parameters

Parameter Type Default Description
name string required MongoDB collection name
target_name string required Destination table/collection name
replication_method full / incremental full Replication strategy
iterate_column string Column used for incremental watermark
iterate_column_type string Type hint for watermark column
custom_query string MongoDB aggregation pipeline (JSON array string)
partitioner string MongoDB Spark partitioner class name
partitioner_options map {} Key-value options passed to the partitioner
tags list [] Tags for selective pipeline execution
pass_on_error bool false Skip table on error instead of failing

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mkpipe_extractor_mongodb-0.6.1.tar.gz (11.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mkpipe_extractor_mongodb-0.6.1-py3-none-any.whl (12.2 kB view details)

Uploaded Python 3

File details

Details for the file mkpipe_extractor_mongodb-0.6.1.tar.gz.

File metadata

  • Download URL: mkpipe_extractor_mongodb-0.6.1.tar.gz
  • Upload date:
  • Size: 11.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for mkpipe_extractor_mongodb-0.6.1.tar.gz
Algorithm Hash digest
SHA256 58721f8a3ae9354acddbda61fabbc1197d39439d9703076b507692e5f5424ae8
MD5 1a21ed73e8e49e95f2b9e2198bc024bd
BLAKE2b-256 87228957840ba7b85ae30e84b69dba8cfacb5351c0e9a3f255699d6652471284

See more details on using hashes here.

File details

Details for the file mkpipe_extractor_mongodb-0.6.1-py3-none-any.whl.

File metadata

File hashes

Hashes for mkpipe_extractor_mongodb-0.6.1-py3-none-any.whl
Algorithm Hash digest
SHA256 78cf51ab54f1b1a73adb262df9695045c57393495017dc44993915a87e31aee7
MD5 0230ab16723ff2e5ba0e6cd72ce34bf2
BLAKE2b-256 b790813239876cdfbad72f3d6905c2c7cee9d10a14fd3c095fcff334be86e3f3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page