Skip to main content

Database functionality for Mindtrace

Project description

PyPI version License Downloads

Mindtrace Database Module

A powerful, flexible Object-Document Mapping (ODM) system that provides a unified interface for working with multiple database backends in the Mindtrace project. Write once, run on MongoDB, Redis, or both.

Key Features

  • Unified Interface - One API for multiple databases
  • Multi-Model Support - Manage multiple document types in a single ODM instance
  • Dynamic Switching - Switch between MongoDB and Redis at runtime
  • Simplified Document Models - Define once, use everywhere
  • Full Async/Sync Support - Both MongoDB and Redis support sync and async interfaces
  • Seamless Interface Compatibility - Use sync code with async databases and vice versa
  • Advanced Querying - Rich query capabilities across all databases
  • Comprehensive Error Handling - Clear, actionable error messages
  • Full Test Coverage - Thoroughly tested with unit and integration tests

Quick Start

The Simple Way: Unified Documents

Define your document model once and use it with any database:

from mindtrace.database import UnifiedMindtraceDocument, UnifiedMindtraceODM, BackendType

from pydantic import Field

# 1. Define your document model (works with both MongoDB and Redis)
class User(UnifiedMindtraceDocument):
    name: str = Field(description="User's full name")
    age: int = Field(ge=0, description="User's age")
    email: str = Field(description="User's email address")
    skills: list[str] = Field(default_factory=list)
    
    class Meta:
        collection_name = "users"
        global_key_prefix = "myapp"
        indexed_fields = ["email", "name"]
        unique_fields = ["email"]

# 2. Create ODM instance (supports both MongoDB and Redis)
db = UnifiedMindtraceODM(
    unified_model_cls=User,
    mongo_db_uri="mongodb://localhost:27017",
    mongo_db_name="myapp",
    redis_url="redis://localhost:6379",
    preferred_backend=BackendType.MONGO  # Start with MongoDB
    # init_mode=InitMode.ASYNC,  # Both use ASYNC (or SYNC)
    # If None, MongoDB defaults to ASYNC and Redis defaults to SYNC
)

# 3. Use it (Same API regardless of database - both sync and async work)
user = User(name="Alice", age=30, email="alice@example.com", skills=["Python"])

# Async operations (work with both MongoDB and Redis)
inserted_user = await db.insert_async(user)
retrieved_user = await db.get_async(inserted_user.id)
retrieved_user.age = 31
updated_user = await db.update_async(retrieved_user)
python_users = await db.find_async({"skills": "Python"})
all_users = await db.all_async()

# Sync operations (also work with both MongoDB and Redis)
inserted_user = db.insert(user)
retrieved_user = db.get(inserted_user.id)
retrieved_user.age = 31
updated_user = db.update(retrieved_user)
python_users = db.find({"skills": "Python"})
all_users = db.all()

# Switch databases on the fly
db.switch_backend(BackendType.REDIS)
redis_user = db.insert(user)  # Now using Redis (sync)
# or
redis_user = await db.insert_async(user)  # Redis with async interface

# Multi-model mode (works with all ODMs)
db = UnifiedMindtraceODM(
    unified_models={'user': User, 'address': Address},
    mongo_db_uri="mongodb://localhost:27017",
    mongo_db_name="myapp",
    redis_url="redis://localhost:6379"
)

# Access models via attributes
address = await db.address.insert_async(Address(street="123 Main St", city="NYC"))
user = await db.user.insert_async(User(name="Alice", email="alice@example.com"))

# All operations work per model
users = await db.user.all_async()
addresses = await db.address.all_async()

Traditional Way: Database-Specific Models

If you prefer more control, you can define database-specific models:

from mindtrace.database import (
    MongoMindtraceODM, 
    RedisMindtraceODM,
    MindtraceDocument,
    MindtraceRedisDocument
)
from beanie import Indexed
from redis_om import Field as RedisField
from typing import Annotated

# MongoDB model
class MongoUser(MindtraceDocument):
    name: str
    email: Annotated[str, Indexed(unique=True)]
    age: int
    
    class Settings:
        name = "users"

# Redis model
class RedisUser(MindtraceRedisDocument):
    name: str = RedisField(index=True)
    email: str = RedisField(index=True)
    age: int = RedisField(index=True)
    
    class Meta:
        global_key_prefix = "myapp"

