Skip to main content

Resilient SQLAlchemy operation runner with retry, backoff, SAVEPOINTs, and per-operation timeouts.

Project description

dbop-runner

Resilient SQLAlchemy operation runner Retry, backoff, SAVEPOINT handling, and backend-specific timeouts — all while using your existing SQLAlchemy sessions.

  • Works with Session and AsyncSession
  • Retries transient faults (deadlocks, lock timeouts, disconnects)
  • Uses SAVEPOINTs when already inside a transaction
  • Supports per-operation lock_timeout and statement_timeout
  • Backends: PostgreSQL, MySQL/MariaDB, SQLite
  • Typed, small, and framework-agnostic (FastAPI/Gunicorn friendly)

Why

Typical DB code either:

  1. Does not retry (deadlocks bubble up and fail the request), or
  2. Retries blindly and risks partial commits within a transaction.

dbop-runner provides a simple, safe abstraction:

Run a single operation function against an existing SQLAlchemy session with retries. When already in a transaction, it retries inside a SAVEPOINT so your outer work remains intact. You still decide when to commit or rollback.


Installation

# Minimal
pip install dbop-runner

# With dev tools
pip install "dbop-runner[dev]"

# With drivers used in integration tests
pip install "dbop-runner[postgres,mysql]"

Supported environments:

  • SQLAlchemy 2.x
  • PostgreSQL: psycopg (sync), asyncpg (async)
  • MySQL/MariaDB: pymysql (sync), aiomysql (async)
  • SQLite / aiosqlite (for tests)

Quick Start (Async)

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy import text
from dbop_runner import DBOpRunner

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db", pool_pre_ping=True)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
runner = DBOpRunner(max_retries=5, default_lock_timeout_s=5)

async def update_user(sess: AsyncSession):
    await sess.execute(text("UPDATE users SET seen_at = now() WHERE id=:id"), {"id": 42})

async def main():
    async with SessionLocal() as sess:
        # Standalone run
        await runner.run_async(sess, update_user, name="update-user")

        # Under a managed transaction
        async with sess.begin():
            await runner.run_async(sess, update_user, name="update-user")

Quick Start (Sync)

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker, Session
from dbop_runner import DBOpRunner

engine = create_engine("postgresql+psycopg://user:pass@localhost/db", pool_pre_ping=True, future=True)
SessionLocal = sessionmaker(engine, expire_on_commit=False)
runner = DBOpRunner(max_retries=3, default_lock_timeout_s=5)

def rename(sess: Session):
    sess.execute(text("UPDATE projects SET name=:n WHERE id=:id"), {"id": 1, "n": "Demo"})

with SessionLocal() as sess, sess.begin():
    runner.run(sess, rename, name="rename-project")

How It Works

  • You own the outer transaction (with sess.begin(): ...) — dbop-runner never commits for you.
  • Each operation runs inside a SAVEPOINT using begin_nested().
  • Transient errors (deadlocks, timeouts) rollback only the inner step and retry cleanly.
  • If no transaction is open, the runner opens one automatically.

This pattern fits production systems where multiple steps form one atomic unit of work.


Common Patterns

Use dbop-runner whenever you want to run database operations safely and predictably — especially when you’re dealing with transient issues like deadlocks, lock waits, or query timeouts. It gives you a consistent retry and timeout layer across PostgreSQL, MySQL/MariaDB, and SQLite — without having to change your ORM or connection code.

Batch of steps

Run multiple operations as a transaction, letting dbop-runner manage per-step retries:

with sess.begin():
    runner.run(sess, step_a, name="step-a")
    runner.run(sess, step_b, name="step-b")

Ideal when each step might temporarily fail (e.g., due to lock contention), but you still want the entire batch to commit atomically once all succeed.


Read-only operation

For queries or analytics tasks that don’t modify data:

rows = runner.run(sess, lambda s: s.execute(text("SELECT ...")), name="read-x", read_only=True)

Enables safer parallel read access while enforcing retry and timeout policies.


Conditional abort

Abort gracefully within a transaction when business logic requires it:

with sess.begin():
    runner.run(sess, step_a, name="step-a")
    if should_abort:
        raise RuntimeError("abort")
    runner.run(sess, step_b, name="step-b")

If an exception occurs, dbop-runner automatically rolls back and logs context for debugging.


Per-operation timeouts

Fine-tune lock and statement timeouts per call instead of globally:

runner.run(sess, step_fn, name="step", lock_timeout_s=3, stmt_timeout_s=10)

This is perfect for isolating slow or high-contention queries without affecting the rest of the workload.

Backend Timeout Parameters
PostgreSQL SET LOCAL lock_timeout, SET LOCAL statement_timeout
MySQL/MariaDB innodb_lock_wait_timeout, MAX_EXECUTION_TIME
SQLite connect_args={"timeout": …}

Logging

DBOpRunner emits structured log records.

Example failure:

{
  "message": "update-user failed",
  "dialect": "postgresql",
  "attempt": 0,
  "transient": True,
  "sync": False,
  "exception": "DBAPIError('...')"
}

Inject a custom logger:

runner = DBOpRunner(logger=my_logger)

