Database functionality for Mindtrace
Project description
Mindtrace Database Module
A powerful, flexible Object-Document Mapping (ODM) system that provides a unified interface for working with multiple database backends in the Mindtrace project. Write once, run on MongoDB, Redis, or both.
Key Features
- Unified Interface - One API for multiple databases
- Multi-Model Support - Manage multiple document types in a single ODM instance
- Dynamic Switching - Switch between MongoDB and Redis at runtime
- Simplified Document Models - Define once, use everywhere
- Full Async/Sync Support - Both MongoDB and Redis support sync and async interfaces
- Seamless Interface Compatibility - Use sync code with async databases and vice versa
- Advanced Querying - Rich query capabilities across all databases
- Comprehensive Error Handling - Clear, actionable error messages
- Full Test Coverage - Thoroughly tested with unit and integration tests
Quick Start
The Simple Way: Unified Documents
Define your document model once and use it with any database:
from mindtrace.database import UnifiedMindtraceDocument, UnifiedMindtraceODM, BackendType
from pydantic import Field
# 1. Define your document model (works with both MongoDB and Redis)
class User(UnifiedMindtraceDocument):
name: str = Field(description="User's full name")
age: int = Field(ge=0, description="User's age")
email: str = Field(description="User's email address")
skills: list[str] = Field(default_factory=list)
class Meta:
collection_name = "users"
global_key_prefix = "myapp"
indexed_fields = ["email", "name"]
unique_fields = ["email"]
# 2. Create ODM instance (supports both MongoDB and Redis)
db = UnifiedMindtraceODM(
unified_model_cls=User,
mongo_db_uri="mongodb://localhost:27017",
mongo_db_name="myapp",
redis_url="redis://localhost:6379",
preferred_backend=BackendType.MONGO # Start with MongoDB
# init_mode=InitMode.ASYNC, # Both use ASYNC (or SYNC)
# If None, MongoDB defaults to ASYNC and Redis defaults to SYNC
)
# 3. Use it (Same API regardless of database - both sync and async work)
user = User(name="Alice", age=30, email="alice@example.com", skills=["Python"])
# Async operations (work with both MongoDB and Redis)
inserted_user = await db.insert_async(user)
retrieved_user = await db.get_async(inserted_user.id)
retrieved_user.age = 31
updated_user = await db.update_async(retrieved_user)
python_users = await db.find_async({"skills": "Python"})
all_users = await db.all_async()
# Sync operations (also work with both MongoDB and Redis)
inserted_user = db.insert(user)
retrieved_user = db.get(inserted_user.id)
retrieved_user.age = 31
updated_user = db.update(retrieved_user)
python_users = db.find({"skills": "Python"})
all_users = db.all()
# Switch databases on the fly
db.switch_backend(BackendType.REDIS)
redis_user = db.insert(user) # Now using Redis (sync)
# or
redis_user = await db.insert_async(user) # Redis with async interface
# Multi-model mode (works with all ODMs)
db = UnifiedMindtraceODM(
unified_models={'user': User, 'address': Address},
mongo_db_uri="mongodb://localhost:27017",
mongo_db_name="myapp",
redis_url="redis://localhost:6379"
)
# Access models via attributes
address = await db.address.insert_async(Address(street="123 Main St", city="NYC"))
user = await db.user.insert_async(User(name="Alice", email="alice@example.com"))
# All operations work per model
users = await db.user.all_async()
addresses = await db.address.all_async()
Traditional Way: Database-Specific Models
If you prefer more control, you can define database-specific models:
from mindtrace.database import (
MongoMindtraceODM,
RedisMindtraceODM,
MindtraceDocument,
MindtraceRedisDocument
)
from beanie import Indexed
from redis_om import Field as RedisField
from typing import Annotated
# MongoDB model
class MongoUser(MindtraceDocument):
name: str
email: Annotated[str, Indexed(unique=True)]
age: int
class Settings:
name = "users"
# Redis model
class RedisUser(MindtraceRedisDocument):
name: str = RedisField(index=True)
email: str = RedisField(index=True)
age: int = RedisField(index=True)
class Meta:
global_key_prefix = "myapp"
# Use them separately
mongo_db = MongoMindtraceODM(
model_cls=MongoUser,
db_uri="mongodb://localhost:27017",
db_name="myapp"
)
redis_db = RedisMindtraceODM(
model_cls=RedisUser,
redis_url="redis://localhost:6379"
)
Multi-Model Support
All ODMs now support managing multiple document types in a single instance. This allows you to work with related models (e.g., User and Address) through a single ODM instance with attribute-based access.
Usage Pattern
# Instead of creating separate ODMs for each model:
user_db = MongoMindtraceODM(model_cls=User, ...)
address_db = MongoMindtraceODM(model_cls=Address, ...)
# Use multi-model mode:
db = MongoMindtraceODM(
models={'user': User, 'address': Address},
db_uri="mongodb://localhost:27017",
db_name="myapp"
)
# Access models via attributes
await db.user.insert(user)
await db.address.insert(address)
users = await db.user.all()
addresses = await db.address.all()
Benefits:
- Shared Connection - All models share the same database connection
- Unified Initialization - All models initialize together
- Cleaner Code - Single ODM instance instead of multiple
- Consistent API - Same operations work across all models
Note: In multi-model mode, you must use attribute-based access (db.user.insert()). Direct methods (db.insert()) will raise a ValueError to prevent ambiguity.
Available ODMs
1. UnifiedMindtraceODM (Recommended)
The flagship ODM that provides a unified interface for multiple databases:
Key Features:
- Single Interface: One API for all databases
- Runtime Switching: Change databases without code changes
- Automatic Model Generation: Converts unified models to database-specific formats
- Flexible Configuration: Use one or multiple databases
Configuration Options:
# Option 1: Single unified model
db = UnifiedMindtraceODM(
unified_model_cls=MyUnifiedDoc,
mongo_db_uri="mongodb://localhost:27017",
mongo_db_name="mydb",
redis_url="redis://localhost:6379",
preferred_backend=BackendType.MONGO
)
# Option 2: Multiple unified models (multi-model mode)
db = UnifiedMindtraceODM(
unified_models={'user': User, 'address': Address},
mongo_db_uri="mongodb://localhost:27017",
mongo_db_name="mydb",
redis_url="redis://localhost:6379",
preferred_backend=BackendType.MONGO
)
# Access via: db.user, db.address
# Option 3: Separate models
db = UnifiedMindtraceODM(
mongo_model_cls=MyMongoDoc,
redis_model_cls=MyRedisDoc,
mongo_db_uri="mongodb://localhost:27017",
mongo_db_name="mydb",
redis_url="redis://localhost:6379",
preferred_backend=BackendType.REDIS
)
# Option 4: Single database
db = UnifiedMindtraceODM(
unified_model_cls=MyUnifiedDoc,
mongo_db_uri="mongodb://localhost:27017",
mongo_db_name="mydb",
preferred_backend=BackendType.MONGO
)
2. MongoMindtraceODM
Specialized MongoDB ODM using Beanie. Natively async, but supports sync interface too
Single Model Mode
from mindtrace.database import MongoMindtraceODM, MindtraceDocument
class User(MindtraceDocument):
name: str
email: str
class Settings:
name = "users"
use_cache = False
db = MongoMindtraceODM(
model_cls=User,
db_uri="mongodb://localhost:27017",
db_name="myapp"
)
# Async operations (native)
user = await db.insert(User(name="Alice", email="alice@example.com"))
user.age = 31
updated_user = await db.update(user)
all_users = await db.all()
# Sync operations (wrapper methods - use from sync code)
user = db.insert_sync(User(name="Bob", email="bob@example.com"))
user.age = 32
updated_user = db.update_sync(user)
all_users = db.all_sync()
# Supports MongoDB-specific features
pipeline = [{"$match": {"age": {"$gte": 18}}}]
results = await db.aggregate(pipeline)
Multi-Model Mode
from mindtrace.database import MongoMindtraceODM, MindtraceDocument
class Address(MindtraceDocument):
street: str
city: str
class Settings:
name = "addresses"
use_cache = False
class User(MindtraceDocument):
name: str
email: str
class Settings:
name = "users"
use_cache = False
# Create ODM with multiple models
db = MongoMindtraceODM(
models={'user': User, 'address': Address},
db_uri="mongodb://localhost:27017",
db_name="myapp"
)
# Access models via attribute-based access
address = await db.address.insert(Address(street="123 Main St", city="NYC"))
user = await db.user.insert(User(name="Alice", email="alice@example.com"))
# All operations work per model
users = await db.user.all()
addresses = await db.address.all()
Working with Linked Documents (fetch_links)
MongoDB supports linking documents using Beanie's Link type. Use fetch_links=True to automatically fetch linked documents:
from mindtrace.database import Link, MongoMindtraceODM, MindtraceDocument
from typing import Optional
class Address(MindtraceDocument):
street: str
city: str
class Settings:
name = "addresses"
use_cache = False
class User(MindtraceDocument):
name: str
email: str
address: Optional[Link[Address]] = None
class Settings:
name = "users"
use_cache = False
db = MongoMindtraceODM(
models={'user': User, 'address': Address},
db_uri="mongodb://localhost:27017",
db_name="myapp"
)
# Create linked documents
address = await db.address.insert(Address(street="123 Main St", city="NYC"))
user = await db.user.insert(User(name="Alice", email="alice@example.com", address=address))
# Fetch with linked documents using fetch_links=True
user_with_address = await db.user.get(user.id, fetch_links=True)
print(f"User: {user_with_address.name}, Address: {user_with_address.address.street}")
# Find with linked documents
users = await db.user.find(User.name == "Alice", fetch_links=True)
for u in users:
if u.address:
print(f"{u.name} lives at {u.address.street}")
# Without fetch_links, address will be a Link object (not fetched)
user_without_links = await db.user.get(user.id)
# user_without_links.address is a Link object, not the actual Address document
3. RedisMindtraceODM
High-performance Redis ODM with JSON support. Natively sync, but supports async interface too
Single Model Mode
from mindtrace.database import RedisMindtraceODM, MindtraceRedisDocument
from redis_om import Field
class User(MindtraceRedisDocument):
name: str = Field(index=True)
email: str = Field(index=True)
age: int = Field(index=True)
class Meta:
global_key_prefix = "myapp"
db = RedisMindtraceODM(
model_cls=User,
redis_url="redis://localhost:6379"
)
# Sync operations (native)
user = db.insert(User(name="Alice", email="alice@example.com"))
user.age = 31
updated_user = db.update(user)
all_users = db.all()
# Async operations (wrapper methods - use from async code)
user = await db.insert_async(User(name="Bob", email="bob@example.com"))
user.age = 32
updated_user = await db.update_async(user)
all_users = await db.all_async()
# Supports Redis-specific queries
users = db.find(User.age >= 18)
Multi-Model Mode
from mindtrace.database import RedisMindtraceODM, MindtraceRedisDocument
from redis_om import Field
class Address(MindtraceRedisDocument):
street: str = Field(index=True)
city: str = Field(index=True)
class Meta:
global_key_prefix = "myapp"
class User(MindtraceRedisDocument):
name: str = Field(index=True)
email: str = Field(index=True)
address_id: Optional[str] = None
class Meta:
global_key_prefix = "myapp"
# Create ODM with multiple models
db = RedisMindtraceODM(
models={'user': User, 'address': Address},
redis_url="redis://localhost:6379"
)
# Access models via attribute-based access
address = db.address.insert(Address(street="123 Main St", city="NYC"))
user = db.user.insert(User(name="Alice", email="alice@example.com", address_id=address.id))
# All operations work per model
users = db.user.all()
addresses = await db.address.all_async()
4. RegistryMindtraceODM
Flexible ODM using the Mindtrace Registry system, supporting local storage, GCP, and other storage options:
Single Model Mode
from mindtrace.database import RegistryMindtraceODM, DocumentNotFoundError
from mindtrace.registry import Registry, Archiver
from typing import Any, Type
from pydantic import BaseModel
from pathlib import Path
class User(BaseModel):
name: str
email: str
class UserArchiver(Archiver):
def save(self, user: User):
with open(Path(self.uri) / "user.json", "w") as f:
f.write(user.model_dump_json())
def load(self, data_type: Type[Any]) -> User:
with open(Path(self.uri) / "user.json", "r") as f:
return User.model_validate_json(f.read())
Registry.register_default_materializer(User, UserArchiver)
db = RegistryMindtraceODM(model_cls=User)
user = User(name="John Doe", email="john.doe@example.com")
inserted_user = db.insert(user)
# Update the user
inserted_user.name = "John Smith"
updated_user = db.update(inserted_user)
# Retrieve by ID (raises DocumentNotFoundError if not found)
try:
user = db.get(inserted_user.id)
except DocumentNotFoundError:
print("User not found")
Multi-Model Mode
from mindtrace.database import RegistryMindtraceODM
from mindtrace.registry import Registry, Archiver
from typing import Any, Type
from pydantic import BaseModel
class User(BaseModel):
name: str
email: str
class Address(BaseModel):
street: str
city: str
# Register materializers for both models
Registry.register_default_materializer(User, UserArchiver)
Registry.register_default_materializer(Address, AddressArchiver)
# Create ODM with multiple models
db = RegistryMindtraceODM(
models={'user': User, 'address': Address}
)
# Access models via attribute-based access
address = db.address.insert(Address(street="123 Main St", city="NYC"))
user = db.user.insert(User(name="John Doe", email="john@example.com"))
# All operations work per model
users = db.user.all()
addresses = db.address.all()
With GCP Storage:
from mindtrace.database import RegistryMindtraceODM
from mindtrace.registry import Registry, GCPRegistryBackend, Archiver
from typing import Any, Type
from pydantic import BaseModel
from pathlib import Path
class User(BaseModel):
name: str
email: str
class UserArchiver(Archiver):
def save(self, user: User):
with open(Path(self.uri) / "user.json", "w") as f:
f.write(user.model_dump_json())
def load(self, data_type: Type[Any]) -> User:
with open(Path(self.uri) / "user.json", "r") as f:
return User.model_validate_json(f.read())
Registry.register_default_materializer(User, UserArchiver)
gcp_registry_backend = GCPRegistryBackend(
uri="gs://my-bucket",
project_id="my-project",
bucket_name="my-bucket",
)
db = RegistryMindtraceODM(model_cls=User, backend=gcp_registry_backend)
user = User(name="John Doe", email="john.doe@example.com")
inserted_user = db.insert(user)
# Update the user
inserted_user.name = "John Smith"
updated_user = db.update(inserted_user)
# Retrieve by ID
user = db.get(inserted_user.id)
API Reference
Core Operations
All ODMs support both sync and async interfaces for all operations. Choose the style that fits your codebase.
Async Operations (Recommended for async code)
# Insert a document
inserted_doc = await db.insert_async(doc)
# Get document by ID
doc = await db.get_async("doc_id")
# Get document with linked documents (MongoDB only)
doc_with_links = await db.get_async("doc_id", fetch_links=True)
# Update document
doc.name = "Updated Name"
updated_doc = await db.update_async(doc)
# Delete document
await db.delete_async("doc_id")
# Get all documents
all_docs = await db.all_async()
# Find documents with filters
results = await db.find_async({"name": "Alice"})
# Find documents with linked documents (MongoDB only)
results_with_links = await db.find_async({"name": "Alice"}, fetch_links=True)
Sync Operations (Works with both MongoDB and Redis)
# Insert a document
inserted_doc = db.insert(doc)
# Get document by ID
doc = db.get("doc_id")
# Update document
doc.name = "Updated Name"
updated_doc = db.update(doc)
# Delete document
db.delete("doc_id")
# Get all documents
all_docs = db.all()
# Find documents with filters
results = db.find({"name": "Alice"})
Note:
- MongoDB: Sync methods use wrapper functions that run async code in an event loop
- Redis: Async methods run sync operations in a thread pool to avoid blocking the event loop
- Unified ODM: Automatically routes to the appropriate method based on the active database
- Document IDs: All backends provide a consistent
idattribute on returned documents (MongoDB usesid, Redis usespkinternally but exposes it asid)
Sync/Async Compatibility
Both MongoDB and Redis ODMs support both interfaces:
| Database | Native Interface | Wrapper Interface |
|---|---|---|
| MongoDB | Async (insert, get, etc.) |
Sync (insert_sync, get_sync, etc.) |
| Redis | Sync (insert, get, etc.) |
Async (insert_async, get_async, etc.) |
This means you can:
- Use sync code with MongoDB (via sync wrappers)
- Use async code with Redis (via async wrappers)
- Mix and match based on your needs
UnifiedMindtraceODM Specific
Additional methods for the unified ODM:
# Database management
db.switch_backend(BackendType.REDIS)
current_type = db.get_current_backend_type()
is_async = db.is_async()
# Database availability
has_mongo = db.has_mongo_backend()
has_redis = db.has_redis_backend()
# Direct access to underlying ODMs
mongo_odm = db.get_mongo_backend()
redis_odm = db.get_redis_backend()
# Model access
raw_model = db.get_raw_model()
unified_model = db.get_unified_model()
Advanced Querying
MongoDB (through UnifiedMindtraceODM)
from mindtrace.database import Link
# MongoDB-style queries
users = await db.find_async({"age": {"$gte": 18}})
users = await db.find_async({"skills": {"$in": ["Python", "JavaScript"]}})
# Fetch linked documents using fetch_links=True
user = await db.get_async(user_id, fetch_links=True)
if user.address:
print(f"User lives at {user.address.street}")
# Find with linked documents
users = await db.find_async({"name": "Alice"}, fetch_links=True)
for u in users:
if u.address:
print(f"{u.name} - {u.address.city}")
# Using Beanie expressions with links (get raw model first)
UserMongo = db.get_raw_model()
users = await db.find_async(UserMongo.name == "Alice", fetch_links=True)
# Aggregation pipelines (when using MongoDB)
if db.get_current_backend_type() == BackendType.MONGO:
pipeline = [
{"$match": {"age": {"$gte": 18}}},
{"$group": {"_id": "$department", "count": {"$sum": 1}}}
]
results = await db.get_mongo_backend().aggregate(pipeline)
Redis (through UnifiedMindtraceODM)
# Switch to Redis for these queries
db.switch_backend(BackendType.REDIS)
# Redis OM expressions
Model = db.get_raw_model()
users = db.find(Model.age >= 18)
users = db.find(Model.name == "Alice")
users = db.find(Model.skills << "Python") # Contains
Initialization Options
By default, ODMs auto-initialize on first operation. For more control, use constructor parameters:
from mindtrace.database import InitMode
db = UnifiedMindtraceODM(
unified_model_cls=User,
mongo_db_uri="mongodb://localhost:27017",
mongo_db_name="myapp",
redis_url="redis://localhost:6379",
preferred_backend=BackendType.MONGO,
auto_init=True, # Initialize at creation time
init_mode=InitMode.SYNC, # Use sync initialization
)
InitMode options:
InitMode.SYNC- Synchronous initialization (blocks until complete)InitMode.ASYNC- Deferred initialization (completes on first async operation)
Default behavior:
- MongoDB defaults to
InitMode.ASYNC - Redis defaults to
InitMode.SYNC
Error Handling
The module provides comprehensive error handling with consistent exceptions across all backends:
from mindtrace.database import DocumentNotFoundError, DuplicateInsertError
try:
user = await db.get_async("non_existent_id")
except DocumentNotFoundError as e:
print(f"User not found: {e}")
try:
await db.insert_async(duplicate_user)
except DuplicateInsertError as e:
print(f"User already exists: {e}")
# All backends raise DocumentNotFoundError (not KeyError) for consistency
try:
user = db.get("missing_id")
except DocumentNotFoundError:
print("Document not found")
# Multi-model mode errors
try:
db.insert(user) # In multi-model mode, this raises ValueError
except ValueError as e:
print(f"Use db.model_name.insert() instead: {e}")
from mindtrace.database import DocumentNotFoundError, DuplicateInsertError
try:
user = await db.get_async("non_existent_id")
except DocumentNotFoundError as e:
print(f"User not found: {e}")
try:
await db.insert_async(duplicate_user)
except DuplicateInsertError as e:
print(f"User already exists: {e}")
Testing
The database module includes comprehensive test coverage with both unit and integration tests.
Test Structure
tests/
├── unit/mindtrace/database/ # Unit tests (no DB required)
│ ├── test_mongo_unit.py
│ ├── test_redis_unit.py
│ ├── test_registry_odm_backend.py
│ └── test_unified_unit.py
└── integration/mindtrace/database/ # Integration tests (DB required)
├── test_mongo.py
├── test_redis_odm.py
├── test_registry_odm.py
└── test_unified.py
Running Tests
Quick Start - All Tests
ds test: database
Unit Tests Only
ds test: database --unit
Integration Tests (Containers managed by test script)
ds test: database --integration
Targeted Testing
# Test only unified backend
ds test: tests/integration/mindtrace/database/test_unified.py
# Test only MongoDB
ds test: tests/integration/mindtrace/database/test_mongo.py
# Test only Redis
ds test: tests/integration/mindtrace/database/test_redis_odm.py
Test Coverage
The test suite covers:
- CRUD Operations - Create, Read, Update, Delete
- Query Operations - Find, filter, search
- Error Handling - All exception scenarios
- Database Switching - Dynamic database changes
- Async/Sync Compatibility - Both programming styles
- Model Conversion - Unified to database-specific models
- Edge Cases - Duplicate keys, missing documents, invalid queries
Examples
Complete Example: User Management System
import asyncio
from mindtrace.database import (
UnifiedMindtraceODM,
UnifiedMindtraceDocument,
BackendType,
DocumentNotFoundError
)
from pydantic import Field
from typing import List
class User(UnifiedMindtraceDocument):
name: str = Field(description="Full name")
email: str = Field(description="Email address")
age: int = Field(ge=0, le=150, description="Age")
department: str = Field(description="Department")
skills: List[str] = Field(default_factory=list)
class Meta:
collection_name = "employees"
global_key_prefix = "company"
indexed_fields = ["email", "department", "skills"]
unique_fields = ["email"]
async def main():
# Setup with both MongoDB and Redis
db = UnifiedMindtraceODM(
unified_model_cls=User,
mongo_db_uri="mongodb://localhost:27017",
mongo_db_name="company",
redis_url="redis://localhost:6379",
preferred_backend=BackendType.MONGO
)
# Create some users
users = [
User(
name="Alice Johnson",
email="alice@company.com",
age=30,
department="Engineering",
skills=["Python", "MongoDB", "Docker"]
),
User(
name="Bob Smith",
email="bob@company.com",
age=25,
department="Engineering",
skills=["JavaScript", "Redis", "React"]
),
User(
name="Carol Davis",
email="carol@company.com",
age=35,
department="Marketing",
skills=["Analytics", "SQL"]
)
]
# Insert users
print("Creating users...")
for user in users:
try:
inserted = await db.insert_async(user)
print(f"Created: {inserted.name} (ID: {inserted.id})")
except Exception as e:
print(f"Failed to create {user.name}: {e}")
# Find engineers
print("\nFinding engineers...")
engineers = await db.find_async({"department": "Engineering"})
for eng in engineers:
print(f"{eng.name} - Skills: {', '.join(eng.skills)}")
# Switch to Redis for fast lookups
print("\nSwitching to Redis for fast operations...")
db.switch_backend(BackendType.REDIS)
# Insert more users in Redis (both sync and async work)
redis_user = User(
name="Dave Wilson",
email="dave@company.com",
age=28,
department="DevOps",
skills=["Kubernetes", "Redis", "Monitoring"]
)
# Use sync interface (native for Redis)
redis_inserted = db.insert(redis_user)
print(f"Redis user created (sync): {redis_inserted.name}")
# Or use async interface (wrapper for Redis)
redis_user2 = User(
name="Eve Brown",
email="eve@company.com",
age=32,
department="DevOps",
skills=["Docker", "CI/CD"]
)
redis_inserted2 = await db.insert_async(redis_user2)
print(f"Redis user created (async): {redis_inserted2.name}")
# Demonstrate data isolation
print(f"\nMongoDB users: {len(await db.get_mongo_backend().all())}")
print(f"Redis users: {len(db.get_redis_backend().all())}")
# Switch back to MongoDB
db.switch_backend(BackendType.MONGO)
print(f"Back to MongoDB - Users: {len(await db.all_async())}")
if __name__ == "__main__":
asyncio.run(main())
More Examples
Check out the samples/database/ directory for additional examples:
using_unified_backend.py- Comprehensive unified ODM usage- Advanced querying patterns
- Database switching strategies
- Error handling best practices
Best Practices
1. Model Design
# Good: Clear, descriptive models
class Product(UnifiedMindtraceDocument):
name: str = Field(description="Product name", min_length=1)
price: float = Field(ge=0, description="Price in USD")
category: str = Field(description="Product category")
class Meta:
collection_name = "products"
indexed_fields = ["category", "name"]
unique_fields = ["name"]
# Avoid: Unclear models without validation
class Product(UnifiedMindtraceDocument):
n: str
p: float
c: str
2. Error Handling
# Always handle database exceptions
try:
user = await db.get_async(user_id)
print(f"Found user: {user.name}")
except DocumentNotFoundError:
print("User not found - creating new user")
user = await db.insert_async(User(name="New User", email="new@example.com"))
except Exception as e:
logger.error(f"Database error: {e}")
# Handle appropriately
3. Database Selection
# Choose database based on use case
if high_frequency_reads:
db.switch_backend(BackendType.REDIS) # Fast reads
else:
db.switch_backend(BackendType.MONGO) # Complex queries
Contributing
When adding new features:
- Add tests - Both unit and integration tests
- Update documentation - Keep README and docstrings current
- Follow patterns - Use existing code style and patterns
- Test thoroughly - Run the full test suite
Requirements
- MongoDB 4.4+ (for MongoMindtraceODM)
- Redis 6.0+ (for RedisMindtraceODM)
- Core dependencies:
pydantic,beanie,redis-om-python
Need Help?
- Check the
samples/database/directory for working examples - Look at the test files for usage patterns
- Review the docstrings in the source code for detailed API documentation
The Mindtrace Database Module makes it easy to work with multiple databases through a single, powerful interface.
Breaking Changes
Class Name Changes (v0.6.0)
All ODM class names have been simplified by removing the "Backend" suffix:
| Old Name | New Name |
|---|---|
MindtraceODMBackend |
MindtraceODM |
MongoMindtraceODMBackend |
MongoMindtraceODM |
RedisMindtraceODMBackend |
RedisMindtraceODM |
RegistryMindtraceODMBackend |
RegistryMindtraceODM |
UnifiedMindtraceODMBackend |
UnifiedMindtraceODM |
File names also updated:
| Old File | New File |
|---|---|
mindtrace_odm_backend.py |
mindtrace_odm.py |
mongo_odm_backend.py |
mongo_odm.py |
redis_odm_backend.py |
redis_odm.py |
registry_odm_backend.py |
registry_odm.py |
unified_odm_backend.py |
unified_odm.py |
Migration:
# Old
from mindtrace.database import MongoMindtraceODMBackend, UnifiedMindtraceODMBackend
db = MongoMindtraceODMBackend(model_cls=User, db_uri="...", db_name="...")
# New
from mindtrace.database import MongoMindtraceODM, UnifiedMindtraceODM
db = MongoMindtraceODM(model_cls=User, db_uri="...", db_name="...")
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mindtrace_database-0.11.0.tar.gz.
File metadata
- Download URL: mindtrace_database-0.11.0.tar.gz
- Upload date:
- Size: 58.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0f35e4dbd08099c53222306a30711ea8c5136eca9049951c70248b3254a98933
|
|
| MD5 |
19fe9fcb64a18d02b36ce0eb06bd5770
|
|
| BLAKE2b-256 |
c2908e4349e6169411890173bde0ce6228fa40ed21f83a51412f96ce97e7b700
|
File details
Details for the file mindtrace_database-0.11.0-py3-none-any.whl.
File metadata
- Download URL: mindtrace_database-0.11.0-py3-none-any.whl
- Upload date:
- Size: 50.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
70b5a5949885fe7b7ff6fc923f2708809a80b588da3ab78aa3976b886ad43ccd
|
|
| MD5 |
113a9b78589e390682ddd5299439aee1
|
|
| BLAKE2b-256 |
5fbd8cd0195fe22cd2c4dd54a31eb11e30f4651e73523e29f05ec5945934aac0
|