Skip to main content

A data matching and canonicalization library with multipl database connector support

Project description

CanonMap

CanonMap is a Python library for data matching and canonicalization with multiple database connector support. It provides a powerful entity resolution pipeline that can match and canonicalize entity names against database records using multiple blocking strategies and similarity scoring algorithms.

Features

  • Multi-Strategy Blocking: Uses phonetic (Double Metaphone), Soundex, initialism, and exact matching to efficiently filter candidate matches
  • Advanced Scoring: Combines multiple similarity metrics (Levenshtein, Jaro-Winkler, token overlap, trigram, phonetic, soundex, initialism) with configurable weights
  • MySQL Connector: Built-in MySQL connection pooling with transaction support
  • Helper Fields: Automatic generation of indexed helper columns (phonetic, soundex, initialism) for fast blocking queries
  • Data Import: Import CSV/XLSX files with automatic type inference and schema creation
  • Database Management: High-level API for creating tables, fields, constraints, and managing database schemas
  • FastAPI Integration: Example FastAPI application scaffold for REST API deployment
  • CLI Tools: Command-line interface for scaffolding FastAPI applications
  • Production-Ready Logging: Environment-aware logging (Rich for dev, JSON for production)

Installation

pip install canonmap

For development dependencies:

pip install canonmap[dev]

For FastAPI integration:

pip install canonmap[fastapi]

Quick Start

Basic Usage

from canonmap import MySQLConnector, MySQLConfig, MappingPipeline, EntityMappingRequest

# Configure database connection
config = MySQLConfig.from_env()  # Reads from MYSQL_HOST, MYSQL_USER, etc.
# Or manually:
# config = MySQLConfig(
#     host="localhost",
#     user="root",
#     password="password",
#     database="mydb"
# )

connector = MySQLConnector(config)
connector.initialize_pool()

# Create mapping pipeline
pipeline = MappingPipeline(connector)

# Match an entity
request = EntityMappingRequest(
    entity_name="John Smith",
    candidate_table_name="customers",
    candidate_field_name="full_name",
    top_n=10,
    max_prefilter=1000
)

response = pipeline.run(request)

# Access results
for result in response.results:
    print(f"{result.canonical_entity} (score: {result.score})")

Using Custom Weights

from canonmap import MappingWeights

weights = MappingWeights(
    exact=6.0,          # High weight for exact matches
    levenshtein=1.0,    # Edit distance similarity
    jaro=1.2,           # Jaro-Winkler similarity
    token=2.0,           # Token overlap
    trigram=1.0,         # 3-gram similarity
    phonetic=1.0,        # Phonetic similarity
    soundex=1.0,         # Soundex similarity
    initialism=0.5,      # Initialism matching
    multi_bonus=1.0      # Bonus for multiple matching features
)

response = pipeline.run(request, mapping_weights=weights)

Core Concepts

Entity Mapping Pipeline

The mapping pipeline follows a two-stage process:

  1. Blocking: Filters candidate records using fast indexing strategies (phonetic, soundex, initialism, exact)
  2. Scoring: Computes similarity scores for filtered candidates using multiple string similarity metrics

Blocking Strategies

  • Phonetic: Uses Double Metaphone algorithm to match similar-sounding names
  • Soundex: Uses Soundex codes for phonetic matching
  • Initialism: Matches based on first letters of words (e.g., "IBM" matches "International Business Machines")
  • Exact: Case-insensitive exact matching

Helper Fields

Helper fields are pre-computed indexed columns that speed up blocking queries:

  • __field_name_phonetic__: Double Metaphone codes
  • __field_name_soundex__: Soundex codes
  • __field_name_initialism__: Initialism strings

These can be automatically generated using the DBClient.create_helper_fields() method.

Usage Examples

Database Client Operations

from canonmap import MySQLConnector, MySQLConfig, DBClient

connector = MySQLConnector(MySQLConfig.from_env())
client = DBClient(connector)

# Import a CSV file
rows_imported = client.import_table_from_file(
    "customers.csv",
    table_name="customers",
    if_table_exists="append"  # or "replace" or "fail"
)

# Create helper fields for fast matching
client.create_helper_fields({
    "table_name": "customers",
    "field_name": "full_name",
    "transforms": ["phonetic", "soundex", "initialism"],
    "if_exists": "replace"  # or "skip" or "error"
})

# Create a table
client.create_table(
    table_name="products",
    fields_ddl="id BIGINT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(255) NOT NULL"
)

# Add a field
client.create_field(
    table_name="products",
    field_name="description",
    field_ddl="TEXT"
)

Prefiltering with SQL

