Skip to main content

A framework for storing and managing component and application data for machine apps.

Project description

Vention Storage

A framework for storing and managing component and application data with persistence, validation, and audit trails for machine applications.

Table of Contents

✨ Features

  • Persistent storage with SQLite
  • Automatic audit trails (who, when, what changed)
  • Strong typing & validation via SQLModel
  • Lifecycle hooks before/after insert, update, delete
  • Soft delete with deleted_at fields
  • ConnectRPC bundle generation with Create, Read, Update, Delete + audit
  • Health & monitoring actions (audit log, schema diagram)
  • Batch operations for insert/delete
  • Session management with smart reuse & transactions
  • Bootstrap system for one-command setup
  • CSV export/import for backups and migration
  • Database backup/restore with integrity checking

🧠 Concepts & Overview

Vention Storage is a component-based persistence layer for machine apps:

  • Database → SQLite database with managed sessions and transactions
  • ModelAccessor → Strongly-typed Create, Read, Update, Delete interface for your SQLModel classes
  • Hooks → Functions that run before/after Create, Read, Update, Delete operations
  • AuditLog → Automatically records all data mutations
  • RpcBundle → Auto-generated ConnectRPC bundle with Create, Read, Update, Delete + database management actions

⚙️ Installation & Setup

pip install vention-storage

Optional dependencies:

  • sqlalchemy-schemadisplay and Graphviz → enable database schema visualization

MacOS:

brew install graphviz
pip install sqlalchemy-schemadisplay

Linux (Debian/Ubuntu)

sudo apt-get install graphviz
pip install sqlalchemy-schemadisplay

🚀 Quickstart Tutorial

Define a model, bootstrap storage, and get full Create, Read, Update, Delete RPC actions in minutes:

from datetime import datetime
from typing import Optional
from sqlmodel import Field, SQLModel
from communication.app import VentionApp
from storage.bootstrap import bootstrap
from storage.accessor import ModelAccessor
from storage.vention_communication import build_storage_bundle

