Skip to main content

Pydantic model migrations and schemas

Project description

pyrmute

ci pypi versions license

Pydantic model migrations and schema management with semantic versioning.

pyrmute handles the complexity of data model evolution so you can confidently make changes without breaking your production systems. Version your models, define transformations, and let pyrmute automatically migrate legacy data through multiple versions.

Key Features

  • Version your models - Track schema evolution with semantic versioning
  • Automatic migration chains - Transform data across multiple versions (1.0.0 → 2.0.0 → 3.0.0) in a single call
  • Type-safe transformations - Migrations return validated Pydantic models, catching errors before they reach production
  • Flexible schema export - Generate JSON schemas for all versions with support for $ref, custom generators, and nested models
  • Production-ready - Batch processing, parallel execution, and streaming support for large datasets
  • Only one dependency - Pydantic

When to Use pyrmute

pyrmute excels at handling schema evolution in production systems:

  • Configuration files - Upgrade user config files as your CLI/desktop app evolves (.apprc, config.json, settings.yaml)
  • Message queues & event streams - Handle messages from multiple service versions publishing different schemas (Kafka, RabbitMQ, SQS)
  • ETL & data imports - Import CSV/JSON/Excel files exported over years with evolving structures
  • ML model serving - Manage feature schema evolution across model versions and A/B tests
  • API versioning - Support multiple API versions with automatic request/response migration
  • Database migrations - Transparently migrate legacy data on read without downtime
  • Data archival - Process historical data dumps with various schema versions

See the examples/ directory for complete, runnable code demonstrating these patterns.

Help

See documentation for complete guides and API reference.

Installation

pip install pyrmute

Quick Start

from pydantic import BaseModel
from pyrmute import ModelManager, ModelData

manager = ModelManager()


# Version 1: Simple user model
@manager.model("User", "1.0.0")
class UserV1(BaseModel):
    name: str
    age: int


# Version 2: Split name into components
@manager.model("User", "2.0.0")
class UserV2(BaseModel):
    first_name: str
    last_name: str
    age: int


# Version 3: Add email and make age optional
@manager.model("User", "3.0.0")
class UserV3(BaseModel):
    first_name: str
    last_name: str
    email: str
    age: int | None = None


# Define how to migrate between versions
@manager.migration("User", "1.0.0", "2.0.0")
def split_name(data: ModelData) -> ModelData:
    parts = data["name"].split(" ", 1)
    return {
        "first_name": parts[0],
        "last_name": parts[1] if len(parts) > 1 else "",
        "age": data["age"],
    }


@manager.migration("User", "2.0.0", "3.0.0")
def add_email(data: ModelData) -> ModelData:
    return {
        **data,
        "email": f"{data['first_name'].lower()}@example.com"
    }


# Migrate legacy data to the latest version
legacy_data = {"name": "John Doe", "age": 30}  # or, legacy.model_dump()
current_user = manager.migrate(legacy_data, "User", "1.0.0", "3.0.0")

print(current_user)
# UserV3(first_name='John', last_name='Doe', email='john@example.com', age=30)

Advanced Usage

Compare Model Versions

# See exactly what changed between versions
diff = manager.diff("User", "1.0.0", "3.0.0")
print(f"Added: {diff.added_fields}")
print(f"Removed: {diff.removed_fields}")
# Render a changelog to Markdown
print(diff.to_markdown(header_depth=4))

With header_depth=4 the output can be embedded nicely into this document.

User: 1.0.0 → 3.0.0

Added Fields
  • email: str (required)
  • first_name: str (required)
  • last_name: str (required)
Removed Fields
  • name
Modified Fields
  • age - type: intint | None - now optional - default added: None
Breaking Changes
  • ⚠️ New required field 'last_name' will fail for existing data without defaults
  • ⚠️ New required field 'first_name' will fail for existing data without defaults
  • ⚠️ New required field 'email' will fail for existing data without defaults
  • ⚠️ Removed fields 'name' will be lost during migration
  • ⚠️ Field 'age' type changed - may cause validation errors

Batch Processing

# Migrate thousands of records efficiently
legacy_users = [
    {"name": "Alice Smith", "age": 28},
    {"name": "Bob Johnson", "age": 35},
    # ... thousands more
]

# Parallel processing for CPU-intensive migrations
users = manager.migrate_batch(
    legacy_users,
    "User",
    from_version="1.0.0",
    to_version="3.0.0",
    parallel=True,
    max_workers=4,
)

