Skip to main content

Simple SQLite sync/async wrapper with migration support for use in ad-hoc scripts

Project description

ScriptDB

ScriptDB is a tiny wrapper around SQLite with built‑in migration support. It can be used asynchronously or synchronously. ScriptDB is designed for small integration scripts and ETL jobs where using an external database would be unnecessary. The project aims to provide a pleasant developer experience while keeping the API minimal.

Features

  • Async and sync – choose between the async aiosqlite backend or the synchronous stdlib sqlite3 backend.
  • Migrations – declare migrations as SQL snippet(s) or Python callables and let ScriptDB apply them once.
  • Lightweight – no server to run and no complicated setup; perfect for throw‑away scripts or small tools.
  • WAL by default – connections use SQLite's write-ahead logging mode; disable with use_wal=False if rollback journals are required.

Composite primary keys are not supported; each table must have a single-column primary key.

Requirements

ScriptDB requires SQLite 3.24.0 or newer. Most modern Python builds ship with a recent-enough SQLite; on older distributions (e.g., Ubuntu 18.04) installing scriptdb[pysqlite] bundles a modern SQLite build. If your environment only provides an older SQLite, the compatibility layer will emit a warning and upsert_one/upsert_many will raise a clear error until you upgrade.

If pysqlite still does not help in your environment, you can open the database with legacy_sqlite_support=True. In that mode ScriptDB emulates upserts using older SQLite-compatible SQL. It keeps working on legacy SQLite builds, but upsert_one and upsert_many will be slower than on modern SQLite.

Installation

To use the synchronous implementation:

pip install scriptdb

To use the asynchronous version (installs aiosqlite):

pip install scriptdb[async]

To bundle a modern SQLite build via pysqlite3 for legacy systems:

pip install scriptdb[pysqlite]

If you must stay on an older SQLite build, you can also enable the legacy upsert fallback:

from scriptdb import SyncBaseDB


class MyDB(SyncBaseDB):
    def migrations(self):
        return []


with MyDB.open("app.db", legacy_sqlite_support=True) as db:
    db.upsert_one("items", {"id": 1, "value": "x"})

This compatibility mode is slower because upserts are emulated without ON CONFLICT ... DO UPDATE.

Sync or Async

Both the asynchronous and synchronous interfaces expose the same API. The only difference is whether methods are coroutines (AsyncBaseDB and AsyncCacheDB) or regular blocking functions (SyncBaseDB and SyncCacheDB). Import AsyncBaseDB/AsyncCacheDB from scriptdb.asyncdb for asynchronous usage or SyncBaseDB/SyncCacheDB from scriptdb.syncdb for synchronous usage. For convenience, each module also exposes BaseDB and CacheDB aliases pointing to the respective implementations.

If you need both versions of the same schema, scriptdb.conversion can build a class that reuses an existing set of migrations without duplicating any code. Migrations must already match the target style: synchronous migrations for SyncBaseDB subclasses and asynchronous migrations for AsyncBaseDB subclasses. Conversion only works for SQL-based migrations (strings or Builder objects); callable function migrations cannot be converted because their async/sync behavior is implementation-specific.

from scriptdb.conversion import async_from_sync, sync_from_async
from scriptdb.syncdb import SyncBaseDB


class SyncEventsDB(SyncBaseDB):
    def migrations(self):
        return [
            {
                "name": "create_events",
                "sql": "CREATE TABLE events(id INTEGER PRIMARY KEY, payload TEXT)",
            },
        ]


# Generate an async counterpart that shares migrations
AsyncEventsDB = async_from_sync(SyncEventsDB)

# Or build a sync wrapper around an async definition
SyncEventsFromAsync = sync_from_async(AsyncEventsDB)

Asynchronous quick start

Create a subclass of AsyncBaseDB and provide a list of migrations:

from scriptdb import AsyncBaseDB

class MyDB(AsyncBaseDB):
    def migrations(self):
        return [
            {
                "name": "create_links",
                "sql": """
                    CREATE TABLE links(
                        resource_id INTEGER PRIMARY KEY,
                        referrer_url TEXT,
                        url TEXT,
                        status INTEGER,
                        progress INTEGER,
                        is_done INTEGER,
                        content BLOB
                    )
                """,
            },
            {
                "name": "add_created_idx",
                # run multiple statements sequentially
                "sqls": [
                    "ALTER TABLE links ADD COLUMN created_at TEXT",  # new column
                    "CREATE INDEX idx_links_created_at ON links(created_at)",  # index
                ],
            },
        ]

