A data matching and canonicalization library with multipl database connector support
Project description
CanonMap
CanonMap is a Python library for data matching and canonicalization with multiple database connector support. It provides a powerful entity resolution pipeline that can match and canonicalize entity names against database records using multiple blocking strategies and similarity scoring algorithms.
Features
- Multi-Strategy Blocking: Uses phonetic (Double Metaphone), Soundex, initialism, and exact matching to efficiently filter candidate matches
- Advanced Scoring: Combines multiple similarity metrics (Levenshtein, Jaro-Winkler, token overlap, trigram, phonetic, soundex, initialism) with configurable weights
- MySQL Connector: Built-in MySQL connection pooling with transaction support
- Helper Fields: Automatic generation of indexed helper columns (phonetic, soundex, initialism) for fast blocking queries
- Data Import: Import CSV/XLSX files with automatic type inference and schema creation
- Database Management: High-level API for creating tables, fields, constraints, and managing database schemas
- FastAPI Integration: Example FastAPI application scaffold for REST API deployment
- CLI Tools: Command-line interface for scaffolding FastAPI applications
- Production-Ready Logging: Environment-aware logging (Rich for dev, JSON for production)
Installation
pip install canonmap
For development dependencies:
pip install canonmap[dev]
For FastAPI integration:
pip install canonmap[fastapi]
Quick Start
Basic Usage
from canonmap import MySQLConnector, MySQLConfig, MappingPipeline, EntityMappingRequest
# Configure database connection
config = MySQLConfig.from_env() # Reads from MYSQL_HOST, MYSQL_USER, etc.
# Or manually:
# config = MySQLConfig(
# host="localhost",
# user="root",
# password="password",
# database="mydb"
# )
connector = MySQLConnector(config)
connector.initialize_pool()
# Create mapping pipeline
pipeline = MappingPipeline(connector)
# Match an entity
request = EntityMappingRequest(
entity_name="John Smith",
candidate_table_name="customers",
candidate_field_name="full_name",
top_n=10,
max_prefilter=1000
)
response = pipeline.run(request)
# Access results
for result in response.results:
print(f"{result.canonical_entity} (score: {result.score})")
Using Custom Weights
from canonmap import MappingWeights
weights = MappingWeights(
exact=6.0, # High weight for exact matches
levenshtein=1.0, # Edit distance similarity
jaro=1.2, # Jaro-Winkler similarity
token=2.0, # Token overlap
trigram=1.0, # 3-gram similarity
phonetic=1.0, # Phonetic similarity
soundex=1.0, # Soundex similarity
initialism=0.5, # Initialism matching
multi_bonus=1.0 # Bonus for multiple matching features
)
response = pipeline.run(request, mapping_weights=weights)
Core Concepts
Entity Mapping Pipeline
The mapping pipeline follows a two-stage process:
- Blocking: Filters candidate records using fast indexing strategies (phonetic, soundex, initialism, exact)
- Scoring: Computes similarity scores for filtered candidates using multiple string similarity metrics
Blocking Strategies
- Phonetic: Uses Double Metaphone algorithm to match similar-sounding names
- Soundex: Uses Soundex codes for phonetic matching
- Initialism: Matches based on first letters of words (e.g., "IBM" matches "International Business Machines")
- Exact: Case-insensitive exact matching
Helper Fields
Helper fields are pre-computed indexed columns that speed up blocking queries:
__field_name_phonetic__: Double Metaphone codes__field_name_soundex__: Soundex codes__field_name_initialism__: Initialism strings
These can be automatically generated using the DBClient.create_helper_fields() method.
Usage Examples
Database Client Operations
from canonmap import MySQLConnector, MySQLConfig, DBClient
connector = MySQLConnector(MySQLConfig.from_env())
client = DBClient(connector)
# Import a CSV file
rows_imported = client.import_table_from_file(
"customers.csv",
table_name="customers",
if_table_exists="append" # or "replace" or "fail"
)
# Create helper fields for fast matching
client.create_helper_fields({
"table_name": "customers",
"field_name": "full_name",
"transforms": ["phonetic", "soundex", "initialism"],
"if_exists": "replace" # or "skip" or "error"
})
# Create a table
client.create_table(
table_name="products",
fields_ddl="id BIGINT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(255) NOT NULL"
)
# Add a field
client.create_field(
table_name="products",
field_name="description",
field_ddl="TEXT"
)
Prefiltering with SQL
# Use SQL to prefilter candidates before blocking
request = EntityMappingRequest(
entity_name="John Smith",
candidate_table_name="customers",
candidate_field_name="full_name",
prefilter_sql="SELECT * FROM customers WHERE status = 'active' AND created_at > '2020-01-01'",
top_n=10
)
response = pipeline.run(request)
Development Mode
The run_dev() method provides additional debugging and deterministic behavior:
response = pipeline.run_dev(
entity_mapping_request=request,
mapping_weights=weights,
per_strategy_limits={"phonetic": 500, "soundex": 300},
global_prefilter_cap=1000,
debug=True
)
Architecture
Package Structure
canonmap/
├── connectors/
│ └── mysql_connector/
│ ├── connector.py # Connection pooling
│ ├── config.py # Configuration management
│ ├── db_client.py # High-level DB operations
│ ├── services/ # Service layer
│ │ ├── helper_fields_service.py
│ │ ├── import_table_service.py
│ │ ├── schema_service.py
│ │ └── ...
│ └── utils/ # Utilities
│ ├── blocking.py
│ ├── transforms.py
│ └── ...
├── mapping/
│ ├── mapping_pipeline.py # Main pipeline
│ ├── models.py # Pydantic models
│ └── utils/
│ ├── blocking.py # Blocking strategies
│ ├── scoring.py # Similarity scoring
│ └── normalize.py # Text normalization
├── cli.py # CLI interface
└── logger.py # Logging configuration
Key Components
MySQLConnector
Manages MySQL connection pooling and provides:
- Connection context managers
- Transaction support
- Query execution with automatic LIMIT enforcement
- Write protection (optional)
MappingPipeline
Orchestrates the entity matching process:
- Normalizes input entities
- Executes blocking strategies in parallel
- Scores candidates concurrently
- Returns ranked results
DBClient
High-level database operations:
- Table and field creation
- Data import from files
- Helper field generation
- Schema management
- Constraint management
API Reference
Models
EntityMappingRequest
class EntityMappingRequest(BaseModel):
entity_name: str # Entity to match
candidate_table_name: str # Table to search
candidate_field_name: str # Field to match against
top_n: int = 20 # Number of results to return
max_prefilter: int = 1000 # Max candidates per blocking strategy
semantic_rerank: bool = False # Future: semantic reranking
prefilter_sql: Optional[str] = None # Optional SQL prefilter
EntityMappingResponse
class EntityMappingResponse(BaseModel):
results: List[SingleMappedEntity]
class SingleMappedEntity(BaseModel):
raw_entity: str # Original input
canonical_entity: str # Matched entity
canonical_table_name: str # Source table
canonical_field_name: str # Source field
score: float # Similarity score
MappingWeights
class MappingWeights(BaseModel):
exact: float = 6.0
levenshtein: float = 1.0
jaro: float = 1.2
token: float = 2.0
trigram: float = 1.0
phonetic: float = 1.0
soundex: float = 1.0
initialism: float = 0.5
multi_bonus: float = 1.0
Methods
MappingPipeline.run()
def run(
self,
entity_mapping_request: Union[EntityMappingRequest, Dict[str, Any]],
mapping_weights: Optional[Union[MappingWeights, Dict[str, Any]]] = None,
) -> EntityMappingResponse
Main pipeline execution method. Returns top N matches sorted by score.
MappingPipeline.run_dev()
def run_dev(
self,
entity_mapping_request: Union[EntityMappingRequest, Dict[str, Any]],
mapping_weights: Optional[Union[MappingWeights, Dict[str, Any]]] = None,
per_strategy_limits: Optional[Dict[str, int]] = None,
global_prefilter_cap: Optional[int] = None,
debug: bool = True,
) -> EntityMappingResponse
Development mode with additional controls and debugging.
Configuration
Environment Variables
CanonMap uses environment variables for database configuration:
MYSQL_HOST=localhost
MYSQL_USER=root
MYSQL_PASSWORD=your_password
MYSQL_DATABASE=your_database
MYSQL_POOL_NAME=mypool # Optional, default: "mypool"
MYSQL_POOL_SIZE=5 # Optional, default: 5
ENV=dev # Optional, default: "dev" (affects logging)
Logging
Logging automatically adapts to environment:
- Development (
ENV=dev): Rich, colorized console output - Production (
ENV=prod): JSON-formatted logs for cloud logging
CLI Usage
CanonMap includes a CLI tool for scaffolding FastAPI applications:
# Install CLI
pip install canonmap
# Create a new FastAPI app
cm make api
# Or specify a name
cm make api --name my_api
This creates a FastAPI application scaffold with:
- Entity mapping endpoints
- Database routes
- Example configuration
- CORS middleware
FastAPI Integration
The package includes an example FastAPI application in canonmap._example_usage.app. After scaffolding:
# Set up environment
cp .env.example .env
# Edit .env with your database credentials
# Run the API
uvicorn your_api_name.main:app --reload
Example Endpoints
POST /entity/map-entity: Map an entity to canonical forms- Database management endpoints (see example routes)
Performance Considerations
Helper Fields
For best performance, create helper fields on fields you frequently match against:
client.create_helper_fields({
"table_name": "customers",
"field_name": "full_name",
"transforms": ["phonetic", "soundex", "initialism"]
})
This creates indexed columns that dramatically speed up blocking queries.
Prefiltering
Use prefilter_sql to reduce the candidate set before blocking:
request = EntityMappingRequest(
entity_name="John Smith",
candidate_table_name="customers",
candidate_field_name="full_name",
prefilter_sql="SELECT * FROM customers WHERE active = 1"
)
Parallel Execution
The pipeline automatically uses parallel execution for:
- Multiple blocking strategies
- Candidate scoring
Development
Setup
# Clone the repository
git clone https://github.com/vaberry/canonmap.git
cd canonmap
# Install in development mode
pip install -e ".[dev]"
# Run tests
pytest
# Format code
black src/
isort src/
# Type checking
mypy src/
Project Structure
src/canonmap/: Main package codetests/: Test suitescripts/: Build and deployment scripts
Dependencies
Core Dependencies
pydantic>=2.0.0: Data validation and settingspandas>=2.0: Data manipulationmysql-connector-python>=8.0.0: MySQL connectivitymetaphone>=0.6: Phonetic matchingpython-Levenshtein>=0.20.0: String similarityjellyfish>=0.9.0: String matching algorithmsrich>=12.0.0: Rich console outputSQLAlchemy>=2.0: Database abstraction
Optional Dependencies
fastapi>=0.100.0: FastAPI integrationuvicorn>=0.20.0: ASGI servercohere>=4.0.0: Semantic reranking (future feature)python-dotenv>=1.0.0: Environment variable managementopenpyxl>=3.1: Excel file supportchardet>=4.0.0: Character encoding detection
Advanced Features
Schema Generation
Generate comprehensive schema metadata for your database:
schema = client.generate_schema(
table_fields=["customers.full_name", "products.name"], # Optional: specific fields
num_examples=10, # Number of example values per field
save_location="./schemas", # Optional: save to disk
schema_name="my_schema.json",
if_schema_exists="replace"
)
This generates metadata including:
- Data types and column definitions
- Example values for each field
- Datetime format inference
- Nullability and default values
Type Inference for Imports
When importing files, CanonMap automatically infers optimal MySQL types with robust error handling and automatic type widening for out-of-range values.
How It Works Internally
Matching Pipeline Flow
-
Input Normalization: Entity name is normalized (Unicode NFKD normalization, punctuation removal, whitespace collapse, lowercasing)
-
Blocking Phase (Parallel using ThreadPoolExecutor):
- Phonetic Blocking: Generates Double Metaphone codes, queries helper column
__field_phonetic__with LIKE pattern - Soundex Blocking: Generates Soundex codes, queries helper column
__field_soundex__or falls back to MySQL SOUNDEX() function - Initialism Blocking: Extracts first letters of words, queries helper column
__field_initialism__with exact match - Exact Blocking: Normalized exact match with LIKE pattern on the source field
- Phonetic Blocking: Generates Double Metaphone codes, queries helper column
-
Candidate Union: All candidates from different strategies are combined into a single set (duplicates removed)
-
Scoring Phase (Parallel using ThreadPoolExecutor):
- Each candidate is scored against the normalized query using weighted combination of:
- Exact match (binary, weight: 6.0 default)
- Levenshtein ratio (weight: 1.0 default)
- Jaro-Winkler similarity (weight: 1.2 default)
- Token overlap - first/last token matching (weight: 2.0 default)
- Trigram similarity - 3-character sequences (weight: 1.0 default)
- Phonetic similarity - Double Metaphone comparison (weight: 1.0 default)
- Soundex similarity (weight: 1.0 default)
- Initialism match (weight: 0.5 default)
- Multi-bonus (weight: 1.0 default) applied when multiple features match
- Final score = weighted sum of all features
- Each candidate is scored against the normalized query using weighted combination of:
-
Ranking: Results sorted by score (descending) with deterministic tie-breaking by entity name
-
Top-N Selection: Returns the top N results based on
top_nparameter
Helper Field Generation Process
Helper fields are generated through a robust batch processing system:
- Column Detection: Checks for existing helper columns with naming convention
__field_name_transform__orfield_name_transform__ - Primary Key Detection: Identifies primary key or auto-increment column for stable paging
- Temporary Key Creation: If no stable key exists, creates temporary auto-increment column
__cm_tmp_pk__ - Batch Processing: Processes records in configurable chunks (default chunk size)
- Transform Application: Applies transform functions (phonetic via Double Metaphone, soundex, initialism)
- Bulk Updates: Uses CASE statements for efficient bulk updates by primary key
- Automatic Retry: Implements retry logic for transient database errors
- Cleanup: Removes temporary columns after processing
Troubleshooting
Common Issues
"Helper field already exists" error
- Use
if_exists="replace"orif_exists="skip"when creating helper fields - Or drop the column manually:
ALTER TABLE table_name DROP COLUMN __field_name_phonetic__
Slow matching performance
- Ensure helper fields are created and indexed
- Use
prefilter_sqlto reduce candidate set - Adjust
max_prefilterto limit candidates per strategy - Consider creating indexes on helper columns manually:
CREATE INDEX idx_phonetic ON table_name(__field_name_phonetic__)
Connection pool errors
- Increase
MYSQL_POOL_SIZEif you have many concurrent operations - Check MySQL server connection limits:
SHOW VARIABLES LIKE 'max_connections' - Ensure proper connection cleanup (always use context managers)
Import errors with CSV/XLSX
- Check file encoding (UTF-8 recommended)
- Verify column names don't conflict with MySQL reserved words
- For large files, consider chunking or using direct MySQL LOAD DATA
- Check for out-of-range numeric values (library will attempt automatic type widening)
No matches returned
- Verify helper fields exist for the blocking strategies you're using
- Check that
prefilter_sqlisn't too restrictive - Try increasing
max_prefilterto allow more candidates - Use
run_dev()withdebug=Trueto see per-strategy candidate counts
Limitations
- Currently supports MySQL only (other database connectors planned)
- Helper fields must be created before optimal performance (first run may be slower)
- Large candidate sets (>10,000) may require tuning
max_prefilterandper_strategy_limits - Semantic reranking (
semantic_rerank=True) is planned but not yet implemented - Helper field generation requires a stable key (primary key or auto-increment column)
Roadmap
- Additional database connectors (PostgreSQL, SQLite)
- Semantic reranking using Cohere API
- Machine learning-based scoring models
- Real-time streaming matching
- Distributed matching for very large datasets
- Additional blocking strategies (fuzzy matching, n-gram indexing)
License
MIT License - see LICENSE file for details.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Support
For issues, questions, or contributions, please visit the GitHub repository.
Author
Vince Berry - vincent.berry11@gmail.com
CanonMap - Intelligent entity resolution and canonicalization for Python
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file canonmap-1.0.2.tar.gz.
File metadata
- Download URL: canonmap-1.0.2.tar.gz
- Upload date:
- Size: 43.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0971fada54e557090c1c39b3ac8fda971b4f2ce3e89f228d476883d2065d4fa6
|
|
| MD5 |
38e8b602668ba2652fb6685fb904b79a
|
|
| BLAKE2b-256 |
3db536cceaed0e252aed5177318ab5cd88230139f0f980491bd129bb7636b445
|
File details
Details for the file canonmap-1.0.2-py3-none-any.whl.
File metadata
- Download URL: canonmap-1.0.2-py3-none-any.whl
- Upload date:
- Size: 59.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
88e3a7efa974ff69c9bec539ba585fef7faa74be833530828d4634792a0c765e
|
|
| MD5 |
95c9dd4b2bf2d90b3a3e7160e680dafe
|
|
| BLAKE2b-256 |
919668fa36601f497a8ade0a0f08863c830724321407f41867ff74f801976882
|