Streaming Large Datasets

# Process huge datasets without loading everything into memory
def load_users_from_database() -> Iterator[dict[str, Any]]:
    yield from database.stream_users()


# Migrate and save incrementally
for user in manager.migrate_batch_streaming(
    load_users_from_database(),
    "User",
    from_version="1.0.0",
    to_version="3.0.0",
    chunk_size=1000
):
    database.save(user)

Test Your Migrations

# Validate migration logic with test cases
results = manager.test_migration(
    "User",
    from_version="1.0.0",
    to_version="2.0.0",
    test_cases=[
        # (input, expected_output)
        (
            {"name": "Alice Smith", "age": 28},
            {"first_name": "Alice", "last_name": "Smith", "age": 28}
        ),
        (
            {"name": "Bob", "age": 35},
            {"first_name": "Bob", "last_name": "", "age": 35}
        ),
    ]
)

# Use in your test suite
assert results.all_passed, f"Migration failed: {results.failures}"

Bidirectional Migrations

# Support both upgrades and downgrades
@manager.migration("Config", "2.0.0", "1.0.0")
def downgrade_config(data: ModelData) -> ModelData:
    """Rollback to v1 format."""
    return {k: v for k, v in data.items() if k in ["setting1", "setting2"]}

# Useful for:
# - Rolling back deployments
# - Normalizing outputs from multiple model versions
# - Supporting legacy systems during transitions

Nested Model Migrations

# Automatically migrates nested Pydantic models
@manager.model("Address", "1.0.0")
class AddressV1(BaseModel):
    street: str
    city: str

@manager.model("Address", "2.0.0")
class AddressV2(BaseModel):
    street: str
    city: str
    postal_code: str

@manager.model("User", "2.0.0")
class UserV2(BaseModel):
    name: str
    address: AddressV2  # Nested model

# When migrating User, Address is automatically migrated too
@manager.migration("Address", "1.0.0", "2.0.0")
def add_postal_code(data: ModelData) -> ModelData:
    return {**data, "postal_code": "00000"}

Discriminated Unions

from typing import Literal, Union
from pydantic import Field

# Handle complex type hierarchies
@manager.model("CreditCard", "1.0.0")
class CreditCardV1(BaseModel):
    type: Literal["credit_card"] = "credit_card"
    card_number: str

@manager.model("PayPal", "1.0.0")
class PayPalV1(BaseModel):
    type: Literal["paypal"] = "paypal"
    email: str

@manager.model("Payment", "1.0.0")
class PaymentV1(BaseModel):
    method: Union[CreditCardV1, PayPalV1] = Field(discriminator="type")

# Migrations respect discriminated unions

Export JSON Schemas

# Generate schemas for all versions
manager.dump_schemas("schemas/")
# Creates: User_v1.0.0.json, User_v2.0.0.json, User_v3.0.0.json

# Use separate files with $ref for nested models with 'enable_ref=True'.
manager.dump_schemas(
    "schemas/",
    separate_definitions=True,
    ref_template="https://api.example.com/schemas/{model}_v{version}.json"
)

Auto-Migration

# Skip writing migration functions for simple changes
@manager.model("Config", "1.0.0")
class ConfigV1(BaseModel):
    timeout: int = 30


@manager.model("Config", "2.0.0", backward_compatible=True)
class ConfigV2(BaseModel):
    timeout: int = 30
    retries: int = 3  # New field with default


# No migration function needed - defaults are applied automatically
config = manager.migrate({"timeout": 60}, "Config", "1.0.0", "2.0.0")
# ConfigV2(timeout=60, retries=3)

Real-World Examples

Configuration File Evolution

# Your CLI tool evolves over time
@manager.model("AppConfig", "1.0.0")
class AppConfigV1(BaseModel):
    api_key: str
    debug: bool = False

@manager.model("AppConfig", "2.0.0")
class AppConfigV2(BaseModel):
    api_key: str
    api_endpoint: str = "https://api.example.com"
    log_level: Literal["DEBUG", "INFO", "ERROR"] = "INFO"

@manager.migration("AppConfig", "1.0.0", "2.0.0")
def upgrade_config(data: dict) -> dict:
    return {
        "api_key": data["api_key"],
        "api_endpoint": "https://api.example.com",
        "log_level": "DEBUG" if data.get("debug") else "INFO",
    }

