Skip to main content

A CSV importer for MongoDB

Project description

PyImport - A Powerful CSV Importer for MongoDB

Documentation Status Python 3.9+ License

PyImport is a Python command-line tool for importing CSV data into MongoDB with automatic type detection, parallel processing, and graceful handling of "dirty" data.

Unlike MongoDB's native mongoimport, PyImport focuses on handling real-world messy data, automatic type inference, and high-performance parallel imports.

Version: 2.0.1 Author: Joe Drumgoole (joe@joedrumgoole.com | BlueSky) License: Apache 2.0 Source: github.com/jdrumgoole/pyimport Documentation: pyimport.readthedocs.io

Key Features

  • 🆕 Nested Document Mapping (v2.0) - Transform flat CSV data into rich hierarchical MongoDB documents using dot notation paths
  • Automatic Type Detection - Generate field files with inferred types using --genfieldfile
  • Graceful Error Handling - Falls back to strings on type conversion errors instead of failing
  • Multiple Import Strategies - Sync, async, multi-process, and threaded imports
  • Parallel Processing - Split large files and import in parallel for maximum throughput
  • Flexible Date Parsing - Multiple date formats with fast ISO date parsing (100x faster)
  • Performance Optimized - Recent improvements provide 20-35% faster imports
  • URL Support - Import directly from URLs or local files
  • Audit Tracking - Optional audit records for import tracking and monitoring
  • Restart Capability - Resume interrupted imports from where they left off with --restart

Performance

  • Sync: ~24,000-32,000 docs/sec
  • Async: ~30,000-40,000 docs/sec
  • Multi-process: ~50,000+ docs/sec

Requirements

  • Python: 3.9 or higher
  • MongoDB: 4.0 or higher

Installation

From PyPI (Recommended)

pip install pyimport

From Source

git clone https://github.com/jdrumgoole/pyimport.git
cd pyimport
poetry install

Verify Installation

pyimport --version
# Output: pyimport 2.0.1

Python API

PyImport provides a clean programmatic Python API for integrating CSV imports into your applications:

from pyimport.api import PyImportAPI

# Simple import
api = PyImportAPI(database="mydb", collection="mycol")
result = api.import_csv("data.csv", has_header=True)
print(f"Imported {result.total_written} records")

# Advanced usage with builder pattern
from pyimport.api import PyImportBuilder

result = (PyImportBuilder()
    .connect("mongodb://localhost:27017")
    .database("mydb")
    .collection("mycol")
    .csv_file("data.csv")
    .has_header(True)
    .parallel("multi", workers=4)
    .add_timestamp()
    .import_data())

Full API Documentation: API Guide

Quick Start

Step 1: Create a Simple CSV File

# Create a test CSV file
echo "name,age,city" > test.csv
echo "Alice,30,NYC" >> test.csv
echo "Bob,25,LA" >> test.csv

Step 2: Generate Field File (Type Definitions)

pyimport --genfieldfile test.csv
# Output: Created field filename 'test.tff' from 'test.csv'

This creates a test.tff file that defines the type of each column (string, int, date, etc.).

Step 3: Import to MongoDB

pyimport --database mydb --collection people test.csv
# Imports data using the auto-generated test.tff field file

Step 4: Verify Import

mongosh mydb --eval "db.people.find().pretty()"

🆕 Nested Document Mapping (v2.0)

Transform flat CSV data into rich hierarchical MongoDB documents!

PyImport v2.0 introduces powerful nested document mapping using dot notation paths in field files. This allows you to organize related fields into logical hierarchies, making your MongoDB documents more structured and queryable.

Quick Example

Transform this flat CSV:

first_name,last_name,city,state,zip
Alice,Smith,NYC,NY,10001

Into this nested MongoDB document:

{
  "name": {
    "first": "Alice",
    "last": "Smith"
  },
  "address": {
    "city": "NYC",
    "state": "NY",
    "zip": "10001"
  }
}

How It Works

Simply add a path field to your .tff field file using dot notation:

[first_name]
type = "str"
name = "first_name"
path = "name.first"  # ← Nested path

[last_name]
type = "str"
name = "last_name"
path = "name.last"   # ← Nested path

[city]
type = "str"
name = "city"
path = "address.city"  # ← Nested path

[state]
type = "str"
name = "state"
path = "address.state"

[zip]
type = "int"
name = "zip"
path = "address.zip"

Then import as usual:

pyimport --database mydb --collection people --fieldfile people_v2.tff people.csv

Real-World Examples

Healthcare Data - Organize hospital A&E data into departments, performance metrics, and admissions:

[SHA]
type = "str"
path = "organization.sha"