# Use them separately
mongo_db = MongoMindtraceODM(
    model_cls=MongoUser,
    db_uri="mongodb://localhost:27017",
    db_name="myapp"
)

redis_db = RedisMindtraceODM(
    model_cls=RedisUser,
    redis_url="redis://localhost:6379"
)

Multi-Model Support

All ODMs now support managing multiple document types in a single instance. This allows you to work with related models (e.g., User and Address) through a single ODM instance with attribute-based access.

Usage Pattern

# Instead of creating separate ODMs for each model:
user_db = MongoMindtraceODM(model_cls=User, ...)
address_db = MongoMindtraceODM(model_cls=Address, ...)

# Use multi-model mode:
db = MongoMindtraceODM(
    models={'user': User, 'address': Address},
    db_uri="mongodb://localhost:27017",
    db_name="myapp"
)

# Access models via attributes
await db.user.insert(user)
await db.address.insert(address)
users = await db.user.all()
addresses = await db.address.all()

Benefits:

  • Shared Connection - All models share the same database connection
  • Unified Initialization - All models initialize together
  • Cleaner Code - Single ODM instance instead of multiple
  • Consistent API - Same operations work across all models

Note: In multi-model mode, you must use attribute-based access (db.user.insert()). Direct methods (db.insert()) will raise a ValueError to prevent ambiguity.

Available ODMs

1. UnifiedMindtraceODM (Recommended)

The flagship ODM that provides a unified interface for multiple databases:

Key Features:

  • Single Interface: One API for all databases
  • Runtime Switching: Change databases without code changes
  • Automatic Model Generation: Converts unified models to database-specific formats
  • Flexible Configuration: Use one or multiple databases

Configuration Options:

# Option 1: Single unified model
db = UnifiedMindtraceODM(
    unified_model_cls=MyUnifiedDoc,
    mongo_db_uri="mongodb://localhost:27017",
    mongo_db_name="mydb",
    redis_url="redis://localhost:6379",
    preferred_backend=BackendType.MONGO
)

# Option 2: Multiple unified models (multi-model mode)
db = UnifiedMindtraceODM(
    unified_models={'user': User, 'address': Address},
    mongo_db_uri="mongodb://localhost:27017",
    mongo_db_name="mydb",
    redis_url="redis://localhost:6379",
    preferred_backend=BackendType.MONGO
)
# Access via: db.user, db.address

# Option 3: Separate models
db = UnifiedMindtraceODM(
    mongo_model_cls=MyMongoDoc,
    redis_model_cls=MyRedisDoc,
    mongo_db_uri="mongodb://localhost:27017",
    mongo_db_name="mydb",
    redis_url="redis://localhost:6379",
    preferred_backend=BackendType.REDIS
)

# Option 4: Single database
db = UnifiedMindtraceODM(
    unified_model_cls=MyUnifiedDoc,
    mongo_db_uri="mongodb://localhost:27017",
    mongo_db_name="mydb",
    preferred_backend=BackendType.MONGO
)

2. MongoMindtraceODM

Specialized MongoDB ODM using Beanie. Natively async, but supports sync interface too

Single Model Mode

from mindtrace.database import MongoMindtraceODM, MindtraceDocument

class User(MindtraceDocument):
    name: str
    email: str
    
    class Settings:
        name = "users"
        use_cache = False

db = MongoMindtraceODM(
    model_cls=User,
    db_uri="mongodb://localhost:27017",
    db_name="myapp"
)

# Async operations (native)
user = await db.insert(User(name="Alice", email="alice@example.com"))
user.age = 31
updated_user = await db.update(user)
all_users = await db.all()

# Sync operations (wrapper methods - use from sync code)
user = db.insert_sync(User(name="Bob", email="bob@example.com"))
user.age = 32
updated_user = db.update_sync(user)
all_users = db.all_sync()

# Supports MongoDB-specific features
pipeline = [{"$match": {"age": {"$gte": 18}}}]
results = await db.aggregate(pipeline)

Multi-Model Mode

from mindtrace.database import MongoMindtraceODM, MindtraceDocument

class Address(MindtraceDocument):
    street: str
    city: str
    
    class Settings:
        name = "addresses"
        use_cache = False

class User(MindtraceDocument):
    name: str
    email: str
    
    class Settings:
        name = "users"
        use_cache = False

