Production-grade MongoDB replication tool with PII redaction, parallel processing, and state management
Project description
MongoDB Replication Tool
A production-grade MongoDB replication tool with built-in PII redaction, parallel processing, cascade filtering, and intelligent state management.
✨ Features
Core Capabilities
- Parallel Replication: Process multiple collections simultaneously with configurable worker pools
- Incremental Loading: Cursor-based state management for efficient incremental updates
- PII Redaction: Built-in support for detecting and anonymizing sensitive data using Microsoft Presidio
- Schema Relationship Inference: Automatically detect parent-child relationships between collections
- Cascade Filtering: Replicate related documents across collections based on defined relationships
- Native BSON Support: Preserves MongoDB data types (ObjectId, Date, Decimal128, etc.)
- Multiple Write Modes: Support for replace, append, and merge strategies
- Field Transformations: Apply custom transformations to fields during replication
- Index Management: Automatically replicate indexes from source to destination
State Management
- Run Tracking: Track job runs with comprehensive statistics and error reporting
- Collection State: Detailed per-collection state with cursor position tracking
- Configurable State Collections: Customize state collection names via configuration
- Automatic Index Cleanup: Handles migration from legacy state schemas
📦 Installation
pip install mongo-replication
For development installation:
git clone https://github.com/nhuray/mongo-replication.git
cd mongo-replication
uv sync
🚀 Quick Start
1. Initialize a New Job
The init command launches an interactive wizard that guides you through the setup process:
mongorep init my_job
The wizard will:
- Prompt for source and destination MongoDB URIs
- Validate connections to both databases
- Configure collection discovery patterns
- Set up PII detection settings
- Select anonymization strategies
- Choose which collections to replicate
- Generate configuration file at
config/my_job_config.yaml - Display environment variables to add to
.env
Note: The init command can configure both source and destination connections interactively. You can skip the environment variables step if you provide URIs during initialization.
2. Configure Environment Variables (Alternative)
If you prefer to configure via environment variables instead of the interactive wizard, add to your .env file:
MONGOREP_MY_JOB_SOURCE_URI=mongodb://source-host:27017/source_db
MONGOREP_MY_JOB_DESTINATION_URI=mongodb://dest-host:27017/dest_db
MONGOREP_MY_JOB_CONFIG_PATH=config/my_job_config.yaml
MONGOREP_MY_JOB_ENABLED=true
3. Scan Collections (Optional)
After initialization, optionally run scan to analyze collections and detect PII:
mongorep scan my_job
This will:
- Analyze document schemas
- Detect PII fields automatically using Presidio
- Infer schema relationships between collections (if enabled)
- Update configuration with findings
- Generate PII detection report
4. Run Replication
# Replicate all configured collections
mongorep run my_job
# Replicate specific collections
mongorep run my_job --collections users,orders
# Cascade replication from specific document IDs
mongorep run my_job --ids customers=507f1f77bcf86cd799439011
# Cascade replication from a MongoDB query
mongorep run my_job --query customers='{"plan": "Basic"}'
# Interactive mode - select collections to replicate
mongorep run my_job --interactive
# Dry run - preview without executing
mongorep run my_job --dry-run
⚙️ Configuration
Basic Configuration Structure
# MongoDB Replication Tool - Job Configuration
#
# This configuration was generated by 'mongorep init' command.
# You can edit this file to customize your replication settings.
#
# For full documentation, see: src/rep/config/defaults.yaml
#
# Configuration precedence:
# 1. System defaults (defaults.yaml)
# 2. This file (job-specific overrides)
# 3. CLI arguments (highest priority)
# =============================================================================
# SCAN CONFIGURATION
# =============================================================================
# Controls which collections are scanned for PII and how PII is detected.
scan:
# Collection Discovery
# ---------------------
# Use regex patterns to filter which collections are scanned
discovery:
# Include patterns: Only scan collections matching these patterns
# Empty list = scan all collections
include_patterns: [ ]
# Exclude patterns: Skip collections matching these patterns
# Applied after include_patterns
exclude_patterns: [ ] # Don't exclude any collections
# Sampling Configuration
# ----------------------
# Configure how many documents to sample for analysis
sampling:
# Number of documents to analyze per collection
# Larger sample = more accurate but slower
sample_size: 10
# Sampling strategy: 'stratified' or 'random'
# stratified = distributed across collection, random = random selection
sample_strategy: stratified
# PII Detection Settings
# ----------------------
# Configure automatic PII detection using Microsoft Presidio
pii_analysis:
enabled: True
# Confidence threshold (0.0-1.0)
# Higher = fewer false positives, Lower = more sensitive
confidence_threshold: 0.85
# PII entity types to detect
# Common types: EMAIL_ADDRESS, PHONE_NUMBER, PERSON, CREDIT_CARD,
# US_SSN, IBAN_CODE, IP_ADDRESS, URL
entity_types:
- EMAIL_ADDRESS
- PHONE_NUMBER
- PERSON
- CREDIT_CARD
- IBAN_CODE
- US_SSN
- IP_ADDRESS
- URL
# Anonymization operators per entity type
# See docs/presidio.md for all available operators:
# Built-in: replace, redact, mask, hash, encrypt, keep
# Custom: fake_email, fake_name, fake_phone, smart_mask, smart_fake, etc.
# Default mappings (configured in src/mongo_replication/config/presidio.yaml):
# EMAIL_ADDRESS: smart_mask (preserves domain)
# PERSON: replace (replaces with "ANONYMOUS")
# PHONE_NUMBER: mask (shows last 4 digits)
# US_SSN: mask (shows last 4 digits)
# CREDIT_CARD: hash (SHA-256 hash)
# See docs/presidio.md for complete list
# Allowlist: Fields to skip PII detection (false positives)
# Format: collection.field (e.g., users.user_id)
allowlist: [ ] # No allowlist entries
# =============================================================================
# REPLICATION CONFIGURATION
# =============================================================================
# Controls how collections are replicated from source to destination.
# This section is typically generated after running 'mongorep scan'.
replication:
# Collection Discovery
# --------------------
# Controls which collections are automatically discovered and replicated
discovery:
replicate_all: True
include_patterns: [ ]
exclude_patterns: [ ]
# State Management
# ----------------
# Configuration for replication state tracking
state_management:
runs_collection: _rep_runs
state_collection: _rep_state
# Performance Settings
# --------------------
# Configuration for parallel processing and batch sizes
performance:
# Collections to replicate concurrently
max_parallel_collections: 5
# Documents per batch (higher = faster but more memory)
batch_size: 1000
# Collection Defaults
# -------------------
# Default settings that apply to all collections unless overridden
defaults:
# Write strategy: merge (upsert), append (insert), replace (drop/recreate)
write_disposition: merge
# Cursor field candidates (checked in order)
cursor_fields: [ updated_at, updatedAt, meta.updated_at, meta.updatedAt ]
# Fallback cursor field when no cursor_fields match
cursor_fallback_field: _id
# Initial cursor value for first-time replication
cursor_initial_value: '2020-01-01T00:00:00Z'
# Error handling: skip (log and continue) or fail (stop replication)
transform_error_mode: skip
# Collection-Specific Configuration
# ----------------------------------
# Override defaults and specify PII fields for each collection
collections:
See Configuration Documentation for complete reference.
🔧 CLI Commands
init - Initialize a New Job
The init command provides an interactive wizard to set up a new replication job. It guides you through:
- Configuring source and destination MongoDB connections
- Setting up collection discovery (include/exclude patterns)
- Configuring PII detection settings
- Selecting anonymization strategies per entity type
- Choosing which collections to replicate
mongorep init <job_name> [OPTIONS]
Arguments:
job_name Job ID (e.g., 'prod_db', 'staging_db')
Options:
--output -o PATH Output config file path (default: config/<job>_config.yaml)
--help Show this message and exit.
Examples:
# Initialize configuration for prod_db job
mongorep init prod_db
# Specify custom output path
mongorep init prod_db --output /custom/path/config.yaml
The wizard will:
- Prompt for source MongoDB URI and validate the connection
- Prompt for destination MongoDB URI and validate the connection
- Configure collection discovery with include/exclude patterns
- Set up PII detection (confidence threshold, entity types, sample size)
- Configure anonymization strategies for each PII entity type
- Select collections to replicate (all, specific patterns, or manual selection)
- Generate configuration file at the specified path
- Display environment variables to add to your
.envfile
After running init, you can:
- Run
mongorep scan <job_name>to analyze collections and detect PII - Run
mongorep run <job_name>to start replication - Manually edit the generated config file to fine-tune settings
scan - Auto-Discover Collections
mongorep scan <job_name> [OPTIONS]
Options:
--output -o TEXT Output path for config file (default: config/<job>_config.yaml) │
--collections TEXT Comma-separated list of collections to scan (default: all) │
--interactive -i Interactively select collections to scan │
--sample-size -s INTEGER Number of documents to sample per collection (default: from config or 1000) │
--confidence -c FLOAT Minimum confidence for PII detection (default: from config or 0.85) │
--language -l TEXT Language for NLP analysis (default: en) │
--no-pii Skip PII analysis (only discover collections) │
--help Show this message and exit.
run - Execute Replication
mongorep run <job_name> [OPTIONS]
Options:
--collections TEXT Comma-separated list of collections to replicate (default: all configured)
--interactive -i Interactively select collections to replicate
--dry-run Preview what would be replicated without executing
--parallel -p INTEGER Maximum number of parallel collections (default: from config or 5)
--batch-size -b INTEGER Batch size for document processing
--ids TEXT Cascade replication from specific document IDs.
Format: collection=id1,id2,id3
Example: --ids customers=507f1f77bcf86cd799439011,507f191e810c19729de860ea
--query TEXT Cascade replication from MongoDB query.
Format: collection='{"field": "value"}'
Example: --query customers='{"plan": "Basic"}'
--help Show this message and exit.
Examples:
# Replicate all configured collections
mongorep run my_job
# Replicate specific collections
mongorep run my_job --collections users,orders
# Cascade replication by IDs
mongorep run my_job --ids customers=507f1f77bcf86cd799439011
# Cascade replication by query
mongorep run my_job --query customers='{"plan": "Basic", "status": "active"}'
# Interactive mode
mongorep run my_job --interactive
# Dry run
mongorep run my_job --dry-run
🎯 Advanced Usage
Cascade Replication
Replicate related documents across collections using defined relationships. You can filter the root collection by IDs or by query.
By Specific IDs:
# Replicate specific customers and all related orders, invoices, etc.
mongorep run my_job --ids customers=507f1f77bcf86cd799439011
# Multiple IDs
mongorep run my_job --ids customers=507f1f77bcf86cd799439011,507f191e810c19729de860ea
By MongoDB Query:
# Replicate customers matching a query and all related data
mongorep run my_job --query customers='{"plan": "Basic"}'
# Complex queries
mongorep run my_job --query customers='{"status": "active", "createdAt": {"$gte": "2024-01-01"}}'
Define Relationships in Configuration:
schema_relationships:
- parent: customers
child: orders
parent_field: _id
child_field: customer_id
- parent: orders
child: order_items
parent_field: _id
child_field: order_id
The tool will:
- Find documents in the root collection matching your filter (IDs or query)
- Find related documents in child collections based on relationships
- Cascade through the entire relationship chain
- Replicate all matching documents
PII Anonymization
Built-in PII anonymization with support for multi-entity fields:
replication:
collections:
users:
# New format: supports multiple entity types per field
pii_anonymization:
- field: email
operator: mask_email
entity_type: EMAIL_ADDRESS
- field: phone
operator: mask_phone
entity_type: PHONE_NUMBER
- field: contact_info # Field with multiple PII types
operator: mask_person
entity_type: PERSON
- field: contact_info # Same field, second entity type
operator: mask_email
entity_type: EMAIL_ADDRESS
- field: ssn
operator: hash
entity_type: US_SSN
The scan command automatically detects multi-entity fields and configures operators in confidence order.
Field Transformations
Apply custom transformations:
replication:
collections:
orders:
field_transforms:
- field: billing_plan
type: regex_replace
pattern: '.*'
replacement: 'free'
Field Exclusion
Exclude sensitive fields:
replication:
collections:
users:
fields_exclude:
- password_hash
- internal_notes
- legacy_data
💾 State Management
The tool maintains two state collections:
_rep_runs - Job Run Tracking
Tracks each replication job run with:
- Status (running, completed, failed)
- Timestamps and duration
- Document/collection statistics
- Error summaries
_rep_state - Collection State
Per-collection state including:
- Last cursor position for incremental loading
- Processing status
- Error details
- Link to parent run
🐍 Programmatic Usage
Use as a Python library:
from mongo_replication import (
ConnectionManager,
ReplicationOrchestrator,
load_replication_config
)
# Load configuration
config = load_replication_config("config/my_job_config.yaml")
# Setup connections
conn_mgr = ConnectionManager(
source_uri="mongodb://source:27017/source_db",
dest_uri="mongodb://dest:27017/dest_db"
)
# Create orchestrator
orchestrator = ReplicationOrchestrator(
connection_manager=conn_mgr,
config=config
)
# Execute replication
result = orchestrator.replicate()
print(f"Collections processed: {result.total_collections_processed}")
print(f"Documents replicated: {result.total_documents_processed}")
print(f"Duration: {result.total_duration_seconds}s")
🏗️ Architecture
See Technical Design Documentation for:
- System architecture overview
- State management design
- Parallel processing model
- PII detection pipeline
- Extension points
⚡ Performance Tips
-
Batch Size: Adjust based on document size and network latency
- Large documents: 100-500
- Small documents: 1000-5000
-
Parallel Collections: Balance based on available resources
- Local replication: 5-10
- Network replication: 3-5
-
Indexes: Ensure cursor fields are indexed on source collections
-
Incremental Loading: Use timestamp-based cursor fields for optimal performance
🔍 Troubleshooting
Performance issues
# Reduce parallel processing
mongorep run my_job --max-parallel 2 --batch-size 500
Connection timeouts
- Increase
serverSelectionTimeoutMSin connection URI - Check network connectivity and firewall rules
Debug Logging
Enable verbose logging:
import logging
logging.basicConfig(level=logging.DEBUG)
🤝 Contributing
Contributions are welcome! Please:
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass
- Submit a pull request
📄 License
MIT License - see LICENSE file for details.
💬 Support
- Issues: GitHub Issues
- Documentation: Full Documentation
🙏 Acknowledgments
Built with:
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mongo_replication-1.0.2.tar.gz.
File metadata
- Download URL: mongo_replication-1.0.2.tar.gz
- Upload date:
- Size: 2.2 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
27a413230e67765b7c8f877ac7a3aab49cd8282be78538af6c7433434409bf00
|
|
| MD5 |
861d4a29ba2447d727153513d04a6959
|
|
| BLAKE2b-256 |
5542e2f42133f8c759a8219fb4804ea3d064c85c431d92a017ef31497358f67b
|
File details
Details for the file mongo_replication-1.0.2-py3-none-any.whl.
File metadata
- Download URL: mongo_replication-1.0.2-py3-none-any.whl
- Upload date:
- Size: 130.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
59f38847ab202fad8c19ae7282d523c730ae3974a4ec0fae95621f0d50898585
|
|
| MD5 |
6f736e166c3683d2f8d9374d231a641e
|
|
| BLAKE2b-256 |
0248374a71a6edb83b213dad07895d4afa071b658a1b4fdc23aa0be9e6d623fb
|