[Type1_Attendances]
type = "int"
path = "departments.type1.attendances"

[Percentage_4Hours]
type = "float"
path = "performance.within_4_hours_pct"

Geospatial Data - Structure NYC taxi data with nested coordinates for MongoDB geospatial queries:

[pickup_longitude]
type = "float"
path = "pickup.location.coordinates.longitude"

[pickup_latitude]
type = "float"
path = "pickup.location.coordinates.latitude"

[fare_amount]
type = "float"
path = "payment.fare"

Key Benefits

  • Better Organization - Group related fields logically (e.g., address.*, contact.*, payment.*)
  • Easier Queries - Query nested paths: db.collection.find({"address.city": "NYC"})
  • Better Indexes - Create indexes on nested fields: db.collection.createIndex({"payment.total": 1})
  • Backward Compatible - Mix v1.0 (flat) and v2.0 (nested) fields in the same file
  • Minimal Overhead - Less than 5% performance impact
  • Geospatial Support - Perfect for organizing coordinates for MongoDB geospatial queries

Learn More

See the complete Nested Document Mapping Guide for:

  • Deep nesting examples (5+ levels)
  • Path validation rules
  • Migration from v1.0 to v2.0
  • Common patterns and best practices
  • MongoDB query examples

Advanced Usage

Fast Parallel Import for Large Files

pyimport --multi --splitfile --autosplit 8 --poolsize 4 \
         --database mydb --collection mycol largefile.csv

This splits the file into 8 chunks and processes them with 4 parallel workers.

Async Import (High Performance)

pyimport --asyncpro --database mydb --collection mycol data.csv

Import from URL

pyimport --database mydb --collection taxi \
         https://jdrumgoole.s3.eu-west-1.amazonaws.com/2018_Yellow_Taxi_Trip_Data_1000.csv

Track Imports with Audit

# Import with audit tracking enabled
pyimport --audit --audithost mongodb://localhost:27017 \
         --database mydb --collection mycol largefile.csv

Audit records capture metadata about each import including filename, record count, elapsed time, and command-line arguments for monitoring and debugging.

Restart Interrupted Imports

PyImport can resume interrupted multi-file imports from where they left off:

# Start a multi-file import with audit tracking
pyimport --audit --database mydb --collection mycol file1.csv file2.csv file3.csv

# If interrupted, restart using the batch ID
pyimport --restart --batch-id abc123 --database mydb --collection mycol \
         file1.csv file2.csv file3.csv

# Or let PyImport auto-detect the incomplete batch
pyimport --restart --database mydb --collection mycol \
         file1.csv file2.csv file3.csv

Key Features:

  • Progress Tracking - Records checkpoints every N documents (configurable with --checkpoint-interval)
  • File-Level Restart - Skips already completed files, only processes remaining files
  • Auto-Detection - Automatically finds the last incomplete batch if --batch-id not specified
  • Works with All Import Modes - Supports sync, async, multi-process, and threaded imports

Example: Import 10 large files in parallel. If the process crashes after completing 7 files, restart will automatically skip those 7 and only process the remaining 3 files.

Requirements:

  • Restart requires --audit to be enabled for progress tracking
  • Pass the same file list on restart to identify which files were completed

Why PyImport?

MongoDB's native mongoimport is excellent, but PyImport offers several additional capabilities:

PyImport Advantages

Feature PyImport mongoimport
Type inference Automatic with --genfieldfile Manual with --columnsHaveTypes
Dirty data handling Graceful fallback to string Strict, may fail
Date formats Multiple formats, automatic detection Limited
Parallel processing Built-in --multi, --asyncpro, --threads Requires external scripting
Audit tracking Built-in --audit with progress tracking Not built-in
Restart capability Full restart support with --restart Not available
URL imports Direct URL support Requires pre-download
File splitting Automatic with --splitfile Manual
Performance optimization Pre-compiled converters, fast ISO dates Standard

mongoimport Advantages

  • Richer security options (Kerberos, LDAP, x.509)
  • MongoDB Enterprise Advanced features
  • JSON file imports (in addition to CSV)
  • Official MongoDB support

When to Use PyImport

Choose PyImport when you need to:

  • Handle messy, inconsistent, or "dirty" CSV data
  • Automatically infer types from CSV columns
  • Import large files quickly with parallel processing
  • Import data directly from URLs
  • Add metadata (timestamps, filenames, line numbers) to documents
  • Track import operations with audit records
  • Resume interrupted multi-file imports without re-processing completed files

Field Files (.tff)

Field files are TOML-formatted files that define column types and formats for CSV imports. They enable automatic type conversion during import.

Automatic Generation