# Create ODM with multiple models
db = MongoMindtraceODM(
    models={'user': User, 'address': Address},
    db_uri="mongodb://localhost:27017",
    db_name="myapp"
)

# Access models via attribute-based access
address = await db.address.insert(Address(street="123 Main St", city="NYC"))
user = await db.user.insert(User(name="Alice", email="alice@example.com"))

# All operations work per model
users = await db.user.all()
addresses = await db.address.all()

Working with Linked Documents (fetch_links)

MongoDB supports linking documents using Beanie's Link type. Use fetch_links=True to automatically fetch linked documents:

from mindtrace.database import Link, MongoMindtraceODM, MindtraceDocument
from typing import Optional

class Address(MindtraceDocument):
    street: str
    city: str
    
    class Settings:
        name = "addresses"
        use_cache = False

class User(MindtraceDocument):
    name: str
    email: str
    address: Optional[Link[Address]] = None
    
    class Settings:
        name = "users"
        use_cache = False

db = MongoMindtraceODM(
    models={'user': User, 'address': Address},
    db_uri="mongodb://localhost:27017",
    db_name="myapp"
)

# Create linked documents
address = await db.address.insert(Address(street="123 Main St", city="NYC"))
user = await db.user.insert(User(name="Alice", email="alice@example.com", address=address))

# Fetch with linked documents using fetch_links=True
user_with_address = await db.user.get(user.id, fetch_links=True)
print(f"User: {user_with_address.name}, Address: {user_with_address.address.street}")

# Find with linked documents
users = await db.user.find(User.name == "Alice", fetch_links=True)
for u in users:
    if u.address:
        print(f"{u.name} lives at {u.address.street}")

# Without fetch_links, address will be a Link object (not fetched)
user_without_links = await db.user.get(user.id)
# user_without_links.address is a Link object, not the actual Address document

3. RedisMindtraceODM

High-performance Redis ODM with JSON support. Natively sync, but supports async interface too

Single Model Mode

from mindtrace.database import RedisMindtraceODM, MindtraceRedisDocument
from redis_om import Field

class User(MindtraceRedisDocument):
    name: str = Field(index=True)
    email: str = Field(index=True)
    age: int = Field(index=True)
    
    class Meta:
        global_key_prefix = "myapp"

db = RedisMindtraceODM(
    model_cls=User,
    redis_url="redis://localhost:6379"
)

# Sync operations (native)
user = db.insert(User(name="Alice", email="alice@example.com"))
user.age = 31
updated_user = db.update(user)
all_users = db.all()

# Async operations (wrapper methods - use from async code)
user = await db.insert_async(User(name="Bob", email="bob@example.com"))
user.age = 32
updated_user = await db.update_async(user)
all_users = await db.all_async()

# Supports Redis-specific queries
users = db.find(User.age >= 18)

Multi-Model Mode

from mindtrace.database import RedisMindtraceODM, MindtraceRedisDocument
from redis_om import Field

class Address(MindtraceRedisDocument):
    street: str = Field(index=True)
    city: str = Field(index=True)
    
    class Meta:
        global_key_prefix = "myapp"

class User(MindtraceRedisDocument):
    name: str = Field(index=True)
    email: str = Field(index=True)
    address_id: Optional[str] = None
    
    class Meta:
        global_key_prefix = "myapp"

# Create ODM with multiple models
db = RedisMindtraceODM(
    models={'user': User, 'address': Address},
    redis_url="redis://localhost:6379"
)

# Access models via attribute-based access
address = db.address.insert(Address(street="123 Main St", city="NYC"))
user = db.user.insert(User(name="Alice", email="alice@example.com", address_id=address.id))

# All operations work per model
users = db.user.all()
addresses = await db.address.all_async()

4. RegistryMindtraceODM

Flexible ODM using the Mindtrace Registry system, supporting local storage, GCP, and other storage options:

Single Model Mode

from mindtrace.database import RegistryMindtraceODM, DocumentNotFoundError
from mindtrace.registry import Registry, Archiver
from typing import Any, Type
from pydantic import BaseModel
from pathlib import Path

class User(BaseModel):
    name: str
    email: str

class UserArchiver(Archiver):
    def save(self, user: User):
        with open(Path(self.uri) / "user.json", "w") as f:
            f.write(user.model_dump_json())

    def load(self, data_type: Type[Any]) -> User:
        with open(Path(self.uri) / "user.json", "r") as f:
            return User.model_validate_json(f.read())