Log levels:

  • INFO — final “done” record
  • WARNING — transient, retryable error
  • ERROR — non-retryable error

Transient Error Detection

Backend Retryable conditions
PostgreSQL deadlock (40P01), lock unavailable (55P03), OperationalError
MySQL/MariaDB deadlock (1213), lock wait timeout (1205), connection errors (2006/2013)
SQLite "database is locked", generic OperationalError

Everything else is non-transient (no retry).


API Overview

class DBOpRunner:
    def __init__(
        *,
        max_retries: int = 5,
        initial_delay: float = 0.1,
        max_delay: float = 1.0,
        default_lock_timeout_s: int | None = 10,
        default_stmt_timeout_s: int | None = None,
        default_raises: bool = True,
        logger: logging.Logger | None = None,
    )

    def run(...): ...
    async def run_async(...): ...

Recipes

Read and write in one transaction

def read(sess):
    return sess.execute(text("SELECT id FROM users WHERE state=:s"), {"s": "active"}).scalars().all()

def write(sess):
    sess.execute(text("UPDATE users SET state='active' WHERE id=:id"), {"id": 7})

with sess.begin():
    ids = runner.run(sess, read, name="list-active", read_only=True, stmt_timeout_s=5)
    runner.run(sess, write, name="activate-user", lock_timeout_s=3)

Async batch

async def step_a(s): await s.execute(text("UPDATE t1 SET val='a' WHERE id=1"))
async def step_b(s): await s.execute(text("UPDATE t2 SET val='b' WHERE id=1"))

async with async_sess.begin():
    await runner.run_async(async_sess, step_a, name="step-a")
    await runner.run_async(async_sess, step_b, name="step-b")

Testing

Unit (SQLite)

make test
make cov

Integration (Docker)

Install deps (includes drivers):

make install-all

One-shot runs

make integration-pg        # bring up Postgres, run PG tests, keep container up
make integration-mysql     # bring up MySQL, run MySQL tests, keep container up
make integration-mariadb   # bring up MariaDB, run MariaDB tests, keep container up
make integration-all       # run PG -> MySQL -> MariaDB tests sequentially

Granular control

make integration-up            # start Postgres
make integration-test-pg       # run Postgres tests
make integration-down          # stop and clean up

make integration-up-mysql      # start MySQL
make integration-test-mysql    # run MySQL tests
make integration-down          # stop and clean up

make integration-up-mariadb    # start MariaDB
make integration-test-mariadb  # run MariaDB tests
make integration-down          # stop and clean up

Logs (tail/follow)

make integration-logs        # last 200 lines for all services
make integration-logs-pg     # follow Postgres logs
make integration-logs-mysql  # follow MySQL logs
make integration-logs-mariadb# follow MariaDB logs

Local Development

1. Install Dependencies

# Using uv (fast)
uv venv .venv
uv pip install -e '.[dev,postgres,mysql]'

Or with pip:

python -m venv .venv
. .venv/bin/activate
pip install -e '.[dev,postgres,mysql]'

2. Create .env (ignored by Git)

TEST_SYNC_DB_URL=sqlite:///./.pytest-sqlite.db
TEST_ASYNC_DB_URL=sqlite+aiosqlite:///./.pytest-sqlite.db
PYTHONPATH=src

Keep .env ignored and commit env.example instead.

3. Run Tests

make test
make cov

Migration Examples

Examples of migrating from explicit commit/rollback flows to DBOpRunner are available in the full documentation. Examples can be used as a small poc environment for trials.


Compatibility

  • Python 3.9 – 3.13
  • SQLAlchemy 2.x
  • PostgreSQL, MySQL/MariaDB, SQLite

Changelog

See CHANGELOG.md


License

MIT © 2025–present. See LICENSE for full terms.


Support and Contact

For questions or issues, open an issue or discussion at https://github.com/yokha/dbop-runner


Developed by Youssef Khaya LinkedIn

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dbop_runner-1.0.2.tar.gz (14.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dbop_runner-1.0.2-py3-none-any.whl (9.7 kB view details)

Uploaded Python 3

File details

Details for the file dbop_runner-1.0.2.tar.gz.

File metadata

  • Download URL: dbop_runner-1.0.2.tar.gz
  • Upload date:
  • Size: 14.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for dbop_runner-1.0.2.tar.gz
Algorithm Hash digest
SHA256 95af6ed6fb7a88a05f94acf1b6765f646a692f9fbc4f16f0238b04415a066d65
MD5 f3580712df270c7102f2eb360bb1948f
BLAKE2b-256 e2960964143ce6cd2cd0f07f6bec31841368690e8d7eca65d96688e874e0013a

See more details on using hashes here.

File details

Details for the file dbop_runner-1.0.2-py3-none-any.whl.

File metadata

  • Download URL: dbop_runner-1.0.2-py3-none-any.whl
  • Upload date:
  • Size: 9.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for dbop_runner-1.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 4ee5d57e2415d61eac092413d816463f0a050cbf4aa8cbf3a3957de57d136f9d
MD5 0ca777a4301e8188a15f30976bf2c82b
BLAKE2b-256 159ede8295f7c4ecda65a73873cd555e05abd84c836a24c17c4e195699ad3bad

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page