Zero-config SQLite helper for Python - migrations, CRUD generation, connection pooling, WAL
Project description
sqlite-helper-py
Zero-config SQLite helper for Python — migrations, CRUD generation, connection pooling, and WAL mode in one small, dependency-free package.
Install
pip install sqlite-helper-py
Requires Python 3.9+ and nothing else — the standard library's sqlite3 module does all the work.
Quick Start
from sqlite_helper_py import Database, SchemaBuilder, WALConfig
# Open (or create) a database with WAL mode enabled
db = Database("myapp.db", wal_config=WALConfig())
# Define a table with the fluent builder
users_schema = (
SchemaBuilder("users")
.integer("id", primary_key=True)
.text("email", nullable=False, unique=True)
.text("name", nullable=False)
.boolean("active", default=True)
.timestamps()
.build()
)
# Create the table
db.create_table(users_schema)
# Get a CRUD repository
users = db.repository(users_schema)
# Insert
uid = users.insert({"email": "alice@example.com", "name": "Alice"})
# Fetch by PK
user = users.get(uid)
print(user) # {'id': 1, 'email': 'alice@example.com', 'name': 'Alice', ...}
# Update
users.update(uid, {"name": "Alice Smith"})
# Find by field
results = users.find(active=1)
# Delete
users.delete(uid)
Features
| Feature | Description |
|---|---|
| Connection pooling | Thread-safe pool with configurable size and timeout |
| WAL mode | One-call WAL activation with preset profiles |
| Schema builder | Fluent DSL for table and index definitions |
| Migrations | Versioned, checksummed, transactional migrations |
| CRUD repository | Full Create/Read/Update/Delete with pagination and iteration |
| Query builder | Parameterised SELECT builder — no raw string concatenation |
| Zero dependencies | Pure Python, stdlib sqlite3 only |
Connection Pool
from sqlite_helper_py import ConnectionPool
pool = ConnectionPool("myapp.db", pool_size=10, timeout=30.0)
with pool.connection() as conn:
conn.execute("INSERT INTO logs (msg) VALUES (?)", ("hello",))
with pool.transaction() as conn:
conn.execute("UPDATE accounts SET balance = balance - 50 WHERE id = 1")
conn.execute("UPDATE accounts SET balance = balance + 50 WHERE id = 2")
pool.close()
Context manager
Database implements __enter__ / __exit__:
with Database("myapp.db") as db:
db.create_table(schema)
...
# pool is closed automatically
WAL Mode
from sqlite_helper_py import Database, WALConfig, PROFILE_HIGH_CONCURRENCY
# Apply a built-in profile
db = Database("myapp.db", wal_config=PROFILE_HIGH_CONCURRENCY)
# Or fine-tune
cfg = WALConfig(
synchronous="FULL", # stronger durability
wal_autocheckpoint=500,
cache_size=-65536, # 64 MB
busy_timeout=10_000,
)
status = db.enable_wal(cfg)
print(status)
Available profiles: PROFILE_HIGH_CONCURRENCY, PROFILE_HIGH_DURABILITY, PROFILE_EMBEDDED.
Manual checkpoint:
from sqlite_helper_py import checkpoint
with db.connection() as conn:
wal_frames, checkpointed = checkpoint(conn, mode="TRUNCATE")
Schema Builder
from sqlite_helper_py import SchemaBuilder, ForeignKey
posts_schema = (
SchemaBuilder("posts")
.integer("id", primary_key=True)
.text("title", nullable=False)
.text("body")
.integer("author_id", foreign_key=ForeignKey("users", on_delete="CASCADE"))
.integer("view_count", default=0)
.timestamps()
.index("idx_posts_author", ["author_id"])
.index("idx_posts_title", ["title"], unique=True)
.build()
)
# Print the DDL
print(posts_schema.create_statement())
Direct column definition:
from sqlite_helper_py import TableSchema, Column, ColumnType
schema = TableSchema(
"events",
columns=[
Column("id", ColumnType.INTEGER, primary_key=True, autoincrement=True),
Column("name", ColumnType.TEXT, nullable=False),
Column("payload", ColumnType.BLOB),
Column("ts", ColumnType.TEXT, default="CURRENT_TIMESTAMP"),
],
)
Migrations
Inline SQL migrations
from sqlite_helper_py import Database, Migration
db = Database("myapp.db")
migrations = [
Migration(
version=1,
name="create_users",
sql="""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL UNIQUE,
name TEXT NOT NULL
);
""",
),
Migration(
version=2,
name="add_users_active",
sql="ALTER TABLE users ADD COLUMN active INTEGER NOT NULL DEFAULT 1;",
),
]
applied = db.migrate(migrations)
print(f"Applied: {applied}") # [1, 2]
Python function migrations
import sqlite3
from sqlite_helper_py import Migration
def seed_data(conn: sqlite3.Connection) -> None:
conn.execute("INSERT INTO config (key, value) VALUES ('version', '1.0')")
def remove_seed(conn: sqlite3.Connection) -> None:
conn.execute("DELETE FROM config WHERE key = 'version'")
Migration(
version=3,
name="seed_config",
up_fn=seed_data,
down_fn=remove_seed,
)
Migration status and rollback
runner = db.migration_runner()
runner.register(*migrations)
for row in runner.status():
state = "✓" if row["applied"] else "○"
print(f" {state} [{row['version']}] {row['name']}")
# Roll back the latest migration
runner.rollback(2)
SQL file migrations
Name files {version}_{name}.sql and point the runner at the directory:
migrations/
0001_create_users.sql
0002_add_index.sql
0003_add_posts.sql
runner = db.migration_runner()
runner.register_sql_directory("migrations/")
runner.migrate()
CRUD Repository
users = db.repository(users_schema)
# Insert / upsert
uid = users.insert({"email": "bob@example.com", "name": "Bob"})
uid2 = users.upsert({"email": "bob@example.com", "name": "Bob Updated"}, ["email"])
# Fetch
user = users.get(uid) # by PK, returns dict or None
user = users.get_or_raise(uid) # raises RecordNotFoundError on miss
user = users.find_one(email="bob@example.com")
# List
all_users = users.all(order_by="name ASC")
active = users.find(active=1)
count = users.count(active=1)
exists = users.exists(email="bob@example.com")
# Pagination
page = users.paginate(page=1, page_size=20, order_by="name ASC")
# {'items': [...], 'page': 1, 'page_size': 20, 'total': 150, 'total_pages': 8}
# Memory-efficient iteration
for user in users.iter_all(batch_size=500):
process(user)
# Update
users.update(uid, {"name": "Robert"})
users.update_where({"active": 0}, name="Deleted")
# Delete
users.delete(uid)
users.delete_where(active=0)
Query Builder
from sqlite_helper_py import QueryBuilder
sql, params = (
QueryBuilder("orders")
.select("id", "total", "status")
.where("status = ?", "pending")
.where("total > ?", 100)
.where_in("user_id", [1, 2, 3])
.order_by("created_at DESC")
.limit(50)
.offset(100)
.build()
)
with db.connection() as conn:
rows = conn.execute(sql, params).fetchall()
API Reference
Database
| Method | Description |
|---|---|
Database(path, pool_size, wal_config, pragmas) |
Open/create database |
.connection() |
Context manager yielding a pooled connection |
.transaction() |
Context manager with auto-commit / rollback |
.enable_wal(config) |
Activate WAL mode, returns WALStatus |
.create_table(schema) |
Execute CREATE TABLE + indexes |
.drop_table(schema) |
Execute DROP TABLE |
.table_exists(name) |
Return bool |
.list_tables() |
Return list of table names |
.repository(schema) |
Return cached Repository |
.migrate(migrations) |
Register + apply migrations |
.migration_runner() |
Return bound MigrationRunner |
.execute(sql, params) |
Raw execute |
.fetchall(sql, params) |
Return list of dicts |
.fetchone(sql, params) |
Return dict or None |
.scalar(sql, params) |
Return first column of first row |
.integrity_check() |
Return True if database is intact |
.database_size_bytes() |
Approximate size in bytes |
.close() |
Release all pooled connections |
Repository
| Method | Description |
|---|---|
.insert(data) |
Insert row, return PK |
.insert_many(rows) |
Bulk insert, return count |
.upsert(data, conflict_columns) |
Insert or update |
.get(pk) |
Fetch by PK or None |
.get_or_raise(pk) |
Fetch by PK or raise RecordNotFoundError |
.find(**criteria) |
List matching rows |
.find_one(**criteria) |
First matching row or None |
.all(order_by, limit, offset) |
All rows |
.count(**criteria) |
Row count |
.exists(**criteria) |
True if any matching row |
.paginate(page, page_size) |
Paginated result dict |
.iter_all(batch_size) |
Iterator over all rows |
.update(pk, data) |
Update by PK |
.update_where(data, **criteria) |
Bulk update |
.delete(pk) |
Delete by PK |
.delete_where(**criteria) |
Bulk delete |
.truncate() |
Delete all rows |
.query() |
Return a QueryBuilder for this table |
License
MIT — see LICENSE.
Vladyslav Zaiets — sarmkadan.com
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sqlite_helper_py-1.0.0.tar.gz.
File metadata
- Download URL: sqlite_helper_py-1.0.0.tar.gz
- Upload date:
- Size: 21.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fcb58aab0feb098d3f690f7fd6e970c9f899fc5349d7c719b27af3fd4a6577b8
|
|
| MD5 |
a6c2c025a9007cff267471472e5b1b50
|
|
| BLAKE2b-256 |
388d05aee2de9285ac909764a5155d3942ac458e78d83154fd16ee538890a01b
|
File details
Details for the file sqlite_helper_py-1.0.0-py3-none-any.whl.
File metadata
- Download URL: sqlite_helper_py-1.0.0-py3-none-any.whl
- Upload date:
- Size: 27.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0772db3d5e86333fe9f577a63b3bc9745057033c74bf4fc8467af30c263c8cbf
|
|
| MD5 |
81a8a98e041e772fdccb034a545d9d5c
|
|
| BLAKE2b-256 |
359e0a8fb148b50d2a3fcea21c025b3621b18e00299fa3d0e7db0be7f0e0e7c5
|