Registry.register_default_materializer(User, UserArchiver)

db = RegistryMindtraceODM(model_cls=User)

user = User(name="John Doe", email="john.doe@example.com")
inserted_user = db.insert(user)

# Update the user
inserted_user.name = "John Smith"
updated_user = db.update(inserted_user)

# Retrieve by ID (raises DocumentNotFoundError if not found)
try:
    user = db.get(inserted_user.id)
except DocumentNotFoundError:
    print("User not found")

Multi-Model Mode

from mindtrace.database import RegistryMindtraceODM
from mindtrace.registry import Registry, Archiver
from typing import Any, Type
from pydantic import BaseModel

class User(BaseModel):
    name: str
    email: str

class Address(BaseModel):
    street: str
    city: str

# Register materializers for both models
Registry.register_default_materializer(User, UserArchiver)
Registry.register_default_materializer(Address, AddressArchiver)

# Create ODM with multiple models
db = RegistryMindtraceODM(
    models={'user': User, 'address': Address}
)

# Access models via attribute-based access
address = db.address.insert(Address(street="123 Main St", city="NYC"))
user = db.user.insert(User(name="John Doe", email="john@example.com"))

# All operations work per model
users = db.user.all()
addresses = db.address.all()

With GCP Storage:

from mindtrace.database import RegistryMindtraceODM
from mindtrace.registry import Registry, GCPRegistryBackend, Archiver
from typing import Any, Type
from pydantic import BaseModel
from pathlib import Path

class User(BaseModel):
    name: str
    email: str

class UserArchiver(Archiver):
    def save(self, user: User):
        with open(Path(self.uri) / "user.json", "w") as f:
            f.write(user.model_dump_json())

    def load(self, data_type: Type[Any]) -> User:
        with open(Path(self.uri) / "user.json", "r") as f:
            return User.model_validate_json(f.read())

Registry.register_default_materializer(User, UserArchiver)

gcp_registry_backend = GCPRegistryBackend(
    uri="gs://my-bucket",
    project_id="my-project",
    bucket_name="my-bucket",
)

db = RegistryMindtraceODM(model_cls=User, backend=gcp_registry_backend)

user = User(name="John Doe", email="john.doe@example.com")
inserted_user = db.insert(user)

# Update the user
inserted_user.name = "John Smith"
updated_user = db.update(inserted_user)

# Retrieve by ID
user = db.get(inserted_user.id)

API Reference

Core Operations

All ODMs support both sync and async interfaces for all operations. Choose the style that fits your codebase.

Async Operations (Recommended for async code)

# Insert a document
inserted_doc = await db.insert_async(doc)

# Get document by ID
doc = await db.get_async("doc_id")

# Get document with linked documents (MongoDB only)
doc_with_links = await db.get_async("doc_id", fetch_links=True)

# Update document
doc.name = "Updated Name"
updated_doc = await db.update_async(doc)

# Delete document
await db.delete_async("doc_id")

# Get all documents
all_docs = await db.all_async()

# Find documents with filters
results = await db.find_async({"name": "Alice"})

# Find documents with linked documents (MongoDB only)
results_with_links = await db.find_async({"name": "Alice"}, fetch_links=True)

Sync Operations (Works with both MongoDB and Redis)

# Insert a document
inserted_doc = db.insert(doc)

# Get document by ID
doc = db.get("doc_id")

# Update document
doc.name = "Updated Name"
updated_doc = db.update(doc)

# Delete document
db.delete("doc_id")

# Get all documents
all_docs = db.all()

# Find documents with filters
results = db.find({"name": "Alice"})

Note:

  • MongoDB: Sync methods use wrapper functions that run async code in an event loop
  • Redis: Async methods run sync operations in a thread pool to avoid blocking the event loop
  • Unified ODM: Automatically routes to the appropriate method based on the active database
  • Document IDs: All backends provide a consistent id attribute on returned documents (MongoDB uses id, Redis uses pk internally but exposes it as id)

Sync/Async Compatibility

Both MongoDB and Redis ODMs support both interfaces:

Database Native Interface Wrapper Interface
MongoDB Async (insert, get, etc.) Sync (insert_sync, get_sync, etc.)
Redis Sync (insert, get, etc.) Async (insert_async, get_async, etc.)

This means you can:

  • Use sync code with MongoDB (via sync wrappers)
  • Use async code with Redis (via async wrappers)
  • Mix and match based on your needs

