Resilient SQLAlchemy operation runner with retry, backoff, SAVEPOINTs, and per-operation timeouts.
Project description
dbop-runner
Resilient SQLAlchemy operation runner Retry, backoff, SAVEPOINT handling, and backend-specific timeouts — all while using your existing SQLAlchemy sessions.
- Works with
SessionandAsyncSession - Retries transient faults (deadlocks, lock timeouts, disconnects)
- Uses SAVEPOINTs when already inside a transaction
- Supports per-operation
lock_timeoutandstatement_timeout - Backends: PostgreSQL, MySQL/MariaDB, SQLite
- Typed, small, and framework-agnostic (FastAPI/Gunicorn friendly)
Why
Typical DB code either:
- Does not retry (deadlocks bubble up and fail the request), or
- Retries blindly and risks partial commits within a transaction.
dbop-runner provides a simple, safe abstraction:
Run a single operation function against an existing SQLAlchemy session with retries. When already in a transaction, it retries inside a SAVEPOINT so your outer work remains intact. You still decide when to commit or rollback.
Installation
# Minimal
pip install dbop-runner
# With dev tools
pip install "dbop-runner[dev]"
# With drivers used in integration tests
pip install "dbop-runner[postgres,mysql]"
Supported environments:
- SQLAlchemy 2.x
- PostgreSQL:
psycopg(sync),asyncpg(async) - MySQL/MariaDB:
pymysql(sync),aiomysql(async) - SQLite /
aiosqlite(for tests)
Quick Start (Async)
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy import text
from dbop_runner import DBOpRunner
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db", pool_pre_ping=True)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
runner = DBOpRunner(max_retries=5, default_lock_timeout_s=5)
async def update_user(sess: AsyncSession):
await sess.execute(text("UPDATE users SET seen_at = now() WHERE id=:id"), {"id": 42})
async def main():
async with SessionLocal() as sess:
# Standalone run
await runner.run_async(sess, update_user, name="update-user")
# Under a managed transaction
async with sess.begin():
await runner.run_async(sess, update_user, name="update-user")
Quick Start (Sync)
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker, Session
from dbop_runner import DBOpRunner
engine = create_engine("postgresql+psycopg://user:pass@localhost/db", pool_pre_ping=True, future=True)
SessionLocal = sessionmaker(engine, expire_on_commit=False)
runner = DBOpRunner(max_retries=3, default_lock_timeout_s=5)
def rename(sess: Session):
sess.execute(text("UPDATE projects SET name=:n WHERE id=:id"), {"id": 1, "n": "Demo"})
with SessionLocal() as sess, sess.begin():
runner.run(sess, rename, name="rename-project")
How It Works
- You own the outer transaction (
with sess.begin(): ...) —dbop-runnernever commits for you. - Each operation runs inside a SAVEPOINT using
begin_nested(). - Transient errors (deadlocks, timeouts) rollback only the inner step and retry cleanly.
- If no transaction is open, the runner opens one automatically.
This pattern fits production systems where multiple steps form one atomic unit of work.
Common Patterns
Use dbop-runner whenever you want to run database operations safely and predictably — especially when you’re dealing with transient issues like deadlocks, lock waits, or query timeouts. It gives you a consistent retry and timeout layer across PostgreSQL, MySQL/MariaDB, and SQLite — without having to change your ORM or connection code.
Batch of steps
Run multiple operations as a transaction, letting dbop-runner manage per-step retries:
with sess.begin():
runner.run(sess, step_a, name="step-a")
runner.run(sess, step_b, name="step-b")
Ideal when each step might temporarily fail (e.g., due to lock contention), but you still want the entire batch to commit atomically once all succeed.
Read-only operation
For queries or analytics tasks that don’t modify data:
rows = runner.run(sess, lambda s: s.execute(text("SELECT ...")), name="read-x", read_only=True)
Enables safer parallel read access while enforcing retry and timeout policies.
Conditional abort
Abort gracefully within a transaction when business logic requires it:
with sess.begin():
runner.run(sess, step_a, name="step-a")
if should_abort:
raise RuntimeError("abort")
runner.run(sess, step_b, name="step-b")
If an exception occurs, dbop-runner automatically rolls back and logs context for debugging.
Per-operation timeouts
Fine-tune lock and statement timeouts per call instead of globally:
runner.run(sess, step_fn, name="step", lock_timeout_s=3, stmt_timeout_s=10)
This is perfect for isolating slow or high-contention queries without affecting the rest of the workload.
| Backend | Timeout Parameters |
|---|---|
| PostgreSQL | SET LOCAL lock_timeout, SET LOCAL statement_timeout |
| MySQL/MariaDB | innodb_lock_wait_timeout, MAX_EXECUTION_TIME |
| SQLite | connect_args={"timeout": …} |
Logging
DBOpRunner emits structured log records.
Example failure:
{
"message": "update-user failed",
"dialect": "postgresql",
"attempt": 0,
"transient": True,
"sync": False,
"exception": "DBAPIError('...')"
}
Inject a custom logger:
runner = DBOpRunner(logger=my_logger)
Log levels:
INFO— final “done” recordWARNING— transient, retryable errorERROR— non-retryable error
Transient Error Detection
| Backend | Retryable conditions |
|---|---|
| PostgreSQL | deadlock (40P01), lock unavailable (55P03), OperationalError |
| MySQL/MariaDB | deadlock (1213), lock wait timeout (1205), connection errors (2006/2013) |
| SQLite | "database is locked", generic OperationalError |
Everything else is non-transient (no retry).
API Overview
class DBOpRunner:
def __init__(
*,
max_retries: int = 5,
initial_delay: float = 0.1,
max_delay: float = 1.0,
default_lock_timeout_s: int | None = 10,
default_stmt_timeout_s: int | None = None,
default_raises: bool = True,
logger: logging.Logger | None = None,
)
def run(...): ...
async def run_async(...): ...
Recipes
Read and write in one transaction
def read(sess):
return sess.execute(text("SELECT id FROM users WHERE state=:s"), {"s": "active"}).scalars().all()
def write(sess):
sess.execute(text("UPDATE users SET state='active' WHERE id=:id"), {"id": 7})
with sess.begin():
ids = runner.run(sess, read, name="list-active", read_only=True, stmt_timeout_s=5)
runner.run(sess, write, name="activate-user", lock_timeout_s=3)
Async batch
async def step_a(s): await s.execute(text("UPDATE t1 SET val='a' WHERE id=1"))
async def step_b(s): await s.execute(text("UPDATE t2 SET val='b' WHERE id=1"))
async with async_sess.begin():
await runner.run_async(async_sess, step_a, name="step-a")
await runner.run_async(async_sess, step_b, name="step-b")
Testing
Unit (SQLite)
make test
make cov
Integration (Docker)
Install deps (includes drivers):
make install-all
One-shot runs
make integration-pg # bring up Postgres, run PG tests, keep container up
make integration-mysql # bring up MySQL, run MySQL tests, keep container up
make integration-mariadb # bring up MariaDB, run MariaDB tests, keep container up
make integration-all # run PG -> MySQL -> MariaDB tests sequentially
Granular control
make integration-up # start Postgres
make integration-test-pg # run Postgres tests
make integration-down # stop and clean up
make integration-up-mysql # start MySQL
make integration-test-mysql # run MySQL tests
make integration-down # stop and clean up
make integration-up-mariadb # start MariaDB
make integration-test-mariadb # run MariaDB tests
make integration-down # stop and clean up
Logs (tail/follow)
make integration-logs # last 200 lines for all services
make integration-logs-pg # follow Postgres logs
make integration-logs-mysql # follow MySQL logs
make integration-logs-mariadb# follow MariaDB logs
Local Development
1. Install Dependencies
# Using uv (fast)
uv venv .venv
uv pip install -e '.[dev,postgres,mysql]'
Or with pip:
python -m venv .venv
. .venv/bin/activate
pip install -e '.[dev,postgres,mysql]'
2. Create .env (ignored by Git)
TEST_SYNC_DB_URL=sqlite:///./.pytest-sqlite.db
TEST_ASYNC_DB_URL=sqlite+aiosqlite:///./.pytest-sqlite.db
PYTHONPATH=src
Keep .env ignored and commit env.example instead.
3. Run Tests
make test
make cov
Migration Examples
Examples of migrating from explicit commit/rollback flows to DBOpRunner are available in the full documentation. Examples can be used as a small poc environment for trials.
Compatibility
- Python 3.9 – 3.13
- SQLAlchemy 2.x
- PostgreSQL, MySQL/MariaDB, SQLite
Changelog
See CHANGELOG.md
License
MIT © 2025–present. See LICENSE for full terms.
Support and Contact
For questions or issues, open an issue or discussion at https://github.com/yokha/dbop-runner
Developed by Youssef Khaya LinkedIn
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dbop_runner-1.0.2.tar.gz.
File metadata
- Download URL: dbop_runner-1.0.2.tar.gz
- Upload date:
- Size: 14.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
95af6ed6fb7a88a05f94acf1b6765f646a692f9fbc4f16f0238b04415a066d65
|
|
| MD5 |
f3580712df270c7102f2eb360bb1948f
|
|
| BLAKE2b-256 |
e2960964143ce6cd2cd0f07f6bec31841368690e8d7eca65d96688e874e0013a
|
File details
Details for the file dbop_runner-1.0.2-py3-none-any.whl.
File metadata
- Download URL: dbop_runner-1.0.2-py3-none-any.whl
- Upload date:
- Size: 9.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4ee5d57e2415d61eac092413d816463f0a050cbf4aa8cbf3a3957de57d136f9d
|
|
| MD5 |
0ca777a4301e8188a15f30976bf2c82b
|
|
| BLAKE2b-256 |
159ede8295f7c4ecda65a73873cd555e05abd84c836a24c17c4e195699ad3bad
|