Skip to main content

A lightweight and type-safe repository pattern implementation for SQLAlchemy async with Pydantic integration

Project description

Simple Repository AsyncSQLA

A lightweight and type-safe repository pattern implementation for SQLAlchemy async with Pydantic integration.

Features

  • 🚀 Async-first design
  • 🔒 Type-safe CRUD operations
  • 🎯 Easy integration with SQLAlchemy models
  • 📦 Pydantic support out of the box
  • 🛠 Generic repository pattern implementation
  • 📝 Full type hints support

Installation

pip install simple-repo-asyncsqla

Quick Start

Common example

from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

from pydantic import BaseModel, ConfigDict

from simple_repository import crud_factory
from simple_repository.exceptions import NotFoundException

# Define your models
class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str]
    email: Mapped[str]
    is_active: Mapped[bool] = mapped_column(default=True)

# you can use dataclass or normal class but see protocol - DomainModel
class UserDTO(BaseModel):
    id: int = 0
    name: str
    email: str
    is_active: bool = True
    
    model_config = ConfigDict(from_attributes=True)

# you can use dataclass or normal class but see protocol - Schema
class UserPatch(BaseModel):
    name: str | None = None
    email: str | None = None
    is_active: bool | None = None

engine = create_async_engine("sqlite+aiosqlite:///./db.sqlite3")
async_session = async_sessionmaker(engine, expire_on_commit=False)

user_crud = crud_factory(User, UserDTO)

async def example():
    async with async_session() as session:
        # Create
        new_user = await user_crud.create(
            session, 
            UserDTO(name="John Doe", email="john@example.com")
        )
        
        # Read
        user = await user_crud.get_one(session, new_user.id)

        # Update
        user.name = "John Smith"
        updated = await user_crud.update(session, user)

        # Patch
        data = UserPatch(name="Fredy Smith", email="fredy@example.com")
        patched = await user_crud.patch(session, data, updated.id)

        # List with pagination
        users, total = await user_crud.get_all(
            session,
            offset=0,
            limit=10,
            order_by="name",
            desc=True
        )
        
        # Delete
        await user_crud.remove(session, user.id)
        
        # Get exception
        try:
            user = await user_crud.get_one(session, new_user.name, column="name")
        except NotFoundException: 
            ...

Important part is sameness attrs in SQLA and in Domain models as in the example

class User(Base):
    __tablename__ = "users"

    # attrs: id, name, email, is_active
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str]
    email: Mapped[str]
    is_active: Mapped[bool] = mapped_column(default=True)

class UserDTO(BaseModel):

    # attrs: id, name, email, is_active
    id: int = 0
    name: str
    email: str
    is_active: bool = True
    
    model_config = ConfigDict(from_attributes=True)

If attrs is not the same on create crud will be raise exception - DiffAtrrsOnCreateCrud.

Custom Repository

Extend the base repository with advanced query methods:

from sqlalchemy import select, case, func, text

from simple_repository import crud_factory

from .models.user import User
from .domains.user import UserDTO

class UserRepository(crud_factory(User, UserDTO)):
    """Custom repository with advanced analytics capabilities."""
    
    @classmethod
    async def get_user_activity_stats(
        cls,
        session,
        min_orders: int = 5,
        days_window: int = 30
    ) -> list[dict]:
        current_date = func.current_timestamp()
        window_date = current_date - text(f"interval '{days_window} days'")
        
        orders_stats = (
            select(
                Order.user_id,
                func.count().label('order_count'),
                func.sum(Order.total_amount).label('total_spent'),
                func.avg(Order.total_amount).label('avg_order_value'),
                func.count(case(
                    (Order.created_at > window_date, 1)
                )).label('recent_orders'),
                (func.max(Order.created_at) - func.min(Order.created_at)) /
                    func.nullif(func.count() - 1, 0)
                    .label('avg_order_interval')
            )
            .group_by(Order.user_id)
            .having(func.count() >= min_orders)
            .alias('orders_stats')
        )
        
        query = (
            select(
                cls.sqla_model.id,
                cls.sqla_model.name,
                cls.sqla_model.email,
                orders_stats.c.order_count,
                orders_stats.c.total_spent,
                orders_stats.c.avg_order_value,
                orders_stats.c.recent_orders,
                orders_stats.c.avg_order_interval,
                (
                    orders_stats.c.recent_orders * 0.4 +
                    func.least(orders_stats.c.total_spent / 1000, 10) * 0.3 +
                    (orders_stats.c.order_count * 0.3)
                ).label('engagement_score'),
                func.percent_rank().over(
                    order_by=orders_stats.c.total_spent
                ).label('spending_percentile')
            )
            .join(orders_stats, cls.sqla_model.id == orders_stats.c.user_id)
            .where(cls.sqla_model.is_active == True)
            .order_by(text('engagement_score DESC'))
        )
        
        result = await session.execute(query)
        return list(result.mappings().all())

# Usage example
async def analyze_user_activity(session):
    stats = await UserRepository.get_user_activity_stats(
        session,
        min_orders=5,   
        days_window=30   
    )
    

Error Handling

from simple_repository.exceptions import NotFoundException

from .my_repository import user_crud

async def get_user(session, user_id: int) -> UserDTO:
    try:
        return await user_crud.get_one(session, user_id)
    except NotFoundException:
        raise HTTPException(status_code=404, detail="User not found")

Support operations out the box

  • create
  • create_many
  • get_one
  • get_many
  • get_all
  • patch
  • update
  • remove
  • remove_many
  • count

Contributing

Pull requests are welcome! For major changes, please open an issue first to discuss what you would like to change.

License

This project is licensed under the MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

simple_repo_asyncsqla-0.1.4.tar.gz (29.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

simple_repo_asyncsqla-0.1.4-py3-none-any.whl (8.4 kB view details)

Uploaded Python 3

File details

Details for the file simple_repo_asyncsqla-0.1.4.tar.gz.

File metadata

File hashes

Hashes for simple_repo_asyncsqla-0.1.4.tar.gz
Algorithm Hash digest
SHA256 695039d27d5bf5d84dab702ba47e79dc15989ebe4cb52ad868da356de524bfaa
MD5 63258975e175c09599b0e8ab07cc1488
BLAKE2b-256 7e8b74b1c729df19b00a9d0fb77807c64300e036719914a3917aa3f60f892b57

See more details on using hashes here.

File details

Details for the file simple_repo_asyncsqla-0.1.4-py3-none-any.whl.

File metadata

File hashes

Hashes for simple_repo_asyncsqla-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 787f8c78b8feafe8dd464f671adbe3f39047b5caad92ebf7c452c072ad84c20e
MD5 0f105c1c163b47e6bc8835d1cf7695cb
BLAKE2b-256 6f10c69926cd34ae1f62949e9a06290e8d63689ca6dd68038a5fbf0bc5b16568

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page