# Use SQL to prefilter candidates before blocking
request = EntityMappingRequest(
    entity_name="John Smith",
    candidate_table_name="customers",
    candidate_field_name="full_name",
    prefilter_sql="SELECT * FROM customers WHERE status = 'active' AND created_at > '2020-01-01'",
    top_n=10
)

response = pipeline.run(request)

Development Mode

The run_dev() method provides additional debugging and deterministic behavior:

response = pipeline.run_dev(
    entity_mapping_request=request,
    mapping_weights=weights,
    per_strategy_limits={"phonetic": 500, "soundex": 300},
    global_prefilter_cap=1000,
    debug=True
)

Architecture

Package Structure

canonmap/
├── connectors/
│   └── mysql_connector/
│       ├── connector.py          # Connection pooling
│       ├── config.py              # Configuration management
│       ├── db_client.py           # High-level DB operations
│       ├── services/              # Service layer
│       │   ├── helper_fields_service.py
│       │   ├── import_table_service.py
│       │   ├── schema_service.py
│       │   └── ...
│       └── utils/                 # Utilities
│           ├── blocking.py
│           ├── transforms.py
│           └── ...
├── mapping/
│   ├── mapping_pipeline.py       # Main pipeline
│   ├── models.py                  # Pydantic models
│   └── utils/
│       ├── blocking.py            # Blocking strategies
│       ├── scoring.py             # Similarity scoring
│       └── normalize.py           # Text normalization
├── cli.py                         # CLI interface
└── logger.py                      # Logging configuration

Key Components

MySQLConnector

Manages MySQL connection pooling and provides:

  • Connection context managers
  • Transaction support
  • Query execution with automatic LIMIT enforcement
  • Write protection (optional)

MappingPipeline

Orchestrates the entity matching process:

  • Normalizes input entities
  • Executes blocking strategies in parallel
  • Scores candidates concurrently
  • Returns ranked results

DBClient

High-level database operations:

  • Table and field creation
  • Data import from files
  • Helper field generation
  • Schema management
  • Constraint management

API Reference

Models

EntityMappingRequest

class EntityMappingRequest(BaseModel):
    entity_name: str                    # Entity to match
    candidate_table_name: str           # Table to search
    candidate_field_name: str          # Field to match against
    top_n: int = 20                     # Number of results to return
    max_prefilter: int = 1000           # Max candidates per blocking strategy
    semantic_rerank: bool = False       # Future: semantic reranking
    prefilter_sql: Optional[str] = None # Optional SQL prefilter

EntityMappingResponse

class EntityMappingResponse(BaseModel):
    results: List[SingleMappedEntity]

class SingleMappedEntity(BaseModel):
    raw_entity: str              # Original input
    canonical_entity: str         # Matched entity
    canonical_table_name: str    # Source table
    canonical_field_name: str    # Source field
    score: float                 # Similarity score

MappingWeights

class MappingWeights(BaseModel):
    exact: float = 6.0
    levenshtein: float = 1.0
    jaro: float = 1.2
    token: float = 2.0
    trigram: float = 1.0
    phonetic: float = 1.0
    soundex: float = 1.0
    initialism: float = 0.5
    multi_bonus: float = 1.0

Methods

MappingPipeline.run()

def run(
    self,
    entity_mapping_request: Union[EntityMappingRequest, Dict[str, Any]],
    mapping_weights: Optional[Union[MappingWeights, Dict[str, Any]]] = None,
) -> EntityMappingResponse

Main pipeline execution method. Returns top N matches sorted by score.

MappingPipeline.run_dev()

def run_dev(
    self,
    entity_mapping_request: Union[EntityMappingRequest, Dict[str, Any]],
    mapping_weights: Optional[Union[MappingWeights, Dict[str, Any]]] = None,
    per_strategy_limits: Optional[Dict[str, int]] = None,
    global_prefilter_cap: Optional[int] = None,
    debug: bool = True,
) -> EntityMappingResponse

Development mode with additional controls and debugging.

Configuration

Environment Variables

CanonMap uses environment variables for database configuration:

MYSQL_HOST=localhost
MYSQL_USER=root
MYSQL_PASSWORD=your_password
MYSQL_DATABASE=your_database
MYSQL_POOL_NAME=mypool          # Optional, default: "mypool"
MYSQL_POOL_SIZE=5               # Optional, default: 5
ENV=dev                         # Optional, default: "dev" (affects logging)

Logging

Logging automatically adapts to environment:

  • Development (ENV=dev): Rich, colorized console output
  • Production (ENV=prod): JSON-formatted logs for cloud logging

CLI Usage

