Pydantic model migrations and schemas
Project description
pyrmute
Pydantic model migrations and schema management with semantic versioning.
pyrmute handles the complexity of data model evolution so you can confidently make changes without breaking your production systems. Version your models, define transformations, and let pyrmute automatically migrate legacy data through multiple versions.
Key Features
- Version your models - Track schema evolution with semantic versioning
- Automatic migration chains - Transform data across multiple versions (1.0.0 → 2.0.0 → 3.0.0) in a single call
- Type-safe transformations - Migrations return validated Pydantic models, catching errors before they reach production
- Flexible schema export - Generate JSON schemas for all versions with
support for
$ref, custom generators, and nested models - Production-ready - Batch processing, parallel execution, and streaming support for large datasets
- Only one dependency - Pydantic
When to Use pyrmute
pyrmute excels at handling schema evolution in production systems:
- Configuration files - Upgrade user config files as your CLI/desktop app
evolves (
.apprc,config.json,settings.yaml) - Message queues & event streams - Handle messages from multiple service versions publishing different schemas (Kafka, RabbitMQ, SQS)
- ETL & data imports - Import CSV/JSON/Excel files exported over years with evolving structures
- ML model serving - Manage feature schema evolution across model versions and A/B tests
- API versioning - Support multiple API versions with automatic request/response migration
- Database migrations - Transparently migrate legacy data on read without downtime
- Data archival - Process historical data dumps with various schema versions
See the examples/ directory for complete, runnable code demonstrating these patterns.
Help
See documentation for complete guides and API reference.
Installation
pip install pyrmute
Quick Start
from pydantic import BaseModel
from pyrmute import ModelManager, ModelData
manager = ModelManager()
# Version 1: Simple user model
@manager.model("User", "1.0.0")
class UserV1(BaseModel):
name: str
age: int
# Version 2: Split name into components
@manager.model("User", "2.0.0")
class UserV2(BaseModel):
first_name: str
last_name: str
age: int
# Version 3: Add email and make age optional
@manager.model("User", "3.0.0")
class UserV3(BaseModel):
first_name: str
last_name: str
email: str
age: int | None = None
# Define how to migrate between versions
@manager.migration("User", "1.0.0", "2.0.0")
def split_name(data: ModelData) -> ModelData:
parts = data["name"].split(" ", 1)
return {
"first_name": parts[0],
"last_name": parts[1] if len(parts) > 1 else "",
"age": data["age"],
}
@manager.migration("User", "2.0.0", "3.0.0")
def add_email(data: ModelData) -> ModelData:
return {
**data,
"email": f"{data['first_name'].lower()}@example.com"
}
# Migrate legacy data to the latest version
legacy_data = {"name": "John Doe", "age": 30} # or, legacy.model_dump()
current_user = manager.migrate(legacy_data, "User", "1.0.0", "3.0.0")
print(current_user)
# UserV3(first_name='John', last_name='Doe', email='john@example.com', age=30)
Advanced Usage
Compare Model Versions
# See exactly what changed between versions
diff = manager.diff("User", "1.0.0", "3.0.0")
print(f"Added: {diff.added_fields}")
print(f"Removed: {diff.removed_fields}")
# Render a changelog to Markdown
print(diff.to_markdown(header_depth=4))
With header_depth=4 the output can be embedded nicely into this document.
User: 1.0.0 → 3.0.0
Added Fields
email: str(required)first_name: str(required)last_name: str(required)
Removed Fields
name
Modified Fields
age- type:int→int | None- now optional - default added:None
Breaking Changes
- ⚠️ New required field 'last_name' will fail for existing data without defaults
- ⚠️ New required field 'first_name' will fail for existing data without defaults
- ⚠️ New required field 'email' will fail for existing data without defaults
- ⚠️ Removed fields 'name' will be lost during migration
- ⚠️ Field 'age' type changed - may cause validation errors
Batch Processing
# Migrate thousands of records efficiently
legacy_users = [
{"name": "Alice Smith", "age": 28},
{"name": "Bob Johnson", "age": 35},
# ... thousands more
]
# Parallel processing for CPU-intensive migrations
users = manager.migrate_batch(
legacy_users,
"User",
from_version="1.0.0",
to_version="3.0.0",
parallel=True,
max_workers=4,
)
Streaming Large Datasets
# Process huge datasets without loading everything into memory
def load_users_from_database() -> Iterator[dict[str, Any]]:
yield from database.stream_users()
# Migrate and save incrementally
for user in manager.migrate_batch_streaming(
load_users_from_database(),
"User",
from_version="1.0.0",
to_version="3.0.0",
chunk_size=1000
):
database.save(user)
Test Your Migrations
# Validate migration logic with test cases
results = manager.test_migration(
"User",
from_version="1.0.0",
to_version="2.0.0",
test_cases=[
# (input, expected_output)
(
{"name": "Alice Smith", "age": 28},
{"first_name": "Alice", "last_name": "Smith", "age": 28}
),
(
{"name": "Bob", "age": 35},
{"first_name": "Bob", "last_name": "", "age": 35}
),
]
)
# Use in your test suite
assert results.all_passed, f"Migration failed: {results.failures}"
Bidirectional Migrations
# Support both upgrades and downgrades
@manager.migration("Config", "2.0.0", "1.0.0")
def downgrade_config(data: ModelData) -> ModelData:
"""Rollback to v1 format."""
return {k: v for k, v in data.items() if k in ["setting1", "setting2"]}
# Useful for:
# - Rolling back deployments
# - Normalizing outputs from multiple model versions
# - Supporting legacy systems during transitions
Nested Model Migrations
# Automatically migrates nested Pydantic models
@manager.model("Address", "1.0.0")
class AddressV1(BaseModel):
street: str
city: str
@manager.model("Address", "2.0.0")
class AddressV2(BaseModel):
street: str
city: str
postal_code: str
@manager.model("User", "2.0.0")
class UserV2(BaseModel):
name: str
address: AddressV2 # Nested model
# When migrating User, Address is automatically migrated too
@manager.migration("Address", "1.0.0", "2.0.0")
def add_postal_code(data: ModelData) -> ModelData:
return {**data, "postal_code": "00000"}
Discriminated Unions
from typing import Literal, Union
from pydantic import Field
# Handle complex type hierarchies
@manager.model("CreditCard", "1.0.0")
class CreditCardV1(BaseModel):
type: Literal["credit_card"] = "credit_card"
card_number: str
@manager.model("PayPal", "1.0.0")
class PayPalV1(BaseModel):
type: Literal["paypal"] = "paypal"
email: str
@manager.model("Payment", "1.0.0")
class PaymentV1(BaseModel):
method: Union[CreditCardV1, PayPalV1] = Field(discriminator="type")
# Migrations respect discriminated unions
Export JSON Schemas
# Generate schemas for all versions
manager.dump_schemas("schemas/")
# Creates: User_v1.0.0.json, User_v2.0.0.json, User_v3.0.0.json
# Use separate files with $ref for nested models with 'enable_ref=True'.
manager.dump_schemas(
"schemas/",
separate_definitions=True,
ref_template="https://api.example.com/schemas/{model}_v{version}.json"
)
Auto-Migration
# Skip writing migration functions for simple changes
@manager.model("Config", "1.0.0")
class ConfigV1(BaseModel):
timeout: int = 30
@manager.model("Config", "2.0.0", backward_compatible=True)
class ConfigV2(BaseModel):
timeout: int = 30
retries: int = 3 # New field with default
# No migration function needed - defaults are applied automatically
config = manager.migrate({"timeout": 60}, "Config", "1.0.0", "2.0.0")
# ConfigV2(timeout=60, retries=3)
Real-World Examples
Configuration File Evolution
# Your CLI tool evolves over time
@manager.model("AppConfig", "1.0.0")
class AppConfigV1(BaseModel):
api_key: str
debug: bool = False
@manager.model("AppConfig", "2.0.0")
class AppConfigV2(BaseModel):
api_key: str
api_endpoint: str = "https://api.example.com"
log_level: Literal["DEBUG", "INFO", "ERROR"] = "INFO"
@manager.migration("AppConfig", "1.0.0", "2.0.0")
def upgrade_config(data: dict) -> dict:
return {
"api_key": data["api_key"],
"api_endpoint": "https://api.example.com",
"log_level": "DEBUG" if data.get("debug") else "INFO",
}
# Load and auto-upgrade user's config file
def load_config(config_path: Path) -> AppConfigV2:
with open(config_path) as f:
data = json.load(f)
version = data.get("_version", "1.0.0")
# Migrate to current version
config = manager.migrate(
data,
"AppConfig",
from_version=version,
to_version="2.0.0"
)
# Save upgraded config with version tag
with open(config_path, "w") as f:
json.dump({**config.model_dump(), "_version": "2.0.0"}, f, indent=2)
return config
Message Queue Consumer
# Handle messages from multiple service versions
@manager.model("OrderEvent", "1.0.0")
class OrderEventV1(BaseModel):
order_id: str
customer_email: str
items: list[dict] # Unstructured
@manager.model("OrderEvent", "2.0.0")
class OrderEventV2(BaseModel):
order_id: str
customer_email: str
items: list[OrderItem] # Structured
total: Decimal
def process_message(message: dict, schema_version: str) -> None:
# Migrate to current schema regardless of source version
event = manager.migrate(
message,
"OrderEvent",
from_version=schema_version,
to_version="2.0.0"
)
# Process with current schema only
fulfill_order(event)
ETL Data Import
# Import historical exports with evolving schemas
import csv
def import_customers(file_path: Path, file_version: str) -> None:
with open(file_path) as f:
reader = csv.DictReader(f)
# Stream migration for memory efficiency
for customer in manager.migrate_batch_streaming(
reader,
"Customer",
from_version=file_version,
to_version="3.0.0",
chunk_size=1000
):
database.save(customer)
# Handle files from different years
import_customers("exports/2022_customers.csv", "1.0.0")
import_customers("exports/2023_customers.csv", "2.0.0")
import_customers("exports/2024_customers.csv", "3.0.0")
ML Model Serving
# Route requests to appropriate model versions
class InferenceService:
def predict(self, features: dict, request_version: str) -> BaseModel:
# Determine target model version (A/B testing, gradual rollout, etc.)
model_version = self.get_model_version(features["user_id"])
# Migrate request to model's expected format
model_input = manager.migrate(
features,
"PredictionRequest",
from_version=request_version,
to_version=model_version
)
# Run inference
prediction = self.models[model_version].predict(model_input)
# Normalize output for logging/analytics
return manager.migrate(
prediction,
"PredictionResponse",
from_version=model_version,
to_version="3.0.0"
)
See examples/ for complete runnable code:
config_file_migration.py- CLI/desktop app config file evolutionmessage_queue_consumer.py- Kafka/RabbitMQ/SQS consumer handling multiple schemasetl_data_import.py- CSV/JSON/Excel import pipeline with historical dataml_inference_pipeline.py- ML model serving with feature evolutionadvanced_features.py- Complex Pydantic features (unions, nested models, validators)
Contributing
For guidance on setting up a development environment and how to make a contribution to pyrmute, see Contributing to pyrmute.
Reporting a Security Vulnerability
See our security policy.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pyrmute-0.4.0.tar.gz.
File metadata
- Download URL: pyrmute-0.4.0.tar.gz
- Upload date:
- Size: 114.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
18018b899aca3dc01a87c9aedb09dec7d32a0951e1bfc2ca7aaa29168ab51fa5
|
|
| MD5 |
f0e9fe82136199d573f2d9a83145fd8e
|
|
| BLAKE2b-256 |
604dfa9ae0f5e07563a7f12ad496e1be986d19c803a86d73c9ab040d8b25a1d5
|
Provenance
The following attestation bundles were made for pyrmute-0.4.0.tar.gz:
Publisher:
publish.yml on mferrera/pyrmute
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pyrmute-0.4.0.tar.gz -
Subject digest:
18018b899aca3dc01a87c9aedb09dec7d32a0951e1bfc2ca7aaa29168ab51fa5 - Sigstore transparency entry: 600844428
- Sigstore integration time:
-
Permalink:
mferrera/pyrmute@e405e83a538d542fdb7ebf88e9b023d82475dbd3 -
Branch / Tag:
refs/tags/0.4.0 - Owner: https://github.com/mferrera
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@e405e83a538d542fdb7ebf88e9b023d82475dbd3 -
Trigger Event:
release
-
Statement type:
File details
Details for the file pyrmute-0.4.0-py3-none-any.whl.
File metadata
- Download URL: pyrmute-0.4.0-py3-none-any.whl
- Upload date:
- Size: 29.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ed5965b3b21127389f56bf0059b39533d439e64ead357336c6465c8ba918617f
|
|
| MD5 |
66806105dd75c5c75a9f2ac1badc3ce5
|
|
| BLAKE2b-256 |
a80de449f403102dca6b1e5643887602426abb069cb565ce5c133c5831bf7957
|
Provenance
The following attestation bundles were made for pyrmute-0.4.0-py3-none-any.whl:
Publisher:
publish.yml on mferrera/pyrmute
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pyrmute-0.4.0-py3-none-any.whl -
Subject digest:
ed5965b3b21127389f56bf0059b39533d439e64ead357336c6465c8ba918617f - Sigstore transparency entry: 600844431
- Sigstore integration time:
-
Permalink:
mferrera/pyrmute@e405e83a538d542fdb7ebf88e9b023d82475dbd3 -
Branch / Tag:
refs/tags/0.4.0 - Owner: https://github.com/mferrera
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@e405e83a538d542fdb7ebf88e9b023d82475dbd3 -
Trigger Event:
release
-
Statement type: