Skip to main content

PostgreSQL and SQLAlchemy Tools

Project description

db-retry

A Python library providing robust retry mechanisms, connection utilities, and transaction helpers for PostgreSQL and SQLAlchemy applications.

Features

  • Retry Decorators: Automatic retry logic for transient database errors
  • Connection Factories: Robust connection handling with multi-host support
  • DSN Utilities: Flexible Data Source Name parsing and manipulation
  • Transaction Helpers: Simplified transaction management with automatic cleanup

Installation

Using uv

uv add db-retry

Using pip

pip install db-retry

ORM-Based Usage Examples

1. Database Operations with Automatic Retry

Protect your database operations from transient failures using ORM models:

import asyncio
import sqlalchemy as sa
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from db_retry import postgres_retry


class User(DeclarativeBase):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(sa.String())
    email: Mapped[str] = mapped_column(sa.String(), index=True)


# Apply retry logic to ORM operations
@postgres_retry
async def get_user_by_email(session: AsyncSession, email: str) -> User:
    return await session.scalar(
        sa.select(User).where(User.email == email)
    )


async def main():
    engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/mydb")
    async with AsyncSession(engine) as session:
        # Automatically retries on connection failures or serialization errors
        user = await get_user_by_email(session, "john.doe@example.com")
        if user:
            print(f"Found user: {user.name}")


asyncio.run(main())

2. High Availability Database Connections

Set up resilient database connections with multiple fallback hosts:

import sqlalchemy as sa
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from db_retry import build_connection_factory, build_db_dsn

# Configure multiple database hosts for high availability
multi_host_dsn = (
    "postgresql://user:password@/"
    "myapp_db?"
    "host=primary-db:5432&"
    "host=secondary-db:5432&"
    "host=backup-db:5432"
)

# Build production-ready DSN
dsn = build_db_dsn(
    db_dsn=multi_host_dsn,
    database_name="production_database",
    drivername="postgresql+asyncpg"
)

# Create connection factory with timeout
connection_factory = build_connection_factory(
    url=dsn,
    timeout=5.0  # 5 second connection timeout
)

# Engine will automatically try different hosts on failure
engine = create_async_engine(dsn, async_creator=connection_factory)

3. Simplified Transaction Management

Handle database transactions with automatic cleanup using ORM:

import dataclasses
import datetime
import typing

from schemas import AnalyticsEventCreate, AnalyticsEvent
from db_retry import Transaction, postgres_retry

from your_service_name.database.tables import EventsTable
from your_service_name.producers.analytics_service_events_producer import AnalyticsEventsProducer
from your_service_name.repositories.events_repository import EventsRepository
from your_service_name.settings import settings


@dataclasses.dataclass(kw_only=True, frozen=True, slots=True)
class CreateEventUseCase:
    events_repository: EventsRepository
    transaction: Transaction
    analytics_events_producer: AnalyticsEventsProducer

    @postgres_retry
    async def __call__(
            self,
            event_create_data: AnalyticsEventCreate,
    ) -> AnalyticsEvent:
        async with self.transaction:
            model: typing.Final = EventsTable(
                **event_create_data.model_dump(),
                created_at=datetime.datetime.now(tz=settings.common.default_timezone),
            )
            saved_event: typing.Final[EventsTable] = await self.events_repository.create(model)
            event: typing.Final = AnalyticsEvent.model_validate(saved_event)
            await self.analytics_events_producer.send_message(event)
            await self.transaction.commit()
            return event

4. Serializable Transactions for Consistency

Use serializable isolation level to prevent race conditions with ORM:

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from db_retry import Transaction


async def main():
    engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/mydb")

    async with AsyncSession(engine) as session:
        strict_transaction = Transaction(
            session=session,
            isolation_level="SERIALIZABLE",
        )
        # use strict_transaction where needed

Configuration

The library can be configured using environment variables:

Variable Description Default
DB_RETRY_RETRIES_NUMBER Number of retry attempts for database operations 3

Example:

export DB_UTILS_RETRIES_NUMBER=5

API Reference

Retry Decorator

  • @postgres_retry - Decorator for async functions that should retry on database errors

Connection Utilities

  • build_connection_factory(url, timeout) - Creates a connection factory for multi-host setups
  • build_db_dsn(db_dsn, database_name, use_replica=False, drivername="postgresql") - Builds a DSN with specified parameters
  • is_dsn_multihost(db_dsn) - Checks if a DSN contains multiple hosts

Transaction Helper

  • Transaction(session, isolation_level=None) - Context manager for simplified transaction handling

Requirements

  • Python 3.13+
  • SQLAlchemy with asyncio support
  • asyncpg PostgreSQL driver
  • tenacity for retry logic

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

db_retry-0.3.0.tar.gz (5.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

db_retry-0.3.0-py3-none-any.whl (7.1 kB view details)

Uploaded Python 3

File details

Details for the file db_retry-0.3.0.tar.gz.

File metadata

  • Download URL: db_retry-0.3.0.tar.gz
  • Upload date:
  • Size: 5.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for db_retry-0.3.0.tar.gz
Algorithm Hash digest
SHA256 b10814e94162df20bd94a6983922b05396ba2feb29fac74051b24e0113cdf3fb
MD5 4c6f00f1c3113be6aa6e07752f19b35d
BLAKE2b-256 96a9c5d52d56366a7d58de26d0f286c724e267d631c1efbd2037c1c26ca0ee2e

See more details on using hashes here.

File details

Details for the file db_retry-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: db_retry-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 7.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for db_retry-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 87f53ad9b7142c258900ffdc643041326aa1e30dee6e1945fb4ca013e48d38e6
MD5 afe4a59048ef38fedebfc536e96cf271
BLAKE2b-256 9dc7c2bf9ec2295f04ff530b365016fbc062aadcf94ae19b311ee148ecbb00ed

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page