MongoDB extractor for mkpipe.
Project description
mkpipe-extractor-mongodb
MongoDB extractor plugin for MkPipe. Reads MongoDB collections into Spark DataFrames using the official MongoDB Spark Connector.
Documentation
For more detailed documentation, please visit the GitHub repository.
License
This project is licensed under the Apache 2.0 License - see the LICENSE file for details.
Connection Configuration
connections:
mongo_source:
variant: mongodb
mongo_uri: 'mongodb://user:password@host:27017/mydb?authSource=admin'
database: mydb
Alternatively, use individual parameters (URI is constructed automatically):
connections:
mongo_source:
variant: mongodb
host: localhost
port: 27017
database: mydb
user: myuser
password: mypassword
Table Configuration
pipelines:
- name: mongo_to_pg
source: mongo_source
destination: pg_target
tables:
- name: events
target_name: stg_events
replication_method: full
Incremental Replication
- name: events
target_name: stg_events
replication_method: incremental
iterate_column: updated_at
iterate_column_type: datetime
Custom Aggregation Pipeline
- name: orders
target_name: stg_orders
replication_method: full
custom_query: '[{"$match": {"status": "active"}}, {"$project": {"_id": 0}}]'
Read Parallelism (Partitioners)
By default the MongoDB Spark Connector reads with a single partition (MongoSinglePartitioner), which means all data flows through one Spark task. For large collections, configuring a partitioner gives a significant speed-up by splitting the read across multiple parallel Spark tasks.
Available Partitioners
| Partitioner | Best For |
|---|---|
MongoSamplePartitioner |
General-purpose large collections (samples the collection to determine split points) |
MongoPaginateByCountPartitioner |
When you want an exact number of partitions |
MongoPaginateIntoPartitionsPartitioner |
When you want to control records per partition |
MongoShardedPartitioner |
Sharded MongoDB clusters |
MongoSplitVectorPartitioner |
Uses MongoDB's splitVector command (requires admin privileges) |
MongoSinglePartitioner |
Small collections or when parallelism is not needed (default) |
Configuration
- name: events
target_name: stg_events
replication_method: full
partitioner: MongoSamplePartitioner
partitioner_options:
partition.size: 64 # target partition size in MB (default: 64)
samples.per.partition: 10 # sample points per partition (default: 10)
MongoPaginateByCountPartitioner — exact partition count
partitioner: MongoPaginateByCountPartitioner
partitioner_options:
numberOfPartitions: 8 # total number of partitions
partitionKey: _id # field to paginate on (default: _id)
MongoPaginateIntoPartitionsPartitioner — records per partition
partitioner: MongoPaginateIntoPartitionsPartitioner
partitioner_options:
numberOfPartitions: 8
Performance Notes
- Small collections (<1M docs): partitioner overhead is not worth it — omit it.
- Large collections (>5M docs):
MongoSamplePartitioneris a safe default. Expect 3–10x read speed-up when Spark has multiple cores/executors. partition.size: lower values → more, smaller partitions → more parallelism but more MongoDB connections.- The partitioner only affects the read side. Write parallelism is controlled by
write_partitionsin the loader.
All Table Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
string | required | MongoDB collection name |
target_name |
string | required | Destination table/collection name |
replication_method |
full / incremental |
full |
Replication strategy |
iterate_column |
string | — | Column used for incremental watermark |
iterate_column_type |
string | — | Type hint for watermark column |
custom_query |
string | — | MongoDB aggregation pipeline (JSON array string) |
partitioner |
string | — | MongoDB Spark partitioner class name |
partitioner_options |
map | {} |
Key-value options passed to the partitioner |
tags |
list | [] |
Tags for selective pipeline execution |
pass_on_error |
bool | false |
Skip table on error instead of failing |
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mkpipe_extractor_mongodb-0.5.1.tar.gz.
File metadata
- Download URL: mkpipe_extractor_mongodb-0.5.1.tar.gz
- Upload date:
- Size: 11.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
90bcf59739d8d2c66f8b65586b12f93119a9315477311f36af43c8b6e95e2b70
|
|
| MD5 |
c6f2cd8e9a922e447ecbc2932b0e263c
|
|
| BLAKE2b-256 |
d4da825ff4a101e029c4b0b0ffe228bedfcb48f9139a47a82a086278af00d666
|
File details
Details for the file mkpipe_extractor_mongodb-0.5.1-py3-none-any.whl.
File metadata
- Download URL: mkpipe_extractor_mongodb-0.5.1-py3-none-any.whl
- Upload date:
- Size: 12.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e40fcea480951f9af0d44d46c030b3c80056285e12c6148a283a68604ad5098c
|
|
| MD5 |
5f290445e563693afd28a20633bdeeae
|
|
| BLAKE2b-256 |
0e240928a6981cf95b30323234a337aea1a78352456e3432c826cce3da485685
|