You can bundle multiple statements into a single migration entry by separating them with semicolons; ScriptDB will execute them sequentially using SQLite's executescript:

{
    "name": "backfill_created_flags",
    "sql": """
        ALTER TABLE links ADD COLUMN created_flag INTEGER DEFAULT 0;
        UPDATE links SET created_flag = 1 WHERE created_at IS NOT NULL;
    """,
}
async def main():
    async with MyDB.open("app.db") as db:  # WAL journaling is enabled by default
        await db.execute(
            "INSERT INTO links(url, status, progress, is_done) VALUES(?,?,?,?)",
            ("https://example.com/data", 0, 0, 0),
        )
        row = await db.query_one("SELECT url FROM links")
        print(row["url"])  # -> https://example.com/data

    # Manual open/close without a context manager
    db = await MyDB.open("app.db")
    try:
        await db.execute(
            "INSERT INTO links(url, status, progress, is_done) VALUES(?,?,?,?)",
            ("https://example.com/other", 0, 0, 0),
        )
    finally:
        await db.close()

    # Daemonize the aiosqlite worker thread to avoid hanging on exit
    async with MyDB.open("app.db", daemonize_thread=True) as db:
        await db.execute("SELECT 1")

    db = await MyDB.open("app.db", daemonize_thread=True)
    try:
        await db.execute("SELECT 1")
    finally:
        await db.close()

Always close the database connection with close() or use the async with context manager as shown above. If you call MyDB.open() without a context manager, remember to await db.close() when finished. Leaving a database open may keep background tasks alive and prevent your application from exiting cleanly.

Daemonizable aiosqlite

aiosqlite runs a worker thread to execute SQLite operations. There has been an ongoing debate in the aiosqlite project about whether this thread should be a daemon. A non-daemon worker can keep the Python process alive even after all tasks have finished. ScriptDB ships with the internal daemonizable_aiosqlite module that wraps aiosqlite.connect and allows this worker thread to be marked as daemon.

The test suite in this repository relies on this module; without it, lingering threads would prevent tests from completing. To enable daemon mode in your application, pass daemonize_thread=True when opening the database as shown above. Use this option only if your program hangs on exit, as daemon threads can be terminated abruptly, potentially losing in-flight work.

Synchronous quick start

Create a subclass of SyncBaseDB for blocking use:

from scriptdb import SyncBaseDB

class MyDB(SyncBaseDB):
    def migrations(self):
        return [
            {
                "name": "create_links",
                "sql": """
                    CREATE TABLE links(
                        resource_id INTEGER PRIMARY KEY,
                        referrer_url TEXT,
                        url TEXT,
                        status INTEGER,
                        progress INTEGER,
                        is_done INTEGER,
                        content BLOB
                    )
                """,
            },
            {
                "name": "add_created_idx",
                "sqls": [
                    "ALTER TABLE links ADD COLUMN created_at TEXT",  # new column
                    "CREATE INDEX idx_links_created_at ON links(created_at)",  # index
                ],
            },
        ]

with MyDB.open("app.db") as db:  # WAL journaling is enabled by default
    db.execute(
        "INSERT INTO links(url, status, progress, is_done) VALUES(?,?,?,?)",
        ("https://example.com/data", 0, 0, 0),
    )
    row = db.query_one("SELECT url FROM links")
    print(row["url"])  # -> https://example.com/data

# Manual open/close without a context manager
db = MyDB.open("app.db")
try:
    db.execute(
        "INSERT INTO links(url, status, progress, is_done) VALUES(?,?,?,?)",
        ("https://example.com/other", 0, 0, 0),
    )
finally:
    db.close()

Always close the database connection with close() or use the with context manager as shown above. Leaving a database open may keep background tasks alive and prevent your application from exiting cleanly.

Usage examples

The AsyncBaseDB API supports migrations and offers helpers for common operations and background tasks:

from scriptdb import AsyncBaseDB, run_every_seconds, run_every_queries