class User(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str
    email: str
    deleted_at: Optional[datetime] = Field(default=None, index=True)

# Initialize database
bootstrap(
    database_url="sqlite:///./my_app.db",
    create_tables=True
)

# Create accessor
user_accessor = ModelAccessor(User, "users")

# Build RPC bundle and add to app
app = VentionApp(name="my-app")
storage_bundle = build_storage_bundle(
    accessors=[user_accessor],
    max_records_per_model=100,
    enable_db_actions=True
)
app.add_bundle(storage_bundle)
app.finalize()

➡️ You now have Create, Read, Update, Delete, audit, backup, and CSV actions available via ConnectRPC.

🛠 How-to Guides

Bootstrap Multiple Models

# user_accessor was created earlier in the Quickstart example
# Reuse it here to bootstrap multiple models at once

product_accessor = ModelAccessor(Product, "products")

# Build bundle with multiple accessors
storage_bundle = build_storage_bundle(
    accessors=[user_accessor, product_accessor],
    max_records_per_model=100,
    enable_db_actions=True
)
app.add_bundle(storage_bundle)

Export to CSV

# Using ConnectRPC client
from communication.client import ConnectClient

client = ConnectClient("http://localhost:8000")
response = await client.call("Database_ExportZip", {})
with open("backup.zip", "wb") as f:
    f.write(response.data)

Backup & Restore

# Backup
backup_response = await client.call("Database_BackupSqlite", {})
with open(backup_response.filename, "wb") as f:
    f.write(backup_response.data)

# Restore
with open("backup.sqlite", "rb") as f:
    restore_response = await client.call(
        "Database_RestoreSqlite",
        {
            "bytes": f.read(),
            "filename": "backup.sqlite",
            "integrity_check": True,
            "dry_run": False
        }
    )

Use Lifecycle Hooks

@user_accessor.before_insert()
def validate_email(session, instance):
    if "@" not in instance.email:
        raise ValueError("Invalid email")

@user_accessor.after_insert()
def log_creation(session, instance):
    print(f"User created: {instance.name}")

Query Audit Logs

from storage.auditor import AuditLog
from sqlmodel import select

with database.transaction() as session:
    logs = session.exec(select(AuditLog).where(AuditLog.component == "users")).all()

Using the model accessors

# Create
user = user_accessor.insert(User(name="Alice", email="alice@example.com"), actor="admin")

# Read
user = user_accessor.get(user.id)

# Update
user.name = "Alice Smith"
user_accessor.save(user, actor="admin")

# Delete
user_accessor.delete(user.id, actor="admin")

# Restore (for soft-deleted models)
user_accessor.restore(user.id, actor="admin")

# Find users by exact match
users = user_accessor.find(user_accessor.where.email == "alice@example.com")

# Multiple conditions (AND logic)
users = user_accessor.find(
    user_accessor.where.name == "Alice",
    user_accessor.where.email == "alice@example.com"
)

# Comparison operators
adults = user_accessor.find(user_accessor.where.age >= 18)
recent = user_accessor.find(user_accessor.where.created_at > cutoff_date)

# String operations
smiths = user_accessor.find(user_accessor.where.name.contains("Smith"))
gmail_users = user_accessor.find(user_accessor.where.email.endswith("@gmail.com"))
search = user_accessor.find(user_accessor.where.name.ilike("%alice%"))  # case-insensitive

# Collection check
admins = user_accessor.find(user_accessor.where.role.in_(["admin", "superadmin"]))

# Null checks
unverified = user_accessor.find(user_accessor.where.verified_at.is_(None))
verified = user_accessor.find(user_accessor.where.verified_at.isnot(None))

# With pagination and sorting
page = user_accessor.find(
    user_accessor.where.status == "active",
    limit=10,
    offset=20,
    order_by="created_at",
    order_desc=True
)

# Include soft-deleted records
all_users = user_accessor.find(
    user_accessor.where.role == "admin",
    include_deleted=True
)

Using ConnectRPC Client

Once the bundle is added to your VentionApp, each ModelAccessor automatically exposes full CRUD actions via ConnectRPC.

Example: interacting with the Users RPC actions.

import { createPromiseClient } from "@connectrpc/connect";
import { createConnectTransport } from "@connectrpc/connect-web";

const transport = createConnectTransport({
  baseUrl: "http://localhost:8000",
});

const client = createPromiseClient(YourServiceClient, transport);

// Create
export async function createUser(name: string, email: string) {
  const res = await client.usersCreateRecord({
    record: { name, email },
    actor: "operator"
  });
  return res.record;
}

// Read
export async function getUser(id: number) {
  const res = await client.usersGetRecord({
    recordId: id,
    includeDeleted: false
  });
  return res.record;
}

// Update
export async function updateUser(id: number, name: string) {
  const res = await client.usersUpdateRecord({
    recordId: id,
    record: { name },
    actor: "operator"
  });
  return res.record;
}

// Delete (soft delete if model supports deleted_at)
export async function deleteUser(id: number) {
  await client.usersDeleteRecord({
    recordId: id,
    actor: "operator"
  });
}

// Restore
export async function restoreUser(id: number) {
  const res = await client.usersRestoreRecord({
    recordId: id,
    actor: "operator"
  });
  return res.record;
}

// List
export async function listUsers() {
  const res = await client.usersListRecords({
    includeDeleted: false
  });
  return res.records;
}

// Find by exact match
export async function findUserByEmail(email: string) {
  const res = await client.usersFindRecords({
    filters: [
      { field: "email", operation: "eq", value: email }
    ]
  });
  return res.records;
}

// Find with multiple conditions (AND logic)
export async function findActiveAdmins() {
  const res = await client.usersFindRecords({
    filters: [
      { field: "role", operation: "eq", value: "admin" },
      { field: "age", operation: "gte", value: "18" }
    ]
  });
  return res.records;
}

// Find with null checks
export async function findUnverifiedUsers() {
  const res = await client.usersFindRecords({
    filters: [
      { field: "verified_at", operation: "is_null" }
    ]
  });
  return res.records;
}

// Complex query example
export async function findRecentPremiumUsers(cutoffDate: string) {
  const res = await client.usersFindRecords({
    filters: [
      { field: "subscription", operation: "in", value: ["premium", "enterprise"] },
      { field: "created_at", operation: "gte", value: cutoffDate },
      { field: "email_verified", operation: "is_not_null" }
    ],
    limit: 50,
    orderBy: "created_at",
    orderDesc: true
  });
  return res.records;
}

Filter Operations Reference

Operation Description
eq Exact match
ne Not equal to value
gt Greater than value
gte Greater than or equal
lt Less than value
lte Less than or equal
in Value in array
not_in Value not in array
contains Field contains substring
starts_with Field starts with prefix
ends_with Field ends with suffix
like Case-insensitive pattern match
is_null Field is null (no value needed)
is_not_null Field is not null (no value needed)

📖 API Reference

bootstrap

def bootstrap(
    *,
    database_url: Optional[str] = None,
    create_tables: bool = True,
) -> None

Initialize the database engine and optionally create tables. This function performs environment setup only.

build_storage_bundle

def build_storage_bundle(
    *,
    accessors: Sequence[ModelAccessor[Any]],
    max_records_per_model: Optional[int] = 5,
    enable_db_actions: bool = True,
) -> RpcBundle

Build a ConnectRPC RpcBundle exposing CRUD and database utilities. Returns an RpcBundle that can be added to a VentionApp using app.add_bundle().

ModelAccessor

ModelAccessor(
    model: Type[ModelType],
    component_name: str,
    *,
    enable_auditing: bool = True,
)

Read

  • get(id, include_deleted=False) -> Optional[ModelType]
  • all(include_deleted=False) -> List[ModelType]

Write

  • insert(obj, actor="internal") -> ModelType
  • save(obj, actor="internal") -> ModelType
  • delete(id, actor="internal") -> bool
  • restore(id, actor="internal") -> bool

Batch

  • insert_many(objs, actor="internal") -> List[ModelType]
  • delete_many(ids, actor="internal") -> int

Hooks

  • @accessor.before_insert()
  • @accessor.after_insert()
  • @accessor.before_update()
  • @accessor.after_update()
  • @accessor.before_delete()
  • @accessor.after_delete()

Parameters

  • enable_auditing: If False, disables audit logging for this accessor. Useful for models that shouldn't be audited (e.g., audit logs themselves). Defaults to True.

Database Helpers

  • database.set_database_url(url: str) -> None
  • database.get_engine() -> Engine
  • database.transaction() -> Iterator[Session]
  • database.use_session(session: Optional[Session] = None) -> Iterator[Session]

AuditLog model

class AuditLog(SQLModel, table=True):
    id: int
    timestamp: datetime
    component: str
    record_id: int
    operation: str
    actor: str
    before: Optional[Dict[str, Any]]
    after: Optional[Dict[str, Any]]

🔍 Troubleshooting & FAQ

  • Diagram endpoint fails → Ensure Graphviz + sqlalchemy-schemadisplay are installed.
  • No audit actor shown → Provide X-User header in API requests.
  • Soft delete not working → Your model must have a deleted_at field.
  • Restore fails → Ensure integrity_check=True passes when restoring backups.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

vention_storage-0.6.6.tar.gz (22.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

vention_storage-0.6.6-py3-none-any.whl (24.0 kB view details)

Uploaded Python 3

File details

Details for the file vention_storage-0.6.6.tar.gz.

File metadata

  • Download URL: vention_storage-0.6.6.tar.gz
  • Upload date:
  • Size: 22.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.10.12 Linux/6.11.0-1018-azure

File hashes

Hashes for vention_storage-0.6.6.tar.gz
Algorithm Hash digest
SHA256 623a50e3deb266085616d9f1056c9448193dae1785c57986515d75ec9e43f3f9
MD5 114f32c88aa24bafe7d2611be287bcbf
BLAKE2b-256 7868970fc72e58bdd858157d091be6895e61c8de17079c9170fb6dbfb40bb991

See more details on using hashes here.

File details

Details for the file vention_storage-0.6.6-py3-none-any.whl.

File metadata

  • Download URL: vention_storage-0.6.6-py3-none-any.whl
  • Upload date:
  • Size: 24.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.10.12 Linux/6.11.0-1018-azure

File hashes

Hashes for vention_storage-0.6.6-py3-none-any.whl
Algorithm Hash digest
SHA256 d7f9bb25a3a5bbe43c208595a61554c00c6801303f936ef3dddc0635a4165d16
MD5 2f7321cbce7b9f350038ecda8364acd8
BLAKE2b-256 7a2f68f9dddf9a25622aea4db5a42835aa4aa52f9de714a406ee69be866a0e8c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page