Skip to main content

A lightweight and type-safe repository pattern implementation for SQLAlchemy async with Pydantic integration

Project description

Simple Repository AsyncSQLA

A lightweight and type-safe repository pattern implementation for SQLAlchemy async with Pydantic integration.

Features

  • 🚀 Async-first design
  • 🔒 Type-safe CRUD operations
  • 🎯 Easy integration with SQLAlchemy models
  • 📦 Pydantic support out of the box
  • 🛠 Generic repository pattern implementation
  • 📝 Full type hints support

Installation

pip install simple-repo-asyncsqla

Quick Start

Common example

from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

from pydantic import BaseModel, ConfigDict

from simple_repository import crud_factory
from simple_repository.exceptions import NotFoundException

# Define your models
class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str]
    email: Mapped[str]
    is_active: Mapped[bool] = mapped_column(default=True)

# You can use dataclass or a regular class with inherit BaseDomainModel, refer to the protocol - DomainModel for details
class UserDomain(BaseModel):
    id: int = 0
    name: str
    email: str
    is_active: bool = True
    
    model_config = ConfigDict(from_attributes=True)

# You can use dataclass or a regular class with inherit BaseSchema, refer to the protocol - Schema for details
class UserPatch(BaseModel):
    name: str | None = None
    email: str | None = None
    is_active: bool | None = None

engine = create_async_engine("sqlite+aiosqlite:///./db.sqlite3")
async_session_factory = async_sessionmaker(engine, expire_on_commit=False) # Use session factory

# Create the repository class for User
UserRepositoryClass = crud_factory(User, UserDomain)
# Create a repository instance, passing the session factory
user_repo = UserRepositoryClass()

async def example():
    # Operations are now called on the repository instance
    async with async_session_factory() as session:
        # Create
        new_user = await user_repo.create(
            session, 
            UserDomain(name="John Doe", email="john@example.com")
        )
        
        # Read
        user = await user_repo.get_one(session, new_user.id)

        # Update
        user.name = "John Smith"
        updated = await user_repo.update(session, user)

        # Patch
        data = UserPatch(name="Fredy Smith", email="fredy@example.com")
        patched = await user_repo.patch(session, data, updated.id) # Pass ID for patch

        # List with pagination
        users, total = await user_repo.get_all(
            session,
            offset=0,
            limit=10,
            order_by="name",
            desc=True
        )
        
        # Delete
        await user_repo.remove(session, user.id)
        
        # Get exception
        try:
            user = await user_repo.get_one(session, new_user.name, column="name")
        except NotFoundException: 
            ...

Important part is sameness attrs in SQLA and in Domain models as in the example

class User(Base):
    __tablename__ = "users"

    # attrs: id, name, email, is_active
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str]
    email: Mapped[str]
    is_active: Mapped[bool] = mapped_column(default=True)

class UserDTO(BaseModel):

    # attrs: id, name, email, is_active
    id: int = 0
    name: str
    email: str
    is_active: bool = True
    
    model_config = ConfigDict(from_attributes=True)

If attributes do not match, DiffAtrrsOnCreateCrud exception will be raised upon CRUD creation.

Custom Repository

Extend the base repository with advanced query methods:

from sqlalchemy import select, case, func, text

from simple_repository import crud_factory
from simple_repository.abctract import IAsyncCrud

from .models.user import User
from .domains.user import UserDomain

# Define interface for custom operations OR don't
class IUserRepository(IAsyncCrud[UserSQLA, UserDomain]):
    @abstractmethod
    async def get_user_activity_stats(
        self,
        min_orders: int = 5,
        days_window: int = 30
    ) -> list[dict]:
        pass