class MyDB(AsyncBaseDB):
    def migrations(self):
        return [
            {
                "name": "init",
                "sql": """
                    CREATE TABLE links(
                        resource_id INTEGER PRIMARY KEY,
                        referrer_url TEXT,
                        url TEXT,
                        status INTEGER,
                        progress INTEGER,
                        is_done INTEGER,
                        content BLOB
                    )
                """,
            },
            {"name": "idx_status", "sql": "CREATE INDEX idx_links_status ON links(status)"},
            {"name": "create_meta", "sql": "CREATE TABLE meta(key TEXT PRIMARY KEY, value TEXT)"},
        ]

    # Periodically remove finished links
    @run_every_seconds(60)
    async def cleanup(self):
        await self.execute("DELETE FROM links WHERE is_done = 1")

    # Write a checkpoint every 100 executed queries
    @run_every_queries(100)
    async def checkpoint(self):
        await self.execute("PRAGMA wal_checkpoint")

async def main():
    async with MyDB.open("app.db") as db:  # pass use_wal=False to disable WAL

        # Insert many links at once
        await db.execute_many(
            "INSERT INTO links(url) VALUES(?)",
            [("https://a",), ("https://b",), ("https://c",)],
        )

        # Fetch all URLs
        rows = await db.query_many("SELECT url FROM links")
        print([r["url"] for r in rows])

        # Stream links one by one
        async for row in db.query_many_gen("SELECT url FROM links"):
            print(row["url"])

Helper methods

AsyncBaseDB and SyncBaseDB include convenience helpers for common insert, update and delete operations:

# Insert one record and get its primary key
pk = await db.insert_one("links", {"url": "https://a"})

# Insert many records
await db.insert_many("links", [{"url": "https://b"}, {"url": "https://c"}])

# Upsert a single record
await db.upsert_one("links", {"resource_id": pk, "status": 200})

# Upsert many records
await db.upsert_many(
    "links",
    [
        {"resource_id": 1, "status": 200},
        {"resource_id": 2, "status": 404},
    ],
)

# Update selected columns in a record
await db.update_one("links", pk, {"progress": 50})

# Delete records
await db.delete_one("links", pk)
await db.delete_many("links", "status = ?", (404,))

Transactions

Group multiple statements into a single unit of work with the built-in transaction context managers. They automatically call BEGIN, COMMIT, and ROLLBACK for you:

# Async example
async with db.transaction():
    await db.execute("INSERT INTO links(url) VALUES(?)", ("https://example",))
    await db.execute("UPDATE links SET status=? WHERE url=?", (200, "https://example"))

# Sync example
with db.transaction():
    db.execute("INSERT INTO links(url) VALUES(?)", ("https://example",))
    db.execute("UPDATE links SET status=? WHERE url=?", (200, "https://example"))

If you need full control, you can also call begin(), commit(), and rollback() directly on AsyncBaseDB and SyncBaseDB instances. Nested transactions are not supported—calling begin() or transaction() while a transaction is already active raises a RuntimeError.

Query helpers

The library also offers helpers for common read patterns:

# Get a single value
count = await db.query_scalar("SELECT COUNT(*) FROM links")

# Get a list from the first column of each row
ids = await db.query_column("SELECT resource_id FROM links ORDER BY resource_id")

# Build dictionaries from rows
# Use primary key automatically
records = await db.query_dict("SELECT * FROM links")

# Explicit column names for key and value
urls = await db.query_dict(
    "SELECT resource_id, url FROM links", key="resource_id", value="url"
)

# Callables for custom key and value
status_by_url = await db.query_dict(
    "SELECT * FROM links",
    key=lambda r: r["url"],
    value=lambda r: r["status"],
)

Controlling the row factory

AsyncBaseDB and SyncBaseDB return sqlite3.Row instances by default. Pass row_factory=dict to either the constructor or the open() helper to receive plain dictionaries instead.

# Async example
async with MyDB.open("app.db", row_factory=dict) as db:
    row = await db.query_one("SELECT * FROM links LIMIT 1")
    assert isinstance(row, dict)

# Sync example (using your SyncBaseDB subclass)
with MySyncDB.open("app.db", row_factory=dict) as db:
    row = db.query_one("SELECT * FROM links LIMIT 1")
    assert isinstance(row, dict)

The choice affects every method that previously returned sqlite3.Row instances (query_one, query_many, query_many_gen, and the default values from query_dict) as well as helpers like query_scalar and query_column. This makes it easy to integrate ScriptDB with codebases that prefer working with JSON-serialisable dictionaries instead of custom row objects.