# Load and auto-upgrade user's config file
def load_config(config_path: Path) -> AppConfigV2:
    with open(config_path) as f:
        data = json.load(f)

    version = data.get("_version", "1.0.0")

    # Migrate to current version
    config = manager.migrate(
        data,
        "AppConfig",
        from_version=version,
        to_version="2.0.0"
    )

    # Save upgraded config with version tag
    with open(config_path, "w") as f:
        json.dump({**config.model_dump(), "_version": "2.0.0"}, f, indent=2)

    return config

Message Queue Consumer

# Handle messages from multiple service versions
@manager.model("OrderEvent", "1.0.0")
class OrderEventV1(BaseModel):
    order_id: str
    customer_email: str
    items: list[dict]  # Unstructured

@manager.model("OrderEvent", "2.0.0")
class OrderEventV2(BaseModel):
    order_id: str
    customer_email: str
    items: list[OrderItem]  # Structured
    total: Decimal

def process_message(message: dict, schema_version: str) -> None:
    # Migrate to current schema regardless of source version
    event = manager.migrate(
        message,
        "OrderEvent",
        from_version=schema_version,
        to_version="2.0.0"
    )
    # Process with current schema only
    fulfill_order(event)

ETL Data Import

# Import historical exports with evolving schemas
import csv

def import_customers(file_path: Path, file_version: str) -> None:
    with open(file_path) as f:
        reader = csv.DictReader(f)

        # Stream migration for memory efficiency
        for customer in manager.migrate_batch_streaming(
            reader,
            "Customer",
            from_version=file_version,
            to_version="3.0.0",
            chunk_size=1000
        ):
            database.save(customer)

# Handle files from different years
import_customers("exports/2022_customers.csv", "1.0.0")
import_customers("exports/2023_customers.csv", "2.0.0")
import_customers("exports/2024_customers.csv", "3.0.0")

ML Model Serving

# Route requests to appropriate model versions
class InferenceService:
    def predict(self, features: dict, request_version: str) -> BaseModel:
        # Determine target model version (A/B testing, gradual rollout, etc.)
        model_version = self.get_model_version(features["user_id"])

        # Migrate request to model's expected format
        model_input = manager.migrate(
            features,
            "PredictionRequest",
            from_version=request_version,
            to_version=model_version
        )

        # Run inference
        prediction = self.models[model_version].predict(model_input)

        # Normalize output for logging/analytics
        return manager.migrate(
            prediction,
            "PredictionResponse",
            from_version=model_version,
            to_version="3.0.0"
        )

See examples/ for complete runnable code:

  • config_file_migration.py - CLI/desktop app config file evolution
  • message_queue_consumer.py - Kafka/RabbitMQ/SQS consumer handling multiple schemas
  • etl_data_import.py - CSV/JSON/Excel import pipeline with historical data
  • ml_inference_pipeline.py - ML model serving with feature evolution
  • advanced_features.py - Complex Pydantic features (unions, nested models, validators)

Contributing

For guidance on setting up a development environment and how to make a contribution to pyrmute, see Contributing to pyrmute.

Reporting a Security Vulnerability

See our security policy.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyrmute-0.4.0.tar.gz (114.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyrmute-0.4.0-py3-none-any.whl (29.0 kB view details)

Uploaded Python 3

File details

Details for the file pyrmute-0.4.0.tar.gz.

File metadata

  • Download URL: pyrmute-0.4.0.tar.gz
  • Upload date:
  • Size: 114.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pyrmute-0.4.0.tar.gz
Algorithm Hash digest
SHA256 18018b899aca3dc01a87c9aedb09dec7d32a0951e1bfc2ca7aaa29168ab51fa5
MD5 f0e9fe82136199d573f2d9a83145fd8e
BLAKE2b-256 604dfa9ae0f5e07563a7f12ad496e1be986d19c803a86d73c9ab040d8b25a1d5

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyrmute-0.4.0.tar.gz:

Publisher: publish.yml on mferrera/pyrmute

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pyrmute-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: pyrmute-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 29.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pyrmute-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ed5965b3b21127389f56bf0059b39533d439e64ead357336c6465c8ba918617f
MD5 66806105dd75c5c75a9f2ac1badc3ce5
BLAKE2b-256 a80de449f403102dca6b1e5643887602426abb069cb565ce5c133c5831bf7957

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyrmute-0.4.0-py3-none-any.whl:

Publisher: publish.yml on mferrera/pyrmute

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page