Skip to main content

A framework for storing and managing component and application data for machine apps.

Project description

Vention Storage

A framework for storing and managing component and application data with persistence, validation, and audit trails for machine applications.

Table of Contents

✨ Features

  • Persistent storage with SQLite
  • Automatic audit trails (who, when, what changed)
  • Strong typing & validation via SQLModel
  • Lifecycle hooks before/after insert, update, delete
  • Soft delete with deleted_at fields
  • ConnectRPC bundle generation with Create, Read, Update, Delete + audit
  • Health & monitoring actions (audit log, schema diagram)
  • Batch operations for insert/delete
  • Session management with smart reuse & transactions
  • Bootstrap system for one-command setup
  • Opt-in SQLite auto-migrations for safe additive schema changes
  • CSV export/import for backups and migration
  • Database backup/restore with integrity checking

🧠 Concepts & Overview

Vention Storage is a component-based persistence layer for machine apps:

  • Database → SQLite database with managed sessions and transactions
  • ModelAccessor → Strongly-typed Create, Read, Update, Delete interface for your SQLModel classes
  • Hooks → Functions that run before/after Create, Read, Update, Delete operations
  • AuditLog → Automatically records all data mutations
  • RpcBundle → Auto-generated ConnectRPC bundle with Create, Read, Update, Delete + database management actions

⚙️ Installation & Setup

pip install vention-storage

Optional dependencies:

  • sqlalchemy-schemadisplay and Graphviz → enable database schema visualization

MacOS:

brew install graphviz
pip install sqlalchemy-schemadisplay

Linux (Debian/Ubuntu)

sudo apt-get install graphviz
pip install sqlalchemy-schemadisplay

🚀 Quickstart Tutorial

Define a model, bootstrap storage, and get full Create, Read, Update, Delete RPC actions in minutes:

from datetime import datetime
from typing import Optional
from sqlmodel import Field, SQLModel
from communication.app import VentionApp
from storage.bootstrap import bootstrap
from storage.accessor import ModelAccessor
from storage.vention_communication import build_storage_rpc_bundle