Post-processing query results

query_one, query_many, and query_many_gen accept an optional postprocess_func callback. When provided, ScriptDB calls it for each returned row so you can transform data right after reading it from SQLite. The same callback can also be passed to query_scalar, query_column, and query_dict; in those cases it must return a row-like object compatible with the downstream helper.

import json
from datetime import datetime

class MyDb(SyncBaseDB):
    def migrations(self):
        return [
            {
                "name": "create_table",
                "sql": "CREATE TABLE table(id INTEGER PRIMARY KEY, data TEXT, dt TEXT)",
            }
        ]

with MyDb.open("./file.db", row_factory=dict) as db:
    db.insert_one("table", {"id": 1, "data": json.dumps(data), "dt": dt.isoformat()})
    results = db.query_many(
        "SELECT * FROM table",
        postprocess_func=lambda row: {
            **row,
            "data": json.loads(row["data"]),
            "dt": datetime.fromisoformat(row["dt"]),
        },
    )
async with MyDb.open("./file.db", row_factory=dict) as db:
    row = await db.query_one(
        "SELECT * FROM table WHERE id = ?",
        (1,),
        postprocess_func=lambda r: {**r, "data": json.loads(r["data"])},
    )

Useful implementations

CacheDB

AsyncCacheDB and SyncCacheDB provide a simple key‑value store with optional expiration.

from scriptdb import AsyncCacheDB

async def main():
    async with AsyncCacheDB.open("cache.db") as cache:
        await cache.set("answer", b"42", expire_sec=60)
        if await cache.is_set("answer"):
            print("cached!")
        print(await cache.get("answer"))  # b"42"
from scriptdb import SyncCacheDB

with SyncCacheDB.open("cache.db") as cache:
    cache.set("answer", b"42", expire_sec=60)
    if cache.is_set("answer"):
        print("cached!")
    print(cache.get("answer"))  # b"42"

A value without expire_sec will be kept indefinitely. Use is_set to check for keys without retrieving their values. To easily cache function results, use the cache decorator method from a cache instance:

import asyncio
from scriptdb import AsyncCacheDB

async def main():
    async with AsyncCacheDB.open("cache.db") as cache:

        @cache.cache(expire_sec=30)
        async def slow():
            await asyncio.sleep(1)
            return 1

        await slow()

Subsequent calls within 30 seconds will return the cached result without executing the function. You can supply key_func to control how the cache key is generated.

RAM key index

Both cache implementations can track cache keys in memory for faster existence checks. Pass cache_keys_in_ram=True to the constructor or to .open() to enable this mode:

async with AsyncCacheDB.open("cache.db", cache_keys_in_ram=True) as cache:
    await cache.set("answer", b"42")
    if await cache.is_set("answer"):
        ...
with SyncCacheDB.open("cache.db", cache_keys_in_ram=True) as cache:
    cache.set("answer", b"42")
    if cache.get("missing", "?") == "?":
        ...

When the RAM index is active, is_set no longer queries SQLite and get skips disk access for missing keys by consulting the in-memory map first. Because this map reflects only the current process, avoid multiprocessing and do not modify the same database from other processes or threads — doing so would desynchronize the cached keys. Storing every key in memory also increases RAM usage proportionally to the number of cached entries, so plan for that overhead before enabling the mode on very large caches.

Simple DDL query builder

If you often forget SQLite syntax, ScriptDB includes a small helper to build CREATE TABLE, ALTER TABLE, and DROP TABLE statements programmatically. Unlike raw SQL, there is no syntax to memorize. Just type Builder. and let your IDE suggest the available operations.

Warning The builder quotes identifiers to mitigate SQL injection, but options that accept raw SQL snippets (such as check= expressions) are not sanitized. Never pass untrusted user data to these parameters.

from scriptdb import Builder
from datetime import datetime

# The following examples build SQL strings; they do not execute them.
# Builder objects can be rendered either by calling `.done()` or by
# simply passing them to ``str(...)`` as shown below. Supported column
# types are ``int``, ``str``, ``float``, ``bytes``, ``bool``, ``date`` and
# ``datetime`` (date/time values are stored as TEXT in SQLite).

# Build query to create a table with several columns
create_sql = str(
    Builder.create_table("users")
    .primary_key("id", int)
    .add_field("username", str, not_null=True)
    .add_field("email", str)
)

