Skip to main content

A lightweight, flexible and type-safe repository pattern implementation for SQLAlchemy async with Pydantic integration

Project description

Simple Repository AsyncSQLA

A lightweight, flexible, and type-safe repository pattern implementation for SQLAlchemy Async, featuring seamless integration with Pydantic, Dataclasses, and standard Python objects.

Features

  • 🚀 Async-first design
  • 🔒 Type-safe CRUD operations
  • ⚗ Robust SQLAlchemy Async support
  • 📦 Out-of-the-box support for Pydantic (v2), Dataclasses, and Dicts
  • 🛠 Extensible mapping logic (to_repr / to_inner)
  • 📝 Full type hints support
  • 🛡 Attribute protection via Metaclasses

Installation

pip install simple-repo-asyncsqla

Update

More control for model converting

Now you must control converting proccess

class UserRepository(crud_factory(UserSQLA, UserDomain, UserCreate, UserPatch)):
    # Define your custom methods here
    def to_repr(self, obj: SA) -> DM:
        return self.domain_model.model_validate(obj)

    def to_inner(self, data: CS | DM | PS) -> dict:
        return data.model_dump(exclude_unset=True)

More supported types

Use dict as domain model

class UserRepository(crud_factory(UserSQLA, dict, UserCreate, UserPatch)):
    # Define your custom methods here
    def to_repr(self, obj: SA) -> DM:
        return {
            "id": obj.id,
            "name": obj.name,
            "email": obj.email,
            "is_active": obj.is_active
            # your fileds
        }

    def to_inner(self, data: CS | DM | PS) -> dict:
        if isinstance(data, dict):
            return data
        return data.model_dump(exclude_unset=True)

Or use SQLAlchemy model as domain model

class UserRepository(crud_factory(UserSQLA, UserSQLA, UserCreate, UserPatch)):
    # Define your custom methods here
    def to_repr(self, obj: SA) -> DM:
        return obj

    def to_inner(self, data: CS | DM | PS) -> dict:
        if isinstance(data, UserSQLA):
            return {
                "id": data.id,
                "name": data.name,
                "email": data.email,
                "is_active": data.is_active
                # your fileds
            }
        return data.model_dump(exclude_unset=True)

Or dict for any schema

class UserRepository(crud_factory(UserSQLA, dict, dict, dict)):
    # Define your custom methods here
    def to_repr(self, obj: SA) -> DM:
        return {
            "id": obj.id,
            "name": obj.name,
            "email": obj.email,
            "is_active": obj.is_active
            # your fileds
        }

    def to_inner(self, data: CS | DM | PS) -> dict:
        
        return data

Quick Start

Basic Example

from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

from pydantic import BaseModel, ConfigDict

from simple_repository import crud_factory
from simple_repository.exceptions import NotFoundException

# Define SQLAlchemy model
class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    name: Mapped[str]
    email: Mapped[str]
    is_active: Mapped[bool] = mapped_column(default=True)

# Define domain model
class UserDomain(BaseModel):
    id: int
    name: str
    email: str
    is_active: bool
    
    model_config = ConfigDict(from_attributes=True)

# Define schemas for create/update operations
class UserCreate(BaseModel):
    name: str
    email: str
    is_active: bool | None = None

class UserPatch(BaseModel):
    name: str | None = None
    email: str | None = None
    is_active: bool | None = None

# Setup async database
engine = create_async_engine("sqlite+aiosqlite:///./db.sqlite3")
async_session_factory = async_sessionmaker(engine, expire_on_commit=False)

# Create repository class
class UserRepository(crud_factory(User, UserDomain, UserCreate, UserPatch)):
    def to_repr(self, obj: SA) -> DM:
        return self.domain_model.model_validate(obj)

    def to_inner(self, data: CS | DM | PS) -> dict:
        return data.model_dump(exclude_unset=True)


# Use in your async application
async def main():
    user_repo = UserRepository()
    
    async with async_session_factory() as session:
        # Create - single record
        new_user = await user_repo.create(
            session,
            UserCreate(name="John Doe", email="john@example.com")
        )
        print(f"Created user: {new_user}")
        
        # Create many - multiple records
        users_data = [
            UserCreate(name="Jane Doe", email="jane@example.com"),
            UserCreate(name="Bob Smith", email="bob@example.com"),
        ]
        created_users = await user_repo.create_many(session, users_data)
        print(f"Created {len(created_users)} users")
        
        # Get one - by primary key
        user = await user_repo.get_one(session, new_user.id)
        print(f"Retrieved user: {user}")
        
        # Get many - multiple records by IDs
        user_ids = [new_user.id, created_users[0].id]
        many_users = await user_repo.get_many(session, user_ids)
        print(f"Retrieved {len(many_users)} users by IDs")
        
        # Get all - with pagination
        all_users, total = await user_repo.get_all(
            session,
            offset=0,
            limit=10,
            order_by="name"
        )
        print(f"Total users: {total}, Retrieved: {len(all_users)}")
        
        # Count - get total count
        count = await user_repo.count(session)
        print(f"Total count: {count}")
        
        # Update - replace entire record
        user.name = "John Smith"
        updated_user = await user_repo.update(session, user)
        print(f"Updated user: {updated_user}")
        
        # Patch - partial update
        patch_data = UserPatch(email="john.smith@example.com")
        patched_user = await user_repo.patch(session, patch_data, new_user.id)
        print(f"Patched user: {patched_user}")
        
        # Remove - delete single record
        await user_repo.remove(session, created_users[1].id)
        print(f"Removed 1 user")
        
        # Remove many - delete multiple records
        remove_ids = [created_users[0].id]
        removed_count = await user_repo.remove_many(session, remove_ids)
        print(f"Removed {removed_count} users")
        
        # Handle not found exception
        try:
            await user_repo.get_one(session, 9999)
        except NotFoundException:
            print("User not found (expected)")

Custom Repository

Extend the base repository with advanced query methods:

from sqlalchemy import select, case, func, text
from sqlalchemy.ext.asyncio import AsyncSession

from simple_repository import crud_factory
from simple_repository.abctract import IAsyncCrud

from .models.user import User as UserSQLA
from .domains.user import UserDomain
from .schemes.user import UserCreate, UserPatch
from .db import async_session_factory

# Define interface for custom operations OR don't
class IUserRepository(IAsyncCrud[UserSQLA, UserDomain, UserCreate, UserPatch]):
    @abstractmethod
    async def get_user_activity_stats(
        self,
        session: AsyncSession,
        min_orders: int = 5,
        days_window: int = 30
    ) -> list[dict]:
        pass

# Inherit from the interface (if defined) and from the class created by the factory
class UserRepository(IUserRepository, crud_factory(UserSQLA, UserDomain, UserCreate, UserPatch)):
    def to_inner(self, data: CS | DM | PS) -> dict:
        return data.model_dump(exclude_unset=True)

    def to_repr(self, object: SA) -> DM:
        return self.domain_model.model_validate(object)


    """Custom repository with advanced analytical capabilities."""
    async def get_user_activity_stats(
        self,
        session: AsyncSession,
        min_orders: int = 5,
        days_window: int = 30
    ) -> list[dict]:
        current_date = func.current_timestamp()
        window_date = current_date - text(f"interval '{days_window} days'")
        
        orders_stats = (
            select(
                Order.user_id,
                func.count().label('order_count'),
                func.sum(Order.total_amount).label('total_spent'),
                func.avg(Order.total_amount).label('avg_order_value'),
                func.count(case(
                    (Order.created_at > window_date, 1)
                )).label('recent_orders'),
                (func.max(Order.created_at) - func.min(Order.created_at)) /
                    func.nullif(func.count() - 1, 0)
                    .label('avg_order_interval')
            )
            .group_by(Order.user_id)
            .having(func.count() >= min_orders)
            .alias('orders_stats')
        )
        
        query = (
            select(
                self.sqla_model.id,
                self.sqla_model.name,
                self.sqla_model.email,
                orders_stats.c.order_count,
                orders_stats.c.total_spent,
                orders_stats.c.avg_order_value,
                orders_stats.c.recent_orders,
                orders_stats.c.avg_order_interval,
                (
                    orders_stats.c.recent_orders * 0.4 +
                    func.least(orders_stats.c.total_spent / 1000, 10) * 0.3 +
                    (orders_stats.c.order_count * 0.3)
                ).label('engagement_score'),
                func.percent_rank().over(
                    order_by=orders_stats.c.total_spent
                ).label('spending_percentile')
            )
            .join(orders_stats, self.sqla_model.id == orders_stats.c.user_id)
            .where(self.sqla_model.is_active == True)
            .order_by(text('engagement_score DESC'))
        )
        
        result = await session.execute(query)
        return list(result.mappings().all())

