Skip to main content

MongoDB extractor for mkpipe.

Project description

mkpipe-extractor-mongodb

MongoDB extractor plugin for MkPipe. Reads MongoDB collections into Spark DataFrames using the official MongoDB Spark Connector.

Documentation

For more detailed documentation, please visit the GitHub repository.

License

This project is licensed under the Apache 2.0 License - see the LICENSE file for details.


Connection Configuration

connections:
  mongo_source:
    variant: mongodb
    mongo_uri: "mongodb://user:password@host:27017/mydb?authSource=admin"
    database: mydb

Alternatively, use individual parameters (URI is constructed automatically):

connections:
  mongo_source:
    variant: mongodb
    host: localhost
    port: 27017
    database: mydb
    user: myuser
    password: mypassword

Table Configuration

pipelines:
  - name: mongo_to_pg
    source: mongo_source
    destination: pg_target
    tables:
      - name: events
        target_name: stg_events
        replication_method: full

Incremental Replication

- name: events
  target_name: stg_events
  replication_method: incremental
  iterate_column: updated_at
  iterate_column_type: datetime

Custom Aggregation Pipeline

- name: orders
  target_name: stg_orders
  replication_method: full
  custom_query: '[{"$match": {"status": "active"}}, {"$project": {"_id": 0}}]'

Connection-Level Extra Options

Any key-value pairs under extra are forwarded directly as Spark read options. This is useful for setting defaults that apply to all tables using this connection (e.g. a partitioner for Amazon DocumentDB).

connections:
  docdb_source:
    variant: mongodb
    mongo_uri: "mongodb://user:pass@docdb-host:27017/mydb?authMechanism=SCRAM-SHA-1"
    database: mydb
    extra:
      partitioner: com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner

Read Parallelism (Partitioners)

By default the MongoDB Spark Connector uses AutoBucketPartitioner, which relies on the $bucketAuto aggregation stage. This works on MongoDB but does not work on Amazon DocumentDB (which does not support $bucketAuto). For DocumentDB or when you need explicit control, configure one of the alternative partitioners below.

Available Partitioners (Spark Connector v10.x)

All class names are under the com.mongodb.spark.sql.connector.read.partitioner package.

Class Name partitioner value (full class path) Best For
AutoBucketPartitioner com.mongodb.spark.sql.connector.read.partitioner.AutoBucketPartitioner Default — uses $bucketAuto (not supported on DocumentDB)
SamplePartitioner com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner General-purpose large collections — uses $sample (works on DocumentDB)
PaginateBySizePartitioner com.mongodb.spark.sql.connector.read.partitioner.PaginateBySizePartitioner When you want partitions based on data size
PaginateIntoPartitionsPartitioner com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner When you want a fixed number of partitions
ShardedPartitioner com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner Sharded MongoDB clusters
SinglePartitionPartitioner com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner Small collections, or safest fallback for DocumentDB

Configuration

- name: events
  target_name: stg_events
  replication_method: full
  partitioner: com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner
  partitioner_options:
    partition.size: "64"          # target partition size in MB (default: 64)
    samples.per.partition: "10"   # sample points per partition (default: 10)

PaginateIntoPartitionsPartitioner — fixed partition count

partitioner: com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner
partitioner_options:
  max.number.of.partitions: "8"

SinglePartitionPartitioner — no partitioning

partitioner: com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner

Amazon DocumentDB Compatibility

DocumentDB does not support $bucketAuto. Use one of:

  • SamplePartitioner — uses $sample, supported by DocumentDB. Good balance of parallelism and compatibility.
  • SinglePartitionPartitioner — safest option, no special aggregation stages required. All data is read in a single partition; use write_partitions in the loader or Spark repartitioning for downstream parallelism.

You can set the partitioner per-table (as shown above) or per-connection via extra (see Connection-Level Extra Options).

Performance Notes

  • Small collections (<1M docs): partitioner overhead is not worth it — omit it or use SinglePartitionPartitioner.
  • Large collections (>5M docs): SamplePartitioner is a safe default. Expect 3–10x read speed-up when Spark has multiple cores/executors.
  • partition.size: lower values → more, smaller partitions → more parallelism but more MongoDB connections.
  • The partitioner only affects the read side. Write parallelism is controlled by write_partitions in the loader.

All Table Parameters

Parameter Type Default Description
name string required MongoDB collection name
target_name string required Destination table/collection name
replication_method full / incremental full Replication strategy
iterate_column string Column used for incremental watermark
iterate_column_type string Type hint for watermark column
custom_query string MongoDB aggregation pipeline (JSON array string)
partitioner string MongoDB Spark partitioner class name
partitioner_options map {} Key-value options passed to the partitioner
tags list [] Tags for selective pipeline execution
pass_on_error bool false Skip table on error instead of failing

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mkpipe_extractor_mongodb-0.8.0.tar.gz (12.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mkpipe_extractor_mongodb-0.8.0-py3-none-any.whl (12.8 kB view details)

Uploaded Python 3

File details

Details for the file mkpipe_extractor_mongodb-0.8.0.tar.gz.

File metadata

  • Download URL: mkpipe_extractor_mongodb-0.8.0.tar.gz
  • Upload date:
  • Size: 12.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for mkpipe_extractor_mongodb-0.8.0.tar.gz
Algorithm Hash digest
SHA256 f5bc8a8ff6661a65722002123bbcc2d7d13f9c32284aca42cba128955b1e80af
MD5 79c0dc32612b7a6a51c6df16e8c40456
BLAKE2b-256 e924d5c65f4ee963b8176157572a8018f00771a8b29e49e3de08fc0cacd76ea6

See more details on using hashes here.

File details

Details for the file mkpipe_extractor_mongodb-0.8.0-py3-none-any.whl.

File metadata

File hashes

Hashes for mkpipe_extractor_mongodb-0.8.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8a6da977dd2ac5efe26445478f705391d6499a8b3dcff13d7982253c2826a75d
MD5 efeaafb7f2655884859e3b7d74abe924
BLAKE2b-256 c90d00e8221c830fb9df53c6a3b6935021382c1ce2821b40fe33370ca9f3e062

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page