# Build query to add new columns
add_cols_sql = (
    Builder.alter_table("users")
    .add_column("age", int, default=0)
    .add_column("created_at", int)
    .done()  # could also render with str(...) instead of .done()
)

# Build query to remove an old column
drop_col_sql = str(Builder.alter_table("users").drop_column("email"))

# Build query to create an index
index_sql = str(
    Builder.create_index("users", on="username", name="idx_users_username")
)

# Build query to drop the table when finished
drop_sql = str(Builder.drop_table("users"))

# Infer column definitions from a representative dictionary
sample_row = {
    "id": 1,  # becomes INTEGER PRIMARY KEY AUTOINCREMENT
    "username": "alice",  # TEXT
    "is_active": True,  # INTEGER
}
inferred_sql = (
    Builder.create_table_from_dict("users", sample_row)
    .add_field("created_at", datetime)  # chaining works like with create_table
    .done()
)

These builders are convenient for defining migrations in *BaseDB subclasses: they can be passed directly without calling done() because ScriptDB will automatically convert them to SQL strings.

from scriptdb import SyncBaseDB, Builder

class MyDB(SyncBaseDB):
    def migrations(self):
        return [
            {
                "name": "create_users",
                "sql": (
                    Builder.create_table("users")
                    .primary_key("id", int)
                    .add_field("username", str, not_null=True)
                    .add_field("email", str)
                ),
            },
            {
                "name": "add_fields",
                "sqls": [
                    Builder.alter_table("users")
                    .add_column("age", int, default=0)
                    .add_column("created_at", int),
                    "UPDATE users SET age = 0 WHERE age IS NULL;",
                ],
            },
            {
                "name": "remove_email",
                "sql": Builder.alter_table("users").drop_column("email"),
            },
            {
                "name": "index_username",
                "sql": Builder.create_index("users", on="username", name="idx_users_username"),
            },
            {
                "name": "drop_users",
                "sql": Builder.drop_table("users"),
            },
        ]

Running tests

make test

Run linters and type checks with:

make lint

Contributing

Issues and pull requests are welcome. Please run the tests before submitting changes.

License

This project is licensed under the terms of the MIT license. See LICENSE for details.

Development

Clone the repository and create a virtual environment:

git clone https://github.com/MihanEntalpo/ScriptDB.git
cd ScriptDB
python -m venv venv
source venv/bin/activate
pip install -e .[async,test]

If you don't have make, install it first, e.g. sudo apt-get install make.

Before committing, ensure code passes the linters, type checks, and tests with coverage:

make lint
make test

AI Usage disclaimer

  • The package was initially created with help of OpenAI Codex.
  • All algorithms, functionality, and logic were devised by a human.
  • The human supervised and reviewed every function and method generated by Codex.
  • Some parts were manually corrected, as it is often difficult to obtain sane edits from AI.
  • Although some code was made by an LLM, this is not vibe-coding; you can trust this code as if I had written it myself.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

scriptdb-1.1.3.tar.gz (65.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

scriptdb-1.1.3-py3-none-any.whl (43.6 kB view details)

Uploaded Python 3

File details

Details for the file scriptdb-1.1.3.tar.gz.

File metadata

  • Download URL: scriptdb-1.1.3.tar.gz
  • Upload date:
  • Size: 65.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for scriptdb-1.1.3.tar.gz
Algorithm Hash digest
SHA256 5eb68c7a6efc974a1125b5c5453c298571575a50efc3a21be0d9fbade0bb2f4e
MD5 68f714755bd49af6296c52003b465d21
BLAKE2b-256 5ec727b339049f22819fad91e54c5ee7cf5cfee8c4b01529e529e847ec1cecb9

See more details on using hashes here.

File details

Details for the file scriptdb-1.1.3-py3-none-any.whl.

File metadata

  • Download URL: scriptdb-1.1.3-py3-none-any.whl
  • Upload date:
  • Size: 43.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for scriptdb-1.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 9577793d4885756b50d59cc865fe7ac9ade90b038842f6de2ee4a88bcaa9580e
MD5 ebad72f4188f6b2136d937250dc4a079
BLAKE2b-256 1822bf68eabeffa61cf2b5569e9fe2a4e576ad80a975926105c1751c09693cf5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page