Micro-ORM with optional Pydantic (Pyodide compatible with lite models)
Project description
SQLer
English | 日本語はこちら
A lightweight, JSON-first micro-ORM for SQLite (sync + async). Define Pydantic-style models, persist them as JSON, and query with a fluent API — with optional safe models that enforce optimistic versioning.
Why SQLer?
This started as a personal toolkit for very fast prototyping; small scripts that made it effortless to sketch data models, shove them into SQLite as JSON, and iterate. The result became SQLer: a tidy, dependency-light package that keeps that prototyping speed, but adds the pieces you need for real projects (indexes, relationships, integrity policies, and honest concurrency).
Features
- Document-style models backed by SQLite JSON1
- Fluent query builder:
filter,exclude,contains,isin,.any().where(...) - Extended field operations:
between,is_null,is_not_null,startswith,endswith,glob,in_list - NULL-safe comparisons:
F("field") == Nonegenerates properIS NULL - Aggregations:
sum,avg,min,max,exists,paginate - Relationships with simple reference storage and hydration
- Safe models with
_versionand optimistic locking (stale writes raise) - Configurable intent rebasing for automatic conflict resolution
- Bulk operations (
bulk_upsert,bulk_insert) - Transaction-aware saves:
model.save()respects explicit transactions (rollback works!) - Index management:
create_index,drop_index,list_indexes,index_exists - Model mixins: timestamps, soft delete (with
active(),only_deleted(),with_deleted()), lifecycle hooks - Auto-calling hooks:
HooksMixinautomatically callsbefore_save/after_saveinsave() - Integrity policies on delete:
restrict,set_null,cascade - Query logging for debugging and performance profiling
- Raw SQL escape hatch (parameterized), with model hydration when returning
_id, data - Sync & Async APIs with matching semantics
- WAL-friendly concurrency via thread-local connections (many readers, one writer)
- Smart table naming: proper English pluralization (category→categories, box→boxes)
- Opt-in perf tests and practical indexing guidance
- Query caching with TTL and LRU eviction
- Data export/import: CSV, JSON, JSONL (sync + async)
- Full-text search via FTS5 with SearchableMixin
- Connection pooling for high-concurrency scenarios
- Schema migrations with version tracking (sync + async)
- Metrics collection for monitoring and Prometheus export
- Database operations: backup, restore, health checks, vacuum
- Change tracking with dirty field detection and partial updates
Interactive Tour
Learn SQLer through marimo notebooks! Run and modify code directly in your browser.
| Tour | Topics |
|---|---|
| 01. Fundamentals | Setup, models, CRUD, queries, aggregations |
| 02. Relationships | Model references, hydration, cross-model queries |
| 03. Safe Models | Optimistic locking, StaleVersionError, conflict resolution |
| 04. Transactions | Atomic operations, rollback, nested transactions |
| 05. Mixins | Timestamps, soft delete, audit logging, lifecycle hooks |
| 06. Advanced | Bulk ops, indexes, integrity policies, raw SQL |
| 07. Export/Import | CSV, JSON, JSONL data export and import |
| 08. Full-Text Search | FTS5 indexing, boolean queries, ranking, highlights |
| 09. Change Tracking | TrackedModel, dirty checking, partial updates, diff |
| 10. Database Operations | Health checks, statistics, vacuum, pagination, logging |
| 11. Metrics & Caching | Prometheus metrics, query caching, connection pools |
Run Locally (Interactive)
Clone the repo and run any tour interactively with marimo:
git clone https://github.com/gabu-quest/sqler.git
cd sqler
uv sync --dev
uv run marimo edit examples/tour_01_fundamentals.py
What's New (Latest)
Transaction-Aware Saves
model.save() now respects explicit transactions. Previously, saves would commit immediately even inside a transaction block, breaking rollback behavior. Now:
with db.transaction():
User(name="Alice").save() # NOT committed yet
User(name="Bob").save() # NOT committed yet
raise RuntimeError("Oops!")
# Both saves are rolled back!
Soft Delete Convenience Methods
SoftDeleteMixin now provides class methods for querying:
User.active() # Only non-deleted records
User.only_deleted() # Only deleted records
User.with_deleted() # All records including deleted
Extended Query Builder
New field operations for more expressive queries:
F("age").between(18, 65) # BETWEEN ? AND ?
F("email").is_null() # IS NULL
F("email").is_not_null() # IS NOT NULL
F("name").startswith("Al") # LIKE 'Al%' ESCAPE '\'
F("email").endswith("@gmail.com") # LIKE '%@gmail.com' ESCAPE '\'
F("path").glob("/home/*") # GLOB pattern
F("status").in_list(["a", "b"]) # IN (?, ?)
NULL-Safe Comparisons
F("field") == None now generates IS NULL (not = NULL):
# Before: broken SQL "= NULL" (always false)
# After: correct SQL "IS NULL"
User.query().filter(F("deleted_at") == None) # Works correctly!
Configurable Intent Rebasing
Control automatic conflict resolution for safe models:
from sqler import RebaseConfig, PERMISSIVE_REBASE_CONFIG, NO_REBASE_CONFIG
class Counter(SQLerSafeModel):
count: int = 0
_rebase_config = PERMISSIVE_REBASE_CONFIG # Allow rebasing any numeric field
Auto-Calling Lifecycle Hooks
HooksMixin now automatically calls hooks in save() and delete():
class AuditedUser(HooksMixin, SQLerModel):
email: str
def before_save(self) -> bool:
self.email = self.email.lower()
return True # False aborts save
user = AuditedUser(email="ALICE@TEST.COM")
user.save() # Hooks called automatically!
Index Management
Query and manage indexes programmatically:
db.list_indexes() # All indexes
db.list_indexes("users") # Indexes on 'users' table
db.index_exists("idx_users_email") # Check if exists
Smart Table Naming
Proper English pluralization for table names:
class Category(SQLerModel): ... # → categories (not categorys)
class Box(SQLerModel): ... # → boxes (not boxs)
class Company(SQLerModel): ... # → companies (not companys)
Install
pip install sqler
Requires Python 3.12+ and SQLite with JSON1 (bundled on most platforms).
Public API Contract
Each subsection carries a Contract ID. The suite in
tests/test_readme.pyexecutes these snippets using only the documented public APIs. When the contract section changes, the tests must change with it — CI proves the README.
[C01] Sync quickstart: models, save, query
from sqler import SQLerDB, SQLerModel
from sqler.query import SQLerField as F
class Prefecture(SQLerModel):
name: str
region: str
population: int
foods: list[str] | None = None
class City(SQLerModel):
name: str
population: int
prefecture: Prefecture | None = None
db = SQLerDB.in_memory()
Prefecture.set_db(db)
City.set_db(db)
kyoto = Prefecture(name="Kyoto", region="Kansai", population=2_585_000, foods=["matcha","yudofu"]).save()
osaka = Prefecture(name="Osaka", region="Kansai", population=8_839_000, foods=["takoyaki"]).save()
shiga = Prefecture(name="Shiga", region="Kansai", population=1_413_000, foods=["funazushi"]).save()
City(name="Kyoto City", population=1_469_000, prefecture=kyoto).save()
City(name="Osaka City", population=2_750_000, prefecture=osaka).save()
City(name="Otsu", population=343_000, prefecture=shiga).save()
big = Prefecture.query().filter(F("population") > 1_000_000).order_by("population", desc=True).all()
assert [p.name for p in big][:2] == ["Osaka", "Kyoto"]
[C02] Async quickstart (matching semantics)
import asyncio
from sqler import AsyncSQLerDB, AsyncSQLerModel
from sqler.query import SQLerField as F
class AUser(AsyncSQLerModel):
name: str
age: int
async def main():
db = AsyncSQLerDB.in_memory()
await db.connect()
AUser.set_db(db)
await AUser(name="Ada", age=36).save()
adults = await AUser.query().filter(F("age") >= 18).order_by("age").all()
assert any(u.name == "Ada" for u in adults)
await db.close()
asyncio.run(main())
[C03] Query builder: .any().where(...)
from sqler import SQLerDB, SQLerModel
from sqler.query import SQLerField as F
class Order(SQLerModel):
customer: str
items: list[dict] | None = None
db = SQLerDB.in_memory()
Order.set_db(db)
Order(customer="C1", items=[{"sku":"RamenSet","qty":3}, {"sku":"Gyoza","qty":1}]).save()
Order(customer="C2", items=[{"sku":"RamenSet","qty":1}]).save()
expr = F(["items"]).any().where((F("sku") == "RamenSet") & (F("qty") >= 2))
hits = Order.query().filter(expr).all()
assert [h.customer for h in hits] == ["C1"]
[C04] Relationships: hydration & cross-filtering
from sqler import SQLerDB, SQLerModel
class Address(SQLerModel):
city: str
country: str
class User(SQLerModel):
name: str
address: Address | None = None
db = SQLerDB.in_memory()
Address.set_db(db)
User.set_db(db)
home = Address(city="Kyoto", country="JP").save()
user = User(name="Alice", address=home).save()
got = User.from_id(user._id)
assert got.address.city == "Kyoto"
qs = User.query().filter(User.ref("address").field("city") == "Kyoto")
assert any(row.name == "Alice" for row in qs.all())
[C05] Index helpers, debug(), and explain_query_plan()
from sqler import SQLerDB, SQLerModel
from sqler.query import SQLerField as F
db = SQLerDB.in_memory()
class Prefecture(SQLerModel):
name: str
region: str
population: int
Prefecture.set_db(db)
Prefecture(name="A", region="x", population=10).save()
Prefecture(name="B", region="x", population=2_000_000).save()
db.create_index("prefectures", "population")
Prefecture.ensure_index("population")
q = Prefecture.query().filter(F("population") >= 1_000_000)
sql, params = q.debug()
assert isinstance(sql, str) and isinstance(params, list)
plan = q.explain_query_plan(Prefecture.db().adapter)
assert plan and len(list(plan)) >= 1
[C06] Safe models: optimistic versioning
from sqler import SQLerDB, SQLerSafeModel, StaleVersionError
class Account(SQLerSafeModel):
owner: str
balance: int
db = SQLerDB.in_memory()
Account.set_db(db)
acc = Account(owner="Ada", balance=100).save()
acc.balance = 120
acc.save()
table = getattr(Account, "__tablename__", "accounts")
db.adapter.execute(
f"UPDATE {table} SET data = json_set(data,'$._version', json_extract(data,'$._version') + 1) WHERE _id = ?;",
[acc._id],
)
db.adapter.commit()
acc.balance = 130
try:
acc.save()
except StaleVersionError:
pass
else:
raise AssertionError("stale writes must raise")
[C07] bulk_upsert — one id per input, order preserved
from sqler import SQLerDB, SQLerModel
class BU(SQLerModel):
name: str
age: int
db = SQLerDB.in_memory()
BU.set_db(db)
rows = [{"name":"A"}, {"name":"B"}, {"_id": 42, "name":"C"}]
ids = db.bulk_upsert("bus", rows)
assert ids[2] == 42
assert all(isinstance(i, int) and i > 0 for i in ids)
[C08] Raw SQL escape hatch + Model.from_id
rows = db.execute_sql(
"SELECT _id FROM bus WHERE json_extract(data,'$.name') = ?",
["A"],
)
ids = [r.get("_id") if isinstance(r, dict) else r[0] for r in rows]
hydrated = [BU.from_id(i) for i in ids]
assert all(isinstance(h, BU) for h in hydrated)
[C09] Delete policies: restrict
from sqler import SQLerDB, SQLerModel, ReferentialIntegrityError
class U(SQLerModel):
name: str
class Post(SQLerModel):
title: str
author: dict | None = None
db = SQLerDB.in_memory()
U.set_db(db)
Post.set_db(db)
u = U(name="Writer").save()
Post(title="Post A", author={"_table":"u","_id":u._id}).save()
try:
u.delete_with_policy(on_delete="restrict")
except ReferentialIntegrityError:
pass
else:
raise AssertionError("restrict deletes must block when referenced")
[C10] Index variants: unique + partial
from sqler import SQLerDB, SQLerModel
class X(SQLerModel):
name: str
email: str | None = None
db = SQLerDB.in_memory()
X.set_db(db)
db.create_index("xs", "email", unique=True)
db.create_index("xs", "name", where="json_extract(data,'$.name') IS NOT NULL")
Quickstart (Sync)
[C11] Create, query, close
from sqler import SQLerDB, SQLerModel
from sqler.query import SQLerField as F
class User(SQLerModel):
name: str
age: int
db = SQLerDB.on_disk("app.db")
User.set_db(db) # binds model to table "users" (override with table="...")
# Create / save
u = User(name="Alice", age=30)
u.save()
print(u._id) # assigned _id
# Query
adults = User.query().filter(F("age") >= 18).order_by("age").all()
print([a.name for a in adults])
db.close()
Quickstart (Async)
[C12] Async match to sync
import asyncio
from sqler import AsyncSQLerDB, AsyncSQLerModel
from sqler.query import SQLerField as F
class AUser(AsyncSQLerModel):
name: str
age: int
async def main():
db = AsyncSQLerDB.in_memory()
await db.connect()
AUser.set_db(db)
u = AUser(name="Ada", age=36)
await u.save()
adults = await AUser.query().filter(F("age") >= 18).order_by("age").all()
print([a.name for a in adults])
await db.close()
asyncio.run(main())
Safe Models & Optimistic Versioning
Use SQLerSafeModel when you need concurrency safety. New rows start with _version = 0. Updates require the in-memory _version; on success it bumps by 1. If the row changed underneath you, a StaleVersionError is raised.
[C13] Safe model collision handling
from sqler import SQLerDB, SQLerSafeModel, StaleVersionError
class Account(SQLerSafeModel):
owner: str
balance: int
db = SQLerDB.on_disk("bank.db")
Account.set_db(db)
acc = Account(owner="Ada", balance=100)
acc.save() # _version == 0
acc.balance = 120
acc.save() # _version == 1
# Simulate concurrent change
db.adapter.execute("UPDATE accounts SET _version = _version + 1 WHERE _id = ?;", [acc._id])
db.adapter.commit()
# This write is stale → raises
try:
acc.balance = 130
acc.save()
except StaleVersionError:
acc.refresh() # reloads both fields and _version
Relationships
Store references to other models, hydrate them automatically, and filter across JSON references.
[C14] Store and query relationships
from sqler import SQLerDB, SQLerModel
class Address(SQLerModel):
city: str
country: str
class User(SQLerModel):
name: str
address: Address | None = None
db = SQLerDB.in_memory()
Address.set_db(db)
User.set_db(db)
home = Address(city="Kyoto", country="JP").save()
user = User(name="Alice", address=home).save()
loaded = User.from_id(user._id)
assert loaded.address.city == "Kyoto"
q = User.query().filter(User.ref("address").field("city") == "Kyoto")
assert [row.name for row in q.all()] == ["Alice"]
Query Builder
- Fields:
F("age"),F(["items","qty"]) - Predicates:
==,!=,<,<=,>,>=,contains,isin - Boolean ops:
&(AND),|(OR),~(NOT) - Exclude: invert a predicate set
- Arrays:
.any()and scoped.any().where(...)
When you call Model.query(), introspection helpers include .debug() (returns (sql, params)), plus .sql() and .params() methods that mirror the underlying SQLerQuery properties.
[C15] Query builder patterns
from sqler import SQLerDB, SQLerModel
from sqler.query import SQLerField as F
class QueryUser(SQLerModel):
name: str
age: int
tags: list[str] | None = None
tier: int | None = None
class QueryOrder(SQLerModel):
customer: str
items: list[dict] | None = None
db = SQLerDB.in_memory()
QueryUser.set_db(db)
QueryOrder.set_db(db)
QueryUser(name="Ada", age=36, tags=["pro", "python"], tier=1).save()
QueryUser(name="Bob", age=20, tags=["hobby"], tier=3).save()
QueryOrder(customer="Ada", items=[{"sku": "ABC", "qty": 3}]).save()
QueryOrder(customer="Bob", items=[{"sku": "XYZ", "qty": 1}]).save()
q1 = QueryUser.query().filter(F("tags").contains("pro"))
assert [u.name for u in q1.all()] == ["Ada"]
q2 = QueryUser.query().filter(F("tier").isin([1, 2]))
assert [u.name for u in q2.all()] == ["Ada"]
q3 = QueryUser.query().exclude(F("name").like("test%"))
assert {u.name for u in q3.all()} == {"Ada", "Bob"}
expr = F(["items"]).any().where((F("sku") == "ABC") & (F("qty") >= 2))
q4 = QueryOrder.query().filter(expr)
assert [o.customer for o in q4.all()] == ["Ada"]
sql, params = QueryUser.query().filter(F("age") >= 18).debug()
assert isinstance(sql, str) and params == [18]
plan = QueryUser.query().filter(F("age") >= 18).explain_query_plan(QueryUser.db().adapter)
assert plan and len(list(plan)) >= 1
Data Integrity
Delete Policies (restrict, set_null, cascade)
Control how deletions affect JSON references in related rows.
restrict(default): prevent deletion if anything still references the rowset_null: null out the JSON field that holds the reference (field must be nullable)cascade: recursively delete referrers (depth-first, cycle-safe)
[C16] Delete policies in action
from sqler import SQLerDB, SQLerModel, ReferentialIntegrityError
class DIUser(SQLerModel):
name: str
class Post(SQLerModel):
title: str
author: dict | None = None
# restrict: raises while references exist
restrict_db = SQLerDB.in_memory()
DIUser.set_db(restrict_db)
Post.set_db(restrict_db)
writer = DIUser(name="Writer").save()
Post(title="Post A", author={"_table": "diusers", "_id": writer._id}).save()
try:
writer.delete_with_policy(on_delete="restrict")
except ReferentialIntegrityError:
pass
# set_null: clears JSON ref before delete
set_null_db = SQLerDB.in_memory()
DIUser.set_db(set_null_db)
Post.set_db(set_null_db)
nullable = DIUser(name="Nullable").save()
post = Post(title="Post B", author={"_table": "diusers", "_id": nullable._id}).save()
nullable.delete_with_policy(on_delete="set_null")
assert Post.from_id(post._id).author is None
# cascade: remove dependents recursively
cascade_db = SQLerDB.in_memory()
DIUser.set_db(cascade_db)
Post.set_db(cascade_db)
cascade = DIUser(name="Cascade").save()
Post(title="Post C", author={"_table": "diusers", "_id": cascade._id}).save()
cascade.delete_with_policy(on_delete="cascade")
assert Post.query().count() == 0
Reference Validation
Detect orphans proactively:
[C17] Reference validation
from sqler import SQLerDB, SQLerModel
class RefUser(SQLerModel):
name: str
class RefPost(SQLerModel):
title: str
author: dict | None = None
db = SQLerDB.in_memory()
RefUser.set_db(db)
RefPost.set_db(db)
user = RefUser(name="Ada").save()
dangling = RefPost(title="Lost", author={"_table": RefUser.__tablename__, "_id": user._id}).save()
db.delete_document(RefUser.__tablename__, user._id) # simulate manual deletion
broken = RefPost.validate_references()
assert broken and broken[0].row_id == dangling._id
# Returned items are sqler.models.BrokenRef dataclasses
Bulk Operations
Write many documents efficiently.
[C18] Bulk upsert
from sqler import SQLerDB, SQLerModel
class BulkUser(SQLerModel):
name: str
age: int | None = None
db = SQLerDB.in_memory()
BulkUser.set_db(db)
rows = [{"name": "A"}, {"name": "B"}, {"_id": 42, "name": "C"}]
ids = db.bulk_upsert(BulkUser.__tablename__, rows)
assert len(ids) == 3 and 42 in ids
Notes:
- If SQLite supports
RETURNING, SQLer uses it; otherwise a safe fallback is used. - For sustained heavy writes, favor a single-process writer (SQLite has a single writer at a time).
Transactions
Use explicit transactions for atomic multi-operation batches.
[C22] Transaction context manager
from sqler import SQLerDB, SQLerModel
class TxUser(SQLerModel):
name: str
balance: int = 0
db = SQLerDB.in_memory()
TxUser.set_db(db)
# Verify transaction API exists and works
tx = db.transaction()
assert hasattr(tx, "__enter__")
assert hasattr(tx, "__exit__")
assert hasattr(tx, "commit")
assert hasattr(tx, "rollback")
# Transaction context manager usage
with db.transaction():
db.insert_document("txusers", {"name": "Alice", "balance": 100})
db.insert_document("txusers", {"name": "Bob", "balance": 200})
assert TxUser.query().count() == 2
# Explicit commit/rollback methods
tx = db.transaction()
tx.__enter__()
db.insert_document("txusers", {"name": "Charlie", "balance": 300})
tx.commit()
assert TxUser.query().count() == 3
Query Aggregations
Perform aggregate calculations directly in the database.
[C23] Sum, avg, min, max
from sqler import SQLerDB, SQLerModel
from sqler.query import SQLerField as F
class Product(SQLerModel):
name: str
price: float
quantity: int
db = SQLerDB.in_memory()
Product.set_db(db)
Product(name="Apple", price=1.50, quantity=100).save()
Product(name="Banana", price=0.75, quantity=150).save()
Product(name="Cherry", price=3.00, quantity=50).save()
q = Product.query()
assert q.sum("quantity") == 300
assert q.avg("price") == 1.75
assert q.min("price") == 0.75
assert q.max("price") == 3.00
# With filters
expensive = Product.query().filter(F("price") > 1.0)
assert expensive.sum("quantity") == 150 # Apple + Cherry
[C24] Exists check
from sqler import SQLerDB, SQLerModel
from sqler.query import SQLerField as F
class Item(SQLerModel):
name: str
active: bool = True
db = SQLerDB.in_memory()
Item.set_db(db)
Item(name="Widget", active=True).save()
assert Item.query().filter(F("active") == True).exists() == True
assert Item.query().filter(F("name") == "Missing").exists() == False
Pagination
Built-in pagination with navigation helpers.
[C25] Paginate results
from sqler import SQLerDB, SQLerModel, PaginatedResult
class Article(SQLerModel):
title: str
views: int = 0
db = SQLerDB.in_memory()
Article.set_db(db)
for i in range(25):
Article(title=f"Article {i}", views=i * 10).save()
# Get page 2 with 10 items per page
page = Article.query().order_by("views", desc=True).paginate(page=2, per_page=10)
assert isinstance(page, PaginatedResult)
assert len(page.items) == 10
assert page.page == 2
assert page.total == 25
assert page.total_pages == 3
assert page.has_next == True
assert page.has_prev == True
assert page.next_page == 3
assert page.prev_page == 1
Model Mixins
Reusable mixins for common functionality.
[C26] Timestamps
from sqler import SQLerDB, SQLerModel, TimestampMixin
class Post(TimestampMixin, SQLerModel):
title: str
content: str
db = SQLerDB.in_memory()
Post.set_db(db)
post = Post(title="Hello", content="World")
post._set_timestamps() # Call before save for auto-timestamps
post = post.save()
assert post.created_at is not None
assert post.updated_at is not None
[C27] Soft delete
from sqler import SQLerDB, SQLerModel, SoftDeleteMixin
class Comment(SoftDeleteMixin, SQLerModel):
text: str
db = SQLerDB.in_memory()
Comment.set_db(db)
c = Comment(text="Nice post!").save()
assert c.is_deleted == False
c.soft_delete()
assert c.is_deleted == True
assert c.deleted_at is not None
c.restore()
assert c.is_deleted == False
assert c.deleted_at is None
# Verify the comment was restored and can be queried
all_comments = Comment.query().all()
assert len(all_comments) == 1
assert all_comments[0].is_deleted == False
[C28] Lifecycle hooks
from sqler import SQLerDB, SQLerModel, HooksMixin
class AuditedUser(HooksMixin, SQLerModel):
email: str
normalized: bool = False
def before_save(self) -> bool:
self.email = self.email.lower().strip()
self.normalized = True
return True # Continue with save
def after_save(self) -> None:
pass # Log, notify, etc.
db = SQLerDB.in_memory()
AuditedUser.set_db(db)
# Hooks are called manually by caller
u = AuditedUser(email=" ALICE@Example.COM ")
if u.before_save():
u = u.save()
u.after_save()
assert u.email == "alice@example.com"
assert u.normalized == True
Query Logging
Debug and profile queries with the built-in logger.
[C29] Query logging
from sqler import SQLerDB, SQLerModel, query_logger
from sqler.query import SQLerField as F
class LoggedUser(SQLerModel):
name: str
age: int
db = SQLerDB.in_memory()
LoggedUser.set_db(db)
# Enable logging
query_logger.enable()
LoggedUser(name="Ada", age=36).save()
LoggedUser(name="Bob", age=25).save()
# Note: query_logger captures queries when integrated with adapter
# Here we demonstrate the logger API
query_logger.log("SELECT * FROM loggedusers", [], 0.5)
query_logger.log("SELECT * FROM loggedusers WHERE age > ?", [30], 1.2)
# Get logged queries
logs = query_logger.logs
assert len(logs) >= 2
# Get slow queries
slow = query_logger.get_slow_queries(threshold_ms=1.0)
assert len(slow) >= 1
# Get stats
stats = query_logger.get_stats()
assert "count" in stats
assert "avg_time_ms" in stats
query_logger.disable()
query_logger.clear()
Transaction-Aware Operations
Model operations now respect explicit transactions, allowing proper rollback behavior.
[C30] Transaction-aware model.save()
from sqler import SQLerDB, SQLerModel
class TxItem(SQLerModel):
name: str
db = SQLerDB.in_memory()
TxItem.set_db(db)
# Saves inside transaction respect rollback
try:
with db.transaction():
TxItem(name="A").save()
TxItem(name="B").save()
raise RuntimeError("abort!")
except RuntimeError:
pass
# Nothing was saved due to rollback
assert TxItem.query().count() == 0
# Without transaction, saves commit immediately
TxItem(name="C").save()
TxItem(name="D").save()
assert TxItem.query().count() == 2
Extended Query Builder
New field operations for more expressive queries.
[C31] Field operations: between, startswith, endswith, glob
from sqler import SQLerDB, SQLerModel
from sqler.query import SQLerField as F
class Employee(SQLerModel):
name: str
age: int
email: str
db = SQLerDB.in_memory()
Employee.set_db(db)
Employee(name="Alice", age=25, email="alice@example.com").save()
Employee(name="Bob", age=35, email="bob@test.org").save()
Employee(name="Charlie", age=45, email="charlie@example.com").save()
# between (inclusive)
mid_age = Employee.query().filter(F("age").between(30, 40)).all()
assert [e.name for e in mid_age] == ["Bob"]
# startswith
alice = Employee.query().filter(F("name").startswith("Al")).all()
assert [e.name for e in alice] == ["Alice"]
# endswith
example_emails = Employee.query().filter(F("email").endswith("@example.com")).all()
assert len(example_emails) == 2
# is_null / is_not_null
Employee(name="NoEmail", age=30, email="").save() # empty but not null
all_with_email = Employee.query().filter(F("email").is_not_null()).all()
assert len(all_with_email) == 4
[C32] NULL-safe comparison with == None
from sqler import SQLerDB, SQLerModel, SoftDeleteMixin
from sqler.query import SQLerField as F
class SoftUser(SoftDeleteMixin, SQLerModel):
name: str
db = SQLerDB.in_memory()
SoftUser.set_db(db)
active = SoftUser(name="Active").save()
deleted = SoftUser(name="Deleted").save()
deleted.soft_delete()
# F("field") == None generates IS NULL (correct SQL)
# F("field") != None generates IS NOT NULL
active_users = SoftUser.query().filter(F("deleted_at") == None).all()
deleted_users = SoftUser.query().filter(F("deleted_at") != None).all()
assert len(active_users) == 1 and active_users[0].name == "Active"
assert len(deleted_users) == 1 and deleted_users[0].name == "Deleted"
[C33] in_list for multiple value matching
from sqler import SQLerDB, SQLerModel
from sqler.query import SQLerField as F
class Status(SQLerModel):
code: str
label: str
db = SQLerDB.in_memory()
Status.set_db(db)
Status(code="A", label="Active").save()
Status(code="P", label="Pending").save()
Status(code="C", label="Closed").save()
Status(code="D", label="Draft").save()
# in_list with values
open_statuses = Status.query().filter(F("code").in_list(["A", "P"])).all()
assert len(open_statuses) == 2
# Empty list returns nothing
empty = Status.query().filter(F("code").in_list([])).all()
assert len(empty) == 0
Soft Delete Convenience Methods
Query soft-deleted records easily with class methods.
[C34] SoftDeleteMixin class methods
from sqler import SQLerDB, SQLerModel, SoftDeleteMixin
class Document(SoftDeleteMixin, SQLerModel):
title: str
db = SQLerDB.in_memory()
Document.set_db(db)
doc1 = Document(title="Active Doc").save()
doc2 = Document(title="Deleted Doc").save()
doc3 = Document(title="Another Active").save()
doc2.soft_delete()
# active() - only non-deleted
active = Document.active().all()
assert len(active) == 2
assert all(d.deleted_at is None for d in active)
# only_deleted() - only deleted
deleted = Document.only_deleted().all()
assert len(deleted) == 1
assert deleted[0].title == "Deleted Doc"
# with_deleted() - all records
all_docs = Document.with_deleted().all()
assert len(all_docs) == 3
Index Management
Query and manage database indexes programmatically.
[C35] list_indexes and index_exists
from sqler import SQLerDB, SQLerModel
class Product(SQLerModel):
sku: str
price: float
db = SQLerDB.in_memory()
Product.set_db(db)
# Create indexes
db.create_index("products", "sku", unique=True, name="idx_products_sku")
db.create_index("products", "price", name="idx_products_price")
# List all indexes
all_indexes = db.list_indexes()
assert len(all_indexes) >= 2
# List indexes for specific table
product_indexes = db.list_indexes("products")
assert len(product_indexes) == 2
# Check if index exists
assert db.index_exists("idx_products_sku") == True
assert db.index_exists("nonexistent_index") == False
# Index info includes uniqueness
sku_idx = next(i for i in product_indexes if i["name"] == "idx_products_sku")
assert sku_idx["unique"] == True
Configurable Intent Rebasing
Control how safe models handle concurrent numeric field updates.
[C36] RebaseConfig for safe models
from sqler import SQLerDB, SQLerSafeModel, StaleVersionError
from sqler.models.utils import RebaseConfig, PERMISSIVE_REBASE_CONFIG, NO_REBASE_CONFIG
class Counter(SQLerSafeModel):
value: int = 0
count: int = 0
# Allow rebasing any numeric field with delta ±1
_rebase_config = PERMISSIVE_REBASE_CONFIG
class StrictCounter(SQLerSafeModel):
value: int = 0
# No rebasing - any conflict raises
_rebase_config = NO_REBASE_CONFIG
db = SQLerDB.in_memory()
Counter.set_db(db)
StrictCounter.set_db(db)
# Permissive: increments can be rebased
c = Counter(value=0, count=0).save()
c.value += 1
c.count += 1
c.save() # Works even if version changed (for small deltas)
# Strict: no automatic rebasing
s = StrictCounter(value=0).save()
s.value += 1
s.save()
assert s._version == 1
Auto-Calling Lifecycle Hooks
HooksMixin automatically invokes hooks when using save() and delete().
[C37] HooksMixin auto-calling
from sqler import SQLerDB, SQLerModel, HooksMixin
class AuditedItem(HooksMixin, SQLerModel):
name: str
normalized: bool = False
save_count: int = 0
def before_save(self) -> bool:
self.name = self.name.strip().lower()
self.normalized = True
return True # Continue with save
def after_save(self) -> None:
self.save_count += 1
db = SQLerDB.in_memory()
AuditedItem.set_db(db)
# Hooks are called automatically in save()
item = AuditedItem(name=" HELLO WORLD ")
item = item.save()
assert item.name == "hello world"
assert item.normalized == True
assert item.save_count == 1
# before_save returning False aborts the save
class AbortableItem(HooksMixin, SQLerModel):
name: str
valid: bool = True
def before_save(self) -> bool:
return self.valid # Abort if not valid
db2 = SQLerDB.in_memory()
AbortableItem.set_db(db2)
try:
AbortableItem(name="test", valid=False).save()
except RuntimeError as e:
assert "before_save() returned False" in str(e)
Advanced Usage
Raw SQL (execute_sql)
Run parameterized SQL. To hydrate models later, return _id and data columns.
[C19] Raw SQL (execute_sql)
from sqler import SQLerDB, SQLerModel
class ReportUser(SQLerModel):
name: str
email: str | None = None
db = SQLerDB.in_memory()
ReportUser.set_db(db)
ReportUser(name="Ada", email="ada@example.com").save()
ReportUser(name="Bob", email="bob@example.com").save()
rows = db.execute_sql("""
SELECT u._id, u.data
FROM reportusers u
WHERE json_extract(u.data,'$.name') LIKE ?
""", ["A%"])
assert len(rows) == 1 and rows[0]["_id"] == 1
Indexes (JSON paths)
Build indexes for fields you filter/sort on.
[C20] Index helpers
from sqler import SQLerDB, SQLerModel
class IndexedUser(SQLerModel):
name: str
age: int | None = None
email: str | None = None
address: dict | None = None
db = SQLerDB.in_memory()
IndexedUser.set_db(db)
# DB-level indexes on JSON paths
db.create_index("indexedusers", "age")
db.create_index("indexedusers", "email", unique=True)
db.create_index(
"indexedusers",
"age",
where="json_extract(data,'$.age') IS NOT NULL",
)
# Relationship-friendly indexes
db.create_index("indexedusers", "address._id")
db.create_index("indexedusers", "address.city")
Concurrency Model (WAL)
-
SQLer uses thread-local connections and enables WAL:
journal_mode=WAL,busy_timeout=5000,synchronous=NORMAL- Many readers in parallel; one writer (SQLite rule)
-
Safe models perform optimistic writes:
UPDATE ... SET data=json(?), _version=_version+1 WHERE _id=? AND _version=?;
If no rows match, a
StaleVersionErroris raised. -
Under bursts, SQLite may report “database is locked”. SQLer uses
BEGIN IMMEDIATEand a small backoff to reduce thrash. -
refresh()always re-hydrates_version.
HTTP mapping (FastAPI)
[C21] FastAPI stale version
try:
from fastapi import HTTPException
except ImportError: # pragma: no cover - docs fallback
class HTTPException(Exception):
def __init__(self, status_code: int, detail: str):
self.status_code = status_code
self.detail = detail
from sqler.models import StaleVersionError
try:
obj.save()
except StaleVersionError:
raise HTTPException(409, "Version conflict")
Performance Tips
-
Index hot JSON paths (e.g.,
users.age,orders.items.sku) -
Batch writes with
bulk_upsert -
For heavy write loads, serialize writes via one process / queue
-
Perf suite is opt-in:
pytest -q -m perf pytest -q -m perf --benchmark-save=baseline pytest -q -m perf --benchmark-compare=baseline
Errors
SQLer provides a unified exception hierarchy under sqler.exceptions:
- Connection errors
NotConnectedError— adapter closed / not connectedConnectionPoolExhaustedError— no connections available
- Query errors
NoAdapterError— query executed without adapterInvariantViolationError— malformed row invariant (e.g., NULL JSON)QueryTimeoutError— query exceeded timeout
- Concurrency errors
StaleVersionError— optimistic check failed (HTTP 409)DeadlockError— deadlock detectedLockTimeoutError— unable to acquire lock
- Integrity errors
ReferentialIntegrityError— foreign key constraint violatedUniqueConstraintError— unique constraint violated
- Model errors
NotBoundError— model not bound to databaseNotFoundError— model instance not foundValidationError— model validation failed
- Hook errors
BeforeSaveError,AfterSaveError— save hook failuresBeforeDeleteError,AfterDeleteError— delete hook failures
All exceptions inherit from SQLerError for unified catching. SQLite exceptions (sqlite3.*) bubble with context.
Error Handling Patterns
from sqler import (
SQLerDB, SQLerModel, SQLerSafeModel,
StaleVersionError, ReferentialIntegrityError, NotBoundError,
)
from sqler.exceptions import SQLerError
# Pattern 1: Catch all SQLer errors
try:
user.save()
except SQLerError as e:
print(f"Database error: {e}")
# Pattern 2: Handle optimistic locking (HTTP 409)
def update_with_retry(model, max_retries=3):
for attempt in range(max_retries):
try:
model.save()
return model
except StaleVersionError:
if attempt < max_retries - 1:
model.refresh() # Reload from database
else:
raise
# Pattern 3: Safe deletion with referential integrity
def safe_delete(model):
try:
model.delete_with_policy(on_delete="restrict")
except ReferentialIntegrityError as e:
# Either cascade, set_null, or inform user
print(f"Cannot delete: still referenced by other records")
return False
return True
Troubleshooting
Common Issues
"database is locked"
SQLite allows only one writer at a time. If you see this error:
- Reduce concurrent write operations
- Use transactions to batch writes:
with db.transaction(): ... - Increase
busy_timeoutpragma (default is 5000ms) - For high write loads, consider a write queue or single-writer pattern
"Model X is not bound"
Call Model.set_db(db) before using the model:
db = SQLerDB.in_memory()
User.set_db(db) # Must call this before save/query
StaleVersionError on save
Using SQLerSafeModel, the row was modified by another process:
try:
model.save()
except StaleVersionError:
model.refresh() # Reload and decide: retry or abort
Query returns empty but data exists
Check field path syntax for nested fields:
# Correct: use list for nested paths
F(["address", "city"]) == "Kyoto"
# Wrong: string doesn't traverse nested objects
F("address.city") == "Kyoto" # Only works for index creation
Async adapter not connecting
Ensure you explicitly connect and close:
db = AsyncSQLerDB.on_disk("app.db")
await db.connect() # Required!
# ... operations ...
await db.close() # Clean up
Debug Tools
# See generated SQL
q = User.query().filter(F("age") > 30)
print(q.sql()) # SELECT _id, data FROM users WHERE ...
print(q.params()) # [30]
# Full debug tuple
sql, params = q.debug()
# Query execution plan
plan = q.explain_query_plan(db.adapter)
for row in plan:
print(row)
# Enable query logging
from sqler import query_logger
query_logger.enable()
# ... run queries ...
print(query_logger.get_slow_queries(threshold_ms=10.0))
Query Caching
Cache query results to avoid repeated database calls:
from sqler import QueryCache, cached_query
# Create a cache with max 1000 entries, 5 minute TTL
cache = QueryCache(max_size=1000, default_ttl_seconds=300)
# Cache query results
cache.set("users:active", active_users, table="users")
result = cache.get("users:active")
# Check cache stats
stats = cache.stats
print(f"Hit rate: {stats.hit_rate:.2%}")
# Invalidate by pattern or table
cache.invalidate_pattern("users:*")
cache.invalidate_table("users")
# Use the decorator for automatic caching
@cached_query(ttl_seconds=60)
def get_active_users():
return User.query().filter(F("active") == True).all()
Data Export & Import
Export and import data in various formats:
CSV Export/Import
from sqler import export_csv, export_csv_string, import_csv
# Export to file
result = export_csv(User, "users.csv")
print(f"Exported {result.count} records ({result.size_bytes} bytes)")
# Export specific fields
export_csv(User, "users.csv", fields=["name", "email"], include_id=False)
# Export to string
csv_string = export_csv_string(User)
# Import from CSV
result = import_csv(User, "users.csv")
print(f"Imported {result.succeeded}/{result.count} records")
# Import with transform
def normalize(row):
row["email"] = row["email"].lower()
return row
import_csv(User, "users.csv", transform=normalize)
JSON Export/Import
from sqler import export_json, export_json_string, import_json
# Export to file
export_json(User, "users.json", indent=2)
# Export to string
json_string = export_json_string(User)
# Import from JSON array file
result = import_json(User, "users.json")
JSONL (JSON Lines) Export/Import
from sqler import export_jsonl, import_jsonl, stream_jsonl
# Export one record per line (streaming-friendly)
export_jsonl(User, "users.jsonl")
# Import JSONL
import_jsonl(User, "users.jsonl")
# Stream records without loading all into memory
for record_json in stream_jsonl(User):
process(record_json)
Async Export/Import
from sqler import async_export_jsonl, async_import_jsonl
# Async versions for high-throughput scenarios
await async_export_jsonl(User, "users.jsonl")
await async_import_jsonl(User, "users.jsonl")
Full-Text Search
SQLer provides FTS5-based full-text search:
Using FTSIndex Directly
from sqler import FTSIndex, SearchResult
class Article(SQLerModel):
title: str
content: str
author: str
Article.set_db(db)
# Create FTS index on specific fields
fts = FTSIndex(Article, fields=["title", "content"])
fts.create(db)
fts.rebuild() # Index existing records
# Search
results = fts.search("Python")
for article in results:
print(article.title)
# Ranked search with scores
ranked = fts.search_ranked("Python programming")
for result in ranked:
print(f"{result.model.title} (score: {result.score})")
# Count matches
count = fts.count("Python")
# Index a new document
new_article = Article(title="New", content="content").save()
fts.index(new_article)
Using SearchableMixin
from sqler import SearchableMixin
class Post(SearchableMixin, SQLerModel):
title: str
body: str
class FTS:
fields = ["title", "body"]
Post.set_db(db)
# Create index once
Post.create_search_index(db)
# Add posts
Post(title="Python Tips", body="Learn Python").save()
Post.rebuild_search_index()
# Search
results = Post.search("Python")
count = Post.search_count("Python")
Connection Pooling
For high-concurrency scenarios, use connection pooling:
from sqler import PooledSQLerDB, PoolStats
# Create pooled database with 10 connections
db = PooledSQLerDB.on_disk("mydb.db", pool_size=10)
class User(SQLerModel):
name: str
User.set_db(db)
# Use normally - connections are managed automatically
User(name="Alice").save()
users = User.query().all()
# Check pool stats
stats: PoolStats = db.pool_stats()
print(f"Active: {stats.active_connections}/{stats.pool_size}")
print(f"Waiting: {stats.waiting_requests}")
db.close()
Schema Migrations
Manage database schema changes with versioned migrations:
from sqler import Migration, MigrationRunner
# Define migrations
migrations = [
Migration(
version=1,
name="create_users",
up=lambda db: db.adapter.execute(
"CREATE TABLE users (_id INTEGER PRIMARY KEY, data JSON)"
),
down=lambda db: db.adapter.execute("DROP TABLE users"),
),
Migration(
version=2,
name="add_posts",
up=lambda db: db.adapter.execute(
"CREATE TABLE posts (_id INTEGER PRIMARY KEY, data JSON)"
),
down=lambda db: db.adapter.execute("DROP TABLE posts"),
),
]
# Create runner and apply
runner = MigrationRunner(db, migrations)
# Check status
status = runner.status()
print(f"Current: v{status['current_version']}, Pending: {status['pending_count']}")
# Migrate to latest
result = runner.migrate()
if result.success:
print(f"Applied {len(result.applied)} migrations")
# Migrate to specific version
runner.migrate(target_version=1)
# Rollback
runner.rollback(target_version=0)
Async Migrations
from sqler import AsyncMigration, AsyncMigrationRunner
migrations = [
AsyncMigration(
version=1,
name="create_users",
up=lambda db: db.adapter.execute("CREATE TABLE users ..."),
),
]
runner = AsyncMigrationRunner(db, migrations)
result = await runner.migrate()
Metrics Collection
Collect performance metrics for monitoring:
from sqler import metrics
# Enable metrics collection
metrics.enable()
# ... run queries (metrics collected automatically) ...
User(name="Alice").save()
User.query().all()
# Get metrics data
data = metrics.get_metrics()
print(f"Total queries: {data['queries']['total_queries']}")
print(f"Histogram: {data['queries']['histogram']}")
# Get Prometheus-format output
prometheus_text = metrics.prometheus_export()
# Returns:
# sqler_queries_total 150
# sqler_query_duration_ms_bucket{le="1"} 50
# sqler_query_duration_ms_bucket{le="10"} 100
# ...
# Add custom callback for real-time monitoring
metrics.add_callback(lambda log: print(f"Query: {log.sql[:50]}"))
# Reset metrics
metrics.reset()
# Disable when done
metrics.disable()
Database Operations
Production-ready database operations:
Health Checks
from sqler import health_check, is_healthy, HealthStatus
# Quick boolean check
if is_healthy(db):
print("Database OK")
# Detailed health check
status: HealthStatus = health_check(db)
print(f"Healthy: {status.healthy}")
print(f"Latency: {status.latency_ms:.2f}ms")
print(f"Details: {status.details}")
# Serialize for API response
return status.to_dict()
Database Statistics
from sqler import get_stats, DatabaseStats
stats: DatabaseStats = get_stats(db)
print(f"Tables: {stats.table_count}")
print(f"Indexes: {stats.index_count}")
print(f"Page size: {stats.page_size}")
print(f"Page count: {stats.page_count}")
# Serialize for monitoring
return stats.to_dict()
Backup and Restore
from sqler import backup, restore, BackupResult
# Create backup
result: BackupResult = backup(db, "/backups/mydb.bak")
if result.success:
print(f"Backup created: {result.size_bytes} bytes in {result.duration_ms}ms")
# Restore from backup
result = restore(db, "/backups/mydb.bak")
if result.success:
print("Database restored")
Maintenance Operations
from sqler import vacuum, checkpoint
# Reclaim space and defragment
duration_ms = vacuum(db)
# Force WAL checkpoint
checkpoint(db)
Async Operations
from sqler import async_health_check, async_backup, async_get_stats, async_vacuum
status = await async_health_check(db)
await async_backup(db, "/backups/mydb.bak")
stats = await async_get_stats(db)
await async_vacuum(db)
Change Tracking
Track field changes and detect dirty models:
TrackedModel
from sqler import TrackedModel
class User(TrackedModel, SQLerModel):
name: str
email: str
age: int
User.set_db(db)
user = User(name="Alice", email="alice@test.com", age=30)
user.save()
user.mark_clean()
# Modify fields
user.name = "Bob"
user.age = 31
# Check dirty state
print(user.is_dirty) # True
print(user.changed_fields) # {'name', 'age'}
# Get detailed changes (old_value, new_value)
changes = user.get_changes()
# {'name': ('Alice', 'Bob'), 'age': (30, 31)}
# Revert unsaved changes
user.revert_changes()
print(user.name) # 'Alice'
DiffMixin
from sqler import DiffMixin
class Item(DiffMixin, SQLerModel):
name: str
quantity: int
Item.set_db(db)
item1 = Item(name="Apple", quantity=10)
item2 = Item(name="Apple", quantity=15)
# Compare two instances
diff = item1.diff(item2)
# {'quantity': (10, 15)}
# Check equality
item1.is_equal(item2) # False
# Clone with overrides
cloned = item1.clone(quantity=20)
# Item with same name, quantity=20, no _id
Examples
See examples/ for end-to-end scripts:
sync_model_quickstart.pysync_safe_model.pyasync_model_quickstart.pyasync_safe_model.pymodel_arrays_any.py
Run:
uv run python examples/sync_model_quickstart.py
Running the FastAPI Example
SQLer ships with a minimal FastAPI demo under examples/fastapi/app.py.
To run it:
pip install fastapi uvicorn
uv run uvicorn examples.fastapi.app:app --reload
Testing
# Unit
uv run pytest -q
# Perf (opt-in)
uv run pytest -q -m perf
Contributing
-
Format & lint:
uv run ruff format . uv run ruff check .
-
Tests:
uv run pytest -q --cov=src --cov-report=term-missing
License
MIT © Contributors
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sqler-1.2026.1.7.tar.gz.
File metadata
- Download URL: sqler-1.2026.1.7.tar.gz
- Upload date:
- Size: 128.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a19b30096e9815ad4580bd5d4982fe51a80fdbe0ee8f167f8a6086818636a72a
|
|
| MD5 |
c696da14691f5a23f7fe6590c3473dcd
|
|
| BLAKE2b-256 |
065ade9db31ed83b23632480404fad4ce7b55688884ca3cb4823a612e89530d4
|
Provenance
The following attestation bundles were made for sqler-1.2026.1.7.tar.gz:
Publisher:
pypi.yml on gabu-quest/sqler
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
sqler-1.2026.1.7.tar.gz -
Subject digest:
a19b30096e9815ad4580bd5d4982fe51a80fdbe0ee8f167f8a6086818636a72a - Sigstore transparency entry: 869040444
- Sigstore integration time:
-
Permalink:
gabu-quest/sqler@2d76e599ac857b945ffb7ea75dcc6fe98238f409 -
Branch / Tag:
refs/tags/v1.2026.1.7 - Owner: https://github.com/gabu-quest
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
pypi.yml@2d76e599ac857b945ffb7ea75dcc6fe98238f409 -
Trigger Event:
push
-
Statement type:
File details
Details for the file sqler-1.2026.1.7-py3-none-any.whl.
File metadata
- Download URL: sqler-1.2026.1.7-py3-none-any.whl
- Upload date:
- Size: 142.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fd8928d2ad9fe0fb2a72da73ec58233149eed4d16f534f5dd26c13b736e4ea03
|
|
| MD5 |
532f4f56110fbc2f2d797851978cc4fe
|
|
| BLAKE2b-256 |
d3c024a6405880a0daf12b37a56cd7d6812c0082f9c40e15aec80f2da4ade399
|
Provenance
The following attestation bundles were made for sqler-1.2026.1.7-py3-none-any.whl:
Publisher:
pypi.yml on gabu-quest/sqler
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
sqler-1.2026.1.7-py3-none-any.whl -
Subject digest:
fd8928d2ad9fe0fb2a72da73ec58233149eed4d16f534f5dd26c13b736e4ea03 - Sigstore transparency entry: 869040470
- Sigstore integration time:
-
Permalink:
gabu-quest/sqler@2d76e599ac857b945ffb7ea75dcc6fe98238f409 -
Branch / Tag:
refs/tags/v1.2026.1.7 - Owner: https://github.com/gabu-quest
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
pypi.yml@2d76e599ac857b945ffb7ea75dcc6fe98238f409 -
Trigger Event:
push
-
Statement type: