Skip to main content

A framework for storing and managing component and application data for machine apps.

Project description

Vention Storage

A framework for storing and managing component and application data with persistence, validation, and audit trails for machine applications.

Table of Contents

✨ Features

  • Persistent storage with SQLite
  • Automatic audit trails (who, when, what changed)
  • Strong typing & validation via SQLModel
  • Lifecycle hooks before/after insert, update, delete
  • Soft delete with deleted_at fields
  • ConnectRPC bundle generation with Create, Read, Update, Delete + audit
  • Health & monitoring actions (audit log, schema diagram)
  • Batch operations for insert/delete
  • Session management with smart reuse & transactions
  • Bootstrap system for one-command setup
  • CSV export/import for backups and migration
  • Database backup/restore with integrity checking

🧠 Concepts & Overview

Vention Storage is a component-based persistence layer for machine apps:

  • Database → SQLite database with managed sessions and transactions
  • ModelAccessor → Strongly-typed Create, Read, Update, Delete interface for your SQLModel classes
  • Hooks → Functions that run before/after Create, Read, Update, Delete operations
  • AuditLog → Automatically records all data mutations
  • RpcBundle → Auto-generated ConnectRPC bundle with Create, Read, Update, Delete + database management actions

⚙️ Installation & Setup

pip install vention-storage

Optional dependencies:

  • sqlalchemy-schemadisplay and Graphviz → enable database schema visualization

MacOS:

brew install graphviz
pip install sqlalchemy-schemadisplay

Linux (Debian/Ubuntu)

sudo apt-get install graphviz
pip install sqlalchemy-schemadisplay

🚀 Quickstart Tutorial

Define a model, bootstrap storage, and get full Create, Read, Update, Delete RPC actions in minutes:

from datetime import datetime
from typing import Optional
from sqlmodel import Field, SQLModel
from communication.app import VentionApp
from storage.bootstrap import bootstrap
from storage.accessor import ModelAccessor
from storage.vention_communication import build_storage_bundle

