Skip to main content

A framework for storing and managing component and application data for machine apps.

Project description

Vention Storage

A framework for storing and managing component and application data with persistence, validation, and audit trails for machine applications.

Table of Contents

✨ Features

  • Persistent storage with SQLite
  • Automatic audit trails (who, when, what changed)
  • Strong typing & validation via SQLModel
  • Lifecycle hooks before/after insert, update, delete
  • Soft delete with deleted_at fields
  • ConnectRPC bundle generation with Create, Read, Update, Delete + audit
  • Health & monitoring actions (audit log, schema diagram)
  • Batch operations for insert/delete
  • Session management with smart reuse & transactions
  • Bootstrap system for one-command setup
  • Opt-in SQLite auto-migrations for safe additive schema changes
  • CSV export/import for backups and migration
  • Database backup/restore with integrity checking

🧠 Concepts & Overview

Vention Storage is a component-based persistence layer for machine apps:

  • Database → SQLite database with managed sessions and transactions
  • ModelAccessor → Strongly-typed Create, Read, Update, Delete interface for your SQLModel classes
  • Hooks → Functions that run before/after Create, Read, Update, Delete operations
  • AuditLog → Automatically records all data mutations
  • RpcBundle → Auto-generated ConnectRPC bundle with Create, Read, Update, Delete + database management actions

⚙️ Installation & Setup

pip install vention-storage

Optional dependencies:

  • sqlalchemy-schemadisplay and Graphviz → enable database schema visualization

MacOS:

brew install graphviz
pip install sqlalchemy-schemadisplay

Linux (Debian/Ubuntu)

sudo apt-get install graphviz
pip install sqlalchemy-schemadisplay

🚀 Quickstart Tutorial

Define a model, bootstrap storage, and get full Create, Read, Update, Delete RPC actions in minutes:

from datetime import datetime
from typing import Optional
from sqlmodel import Field, SQLModel
from communication.app import VentionApp
from storage.bootstrap import bootstrap
from storage.accessor import ModelAccessor
from storage.vention_communication import build_storage_rpc_bundle