UnifiedMindtraceODM Specific

Additional methods for the unified ODM:

# Database management
db.switch_backend(BackendType.REDIS)
current_type = db.get_current_backend_type()
is_async = db.is_async()

# Database availability
has_mongo = db.has_mongo_backend()
has_redis = db.has_redis_backend()

# Direct access to underlying ODMs
mongo_odm = db.get_mongo_backend()
redis_odm = db.get_redis_backend()

# Model access
raw_model = db.get_raw_model()
unified_model = db.get_unified_model()

Advanced Querying

MongoDB (through UnifiedMindtraceODM)

from mindtrace.database import Link

# MongoDB-style queries
users = await db.find_async({"age": {"$gte": 18}})
users = await db.find_async({"skills": {"$in": ["Python", "JavaScript"]}})

# Fetch linked documents using fetch_links=True
user = await db.get_async(user_id, fetch_links=True)
if user.address:
    print(f"User lives at {user.address.street}")

# Find with linked documents
users = await db.find_async({"name": "Alice"}, fetch_links=True)
for u in users:
    if u.address:
        print(f"{u.name} - {u.address.city}")

# Using Beanie expressions with links (get raw model first)
UserMongo = db.get_raw_model()
users = await db.find_async(UserMongo.name == "Alice", fetch_links=True)

# Aggregation pipelines (when using MongoDB)
if db.get_current_backend_type() == BackendType.MONGO:
    pipeline = [
        {"$match": {"age": {"$gte": 18}}},
        {"$group": {"_id": "$department", "count": {"$sum": 1}}}
    ]
    results = await db.get_mongo_backend().aggregate(pipeline)

Redis (through UnifiedMindtraceODM)

# Switch to Redis for these queries
db.switch_backend(BackendType.REDIS)

# Redis OM expressions
Model = db.get_raw_model()
users = db.find(Model.age >= 18)
users = db.find(Model.name == "Alice")
users = db.find(Model.skills << "Python")  # Contains

Initialization Options

By default, ODMs auto-initialize on first operation. For more control, use constructor parameters:

from mindtrace.database import InitMode

db = UnifiedMindtraceODM(
    unified_model_cls=User,
    mongo_db_uri="mongodb://localhost:27017",
    mongo_db_name="myapp",
    redis_url="redis://localhost:6379",
    preferred_backend=BackendType.MONGO,
    auto_init=True,              # Initialize at creation time
    init_mode=InitMode.SYNC,     # Use sync initialization
)

InitMode options:

  • InitMode.SYNC - Synchronous initialization (blocks until complete)
  • InitMode.ASYNC - Deferred initialization (completes on first async operation)

Default behavior:

  • MongoDB defaults to InitMode.ASYNC
  • Redis defaults to InitMode.SYNC

Error Handling

The module provides comprehensive error handling with consistent exceptions across all backends:

from mindtrace.database import DocumentNotFoundError, DuplicateInsertError

try:
    user = await db.get_async("non_existent_id")
except DocumentNotFoundError as e:
    print(f"User not found: {e}")

try:
    await db.insert_async(duplicate_user)
except DuplicateInsertError as e:
    print(f"User already exists: {e}")

# All backends raise DocumentNotFoundError (not KeyError) for consistency
try:
    user = db.get("missing_id")
except DocumentNotFoundError:
    print("Document not found")

# Multi-model mode errors
try:
    db.insert(user)  # In multi-model mode, this raises ValueError
except ValueError as e:
    print(f"Use db.model_name.insert() instead: {e}")
from mindtrace.database import DocumentNotFoundError, DuplicateInsertError

try:
    user = await db.get_async("non_existent_id")
except DocumentNotFoundError as e:
    print(f"User not found: {e}")

try:
    await db.insert_async(duplicate_user)
except DuplicateInsertError as e:
    print(f"User already exists: {e}")

Testing

The database module includes comprehensive test coverage with both unit and integration tests.

Test Structure

tests/
├── unit/mindtrace/database/          # Unit tests (no DB required)
│   ├── test_mongo_unit.py
│   ├── test_redis_unit.py
│   ├── test_registry_odm_backend.py
│   └── test_unified_unit.py
└── integration/mindtrace/database/   # Integration tests (DB required)
    ├── test_mongo.py
    ├── test_redis_odm.py
    ├── test_registry_odm.py
    └── test_unified.py

