Skip to main content

MongoDB extractor for mkpipe.

Project description

mkpipe-extractor-mongodb

MongoDB extractor plugin for MkPipe. Reads MongoDB collections into Spark DataFrames using the official MongoDB Spark Connector.

Documentation

For more detailed documentation, please visit the GitHub repository.

License

This project is licensed under the Apache 2.0 License - see the LICENSE file for details.


Connection Configuration

connections:
  mongo_source:
    variant: mongodb
    mongo_uri: "mongodb://user:password@host:27017/mydb?authSource=admin"
    database: mydb

Alternatively, use individual parameters (URI is constructed automatically):

connections:
  mongo_source:
    variant: mongodb
    host: localhost
    port: 27017
    database: mydb
    user: myuser
    password: mypassword

Table Configuration

pipelines:
  - name: mongo_to_pg
    source: mongo_source
    destination: pg_target
    tables:
      - name: events
        target_name: stg_events
        replication_method: full

Incremental Replication

- name: events
  target_name: stg_events
  replication_method: incremental
  iterate_column: updated_at
  iterate_column_type: datetime

Custom Aggregation Pipeline

- name: orders
  target_name: stg_orders
  replication_method: full
  custom_query: '[{"$match": {"status": "active"}}, {"$project": {"_id": 0}}]'

Connection-Level Extra Options

Any key-value pairs under extra are forwarded directly as Spark read options. This is useful for setting defaults that apply to all tables using this connection (e.g. a partitioner for Amazon DocumentDB).

connections:
  docdb_source:
    variant: mongodb
    mongo_uri: "mongodb://user:pass@docdb-host:27017/mydb?authMechanism=SCRAM-SHA-1"
    database: mydb
    extra:
      partitioner: com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner

Read Parallelism (Partitioners)

By default the MongoDB Spark Connector uses AutoBucketPartitioner, which relies on the $bucketAuto aggregation stage. This works on MongoDB but does not work on Amazon DocumentDB (which does not support $bucketAuto). For DocumentDB or when you need explicit control, configure one of the alternative partitioners below.

Available Partitioners (Spark Connector v10.x)

All class names are under the com.mongodb.spark.sql.connector.read.partitioner package.

Class Name partitioner value (full class path) Best For
AutoBucketPartitioner com.mongodb.spark.sql.connector.read.partitioner.AutoBucketPartitioner Default — uses $bucketAuto (not supported on DocumentDB)
SamplePartitioner com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner General-purpose large collections — uses $sample (works on DocumentDB)
PaginateBySizePartitioner com.mongodb.spark.sql.connector.read.partitioner.PaginateBySizePartitioner When you want partitions based on data size
PaginateIntoPartitionsPartitioner com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner When you want a fixed number of partitions
ShardedPartitioner com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner Sharded MongoDB clusters
SinglePartitionPartitioner com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner Small collections, or safest fallback for DocumentDB

Configuration

- name: events
  target_name: stg_events
  replication_method: full
  partitioner: com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner
  partitioner_options:
    partition.size: "64"          # target partition size in MB (default: 64)
    samples.per.partition: "10"   # sample points per partition (default: 10)

PaginateIntoPartitionsPartitioner — fixed partition count

partitioner: com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner
partitioner_options:
  max.number.of.partitions: "8"

SinglePartitionPartitioner — no partitioning

partitioner: com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner

Amazon DocumentDB Compatibility

DocumentDB does not support $bucketAuto. Use one of:

  • SamplePartitioner — uses $sample, supported by DocumentDB. Good balance of parallelism and compatibility.
  • SinglePartitionPartitioner — safest option, no special aggregation stages required. All data is read in a single partition; use write_partitions in the loader or Spark repartitioning for downstream parallelism.

You can set the partitioner per-table (as shown above) or per-connection via extra (see Connection-Level Extra Options).

Performance Notes

  • Small collections (<1M docs): partitioner overhead is not worth it — omit it or use SinglePartitionPartitioner.
  • Large collections (>5M docs): SamplePartitioner is a safe default. Expect 3–10x read speed-up when Spark has multiple cores/executors.
  • partition.size: lower values → more, smaller partitions → more parallelism but more MongoDB connections.
  • The partitioner only affects the read side. Write parallelism is controlled by write_partitions in the loader.

All Table Parameters

Parameter Type Default Description
name string required MongoDB collection name
target_name string required Destination table/collection name
replication_method full / incremental full Replication strategy
iterate_column string Column used for incremental watermark
iterate_column_type string Type hint for watermark column
custom_query string MongoDB aggregation pipeline (JSON array string)
partitioner string MongoDB Spark partitioner class name
partitioner_options map {} Key-value options passed to the partitioner
tags list [] Tags for selective pipeline execution
pass_on_error bool false Skip table on error instead of failing

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mkpipe_extractor_mongodb-0.7.1.tar.gz (12.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mkpipe_extractor_mongodb-0.7.1-py3-none-any.whl (12.8 kB view details)

Uploaded Python 3

File details

Details for the file mkpipe_extractor_mongodb-0.7.1.tar.gz.

File metadata

  • Download URL: mkpipe_extractor_mongodb-0.7.1.tar.gz
  • Upload date:
  • Size: 12.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for mkpipe_extractor_mongodb-0.7.1.tar.gz
Algorithm Hash digest
SHA256 2a255e907e99ecf44288a2b52e0a890253f4036c05c64feb52f446d7bdc469b3
MD5 1536cead21b566210b2081b7e41cf4b8
BLAKE2b-256 a80700e6fb11bc1c41e34ec22781e47100e165c0363a863eacb5a3eb2f923a52

See more details on using hashes here.

File details

Details for the file mkpipe_extractor_mongodb-0.7.1-py3-none-any.whl.

File metadata

File hashes

Hashes for mkpipe_extractor_mongodb-0.7.1-py3-none-any.whl
Algorithm Hash digest
SHA256 11a01a1ed26ea112d5a0a05d552999e60fde9c9095c1e871842c9fadba2624ae
MD5 2953efd6918a24176dc8ad582c42f087
BLAKE2b-256 4f48f0e821c1f075148634a9ca1f69b516bf8bd8f5327f47725b32539dc9529e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page