Skip to main content

Async PostgreSQL client with connection pooling, query builder, and DAO pattern

Project description

adc-aiopg

Async-библиотека для PostgreSQL на базе asyncpg. Предоставляет пул соединений с бинарными кодеками, query builder поверх SQLAlchemy, generic DAO с фильтрами, систему версионирования таблиц и интеграцию с Alembic.

Установка

pip install git+https://github.com/ascet-dev/adc-aiopg.git@main

Быстрый старт

import asyncio
from adc_aiopg import create_db_pool, PGPoolManager

async def main():
    pool = await create_db_pool("postgresql://user:pass@localhost/mydb")
    pm = PGPoolManager(pool)

    async with pm.transaction() as conn:
        rows = await pm.fetch(
            select(users).where(users.c.active == True)
        )

    await pool.close()

asyncio.run(main())

API

create_db_pool

Создает asyncpg.Pool с предустановленными бинарными кодеками для jsonb, timestamp, timestamptz (через ujson).

from adc_aiopg import create_db_pool

pool = await create_db_pool(
    dsn="postgresql://user:pass@localhost/mydb",
    min_size=5,
    max_size=20,
)

compile_query

Компилирует SQLAlchemy-выражение в строку с позиционными параметрами $1, $2, ... для asyncpg.

from adc_aiopg import compile_query
from sqlalchemy import select

sql, params = compile_query(select(users).where(users.c.id == 42))
# sql: "SELECT ... WHERE users.id = $1"
# params: [42]

PGPoolManager

Обертка над пулом. Принимает SQLAlchemy-выражения, компилирует и выполняет. Результаты возвращаются как dict.

from adc_aiopg import PGPoolManager

pm = PGPoolManager(pool)

# Получить все строки
rows = await pm.fetch(select(users))

# Одна строка
row = await pm.fetchrow(select(users).where(users.c.id == 1))

# Скалярное значение
count = await pm.fetchval(select(func.count()).select_from(users))

# Контекстные менеджеры
async with pm.connection() as conn:
    ...

async with pm.transaction() as conn:
    ...

PGDataAccessObject[T]

Generic DAO поверх SQLModel. Полный CRUD с типизированными результатами. Хранит конфигурацию таблицы: model, metadata, table_name, table_model.

from sqlmodel import Field
from adc_aiopg import PGDataAccessObject
from adc_aiopg.types import Base

class User(Base):
    id: int = Field(primary_key=True)
    name: str
    email: str
    active: bool = True

# Быстрое создание DAO из модели
UserDAO = PGDataAccessObject.from_model(User, "users", metadata=meta)
user_dao = UserDAO(db_pool=pool)

# CRUD
user = await user_dao.create(name="Alice", email="alice@example.com")
user = await user_dao.get_by_id(1)
users = await user_dao.search(active=True)
user = await user_dao.update_by_id(1, name="Alice Smith")
await user_dao.delete_by_id(1)

# Soft delete (ставит archived=True)
await user_dao.archive_by_id(1)

Кастомный DAO

Наследуйтесь от PGDataAccessObject и добавляйте свои методы:

class UsersDAO(PGDataAccessObject[User], table_name="users"):
    model = User

    async def find_by_email(self, email: str) -> User | None:
        results = await self.search(email=email)
        return results[0] if results else None

Если модель уже является таблицей (table=True), bind() использует её как есть:

class User(Base, table=True):
    __tablename__ = "users"
    id: int = Field(primary_key=True)
    name: str

class UsersDAO(PGDataAccessObject[User]):
    model = User  # уже selectable — bind() не создаёт обёртку

Фильтры

DAO поддерживает суффиксные фильтры в search() и других методах:

# Сравнение
users = await dao.search(age_gt=18)          # age > 18
users = await dao.search(age_ge=18)          # age >= 18
users = await dao.search(age_lt=65)          # age < 65
users = await dao.search(age_le=65)          # age <= 65
users = await dao.search(status_ne="banned") # status != 'banned'

# Списки
users = await dao.search(role_in=["admin", "moderator"])
users = await dao.search(role_notin=["banned"])

# NULL
users = await dao.search(deleted_at_is=None)
users = await dao.search(deleted_at_isnot=None)

# LIKE / ILIKE
users = await dao.search(name_ilike="%alice%")

Пагинация

from adc_aiopg.types import Paginated

result: Paginated[User] = await dao.paginated_search(
    limit=20, offset=0, active=True
)
# result.items: list[User]
# result.pagination: Pagination(total=150, limit=20, offset=0)

PostgresAccessLayer + TableDescriptor