CanonMap includes a CLI tool for scaffolding FastAPI applications:

# Install CLI
pip install canonmap

# Create a new FastAPI app
cm make api

# Or specify a name
cm make api --name my_api

This creates a FastAPI application scaffold with:

  • Entity mapping endpoints
  • Database routes
  • Example configuration
  • CORS middleware

FastAPI Integration

The package includes an example FastAPI application in canonmap._example_usage.app. After scaffolding:

# Set up environment
cp .env.example .env
# Edit .env with your database credentials

# Run the API
uvicorn your_api_name.main:app --reload

Example Endpoints

  • POST /entity/map-entity: Map an entity to canonical forms
  • Database management endpoints (see example routes)

Performance Considerations

Helper Fields

For best performance, create helper fields on fields you frequently match against:

client.create_helper_fields({
    "table_name": "customers",
    "field_name": "full_name",
    "transforms": ["phonetic", "soundex", "initialism"]
})

This creates indexed columns that dramatically speed up blocking queries.

Prefiltering

Use prefilter_sql to reduce the candidate set before blocking:

request = EntityMappingRequest(
    entity_name="John Smith",
    candidate_table_name="customers",
    candidate_field_name="full_name",
    prefilter_sql="SELECT * FROM customers WHERE active = 1"
)

Parallel Execution

The pipeline automatically uses parallel execution for:

  • Multiple blocking strategies
  • Candidate scoring

Development

Setup

# Clone the repository
git clone https://github.com/vaberry/canonmap.git
cd canonmap

# Install in development mode
pip install -e ".[dev]"

# Run tests
pytest

# Format code
black src/
isort src/

# Type checking
mypy src/

Project Structure

  • src/canonmap/: Main package code
  • tests/: Test suite
  • scripts/: Build and deployment scripts

Dependencies

Core Dependencies

  • pydantic>=2.0.0: Data validation and settings
  • pandas>=2.0: Data manipulation
  • mysql-connector-python>=8.0.0: MySQL connectivity
  • metaphone>=0.6: Phonetic matching
  • python-Levenshtein>=0.20.0: String similarity
  • jellyfish>=0.9.0: String matching algorithms
  • rich>=12.0.0: Rich console output
  • SQLAlchemy>=2.0: Database abstraction

Optional Dependencies

  • fastapi>=0.100.0: FastAPI integration
  • uvicorn>=0.20.0: ASGI server
  • cohere>=4.0.0: Semantic reranking (future feature)
  • python-dotenv>=1.0.0: Environment variable management
  • openpyxl>=3.1: Excel file support
  • chardet>=4.0.0: Character encoding detection

Advanced Features

Schema Generation

Generate comprehensive schema metadata for your database:

schema = client.generate_schema(
    table_fields=["customers.full_name", "products.name"],  # Optional: specific fields
    num_examples=10,  # Number of example values per field
    save_location="./schemas",  # Optional: save to disk
    schema_name="my_schema.json",
    if_schema_exists="replace"
)

This generates metadata including:

  • Data types and column definitions
  • Example values for each field
  • Datetime format inference
  • Nullability and default values

Type Inference for Imports

When importing files, CanonMap automatically infers optimal MySQL types with robust error handling and automatic type widening for out-of-range values.

How It Works Internally

Matching Pipeline Flow

  1. Input Normalization: Entity name is normalized (Unicode NFKD normalization, punctuation removal, whitespace collapse, lowercasing)

  2. Blocking Phase (Parallel using ThreadPoolExecutor):

    • Phonetic Blocking: Generates Double Metaphone codes, queries helper column __field_phonetic__ with LIKE pattern
    • Soundex Blocking: Generates Soundex codes, queries helper column __field_soundex__ or falls back to MySQL SOUNDEX() function
    • Initialism Blocking: Extracts first letters of words, queries helper column __field_initialism__ with exact match
    • Exact Blocking: Normalized exact match with LIKE pattern on the source field
  3. Candidate Union: All candidates from different strategies are combined into a single set (duplicates removed)

  4. Scoring Phase (Parallel using ThreadPoolExecutor):

    • Each candidate is scored against the normalized query using weighted combination of:
      • Exact match (binary, weight: 6.0 default)
      • Levenshtein ratio (weight: 1.0 default)
      • Jaro-Winkler similarity (weight: 1.2 default)
      • Token overlap - first/last token matching (weight: 2.0 default)
      • Trigram similarity - 3-character sequences (weight: 1.0 default)
      • Phonetic similarity - Double Metaphone comparison (weight: 1.0 default)
      • Soundex similarity (weight: 1.0 default)
      • Initialism match (weight: 0.5 default)
    • Multi-bonus (weight: 1.0 default) applied when multiple features match
    • Final score = weighted sum of all features
  5. Ranking: Results sorted by score (descending) with deterministic tie-breaking by entity name

  6. Top-N Selection: Returns the top N results based on top_n parameter

