Skip to main content

MongoDB extractor for mkpipe.

Project description

mkpipe-extractor-mongodb

MongoDB extractor plugin for MkPipe. Reads MongoDB collections into Spark DataFrames using the official MongoDB Spark Connector.

Documentation

For more detailed documentation, please visit the GitHub repository.

License

This project is licensed under the Apache 2.0 License - see the LICENSE file for details.


Connection Configuration

connections:
  mongo_source:
    variant: mongodb
    mongo_uri: "mongodb://user:password@host:27017/mydb?authSource=admin"
    database: mydb

Alternatively, use individual parameters (URI is constructed automatically):

connections:
  mongo_source:
    variant: mongodb
    host: localhost
    port: 27017
    database: mydb
    user: myuser
    password: mypassword

Table Configuration

pipelines:
  - name: mongo_to_pg
    source: mongo_source
    destination: pg_target
    tables:
      - name: events
        target_name: stg_events
        replication_method: full

Incremental Replication

- name: events
  target_name: stg_events
  replication_method: incremental
  iterate_column: updated_at
  iterate_column_type: datetime

Custom Aggregation Pipeline

- name: orders
  target_name: stg_orders
  replication_method: full
  custom_query: '[{"$match": {"status": "active"}}, {"$project": {"_id": 0}}]'

Connection-Level Extra Options

Any key-value pairs under extra are forwarded directly as Spark read options. This is useful for setting defaults that apply to all tables using this connection (e.g. a partitioner for Amazon DocumentDB).

connections:
  docdb_source:
    variant: mongodb
    mongo_uri: "mongodb://user:pass@docdb-host:27017/mydb?authMechanism=SCRAM-SHA-1"
    database: mydb
    extra:
      partitioner: com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner

Read Parallelism (Partitioners)

By default the MongoDB Spark Connector uses AutoBucketPartitioner, which relies on the $bucketAuto aggregation stage. This works on MongoDB but does not work on Amazon DocumentDB (which does not support $bucketAuto). For DocumentDB or when you need explicit control, configure one of the alternative partitioners below.

Available Partitioners (Spark Connector v10.x)

All class names are under the com.mongodb.spark.sql.connector.read.partitioner package.

Class Name partitioner value (full class path) Best For
AutoBucketPartitioner com.mongodb.spark.sql.connector.read.partitioner.AutoBucketPartitioner Default — uses $bucketAuto (not supported on DocumentDB)
SamplePartitioner com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner General-purpose large collections — uses $sample (works on DocumentDB)
PaginateBySizePartitioner com.mongodb.spark.sql.connector.read.partitioner.PaginateBySizePartitioner When you want partitions based on data size
PaginateIntoPartitionsPartitioner com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner When you want a fixed number of partitions
ShardedPartitioner com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner Sharded MongoDB clusters
SinglePartitionPartitioner com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner Small collections, or safest fallback for DocumentDB

Configuration

- name: events
  target_name: stg_events
  replication_method: full
  partitioner: com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner
  partitioner_options:
    partition.size: "64"          # target partition size in MB (default: 64)
    samples.per.partition: "10"   # sample points per partition (default: 10)

PaginateIntoPartitionsPartitioner — fixed partition count

partitioner: com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner
partitioner_options:
  max.number.of.partitions: "8"

SinglePartitionPartitioner — no partitioning

partitioner: com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner

Amazon DocumentDB Compatibility

DocumentDB does not support $bucketAuto. Use one of:

  • SamplePartitioner — uses $sample, supported by DocumentDB. Good balance of parallelism and compatibility.
  • SinglePartitionPartitioner — safest option, no special aggregation stages required. All data is read in a single partition; use write_partitions in the loader or Spark repartitioning for downstream parallelism.

You can set the partitioner per-table (as shown above) or per-connection via extra (see Connection-Level Extra Options).

Performance Notes

  • Small collections (<1M docs): partitioner overhead is not worth it — omit it or use SinglePartitionPartitioner.
  • Large collections (>5M docs): SamplePartitioner is a safe default. Expect 3–10x read speed-up when Spark has multiple cores/executors.
  • partition.size: lower values → more, smaller partitions → more parallelism but more MongoDB connections.
  • The partitioner only affects the read side. Write parallelism is controlled by write_partitions in the loader.

All Table Parameters

Parameter Type Default Description
name string required MongoDB collection name
target_name string required Destination table/collection name
replication_method full / incremental full Replication strategy
iterate_column string Column used for incremental watermark
iterate_column_type string Type hint for watermark column
custom_query string MongoDB aggregation pipeline (JSON array string)
partitioner string MongoDB Spark partitioner class name
partitioner_options map {} Key-value options passed to the partitioner
tags list [] Tags for selective pipeline execution
pass_on_error bool false Skip table on error instead of failing

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mkpipe_extractor_mongodb-0.7.0.tar.gz (12.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mkpipe_extractor_mongodb-0.7.0-py3-none-any.whl (12.8 kB view details)

Uploaded Python 3

File details

Details for the file mkpipe_extractor_mongodb-0.7.0.tar.gz.

File metadata

  • Download URL: mkpipe_extractor_mongodb-0.7.0.tar.gz
  • Upload date:
  • Size: 12.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for mkpipe_extractor_mongodb-0.7.0.tar.gz
Algorithm Hash digest
SHA256 15e9a0a4d4d26d40bf443e676568f72c3e6fb2a52b466911875c918b6cc7f611
MD5 a934bfb9f5008bb8afb1f5bd92820bea
BLAKE2b-256 f363013ced4b013154d70c02bc5e81ebd464e746384453ec7dc2090bb9e0af5c

See more details on using hashes here.

File details

Details for the file mkpipe_extractor_mongodb-0.7.0-py3-none-any.whl.

File metadata

File hashes

Hashes for mkpipe_extractor_mongodb-0.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 69eabb4ea93e84db11482536b5662b66dc0743e4f02a6793c89587b6c38471b4
MD5 2967ec36b1186809bf50f95f74af24ed
BLAKE2b-256 5f3088fb8ad60c36fd1b001ff67db978e4dea29d57c69b52c959b8f176b56d54

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page