Running Tests

Quick Start - All Tests

ds test: database

Unit Tests Only

ds test: database --unit

Integration Tests (Containers managed by test script)

ds test: database --integration

Targeted Testing

# Test only unified backend
ds test: tests/integration/mindtrace/database/test_unified.py

# Test only MongoDB
ds test: tests/integration/mindtrace/database/test_mongo.py

# Test only Redis
ds test: tests/integration/mindtrace/database/test_redis_odm.py

Test Coverage

The test suite covers:

  • CRUD Operations - Create, Read, Update, Delete
  • Query Operations - Find, filter, search
  • Error Handling - All exception scenarios
  • Database Switching - Dynamic database changes
  • Async/Sync Compatibility - Both programming styles
  • Model Conversion - Unified to database-specific models
  • Edge Cases - Duplicate keys, missing documents, invalid queries

Examples

Complete Example: User Management System

import asyncio
from mindtrace.database import (
    UnifiedMindtraceODM,
    UnifiedMindtraceDocument,
    BackendType,
    DocumentNotFoundError
)
from pydantic import Field
from typing import List

class User(UnifiedMindtraceDocument):
    name: str = Field(description="Full name")
    email: str = Field(description="Email address")  
    age: int = Field(ge=0, le=150, description="Age")
    department: str = Field(description="Department")
    skills: List[str] = Field(default_factory=list)
    
    class Meta:
        collection_name = "employees"
        global_key_prefix = "company"
        indexed_fields = ["email", "department", "skills"]
        unique_fields = ["email"]

async def main():
    # Setup with both MongoDB and Redis
    db = UnifiedMindtraceODM(
        unified_model_cls=User,
        mongo_db_uri="mongodb://localhost:27017",
        mongo_db_name="company",
        redis_url="redis://localhost:6379",
        preferred_backend=BackendType.MONGO
    )
    
    # Create some users
    users = [
        User(
            name="Alice Johnson",
            email="alice@company.com",
            age=30,
            department="Engineering",
            skills=["Python", "MongoDB", "Docker"]
        ),
        User(
            name="Bob Smith", 
            email="bob@company.com",
            age=25,
            department="Engineering",
            skills=["JavaScript", "Redis", "React"]
        ),
        User(
            name="Carol Davis",
            email="carol@company.com", 
            age=35,
            department="Marketing",
            skills=["Analytics", "SQL"]
        )
    ]
    
    # Insert users
    print("Creating users...")
    for user in users:
        try:
            inserted = await db.insert_async(user)
            print(f"Created: {inserted.name} (ID: {inserted.id})")
        except Exception as e:
            print(f"Failed to create {user.name}: {e}")
    
    # Find engineers
    print("\nFinding engineers...")
    engineers = await db.find_async({"department": "Engineering"})
    for eng in engineers:
        print(f"{eng.name} - Skills: {', '.join(eng.skills)}")
    
    # Switch to Redis for fast lookups
    print("\nSwitching to Redis for fast operations...")
    db.switch_backend(BackendType.REDIS)
    
    # Insert more users in Redis (both sync and async work)
    redis_user = User(
        name="Dave Wilson",
        email="dave@company.com",
        age=28,
        department="DevOps",
        skills=["Kubernetes", "Redis", "Monitoring"]
    )
    
    # Use sync interface (native for Redis)
    redis_inserted = db.insert(redis_user)
    print(f"Redis user created (sync): {redis_inserted.name}")
    
    # Or use async interface (wrapper for Redis)
    redis_user2 = User(
        name="Eve Brown",
        email="eve@company.com",
        age=32,
        department="DevOps",
        skills=["Docker", "CI/CD"]
    )
    redis_inserted2 = await db.insert_async(redis_user2)
    print(f"Redis user created (async): {redis_inserted2.name}")
    
    # Demonstrate data isolation
    print(f"\nMongoDB users: {len(await db.get_mongo_backend().all())}")
    print(f"Redis users: {len(db.get_redis_backend().all())}")
    
    # Switch back to MongoDB
    db.switch_backend(BackendType.MONGO)
    print(f"Back to MongoDB - Users: {len(await db.all_async())}")

if __name__ == "__main__":
    asyncio.run(main())

More Examples

Check out the samples/database/ directory for additional examples:

  • using_unified_backend.py - Comprehensive unified ODM usage
  • Advanced querying patterns
  • Database switching strategies
  • Error handling best practices