# Usage example
async def analyze_user_activity():
    user_repo_instance = UserRepository()
    with async_session_factory() as session:
        stats = await user_repo_instance.get_user_activity_stats(
            session,
            min_orders=5,   
            days_window=30   
        )
        return stats
    

Error Handling

from fastapi import HTTPException, APIRouter
from simple_repository.exceptions import NotFoundException
from sqlalchemy.ext.asyncio import AsyncSession

from .db import async_session_factory
from .my_repository import user_crud
from .domains.user import UserDomain

router = APIRouter("/user")

@router.get("/{user_id}")
async def get_user(
    session: Annotated[AsyncSession, Depends(async_session_factory)], 
    user_id: int,
) -> UserDomain:
    try:
        return await user_crud.get_one(session, user_id)
    except NotFoundException:
        raise HTTPException(status_code=404, detail="User not found")

Out-of-the-box Supported Operations

  • create
  • create_many
  • get_one
  • get_many
  • get_all
  • patch
  • update
  • remove
  • remove_many
  • count

Class Attribute Protection

The library now uses the FrozenClassAttributesMeta metaclass to prevent accidental or unintended reassignment of sqla_model and domain_model attributes at the class level after they have been set by crud_factory. This ensures the stability and predictability of repository behavior.

Attempting to modify these attributes after class creation will result in an AttributeError.

# Example:
MyUserRepoClass = crud_factory(UserSQLA, UserDomain)
try:
    MyUserRepoClass.sqla_model = SomeOtherSQLA # This will raise an AttributeError
    MyUserRepoClass.domain_model = SomeOtherDomain # This also will raise an AttributeError
except AttributeError as e:
    print(f"Error: {e}")

Contributing

Pull requests are welcome! For major changes, please open an issue first to discuss what you would like to change.

License

This project is licensed under the MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

simple_repo_asyncsqla-2.2.0.tar.gz (35.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

simple_repo_asyncsqla-2.2.0-py3-none-any.whl (12.4 kB view details)

Uploaded Python 3

File details

Details for the file simple_repo_asyncsqla-2.2.0.tar.gz.

File metadata

  • Download URL: simple_repo_asyncsqla-2.2.0.tar.gz
  • Upload date:
  • Size: 35.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.1 {"installer":{"name":"uv","version":"0.11.1","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for simple_repo_asyncsqla-2.2.0.tar.gz
Algorithm Hash digest
SHA256 4481111fdfc44a417373aaa5ba7b4db57221a14866eec5270849384cc1c724c6
MD5 f02fcf7af99f7ac37e669ecbe6e500f7
BLAKE2b-256 7fdc12a2f4c0e1b445e5e1f2717f9b2ec633d2b7023b0147dee9d045a7a17508

See more details on using hashes here.

File details

Details for the file simple_repo_asyncsqla-2.2.0-py3-none-any.whl.

File metadata

  • Download URL: simple_repo_asyncsqla-2.2.0-py3-none-any.whl
  • Upload date:
  • Size: 12.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.1 {"installer":{"name":"uv","version":"0.11.1","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for simple_repo_asyncsqla-2.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 778fee0dc5668a4009857c3f4515a77276886ef4cb27c34763a7e718f2415e8e
MD5 cc1344df9075d1abe51083f777162916
BLAKE2b-256 9f9463380e936b9f2d8e097ff2d7dde17afe4e98bce63041e8841d61d0f44e54

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page