Декларативный слой доступа к БД. Группирует несколько DAO в одном объекте. Передаёт metadata всем DAO и вызывает bind() при определении класса (для миграций).

from adc_aiopg import PostgresAccessLayer, PGDataAccessObject, TableDescriptor
from sqlalchemy import MetaData

meta = MetaData()

class User(Base):
    id: int = Field(primary_key=True)
    name: str

class Post(Base):
    id: int = Field(primary_key=True)
    title: str
    author_id: int

# Простые DAO из моделей
class DB(PostgresAccessLayer, metadata=meta):
    users = TableDescriptor(PGDataAccessObject.from_model(User, "users"))
    posts = TableDescriptor(PGDataAccessObject.from_model(Post, "posts"))

# Кастомный DAO с дополнительными методами
class UsersDAO(PGDataAccessObject[User], table_name="users"):
    model = User

    async def find_by_email(self, email: str) -> User | None:
        results = await self.search(email=email)
        return results[0] if results else None

class DB(PostgresAccessLayer, metadata=meta):
    users: UsersDAO = TableDescriptor(UsersDAO)
    posts = TableDescriptor(PGDataAccessObject.from_model(Post, "posts"))

# Использование
db = DB(pool)
user = await db.users.find_by_email("alice@example.com")
posts = await db.posts.search(author_id=1)

# Произвольные запросы через PGPoolManager
count = await db.pm.fetchval(select(func.count()).select_from(users))

TableDescriptor — минимальный дескриптор, принимает DAO-класс. Если table_name не задан на DAO, используется имя атрибута как fallback.

Версионирование таблиц

Создает shadow-таблицу {table}_log и PostgreSQL-триггер для автоматического логирования изменений.

from adc_aiopg import declare_version_table

VersionedUser = declare_version_table(User)
# Создается таблица users_log с теми же колонками
# Триггер автоматически пишет в _log при INSERT/UPDATE/DELETE

Alembic-интеграция

В env.py вашего Alembic-проекта:

from adc_aiopg.alembic_env import run_alembic
from myapp.models import Base

run_alembic(
    sqlalchemy_url="postgresql://user:pass@localhost/mydb",
    target_metadata=Base.metadata,
)

Автоматически:

  • Нумерует миграции инкрементально (0001, 0002, ...)
  • Добавляет CREATE/DROP TRIGGER для версионированных таблиц

Типы

Base

Базовый класс для моделей (наследует SQLModel):

from adc_aiopg.types import Base

class User(Base, table=True):
    __tablename__ = "users"
    id: int = Field(primary_key=True)
    name: str

# Проекции
UserPartial = User.partial()       # все поля Optional
UserNames = User.only("id", "name")  # только указанные поля
UserNoEmail = User.exclude("email")  # все кроме указанных

sqla_enum

Хелпер для enum-полей с поддержкой SQLAlchemy:

from adc_aiopg.enum import sqla_enum

class Role(str, Enum):
    admin = "admin"
    user = "user"

class User(Base, table=True):
    role: Role = sqla_enum(Role)

Требования

  • Python >= 3.8
  • asyncpg >= 0.27.0
  • sqlalchemy >= 2.0.0
  • sqlmodel >= 0.0.8
  • pydantic >= 2.0.0
  • alembic >= 1.11.0
  • ujson >= 5.10.0

Лицензия

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

adc_aiopg-1.0.0.tar.gz (18.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

adc_aiopg-1.0.0-py3-none-any.whl (15.2 kB view details)

Uploaded Python 3

File details

Details for the file adc_aiopg-1.0.0.tar.gz.

File metadata

  • Download URL: adc_aiopg-1.0.0.tar.gz
  • Upload date:
  • Size: 18.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.2

File hashes

Hashes for adc_aiopg-1.0.0.tar.gz
Algorithm Hash digest
SHA256 e07600a6dd2caa7d025be0ae654ab73e5b69cdd7b5ee01ed3632c4c2d62fed33
MD5 774660ee416c6c490fb35b5be758c5da
BLAKE2b-256 b4d0d6b29509c5561b7cb350a3f33bc4464178299155ede16e75ef506b2a7b17

See more details on using hashes here.

File details

Details for the file adc_aiopg-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: adc_aiopg-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 15.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.2

File hashes

Hashes for adc_aiopg-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ab44d104198f5efe23d550df2f5b9f0253de5984a9294b876569a87e90d756f5
MD5 b6236f6bf03e355c3c7c39bfbd786f0b
BLAKE2b-256 ac83053da74884bbd5625361197d4e3283dc7ac7ad0af6961f5f2a012471d21c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page