Best Practices

1. Model Design

# Good: Clear, descriptive models
class Product(UnifiedMindtraceDocument):
    name: str = Field(description="Product name", min_length=1)
    price: float = Field(ge=0, description="Price in USD")
    category: str = Field(description="Product category")
    
    class Meta:
        collection_name = "products"
        indexed_fields = ["category", "name"]
        unique_fields = ["name"]

# Avoid: Unclear models without validation
class Product(UnifiedMindtraceDocument):
    n: str
    p: float
    c: str

2. Error Handling

# Always handle database exceptions
try:
    user = await db.get_async(user_id)
    print(f"Found user: {user.name}")
except DocumentNotFoundError:
    print("User not found - creating new user")
    user = await db.insert_async(User(name="New User", email="new@example.com"))
except Exception as e:
    logger.error(f"Database error: {e}")
    # Handle appropriately

3. Database Selection

# Choose database based on use case
if high_frequency_reads:
    db.switch_backend(BackendType.REDIS)  # Fast reads
else:
    db.switch_backend(BackendType.MONGO)  # Complex queries

Contributing

When adding new features:

  1. Add tests - Both unit and integration tests
  2. Update documentation - Keep README and docstrings current
  3. Follow patterns - Use existing code style and patterns
  4. Test thoroughly - Run the full test suite

Requirements

  • MongoDB 4.4+ (for MongoMindtraceODM)
  • Redis 6.0+ (for RedisMindtraceODM)
  • Core dependencies: pydantic, beanie, redis-om-python

Need Help?

  • Check the samples/database/ directory for working examples
  • Look at the test files for usage patterns
  • Review the docstrings in the source code for detailed API documentation

The Mindtrace Database Module makes it easy to work with multiple databases through a single, powerful interface.


Breaking Changes

Class Name Changes (v0.6.0)

All ODM class names have been simplified by removing the "Backend" suffix:

Old Name New Name
MindtraceODMBackend MindtraceODM
MongoMindtraceODMBackend MongoMindtraceODM
RedisMindtraceODMBackend RedisMindtraceODM
RegistryMindtraceODMBackend RegistryMindtraceODM
UnifiedMindtraceODMBackend UnifiedMindtraceODM

File names also updated:

Old File New File
mindtrace_odm_backend.py mindtrace_odm.py
mongo_odm_backend.py mongo_odm.py
redis_odm_backend.py redis_odm.py
registry_odm_backend.py registry_odm.py
unified_odm_backend.py unified_odm.py

Migration:

# Old
from mindtrace.database import MongoMindtraceODMBackend, UnifiedMindtraceODMBackend
db = MongoMindtraceODMBackend(model_cls=User, db_uri="...", db_name="...")

# New
from mindtrace.database import MongoMindtraceODM, UnifiedMindtraceODM
db = MongoMindtraceODM(model_cls=User, db_uri="...", db_name="...")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mindtrace_database-0.11.0.tar.gz (58.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mindtrace_database-0.11.0-py3-none-any.whl (50.1 kB view details)

Uploaded Python 3

File details

Details for the file mindtrace_database-0.11.0.tar.gz.

File metadata

  • Download URL: mindtrace_database-0.11.0.tar.gz
  • Upload date:
  • Size: 58.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for mindtrace_database-0.11.0.tar.gz
Algorithm Hash digest
SHA256 0f35e4dbd08099c53222306a30711ea8c5136eca9049951c70248b3254a98933
MD5 19fe9fcb64a18d02b36ce0eb06bd5770
BLAKE2b-256 c2908e4349e6169411890173bde0ce6228fa40ed21f83a51412f96ce97e7b700

See more details on using hashes here.

File details

Details for the file mindtrace_database-0.11.0-py3-none-any.whl.

File metadata

  • Download URL: mindtrace_database-0.11.0-py3-none-any.whl
  • Upload date:
  • Size: 50.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for mindtrace_database-0.11.0-py3-none-any.whl
Algorithm Hash digest
SHA256 70b5a5949885fe7b7ff6fc923f2708809a80b588da3ab78aa3976b886ad43ccd
MD5 113a9b78589e390682ddd5299439aee1
BLAKE2b-256 5fbd8cd0195fe22cd2c4dd54a31eb11e30f4651e73523e29f05ec5945934aac0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page