The easiest way to create a field file is to generate it automatically:

pyimport --genfieldfile data.csv
# Creates data.tff with inferred types

Supported Types

  • str - String (text)
  • int - Integer
  • float - Floating point number
  • date - Date without time
  • datetime - Date with time
  • isodate - ISO format date (YYYY-MM-DD) - fastest parsing
  • bool - Boolean (true/false)
  • timestamp - Unix timestamp

Field File Naming

PyImport automatically looks for field files with the .tff extension:

  • For data.csv, it looks for data.tff
  • You can specify a custom field file with --fieldfile

Example Field File

For a CSV file with inventory data:

Inventory Item Amount Last Order
Screws 300 1-Jan-2016
Bolts 150 3-Feb-2017
Nails 25 31-Dec-2017

Running pyimport --genfieldfile inventory.csv generates:

# Created 'inventory.tff'
# at UTC: 2025-10-12 by pyimport.fieldfile

["Inventory Item"]
type = "str"
name = "Inventory Item"

["Amount"]
type = "int"
name = "Amount"

["Last Order"]
type = "date"
name = "Last Order"
format = "%d-%b-%Y"  # Date format string

[DEFAULTS_SECTION]
delimiter = ","
has_header = true

Type Inference

PyImport analyzes the first data row after the header to infer types:

  1. Tries to parse as int
  2. If that fails, tries float
  3. If that fails, tries date
  4. Falls back to str

You can manually edit .tff files to correct types if inference is incorrect.

Graceful Error Handling

If type conversion fails during import, PyImport falls back to storing the value as a string instead of failing the entire import (unless --onerror fail is specified).

Date Format Strings

Date and datetime fields support strptime format strings:

["order_date"]
type = "date"
format = "%Y-%m-%d"  # 2024-12-31

Common format codes:

  • %Y - 4-digit year (2024)
  • %m - Month (01-12)
  • %d - Day (01-31)
  • %H - Hour (00-23)
  • %M - Minute (00-59)
  • %S - Second (00-59)

Date Parsing Performance

For best performance, choose the right date type:

  1. isodate (fastest) - Use for ISO format dates (YYYY-MM-DD)

    • 100x faster than generic date parsing
    ["created_date"]
    type = "isodate"
    
  2. date/datetime with format (fast) - Use when all dates have the same format

    ["order_date"]
    type = "datetime"
    format = "%Y-%m-%d %H:%M:%S"
    
  3. date/datetime without format (slow) - Use only for inconsistent date formats

    ["flexible_date"]
    type = "date"  # No format - uses slow dateutil.parser
    

Complete Documentation

For comprehensive documentation including all CLI options, advanced features, and examples, visit:

📖 Full Documentation at readthedocs.io

Documentation includes:

Common Options

Basic Options