class User(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str
    email: str
    deleted_at: Optional[datetime] = Field(default=None, index=True)

# Initialize database
bootstrap(
    database_url="sqlite:///./my_app.db",
    auto_migrate=True,
)

# Create accessor
user_accessor = ModelAccessor(User, "users")

# Build RPC bundle and add to app
app = VentionApp(name="my-app")
storage_bundle = build_storage_rpc_bundle(
    accessors=[user_accessor],
    max_records_per_model=100,
    enable_db_actions=True
)
app.register_rpc_plugin(storage_bundle)
app.finalize()

➡️ You now have Create, Read, Update, Delete, audit, backup, CSV, and opt-in startup migrations available via ConnectRPC.

auto_migrate=True is intentionally conservative. It supports creating missing tables, adding safe columns, backfilling scalar defaults, and creating missing indexes. Destructive or ambiguous changes still require a manual migration.

🛠 How-to Guides

Bootstrap Multiple Models

# user_accessor was created earlier in the Quickstart example
# Reuse it here to bootstrap multiple models at once

class Product(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str
    sku: str

product_accessor = ModelAccessor(Product, "products")

# Build bundle with multiple accessors
storage_bundle = build_storage_rpc_bundle(
    accessors=[user_accessor, product_accessor],
    max_records_per_model=100,
    enable_db_actions=True
)
app.register_rpc_plugin(storage_bundle)

Export to CSV

# Using ConnectRPC client
from communication.client import ConnectClient

client = ConnectClient("http://localhost:8000")
response = await client.call("Database_ExportZip", {})
with open("backup.zip", "wb") as f:
    f.write(response.data)

Backup & Restore

# Backup
backup_response = await client.call("Database_BackupSqlite", {})
with open(backup_response.filename, "wb") as f:
    f.write(backup_response.data)

# Restore
with open("backup.sqlite", "rb") as f:
    restore_response = await client.call(
        "Database_RestoreSqlite",
        {
            "bytes": f.read(),
            "filename": "backup.sqlite",
            "integrity_check": True,
            "dry_run": False
        }
    )

Use Lifecycle Hooks

@user_accessor.before_insert()
def validate_email(session, instance):
    if "@" not in instance.email:
        raise ValueError("Invalid email")

@user_accessor.after_insert()
def log_creation(session, instance):
    print(f"User created: {instance.name}")

Query Audit Logs

from storage.auditor import AuditLog
from storage import database
from sqlmodel import select

with database.transaction() as session:
    logs = session.exec(select(AuditLog).where(AuditLog.component == "users")).all()

Using the model accessors

# Create
user = user_accessor.insert(User(name="Alice", email="alice@example.com"), actor="admin")

# Read
user = user_accessor.get(user.id)

# Update
user.name = "Alice Smith"
user_accessor.save(user, actor="admin")

# Delete
user_accessor.delete(user.id, actor="admin")

# Restore (for soft-deleted models)
user_accessor.restore(user.id, actor="admin")

# Find users by exact match
users = user_accessor.find(user_accessor.where.email == "alice@example.com")

# Multiple conditions (AND logic)
users = user_accessor.find(
    user_accessor.where.name == "Alice",
    user_accessor.where.email == "alice@example.com"
)

# Comparison operators
recent_users = user_accessor.find(user_accessor.where.id > 10)
first_users = user_accessor.find(user_accessor.where.id <= 5)

# String operations
smiths = user_accessor.find(user_accessor.where.name.contains("Smith"))
gmail_users = user_accessor.find(user_accessor.where.email.endswith("@gmail.com"))
search = user_accessor.find(user_accessor.where.name.ilike("%alice%"))  # case-insensitive

# Collection check
selected_users = user_accessor.find(
    user_accessor.where.email.in_(["alice@example.com", "bob@example.com"])
)

# Null checks
active_users = user_accessor.find(user_accessor.where.deleted_at.is_(None))
deleted_users = user_accessor.find(user_accessor.where.deleted_at.isnot(None))

# With pagination and sorting
page = user_accessor.find(
    user_accessor.where.email.contains("@example.com"),
    limit=10,
    offset=20,
    order_by="id",
    order_desc=True
)

# Include soft-deleted records
all_users = user_accessor.find(
    user_accessor.where.email.contains("@example.com"),
    include_deleted=True
)

Using ConnectRPC Client

Once the bundle is added to your VentionApp, each ModelAccessor automatically exposes full CRUD actions via ConnectRPC.

Example: interacting with the Users RPC actions.

import { createPromiseClient } from "@connectrpc/connect";
import { createConnectTransport } from "@connectrpc/connect-web";

const transport = createConnectTransport({
  baseUrl: "http://localhost:8000",
});

const client = createPromiseClient(YourServiceClient, transport);

// Create
export async function createUser(name: string, email: string) {
  const res = await client.usersCreateRecord({
    record: { name, email },
    actor: "operator"
  });
  return res.record;
}

// Read
export async function getUser(id: number) {
  const res = await client.usersGetRecord({
    recordId: id,
    includeDeleted: false
  });
  return res.record;
}

// Update
export async function updateUser(id: number, name: string) {
  const res = await client.usersUpdateRecord({
    recordId: id,
    record: { name },
    actor: "operator"
  });
  return res.record;
}

// Delete (soft delete if model supports deleted_at)
export async function deleteUser(id: number) {
  await client.usersDeleteRecord({
    recordId: id,
    actor: "operator"
  });
}

// Restore
export async function restoreUser(id: number) {
  const res = await client.usersRestoreRecord({
    recordId: id,
    actor: "operator"
  });
  return res.record;
}

// List
export async function listUsers() {
  const res = await client.usersListRecords({
    includeDeleted: false
  });
  return res.records;
}

// Find by exact match
export async function findUserByEmail(email: string) {
  const res = await client.usersFindRecords({
    filters: [
      { field: "email", operation: "eq", value: email }
    ]
  });
  return res.records;
}

// Find with multiple conditions (AND logic)
export async function findNamedUser(name: string, email: string) {
  const res = await client.usersFindRecords({
    filters: [
      { field: "name", operation: "eq", value: name },
      { field: "email", operation: "eq", value: email }
    ]
  });
  return res.records;
}

// Find with null checks
export async function findDeletedUsers() {
  const res = await client.usersFindRecords({
    filters: [
      { field: "deleted_at", operation: "is_not_null" }
    ]
  });
  return res.records;
}

// Complex query example
export async function findRecentUsers(minId: string) {
  const res = await client.usersFindRecords({
    filters: [
      { field: "email", operation: "contains", value: "@example.com" },
      { field: "id", operation: "gte", value: minId },
      { field: "deleted_at", operation: "is_null" }
    ],
    limit: 50,
    orderBy: "id",
    orderDesc: true
  });
  return res.records;
}

Filter Operations Reference

Operation Description
eq Exact match
ne Not equal to value
gt Greater than value
gte Greater than or equal
lt Less than value
lte Less than or equal
in Value in array
not_in Value not in array
contains Field contains substring
startswith Field starts with prefix
endswith Field ends with suffix
like Case-insensitive pattern match
is_null Field is null (no value needed)
is_not_null Field is not null (no value needed)

📖 API Reference

bootstrap

def bootstrap(
    *,
    database_url: Optional[str] = None,
    auto_migrate: bool = False,
) -> None

Initialize the database engine and ensure the schema exists.

When auto_migrate=False, bootstrap() creates all tables from SQLModel.metadata.

When auto_migrate=True, bootstrap() plans and applies supported SQLite schema migrations using SQLModel.metadata. All models must be imported before calling bootstrap() so they are present in SQLModel.metadata.

build_storage_rpc_bundle

def build_storage_rpc_bundle(
    *,
    accessors: Sequence[ModelAccessor[Any]],
    max_records_per_model: Optional[int] = 5,
    enable_db_actions: bool = True,
) -> RpcBundle

Build a ConnectRPC RpcBundle exposing CRUD and database utilities. Returns an RpcBundle that can be registered on a VentionApp using app.register_rpc_plugin().

ModelAccessor

ModelAccessor(
    model: Type[ModelType],
    component_name: str,
    *,
    enable_auditing: bool = True,
)

Read

  • get(id, include_deleted=False) -> Optional[ModelType]
  • all(include_deleted=False) -> List[ModelType]

Write

  • insert(obj, actor="internal") -> ModelType
  • save(obj, actor="internal") -> ModelType
  • delete(id, actor="internal") -> bool
  • restore(id, actor="internal") -> bool

Batch

  • insert_many(objs, actor="internal") -> List[ModelType]
  • delete_many(ids, actor="internal") -> int

Hooks

  • @accessor.before_insert()
  • @accessor.after_insert()
  • @accessor.before_update()
  • @accessor.after_update()
  • @accessor.before_delete()
  • @accessor.after_delete()

Parameters

  • enable_auditing: If False, disables audit logging for this accessor. Useful for models that shouldn't be audited (e.g., audit logs themselves). Defaults to True.

Database Helpers

  • database.set_database_url(url: str) -> None
  • database.get_engine() -> Engine
  • database.transaction() -> Iterator[Session]
  • database.use_session(session: Optional[Session] = None) -> Iterator[Session]

AuditLog model

class AuditLog(SQLModel, table=True):
    id: int
    timestamp: datetime
    component: str
    record_id: int
    operation: str
    actor: str
    before: Optional[Dict[str, Any]]
    after: Optional[Dict[str, Any]]

🔍 Troubleshooting & FAQ

  • Diagram endpoint fails → Ensure Graphviz + sqlalchemy-schemadisplay are installed.
  • No audit actor shown → Provide an X-User header in API requests.
  • Soft delete not working → Your model must have a deleted_at field.
  • Restore fails → Ensure integrity_check=True passes when restoring backups.
  • Auto-migrate rejects my schema change → Only additive SQLite changes are supported automatically. Type changes, destructive edits, FK/PK changes, unique-constraint additions, and ambiguous renames still need a manual migration.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

vention_storage-0.12.2.tar.gz (41.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

vention_storage-0.12.2-py3-none-any.whl (32.1 kB view details)

Uploaded Python 3

File details

Details for the file vention_storage-0.12.2.tar.gz.

File metadata

  • Download URL: vention_storage-0.12.2.tar.gz
  • Upload date:
  • Size: 41.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.6.9

File hashes

Hashes for vention_storage-0.12.2.tar.gz
Algorithm Hash digest
SHA256 ef9b4c69db69c85b47692096646309b6f408af70dc36b14fcde311b71eef5fe2
MD5 ab69e11a32b3e53a52798bae9ff3ce2d
BLAKE2b-256 11af1ed57a07bf122ea0bc90a191f327d8f561f688665a6f44773ba798b98ac3

See more details on using hashes here.

File details

Details for the file vention_storage-0.12.2-py3-none-any.whl.

File metadata

File hashes

Hashes for vention_storage-0.12.2-py3-none-any.whl
Algorithm Hash digest
SHA256 8caf131face96f4c032e9acb89f63cb0fa28d3ab89a67cb604f2195d051fb1dd
MD5 51e95994b629bcdf82d0dbe849cad6fc
BLAKE2b-256 abaee9020dcea6d87aa6245a7c11aab39a29724606f00c8ff5f0ffa27255e615

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page