class User(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str
    email: str
    deleted_at: Optional[datetime] = Field(default=None, index=True)

# Initialize database
bootstrap(
    database_url="sqlite:///./my_app.db",
    create_tables=True
)

# Create accessor
user_accessor = ModelAccessor(User, "users")

# Build RPC bundle and add to app
app = VentionApp(name="my-app")
storage_bundle = build_storage_bundle(
    accessors=[user_accessor],
    max_records_per_model=100,
    enable_db_actions=True
)
app.add_bundle(storage_bundle)
app.finalize()

➡️ You now have Create, Read, Update, Delete, audit, backup, and CSV actions available via ConnectRPC.

🛠 How-to Guides

Bootstrap Multiple Models

# user_accessor was created earlier in the Quickstart example
# Reuse it here to bootstrap multiple models at once

product_accessor = ModelAccessor(Product, "products")

# Build bundle with multiple accessors
storage_bundle = build_storage_bundle(
    accessors=[user_accessor, product_accessor],
    max_records_per_model=100,
    enable_db_actions=True
)
app.add_bundle(storage_bundle)

Export to CSV

# Using ConnectRPC client
from communication.client import ConnectClient

client = ConnectClient("http://localhost:8000")
response = await client.call("Database_ExportZip", {})
with open("backup.zip", "wb") as f:
    f.write(response.data)

Backup & Restore

# Backup
backup_response = await client.call("Database_BackupSqlite", {})
with open(backup_response.filename, "wb") as f:
    f.write(backup_response.data)

# Restore
with open("backup.sqlite", "rb") as f:
    restore_response = await client.call(
        "Database_RestoreSqlite",
        {
            "bytes": f.read(),
            "filename": "backup.sqlite",
            "integrity_check": True,
            "dry_run": False
        }
    )

Use Lifecycle Hooks

@user_accessor.before_insert()
def validate_email(session, instance):
    if "@" not in instance.email:
        raise ValueError("Invalid email")

@user_accessor.after_insert()
def log_creation(session, instance):
    print(f"User created: {instance.name}")

Query Audit Logs

from storage.auditor import AuditLog
from sqlmodel import select

with database.transaction() as session:
    logs = session.exec(select(AuditLog).where(AuditLog.component == "users")).all()

Using the model accessors

# Create
user = user_accessor.insert(User(name="Alice", email="alice@example.com"), actor="admin")

# Read
user = user_accessor.get(user.id)

# Update
user.name = "Alice Smith"
user_accessor.save(user, actor="admin")

# Delete
user_accessor.delete(user.id, actor="admin")

# Restore (for soft-deleted models)
user_accessor.restore(user.id, actor="admin")

# Find users by exact match
users = user_accessor.find(user_accessor.where.email == "alice@example.com")

# Multiple conditions (AND logic)
users = user_accessor.find(
    user_accessor.where.name == "Alice",
    user_accessor.where.email == "alice@example.com"
)

# Comparison operators
adults = user_accessor.find(user_accessor.where.age >= 18)
recent = user_accessor.find(user_accessor.where.created_at > cutoff_date)

# String operations
smiths = user_accessor.find(user_accessor.where.name.contains("Smith"))
gmail_users = user_accessor.find(user_accessor.where.email.endswith("@gmail.com"))
search = user_accessor.find(user_accessor.where.name.ilike("%alice%"))  # case-insensitive

# Collection check
admins = user_accessor.find(user_accessor.where.role.in_(["admin", "superadmin"]))

# Null checks
unverified = user_accessor.find(user_accessor.where.verified_at.is_(None))
verified = user_accessor.find(user_accessor.where.verified_at.isnot(None))

# With pagination and sorting
page = user_accessor.find(
    user_accessor.where.status == "active",
    limit=10,
    offset=20,
    order_by="created_at",
    order_desc=True
)

# Include soft-deleted records
all_users = user_accessor.find(
    user_accessor.where.role == "admin",
    include_deleted=True
)

Using ConnectRPC Client

Once the bundle is added to your VentionApp, each ModelAccessor automatically exposes full CRUD actions via ConnectRPC.

Example: interacting with the Users RPC actions.

import { createPromiseClient } from "@connectrpc/connect";
import { createConnectTransport } from "@connectrpc/connect-web";

const transport = createConnectTransport({
  baseUrl: "http://localhost:8000",
});

const client = createPromiseClient(YourServiceClient, transport);

// Create
export async function createUser(name: string, email: string) {
  const res = await client.usersCreateRecord({
    record: { name, email },
    actor: "operator"
  });
  return res.record;
}

// Read
export async function getUser(id: number) {
  const res = await client.usersGetRecord({
    recordId: id,
    includeDeleted: false
  });
  return res.record;
}

// Update
export async function updateUser(id: number, name: string) {
  const res = await client.usersUpdateRecord({
    recordId: id,
    record: { name },
    actor: "operator"
  });
  return res.record;
}

// Delete (soft delete if model supports deleted_at)
export async function deleteUser(id: number) {
  await client.usersDeleteRecord({
    recordId: id,
    actor: "operator"
  });
}

// Restore
export async function restoreUser(id: number) {
  const res = await client.usersRestoreRecord({
    recordId: id,
    actor: "operator"
  });
  return res.record;
}

// List
export async function listUsers() {
  const res = await client.usersListRecords({
    includeDeleted: false
  });
  return res.records;
}

// Find by exact match
export async function findUserByEmail(email: string) {
  const res = await client.usersFindRecords({
    filters: [
      { field: "email", operation: "eq", value: email }
    ]
  });
  return res.records;
}

// Find with multiple conditions (AND logic)
export async function findActiveAdmins() {
  const res = await client.usersFindRecords({
    filters: [
      { field: "role", operation: "eq", value: "admin" },
      { field: "age", operation: "gte", value: "18" }
    ]
  });
  return res.records;
}

// Find with null checks
export async function findUnverifiedUsers() {
  const res = await client.usersFindRecords({
    filters: [
      { field: "verified_at", operation: "is_null" }
    ]
  });
  return res.records;
}

// Complex query example
export async function findRecentPremiumUsers(cutoffDate: string) {
  const res = await client.usersFindRecords({
    filters: [
      { field: "subscription", operation: "in", value: ["premium", "enterprise"] },
      { field: "created_at", operation: "gte", value: cutoffDate },
      { field: "email_verified", operation: "is_not_null" }
    ],
    limit: 50,
    orderBy: "created_at",
    orderDesc: true
  });
  return res.records;
}

Filter Operations Reference

Operation Description
eq Exact match
ne Not equal to value
gt Greater than value
gte Greater than or equal
lt Less than value
lte Less than or equal
in Value in array
not_in Value not in array
contains Field contains substring
starts_with Field starts with prefix
ends_with Field ends with suffix
like Case-insensitive pattern match
is_null Field is null (no value needed)
is_not_null Field is not null (no value needed)

📖 API Reference

bootstrap

def bootstrap(
    *,
    database_url: Optional[str] = None,
    create_tables: bool = True,
) -> None

Initialize the database engine and optionally create tables. This function performs environment setup only.

build_storage_bundle

def build_storage_bundle(
    *,
    accessors: Sequence[ModelAccessor[Any]],
    max_records_per_model: Optional[int] = 5,
    enable_db_actions: bool = True,
) -> RpcBundle

Build a ConnectRPC RpcBundle exposing CRUD and database utilities. Returns an RpcBundle that can be added to a VentionApp using app.add_bundle().

ModelAccessor

ModelAccessor(
    model: Type[ModelType],
    component_name: str,
    *,
    enable_auditing: bool = True,
)

Read

  • get(id, include_deleted=False) -> Optional[ModelType]
  • all(include_deleted=False) -> List[ModelType]

Write

  • insert(obj, actor="internal") -> ModelType
  • save(obj, actor="internal") -> ModelType
  • delete(id, actor="internal") -> bool
  • restore(id, actor="internal") -> bool

Batch

  • insert_many(objs, actor="internal") -> List[ModelType]
  • delete_many(ids, actor="internal") -> int

Hooks

  • @accessor.before_insert()
  • @accessor.after_insert()
  • @accessor.before_update()
  • @accessor.after_update()
  • @accessor.before_delete()
  • @accessor.after_delete()

Parameters

  • enable_auditing: If False, disables audit logging for this accessor. Useful for models that shouldn't be audited (e.g., audit logs themselves). Defaults to True.

Database Helpers

  • database.set_database_url(url: str) -> None
  • database.get_engine() -> Engine
  • database.transaction() -> Iterator[Session]
  • database.use_session(session: Optional[Session] = None) -> Iterator[Session]

AuditLog model

class AuditLog(SQLModel, table=True):
    id: int
    timestamp: datetime
    component: str
    record_id: int
    operation: str
    actor: str
    before: Optional[Dict[str, Any]]
    after: Optional[Dict[str, Any]]

🔍 Troubleshooting & FAQ

  • Diagram endpoint fails → Ensure Graphviz + sqlalchemy-schemadisplay are installed.
  • No audit actor shown → Provide X-User header in API requests.
  • Soft delete not working → Your model must have a deleted_at field.
  • Restore fails → Ensure integrity_check=True passes when restoring backups.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

vention_storage-0.6.3.tar.gz (22.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

vention_storage-0.6.3-py3-none-any.whl (24.3 kB view details)

Uploaded Python 3

File details

Details for the file vention_storage-0.6.3.tar.gz.

File metadata

  • Download URL: vention_storage-0.6.3.tar.gz
  • Upload date:
  • Size: 22.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.10.12 Linux/6.11.0-1018-azure

File hashes

Hashes for vention_storage-0.6.3.tar.gz
Algorithm Hash digest
SHA256 3d20642909055ea58bb2d01a9ecdf7c528792540ea6f9ee24957cfb6f0e24d1b
MD5 cf15d81d132a921c95c14e74b3fb8989
BLAKE2b-256 5474b7fa1ab8fdf788358210af892f1c9dcbb7774877b2fbf5ee07445f12fa35

See more details on using hashes here.

File details

Details for the file vention_storage-0.6.3-py3-none-any.whl.

File metadata

  • Download URL: vention_storage-0.6.3-py3-none-any.whl
  • Upload date:
  • Size: 24.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.10.12 Linux/6.11.0-1018-azure

File hashes

Hashes for vention_storage-0.6.3-py3-none-any.whl
Algorithm Hash digest
SHA256 27c5e13306a567214f22decb75bb8fe816759a490042d653b4f09dea03790747
MD5 c54a4fbc89dfea7b12a0ce3c171632ff
BLAKE2b-256 5a2d6bca7d676992977d53a13fd01426c6580060e99c86c688e5f20c54129881

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page