Helper Field Generation Process

Helper fields are generated through a robust batch processing system:

  1. Column Detection: Checks for existing helper columns with naming convention __field_name_transform__ or field_name_transform__
  2. Primary Key Detection: Identifies primary key or auto-increment column for stable paging
  3. Temporary Key Creation: If no stable key exists, creates temporary auto-increment column __cm_tmp_pk__
  4. Batch Processing: Processes records in configurable chunks (default chunk size)
  5. Transform Application: Applies transform functions (phonetic via Double Metaphone, soundex, initialism)
  6. Bulk Updates: Uses CASE statements for efficient bulk updates by primary key
  7. Automatic Retry: Implements retry logic for transient database errors
  8. Cleanup: Removes temporary columns after processing

Troubleshooting

Common Issues

"Helper field already exists" error

  • Use if_exists="replace" or if_exists="skip" when creating helper fields
  • Or drop the column manually: ALTER TABLE table_name DROP COLUMN __field_name_phonetic__

Slow matching performance

  • Ensure helper fields are created and indexed
  • Use prefilter_sql to reduce candidate set
  • Adjust max_prefilter to limit candidates per strategy
  • Consider creating indexes on helper columns manually: CREATE INDEX idx_phonetic ON table_name(__field_name_phonetic__)

Connection pool errors

  • Increase MYSQL_POOL_SIZE if you have many concurrent operations
  • Check MySQL server connection limits: SHOW VARIABLES LIKE 'max_connections'
  • Ensure proper connection cleanup (always use context managers)

Import errors with CSV/XLSX

  • Check file encoding (UTF-8 recommended)
  • Verify column names don't conflict with MySQL reserved words
  • For large files, consider chunking or using direct MySQL LOAD DATA
  • Check for out-of-range numeric values (library will attempt automatic type widening)

No matches returned

  • Verify helper fields exist for the blocking strategies you're using
  • Check that prefilter_sql isn't too restrictive
  • Try increasing max_prefilter to allow more candidates
  • Use run_dev() with debug=True to see per-strategy candidate counts

Limitations

  • Currently supports MySQL only (other database connectors planned)
  • Helper fields must be created before optimal performance (first run may be slower)
  • Large candidate sets (>10,000) may require tuning max_prefilter and per_strategy_limits
  • Semantic reranking (semantic_rerank=True) is planned but not yet implemented
  • Helper field generation requires a stable key (primary key or auto-increment column)

Roadmap

  • Additional database connectors (PostgreSQL, SQLite)
  • Semantic reranking using Cohere API
  • Machine learning-based scoring models
  • Real-time streaming matching
  • Distributed matching for very large datasets
  • Additional blocking strategies (fuzzy matching, n-gram indexing)

License

MIT License - see LICENSE file for details.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Support

For issues, questions, or contributions, please visit the GitHub repository.

Author

Vince Berry - vincent.berry11@gmail.com


CanonMap - Intelligent entity resolution and canonicalization for Python

Project details


Release history Release notifications | RSS feed

This version

1.0.2

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

canonmap-1.0.2.tar.gz (43.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

canonmap-1.0.2-py3-none-any.whl (59.7 kB view details)

Uploaded Python 3

File details

Details for the file canonmap-1.0.2.tar.gz.

File metadata

  • Download URL: canonmap-1.0.2.tar.gz
  • Upload date:
  • Size: 43.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for canonmap-1.0.2.tar.gz
Algorithm Hash digest
SHA256 0971fada54e557090c1c39b3ac8fda971b4f2ce3e89f228d476883d2065d4fa6
MD5 38e8b602668ba2652fb6685fb904b79a
BLAKE2b-256 3db536cceaed0e252aed5177318ab5cd88230139f0f980491bd129bb7636b445

See more details on using hashes here.

File details

Details for the file canonmap-1.0.2-py3-none-any.whl.

File metadata

  • Download URL: canonmap-1.0.2-py3-none-any.whl
  • Upload date:
  • Size: 59.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for canonmap-1.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 88e3a7efa974ff69c9bec539ba585fef7faa74be833530828d4634792a0c765e
MD5 95c9dd4b2bf2d90b3a3e7160e680dafe
BLAKE2b-256 919668fa36601f497a8ade0a0f08863c830724321407f41867ff74f801976882

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page