DB-agnostic operation runner with retries/backoff and pluggable scopes/classifiers
Project description
dbop-core
DB-agnostic retry runner for Python database operations.
You bring the driver or ORM — dbop-core gives you:
- Retries with backoff and jitter
- Attempt scopes (transaction / SAVEPOINT wrappers)
- Per-attempt hooks (e.g. set timeouts, apply metadata)
- Transient error classification
- Optional OTLP traces + metrics via OpenTelemetry
Lightweight and composable — the core doesn’t know your driver.
Adapters live under contrib/ (SQLAlchemy, psycopg, asyncpg, aiomysql, aiosqlite, generic DB-API).
When to Use
Use dbop-core when you need resilience for a single logical DB step:
- Occasional deadlocks or lock wait timeouts
- Slow statements that risk blocking your pool
- SAVEPOINT-style retries inside an outer transaction
- Per-attempt timeouts without rewriting your app logic
It’s not a migration tool or pooler — just a precise execution runner for safe retries.
Features
- ✅ Async-first core API, works with sync and async drivers
- 🔁 Retry policy: max retries, exponential backoff, jitter, caps
- 🧩 Attempt scopes: pluggable context managers (transaction/savepoint)
- ⚙️ Per-attempt hooks: run custom setup (timeouts, instrumentation)
- 🧠 Transient classifier: decide whether an exception should retry
- 📈 Optional OTLP observability:
- Traces (spans per operation + attempt)
- Metrics (counters + histograms) via OpenTelemetry
Installation
pip install dbop-core
# optional extras for contrib adapters
pip install "dbop-core[sqlalchemy]"
pip install "dbop-core[psycopg]"
pip install "dbop-core[asyncpg]"
pip install "dbop-core[aiomysql]"
pip install "dbop-core[aiosqlite]"
# optional OTEL support (traces + metrics)
pip install "dbop-core[otel]"
Compatibility: Python 3.9 – 3.13
Quickstart
from dbop_core.core import execute, RetryPolicy
async def op(x):
return x * 2
result = await execute(op, args=(21,), policy=RetryPolicy())
assert result == 42
Core API (Essentials)
await execute(
op, # callable: sync or async
args=(), kwargs=None,
retry_on=(Exception,), # types to retry
classifier=None, # fn(exc) -> bool; True = retry
raises=True, # if False, return default on final failure
default=None,
policy=RetryPolicy(), # backoff/jitter settings
attempt_scope=None, # sync AttemptScope
attempt_scope_async=None, # async AttemptScope
pre_attempt=None, # async setup hook
read_only=False, # passed to scopes
overall_timeout_s=None, # per-attempt timeout
)
Semantics
- Only exceptions in
retry_onare candidates for retry. - If
classifieris provided, it takes precedence per exception (True-> retry,False-> stop). overall_timeout_scancels the attempt; ifraises=False, you getdefault.pre_attemptis always async — even for sync drivers (wrap your sync setup withasync def pre(): ...).
Execution Flow (Conceptual Diagram)
Below is a simplified view of what happens inside execute() during retries.
┌──────────────────────────────────────────────────────────────┐
│ execute() lifecycle │
└──────────────────────────────────────────────────────────────┘
│
▼
[1] start execute()
│
│
▼
[2] initialize RetryPolicy
- max_retries, delay, jitter, etc.
- retry_on exception types
│
▼
[3] for each attempt (1..N):
│
├─► [3.1] pre_attempt()
│ (async setup hook)
│ e.g., apply_timeouts, reset state
│
├─► [3.2] attempt_scope / attempt_scope_async
│ (transaction or SAVEPOINT wrapper)
│
├─► [3.3] call op(*args, **kwargs)
│ (sync or async function)
│
├─► [3.4] if success -> return result
│
├─► [3.5] if exception:
│ ├─ check type in retry_on
│ ├─ run classifier(exc)
│ ├─ if transient -> sleep(backoff) -> retry
│ └─ else -> re-raise (or return default)
│
▼
[4] if all retries failed:
- return default (if raises=False)
- or raise last exception
Key concepts:
attempt_scopeisolates one DB operation (transaction or savepoint). If the attempt fails, the scope rolls back and prepares for retry.pre_attemptruns before each try — perfect for timeouts, instrumentation, or context tagging.RetryPolicydetermines how long to wait and how many times to retry.
Design Philosophy
Database operations often need fine-grained resilience — but frameworks usually give you an all-or-nothing approach:
- Retry at the HTTP or ORM layer (too coarse).
- Manual retry loops around transactions (too error-prone).
- Connection poolers that retry implicitly (too opaque).
dbop-core exists to make retries explicit, minimal, and driver-agnostic.
It focuses on one unit of work — one statement, one transaction, one savepoint — and lets you decide:
- ✅ When to retry (
classifier,retry_on) - ✅ How to retry (
RetryPolicy, exponential backoff + jitter) - ✅ Where to isolate (
attempt_scope/attempt_scope_async) - ✅ What to prepare before each try (
pre_attempthook)
Everything else — connection pooling, ORM sessions, schema migration — stays out of scope.
This separation keeps dbop-core composable, transparent, and safe to embed anywhere in your stack — from raw DB-API connections to async SQLAlchemy sessions or FastAPI background tasks.
In short:
dbop-coredoesn’t manage your database. It helps you survive it.
Execution modes:
| Driver Type | Scope used | Hook type | Example Adapter |
|---|---|---|---|
| Sync | attempt_scope |
apply_timeouts_sync() |
DB-API, SQLAlchemy |
| Async | attempt_scope_async |
apply_timeouts_async() |
asyncpg, psycopg, aiomysql, aiosqlite |
Contrib Adapters
| Adapter | Sync/Async | Backend | File |
|---|---|---|---|
| DB-API (generic) | Sync | Postgres/MySQL/SQLite | contrib/dbapi_adapter.py |
| SQLAlchemy (Session) | Sync | Any | contrib/sqlalchemy_adapter.py |
| SQLAlchemy (AsyncSession) | Async | Any | contrib/sqlalchemy_adapter_async.py |
| psycopg 3 | Async | Postgres | contrib/psycopg_adapter.py |
| asyncpg | Async | Postgres | contrib/asyncpg_adapter.py |
| aiomysql | Async | MySQL/MariaDB | contrib/aiomysql_adapter.py |
| aiosqlite | Async | SQLite | contrib/aiosqlite_adapter.py |
SQLAlchemy (Sync Example)
import asyncio
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from dbop_core.core import execute, RetryPolicy
from dbop_core.contrib.sqlalchemy_adapter import attempt_scope_sync
engine = create_engine("sqlite+pysqlite:///:memory:")
Session = sessionmaker(bind=engine)
def setup(sess):
sess.execute(text("CREATE TABLE IF NOT EXISTS kv(k TEXT PRIMARY KEY, v TEXT)"))
def put(sess, k, v):
sess.execute(text("INSERT OR REPLACE INTO kv VALUES (:k,:v)"), {"k": k, "v": v})
def get(sess, k):
return sess.execute(text("SELECT v FROM kv WHERE k=:k"), {"k": k}).scalar()
async def main():
pol = RetryPolicy(max_retries=3, initial_delay=0.05, max_delay=0.2)
with Session() as sess:
with sess.begin():
setup(sess)
with sess.begin():
await execute(
lambda: put(sess, "hello", "world"),
attempt_scope=lambda r=False: attempt_scope_sync(sess, read_only=r),
policy=pol,
)
with sess.begin():
val = await execute(
lambda: get(sess, "hello"),
attempt_scope=lambda r=False: attempt_scope_sync(sess, read_only=r),
policy=pol,
read_only=True,
)
print(val)
asyncio.run(main())
psycopg (Postgres, Async)
from functools import partial
from psycopg import AsyncConnection
from dbop_core.core import execute, RetryPolicy
from dbop_core.classify import dbapi_classifier
from dbop_core.contrib.psycopg_adapter import attempt_scope_async, apply_timeouts_async
DSN = "postgresql://postgres:postgres@localhost:5432/dbop"
async def pre(conn): # per-attempt setup
await apply_timeouts_async(conn, lock_timeout_s=3, stmt_timeout_s=10)
async def run():
async with AsyncConnection.connect(DSN) as conn:
pol = RetryPolicy(max_retries=5, initial_delay=0.05, max_delay=0.5)
await execute(
lambda: conn.execute("INSERT INTO items(name) VALUES ('gamma') ON CONFLICT DO NOTHING"),
classifier=dbapi_classifier,
attempt_scope_async=lambda r=False: attempt_scope_async(conn, read_only=r),
pre_attempt=partial(pre, conn),
policy=pol,
)
count = await execute(
lambda: conn.execute("SELECT COUNT(*) FROM items"),
classifier=dbapi_classifier,
attempt_scope_async=lambda r=False: attempt_scope_async(conn, read_only=r),
pre_attempt=partial(pre, conn),
policy=pol,
read_only=True,
)
print("count:", count)
Generic DB-API (Sync, e.g. SQLite)
import asyncio, sqlite3
from dbop_core.core import execute, RetryPolicy
from dbop_core.contrib.dbapi_adapter import attempt_scope_sync, apply_timeouts_sync
conn = sqlite3.connect(":memory:")
def create():
conn.execute("CREATE TABLE IF NOT EXISTS t(x INT)")
def insert():
conn.execute("INSERT INTO t(x) VALUES (1)")
def count():
return conn.execute("SELECT COUNT(*) FROM t").fetchone()[0]
async def pre():
apply_timeouts_sync(conn, backend="sqlite", lock_timeout_s=3)
async def main():
create()
pol = RetryPolicy(max_retries=2, initial_delay=0.05, max_delay=0.2)
await execute(
lambda: insert(),
attempt_scope=lambda r=False: attempt_scope_sync(conn, read_only=r, backend="sqlite"),
pre_attempt=pre,
policy=pol,
)
n = await execute(
lambda: count(),
attempt_scope=lambda r=False: attempt_scope_sync(conn, read_only=True, backend="sqlite"),
pre_attempt=pre,
policy=pol,
read_only=True,
)
print("rows:", n)
asyncio.run(main())
Timeout Mapping (per attempt)
| Backend | Mechanism |
|---|---|
| PostgreSQL | SET LOCAL lock_timeout, SET LOCAL statement_timeout |
| MySQL/MariaDB | innodb_lock_wait_timeout, MAX_EXECUTION_TIME (best-effort) |
| SQLite | PRAGMA busy_timeout (connection-level) |
Use your adapter’s apply_timeouts_* in pre_attempt().
Transient Classification
dbapi_classifier detects common transient patterns:
| Backend | Typical Transient Codes / Messages |
|---|---|
| Postgres | 40P01 (deadlock), 55P03 (lock not available) |
| MySQL/MariaDB | 1213, 1205, connection lost |
| SQLite | database is locked |
| Generic | Operational/timeouts from DB-API |
You can always plug in your own classifier:
classifier(exc) -> bool.
Examples
cd examples
cp env.example .env # configure DSNs
# SQLite (local)
make install-sqlite && make run-sqlite
# Postgres (Docker)
make pg-up && make install-psycopg && make run-psycopg
make install-asyncpg && make run-asyncpg
make pg-down
# MySQL (Docker)
make mysql-up && make install-mysql && make run-mysql
make mysql-down
# OTEL (Collector + Jaeger + Prometheus + Grafana)
make otel-up
make otel-smoke-local-http # or make otel-smoke-local-grpc
More details in examples/README.md.
Observability (OTLP / OpenTelemetry)
dbop-core optionally emits OpenTelemetry traces and metrics for every execution:
Traces
When OTEL is enabled, execute_traced_optional():
-
Creates a root span for each logical DB operation.
-
Wraps each retry in an attempt span.
-
Adds attributes such as:
dbop.max_retries,dbop.initial_delay,dbop.max_delay,dbop.jitterdbop.outcome(success/error)db.system,db.name,db.user,db.statement
-
Emits events
dbop.pre_attemptfor each attempt.
Metrics
The OTEL metrics layer exposes (via the Collector → Prometheus):
dbop_dbop_operations_total— number of DB operationsdbop_dbop_attempts_total— number of attempts (including retries)dbop_dbop_operation_duration_seconds— latency histogram
Metrics are labelled with backend + outcome attributes, so you can break down:
- Read-only vs non-read-only
- Success vs failure
- Per-DB system / database
Enabling OTEL
Install extras:
pip install "dbop-core[otel]"
Minimal env setup:
export DBOP_OTEL_ENABLED=1
export DBOP_OTEL_EXPORTER=http # or grpc
# typical HTTP endpoints (with an OTEL Collector running locally)
export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://127.0.0.1:4318/v1/traces
export OTEL_EXPORTER_OTLP_METRICS_ENDPOINT=http://127.0.0.1:4318/v1/metrics
Then use init_tracer / init_metrics and execute_traced_optional() (see docs/OTEL.md).
Learn more
See docs/OTEL.md for:
- Design & structure (
otel_setup,otel_runtime) - Env variable matrix
- Example
docker-composefor Collector + Jaeger + Prometheus + Grafana - The OTEL smoke demo (
examples/otel-smoke/)
Changelog
See CHANGELOG.md
License
MIT
Author
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dbop_core-1.1.0.tar.gz.
File metadata
- Download URL: dbop_core-1.1.0.tar.gz
- Upload date:
- Size: 31.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
53629c8fc9c675a9d2d06c361ac2251ff61cf45572b2800b34ec1c65782652e2
|
|
| MD5 |
7e4362e2f1c1119ee75b826f9f71df4a
|
|
| BLAKE2b-256 |
a8a0c5f2e71950eba474409b0845a028000c30d64a1d3675e3701814c3db5c0f
|
File details
Details for the file dbop_core-1.1.0-py3-none-any.whl.
File metadata
- Download URL: dbop_core-1.1.0-py3-none-any.whl
- Upload date:
- Size: 21.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7f7fa46d46fcb7e2da79789c5271e748e55fdc6685df8f3340e17bf9e5c023bb
|
|
| MD5 |
9994cc5bfa7edb8c299199c62f8edf22
|
|
| BLAKE2b-256 |
e0ccb8f369ed9a4c5ff7dc48d11ebaef69000f3c215a6a1e69fed6be420fb1f6
|