class User(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str
    email: str
    deleted_at: Optional[datetime] = Field(default=None, index=True)

# Initialize database
bootstrap(
    database_url="sqlite:///./my_app.db",
    auto_migrate=True,
)

# Create accessor
user_accessor = ModelAccessor(User, "users")

# Build RPC bundle and add to app
app = VentionApp(name="my-app")
storage_bundle = build_storage_rpc_bundle(
    accessors=[user_accessor],
    max_records_per_model=100,
    enable_db_actions=True
)
app.register_rpc_plugin(storage_bundle)
app.finalize()

➡️ You now have Create, Read, Update, Delete, audit, backup, CSV, and opt-in startup migrations available via ConnectRPC.

auto_migrate=True is intentionally conservative. It supports creating missing tables, adding safe columns, backfilling scalar defaults, and creating missing indexes. Destructive or ambiguous changes still require a manual migration.

🛠 How-to Guides

Bootstrap Multiple Models

# user_accessor was created earlier in the Quickstart example
# Reuse it here to bootstrap multiple models at once

class Product(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str
    sku: str

product_accessor = ModelAccessor(Product, "products")

# Build bundle with multiple accessors
storage_bundle = build_storage_rpc_bundle(
    accessors=[user_accessor, product_accessor],
    max_records_per_model=100,
    enable_db_actions=True
)
app.register_rpc_plugin(storage_bundle)

Export to CSV

# Using ConnectRPC client
from communication.client import ConnectClient

client = ConnectClient("http://localhost:8000")
response = await client.call("Database_ExportZip", {})
with open("backup.zip", "wb") as f:
    f.write(response.data)

Backup & Restore

# Backup
backup_response = await client.call("Database_BackupSqlite", {})
with open(backup_response.filename, "wb") as f:
    f.write(backup_response.data)

# Restore
with open("backup.sqlite", "rb") as f:
    restore_response = await client.call(
        "Database_RestoreSqlite",
        {
            "bytes": f.read(),
            "filename": "backup.sqlite",
            "integrity_check": True,
            "dry_run": False
        }
    )

Use Lifecycle Hooks

@user_accessor.before_insert()
def validate_email(session, instance):
    if "@" not in instance.email:
        raise ValueError("Invalid email")

@user_accessor.after_insert()
def log_creation(session, instance):
    print(f"User created: {instance.name}")

Query Audit Logs

from storage.auditor import AuditLog
from storage import database
from sqlmodel import select

with database.transaction() as session:
    logs = session.exec(select(AuditLog).where(AuditLog.component == "users")).all()

Using the model accessors

# Create
user = user_accessor.insert(User(name="Alice", email="alice@example.com"), actor="admin")

# Read
user = user_accessor.get(user.id)

# Update
user.name = "Alice Smith"
user_accessor.save(user, actor="admin")

# Delete
user_accessor.delete(user.id, actor="admin")

# Restore (for soft-deleted models)
user_accessor.restore(user.id, actor="admin")

# Find users by exact match
users = user_accessor.find(user_accessor.where.email == "alice@example.com")

# Multiple conditions (AND logic)
users = user_accessor.find(
    user_accessor.where.name == "Alice",
    user_accessor.where.email == "alice@example.com"
)

# Comparison operators
recent_users = user_accessor.find(user_accessor.where.id > 10)
first_users = user_accessor.find(user_accessor.where.id <= 5)

# String operations
smiths = user_accessor.find(user_accessor.where.name.contains("Smith"))
gmail_users = user_accessor.find(user_accessor.where.email.endswith("@gmail.com"))
search = user_accessor.find(user_accessor.where.name.ilike("%alice%"))  # case-insensitive

# Collection check
selected_users = user_accessor.find(
    user_accessor.where.email.in_(["alice@example.com", "bob@example.com"])
)

# Null checks
active_users = user_accessor.find(user_accessor.where.deleted_at.is_(None))
deleted_users = user_accessor.find(user_accessor.where.deleted_at.isnot(None))

# With pagination and sorting
page = user_accessor.find(
    user_accessor.where.email.contains("@example.com"),
    limit=10,
    offset=20,
    order_by="id",
    order_desc=True
)

# Include soft-deleted records
all_users = user_accessor.find(
    user_accessor.where.email.contains("@example.com"),
    include_deleted=True
)

Using ConnectRPC Client

Once the bundle is added to your VentionApp, each ModelAccessor automatically exposes full CRUD actions via ConnectRPC.

Example: interacting with the Users RPC actions.

import { createPromiseClient } from "@connectrpc/connect";
import { createConnectTransport } from "@connectrpc/connect-web";

const transport = createConnectTransport({
  baseUrl: "http://localhost:8000",
});

const client = createPromiseClient(YourServiceClient, transport);

// Create
export async function createUser(name: string, email: string) {
  const res = await client.usersCreateRecord({
    record: { name, email },
    actor: "operator"
  });
  return res.record;
}

// Read
export async function getUser(id: number) {
  const res = await client.usersGetRecord({
    recordId: id,
    includeDeleted: false
  });
  return res.record;
}

// Update
export async function updateUser(id: number, name: string) {
  const res = await client.usersUpdateRecord({
    recordId: id,
    record: { name },
    actor: "operator"
  });
  return res.record;
}

// Delete (soft delete if model supports deleted_at)
export async function deleteUser(id: number) {
  await client.usersDeleteRecord({
    recordId: id,
    actor: "operator"
  });
}

// Restore
export async function restoreUser(id: number) {
  const res = await client.usersRestoreRecord({
    recordId: id,
    actor: "operator"
  });
  return res.record;
}

// List
export async function listUsers() {
  const res = await client.usersListRecords({
    includeDeleted: false
  });
  return res.records;
}

// Find by exact match
export async function findUserByEmail(email: string) {
  const res = await client.usersFindRecords({
    filters: [
      { field: "email", operation: "eq", value: email }
    ]
  });
  return res.records;
}

// Find with multiple conditions (AND logic)
export async function findNamedUser(name: string, email: string) {
  const res = await client.usersFindRecords({
    filters: [
      { field: "name", operation: "eq", value: name },
      { field: "email", operation: "eq", value: email }
    ]
  });
  return res.records;
}

// Find with null checks
export async function findDeletedUsers() {
  const res = await client.usersFindRecords({
    filters: [
      { field: "deleted_at", operation: "is_not_null" }
    ]
  });
  return res.records;
}

// Complex query example
export async function findRecentUsers(minId: string) {
  const res = await client.usersFindRecords({
    filters: [
      { field: "email", operation: "contains", value: "@example.com" },
      { field: "id", operation: "gte", value: minId },
      { field: "deleted_at", operation: "is_null" }
    ],
    limit: 50,
    orderBy: "id",
    orderDesc: true
  });
  return res.records;
}

Filter Operations Reference

Operation Description
eq Exact match
ne Not equal to value
gt Greater than value
gte Greater than or equal
lt Less than value
lte Less than or equal
in Value in array
not_in Value not in array
contains Field contains substring
startswith Field starts with prefix
endswith Field ends with suffix
like Case-insensitive pattern match
is_null Field is null (no value needed)
is_not_null Field is not null (no value needed)

📖 API Reference

bootstrap

def bootstrap(
    *,
    database_url: Optional[str] = None,
    auto_migrate: bool = False,
) -> None

Initialize the database engine and ensure the schema exists.

When auto_migrate=False, bootstrap() creates all tables from SQLModel.metadata.

When auto_migrate=True, bootstrap() plans and applies supported SQLite schema migrations using SQLModel.metadata. All models must be imported before calling bootstrap() so they are present in SQLModel.metadata.

build_storage_rpc_bundle

def build_storage_rpc_bundle(
    *,
    accessors: Sequence[ModelAccessor[Any]],
    max_records_per_model: Optional[int] = 5,
    enable_db_actions: bool = True,
) -> RpcBundle

Build a ConnectRPC RpcBundle exposing CRUD and database utilities. Returns an RpcBundle that can be registered on a VentionApp using app.register_rpc_plugin().

ModelAccessor

ModelAccessor(
    model: Type[ModelType],
    component_name: str,
    *,
    enable_auditing: bool = True,
)

Read

  • get(id, include_deleted=False) -> Optional[ModelType]
  • all(include_deleted=False) -> List[ModelType]

Write

  • insert(obj, actor="internal") -> ModelType
  • save(obj, actor="internal") -> ModelType
  • delete(id, actor="internal") -> bool
  • restore(id, actor="internal") -> bool

Batch

  • insert_many(objs, actor="internal") -> List[ModelType]
  • delete_many(ids, actor="internal") -> int

Hooks

  • @accessor.before_insert()
  • @accessor.after_insert()
  • @accessor.before_update()
  • @accessor.after_update()
  • @accessor.before_delete()
  • @accessor.after_delete()

Parameters

  • enable_auditing: If False, disables audit logging for this accessor. Useful for models that shouldn't be audited (e.g., audit logs themselves). Defaults to True.

Database Helpers

  • database.set_database_url(url: str) -> None
  • database.get_engine() -> Engine
  • database.transaction() -> Iterator[Session]
  • database.use_session(session: Optional[Session] = None) -> Iterator[Session]

AuditLog model

class AuditLog(SQLModel, table=True):
    id: int
    timestamp: datetime
    component: str
    record_id: int
    operation: str
    actor: str
    before: Optional[Dict[str, Any]]
    after: Optional[Dict[str, Any]]

🔍 Troubleshooting & FAQ

  • Diagram endpoint fails → Ensure Graphviz + sqlalchemy-schemadisplay are installed.
  • No audit actor shown → Provide an X-User header in API requests.
  • Soft delete not working → Your model must have a deleted_at field.
  • Restore fails → Ensure integrity_check=True passes when restoring backups.
  • Auto-migrate rejects my schema change → Only additive SQLite changes are supported automatically. Type changes, destructive edits, FK/PK changes, unique-constraint additions, and ambiguous renames still need a manual migration.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

vention_storage-0.11.1.tar.gz (41.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

vention_storage-0.11.1-py3-none-any.whl (32.1 kB view details)

Uploaded Python 3

File details

Details for the file vention_storage-0.11.1.tar.gz.

File metadata

  • Download URL: vention_storage-0.11.1.tar.gz
  • Upload date:
  • Size: 41.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.6.9

File hashes

Hashes for vention_storage-0.11.1.tar.gz
Algorithm Hash digest
SHA256 4bd62ddf3e53e934d9506aa2d55ce14712595417fdd96d172f99b7424e9e9bab
MD5 672c4f64fa1399af2b9e11ebd845d37b
BLAKE2b-256 dc02a6ab3a8b15d65ad4ae6bf08860cc35d667c4e1d180ada91f14ef92f89d16

See more details on using hashes here.

File details

Details for the file vention_storage-0.11.1-py3-none-any.whl.

File metadata

File hashes

Hashes for vention_storage-0.11.1-py3-none-any.whl
Algorithm Hash digest
SHA256 c2af922d93510ad84b9d428e8a5c8e0ec40bf818fd66ea1dfc12465ad6ce600c
MD5 14629a47e18bd6dd6c423f35b1cd1449
BLAKE2b-256 662da08b0fd667159fe3ef891357165805f405873d1660a6022837500a5783e9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page