-h, --help              Show help message
--version               Show version number
--database NAME         Database name [default: PYIM]
--collection NAME       Collection name [default: imported]
--mdburi URI           MongoDB connection URI [default: mongodb://localhost:27017]

Field File Options

--genfieldfile          Generate field file from CSV
--fieldfile FILE        Specify custom field file path
--delimiter CHAR        Field delimiter [default: ,]
--hasheader             CSV has header line

Performance Options

--multi                 Multi-process parallel import
--asyncpro             Async parallel import (high performance)
--threads              Thread-based parallel import
--poolsize N           Number of parallel workers [default: 4]
--batchsize N          Batch size for bulk inserts [default: 1000]

File Splitting Options

--splitfile            Split file for parallel processing
--autosplit N          Split into N chunks
--keepsplits           Don't delete split files after import

Audit Options

--audit                Enable audit tracking
--audithost URI        MongoDB URI for audit records
--auditdatabase NAME   Database for audit records [default: PYIMPORT_AUDIT]
--auditcollection NAME Collection for audit records [default: audit]

Restart Options

--restart              Resume an interrupted import
--batch-id ID          Specify batch ID to restart (auto-detects if omitted)
--checkpoint-interval N Records progress every N documents [default: 10000]

Data Enrichment Options

--addfilename          Add filename to each document
--addtimestamp now     Add current timestamp
--addtimestamp gen     Add generated ObjectId timestamp
--locator              Add filename and line number
--addfield key=value   Add custom field to all documents

Error Handling Options

--onerror fail         Stop on first error
--onerror warn         Log errors and continue [default]
--onerror ignore       Silently skip errors

Example Workflows

Simple Import

pyimport --genfieldfile data.csv
pyimport --database mydb --collection mycol data.csv

High-Performance Import

pyimport --multi --splitfile --autosplit 8 --poolsize 4 \
         --batchsize 5000 --database mydb --collection mycol \
         largefile.csv

Import with Metadata

pyimport --addfilename --addtimestamp now --locator \
         --database mydb --collection mycol data.csv

Import with Audit Tracking

pyimport --audit --audithost mongodb://localhost:27017 \
         --database mydb --collection mycol largefile.csv

This creates audit records in the audit collection tracking import metadata for monitoring and debugging.

Restart an Interrupted Import

# Start import with audit enabled
pyimport --audit --multi --database mydb --collection mycol \
         file1.csv file2.csv file3.csv file4.csv file5.csv

# Process is interrupted after completing file1.csv and file2.csv...

# Restart - will skip completed files and only process file3-5
pyimport --restart --multi --database mydb --collection mycol \
         file1.csv file2.csv file3.csv file4.csv file5.csv

The restart feature works with all import strategies (sync, async, multi-process, threaded).

Contributing

Contributions are welcome! Please feel free to submit issues, feature requests, or pull requests.

Development Setup

git clone https://github.com/jdrumgoole/pyimport.git
cd pyimport
poetry install --with dev

# Run tests
poetry run pytest

# Run all tests with coverage
invoke test-all

Testing

PyImport has comprehensive test coverage (72%+):

# Run all tests
invoke test-all

# Run specific test suites
cd test/test_command && poetry run pytest
cd test/test_e2e && poetry run pytest

# Quick smoke tests
invoke quick-test

Version History

2.0.1 (Current) - Python 3.9 Support & Reliability Improvements

  • Python 3.9 Support: Extended compatibility to Python 3.9+
    • All 329 tests pass on Python 3.9, 3.10, 3.11, 3.12, and 3.13
  • Improved Write Reliability: Changed default write concern from 0 to 1 with journaling enabled
    • Better data durability and eliminates race conditions

2.0.0 - Major Feature Release

  • 🎉 NEW: TFF v2.0 Format - Nested document mapping with dot notation paths
    • Transform flat CSV into hierarchical MongoDB documents
    • Simple dot notation syntax: path = "address.city"
    • 100% backward compatible with v1.0 field files
    • Real-world tested with healthcare and geospatial data
    • Minimal performance overhead (<5%)
  • Fixed: Enricher TypeError when handling nested documents
  • Fixed: PyMongo compatibility - updated deprecated j= parameter to journal=
  • Comprehensive Testing: 80+ tests with 100% coverage on new code
  • Documentation: Complete nested mapping guide with examples

1.10.9

  • Optimized test suite with parallel execution (pytest-xdist)
  • Improved publish workflow performance (30-40% faster)
  • New invoke tasks for faster development

1.10.0

  • NEW: Restart Capability - Resume interrupted multi-file imports with --restart
  • Progress tracking with configurable checkpoint intervals
  • Auto-detection of incomplete batches
  • File-level restart (skips completed files)
  • Works with all import strategies (sync, async, multi-process, threaded)

1.9.0

  • Comprehensive documentation (2,700+ lines)
  • Performance improvements (20-35% faster)
  • Test coverage improvements (72%)
  • Read the Docs integration

See CHANGELOG for complete version history.

Links

Support

License

Apache License 2.0 - See LICENSE file for details.


Made with ❤️ by Joe Drumgoole

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyimport-2.0.3.tar.gz (81.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyimport-2.0.3-py3-none-any.whl (98.8 kB view details)

Uploaded Python 3

File details

Details for the file pyimport-2.0.3.tar.gz.

File metadata

  • Download URL: pyimport-2.0.3.tar.gz
  • Upload date:
  • Size: 81.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.11.9 Darwin/24.6.0

File hashes

Hashes for pyimport-2.0.3.tar.gz
Algorithm Hash digest
SHA256 1a801da66aa9dcc13aa39c8db816acdf261af548f9a2839bc53cc35852406747
MD5 bd2734bf11cc69e34e47be023cbd5c3d
BLAKE2b-256 e1a63d5c8ad89296a84c3ebca3fab460e4351285f602d5de0ca26156238fe37c

See more details on using hashes here.

File details

Details for the file pyimport-2.0.3-py3-none-any.whl.

File metadata

  • Download URL: pyimport-2.0.3-py3-none-any.whl
  • Upload date:
  • Size: 98.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.11.9 Darwin/24.6.0

File hashes

Hashes for pyimport-2.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 12e9cb2bceca0bad8d4d5d8bbc18c3bfad334335007c7f42deef68b8993e8708
MD5 9473137381217519b1829529b158b91f
BLAKE2b-256 06229f205ffc5f88eef77f2fd98ec600e7b6fc72d5804cf4984223df511c23bf

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page