# Inherit from the interface (if defined) and from the class created by the factory
class UserRepository(IUserRepository, crud_factory(UserSQLA, UserDomain)):
    """Custom repository with advanced analytical capabilities."""
    
    @classmethod
    async def get_user_activity_stats(
        cls,
        session,
        min_orders: int = 5,
        days_window: int = 30
    ) -> list[dict]:
        current_date = func.current_timestamp()
        window_date = current_date - text(f"interval '{days_window} days'")
        
        orders_stats = (
            select(
                Order.user_id,
                func.count().label('order_count'),
                func.sum(Order.total_amount).label('total_spent'),
                func.avg(Order.total_amount).label('avg_order_value'),
                func.count(case(
                    (Order.created_at > window_date, 1)
                )).label('recent_orders'),
                (func.max(Order.created_at) - func.min(Order.created_at)) /
                    func.nullif(func.count() - 1, 0)
                    .label('avg_order_interval')
            )
            .group_by(Order.user_id)
            .having(func.count() >= min_orders)
            .alias('orders_stats')
        )
        
        query = (
            select(
                cls.sqla_model.id,
                cls.sqla_model.name,
                cls.sqla_model.email,
                orders_stats.c.order_count,
                orders_stats.c.total_spent,
                orders_stats.c.avg_order_value,
                orders_stats.c.recent_orders,
                orders_stats.c.avg_order_interval,
                (
                    orders_stats.c.recent_orders * 0.4 +
                    func.least(orders_stats.c.total_spent / 1000, 10) * 0.3 +
                    (orders_stats.c.order_count * 0.3)
                ).label('engagement_score'),
                func.percent_rank().over(
                    order_by=orders_stats.c.total_spent
                ).label('spending_percentile')
            )
            .join(orders_stats, cls.sqla_model.id == orders_stats.c.user_id)
            .where(cls.sqla_model.is_active == True)
            .order_by(text('engagement_score DESC'))
        )
        
        result = await session.execute(query)
        return list(result.mappings().all())

# Usage example
async def analyze_user_activity(session):
    user_repo_instance = UserRepository()
    stats = await user_repo_instance.get_user_activity_stats(
        session,
        min_orders=5,   
        days_window=30   
    )
    return stats
    

Error Handling

from fastapi import HTTPException
from simple_repository.exceptions import NotFoundException

from .my_repository import user_crud

async def get_user(session, user_id: int) -> UserDTO:
    try:
        return await user_crud.get_one(session, user_id)
    except NotFoundException:
        raise HTTPException(status_code=404, detail="User not found")

Out-of-the-box Supported Operations

  • create
  • create_many
  • get_one
  • get_many
  • get_all
  • patch
  • update
  • remove
  • remove_many
  • count

Class Attribute Protection

The library now uses the FrozenClassAttributesMeta metaclass to prevent accidental or unintended reassignment of sqla_model and domain_model attributes at the class level after they have been set by crud_factory. This ensures the stability and predictability of repository behavior.

Attempting to modify these attributes after class creation will result in an AttributeError.

# Example:
MyUserRepoClass = crud_factory(UserSQLA, UserDomain)
try:
    MyUserRepoClass.sqla_model = SomeOtherSQLA # This will raise an AttributeError
    MyUserRepoClass.domain_model = SomeOtherDomain # This also will raise an AttributeError
except AttributeError as e:
    print(f"Error: {e}")

Contributing

Pull requests are welcome! For major changes, please open an issue first to discuss what you would like to change.

License

This project is licensed under the MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

simple_repo_asyncsqla-1.0.0.tar.gz (31.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

simple_repo_asyncsqla-1.0.0-py3-none-any.whl (11.6 kB view details)

Uploaded Python 3

File details

Details for the file simple_repo_asyncsqla-1.0.0.tar.gz.

File metadata

File hashes

Hashes for simple_repo_asyncsqla-1.0.0.tar.gz
Algorithm Hash digest
SHA256 217dc838a6000040220af444c810de6635c997527ac92c571bab4a55d282a102
MD5 f7cd1378118a58731dfaaffe2530acdf
BLAKE2b-256 caa40e36cda68846fbb4b6fe779735779b4835847580a85c403d5a1444afc85d

See more details on using hashes here.

File details

Details for the file simple_repo_asyncsqla-1.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for simple_repo_asyncsqla-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 87b3799954be5f3c407287ad5fe68ce03b83ceee3ac799740ef12d8f7b9d0915
MD5 9c70ad4bc872ab10d90e12238a9321db
BLAKE2b-256 243da4273cca70d8959466f60d681e4583283499ed307a637cd6693b7c46bdd2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page