A lightweight, flexible and type-safe repository pattern implementation for SQLAlchemy async with Pydantic integration
Project description
Simple Repository AsyncSQLA
A lightweight, flexible, and type-safe repository pattern implementation for SQLAlchemy Async, featuring seamless integration with Pydantic, Dataclasses, and standard Python objects.
Features
- 🚀 Async-first design
- 🔒 Type-safe CRUD operations
- ⚗ Robust SQLAlchemy Async support
- 📦 Out-of-the-box support for Pydantic (v2), Dataclasses, and Dicts
- 🛠 Extensible mapping logic (
to_repr/to_inner) - 📝 Full type hints support
- 🛡 Attribute protection via Metaclasses
Installation
pip install simple-repo-asyncsqla
Update
More control for model converting
Now you must control converting proccess
class UserRepository(crud_factory(UserSQLA, UserDomain, UserCreate, UserPatch)):
# Define your custom methods here
def to_repr(self, obj: SA) -> DM:
return self.domain_model.model_validate(obj)
def to_inner(self, data: CS | DM | PS) -> dict:
return data.model_dump(exclude_unset=True)
More supported types
Use dict as domain model
class UserRepository(crud_factory(UserSQLA, dict, UserCreate, UserPatch)):
# Define your custom methods here
def to_repr(self, obj: SA) -> DM:
return {
"id": obj.id,
"name": obj.name,
"email": obj.email,
"is_active": obj.is_active
# your fileds
}
def to_inner(self, data: CS | DM | PS) -> dict:
if isinstance(data, dict):
return data
return data.model_dump(exclude_unset=True)
Or use SQLAlchemy model as domain model
class UserRepository(crud_factory(UserSQLA, UserSQLA, UserCreate, UserPatch)):
# Define your custom methods here
def to_repr(self, obj: SA) -> DM:
return obj
def to_inner(self, data: CS | DM | PS) -> dict:
if isinstance(data, UserSQLA):
return {
"id": data.id,
"name": data.name,
"email": data.email,
"is_active": data.is_active
# your fileds
}
return data.model_dump(exclude_unset=True)
Or dict for any schema
class UserRepository(crud_factory(UserSQLA, dict, dict, dict)):
# Define your custom methods here
def to_repr(self, obj: SA) -> DM:
return {
"id": obj.id,
"name": obj.name,
"email": obj.email,
"is_active": obj.is_active
# your fileds
}
def to_inner(self, data: CS | DM | PS) -> dict:
return data
Quick Start
Basic Example
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from pydantic import BaseModel, ConfigDict
from simple_repository import crud_factory
from simple_repository.exceptions import NotFoundException
# Define SQLAlchemy model
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
name: Mapped[str]
email: Mapped[str]
is_active: Mapped[bool] = mapped_column(default=True)
# Define domain model
class UserDomain(BaseModel):
id: int
name: str
email: str
is_active: bool
model_config = ConfigDict(from_attributes=True)
# Define schemas for create/update operations
class UserCreate(BaseModel):
name: str
email: str
is_active: bool | None = None
class UserPatch(BaseModel):
name: str | None = None
email: str | None = None
is_active: bool | None = None
# Setup async database
engine = create_async_engine("sqlite+aiosqlite:///./db.sqlite3")
async_session_factory = async_sessionmaker(engine, expire_on_commit=False)
# Create repository class
class UserRepository(crud_factory(User, UserDomain, UserCreate, UserPatch)):
def to_repr(self, obj: SA) -> DM:
return self.domain_model.model_validate(obj)
def to_inner(self, data: CS | DM | PS) -> dict:
return data.model_dump(exclude_unset=True)
# Use in your async application
async def main():
user_repo = UserRepository()
async with async_session_factory() as session:
# Create - single record
new_user = await user_repo.create(
session,
UserCreate(name="John Doe", email="john@example.com")
)
print(f"Created user: {new_user}")
# Create many - multiple records
users_data = [
UserCreate(name="Jane Doe", email="jane@example.com"),
UserCreate(name="Bob Smith", email="bob@example.com"),
]
created_users = await user_repo.create_many(session, users_data)
print(f"Created {len(created_users)} users")
# Get one - by primary key
user = await user_repo.get_one(session, new_user.id)
print(f"Retrieved user: {user}")
# Get many - multiple records by IDs
user_ids = [new_user.id, created_users[0].id]
many_users = await user_repo.get_many(session, user_ids)
print(f"Retrieved {len(many_users)} users by IDs")
# Get all - with pagination
all_users, total = await user_repo.get_all(
session,
offset=0,
limit=10,
order_by="name"
)
print(f"Total users: {total}, Retrieved: {len(all_users)}")
# Count - get total count
count = await user_repo.count(session)
print(f"Total count: {count}")
# Update - replace entire record
user.name = "John Smith"
updated_user = await user_repo.update(session, user)
print(f"Updated user: {updated_user}")
# Patch - partial update
patch_data = UserPatch(email="john.smith@example.com")
patched_user = await user_repo.patch(session, patch_data, new_user.id)
print(f"Patched user: {patched_user}")
# Remove - delete single record
await user_repo.remove(session, created_users[1].id)
print(f"Removed 1 user")
# Remove many - delete multiple records
remove_ids = [created_users[0].id]
removed_count = await user_repo.remove_many(session, remove_ids)
print(f"Removed {removed_count} users")
# Handle not found exception
try:
await user_repo.get_one(session, 9999)
except NotFoundException:
print("User not found (expected)")
Custom Repository
Extend the base repository with advanced query methods:
from sqlalchemy import select, case, func, text
from sqlalchemy.ext.asyncio import AsyncSession
from simple_repository import crud_factory
from simple_repository.abctract import IAsyncCrud
from .models.user import User as UserSQLA
from .domains.user import UserDomain
from .schemes.user import UserCreate, UserPatch
from .db import async_session_factory
# Define interface for custom operations OR don't
class IUserRepository(IAsyncCrud[UserSQLA, UserDomain, UserCreate, UserPatch]):
@abstractmethod
async def get_user_activity_stats(
self,
session: AsyncSession,
min_orders: int = 5,
days_window: int = 30
) -> list[dict]:
pass
# Inherit from the interface (if defined) and from the class created by the factory
class UserRepository(IUserRepository, crud_factory(UserSQLA, UserDomain, UserCreate, UserPatch)):
def to_inner(self, data: CS | DM | PS) -> dict:
return data.model_dump(exclude_unset=True)
def to_repr(self, object: SA) -> DM:
return self.domain_model.model_validate(object)
"""Custom repository with advanced analytical capabilities."""
async def get_user_activity_stats(
self,
session: AsyncSession,
min_orders: int = 5,
days_window: int = 30
) -> list[dict]:
current_date = func.current_timestamp()
window_date = current_date - text(f"interval '{days_window} days'")
orders_stats = (
select(
Order.user_id,
func.count().label('order_count'),
func.sum(Order.total_amount).label('total_spent'),
func.avg(Order.total_amount).label('avg_order_value'),
func.count(case(
(Order.created_at > window_date, 1)
)).label('recent_orders'),
(func.max(Order.created_at) - func.min(Order.created_at)) /
func.nullif(func.count() - 1, 0)
.label('avg_order_interval')
)
.group_by(Order.user_id)
.having(func.count() >= min_orders)
.alias('orders_stats')
)
query = (
select(
self.sqla_model.id,
self.sqla_model.name,
self.sqla_model.email,
orders_stats.c.order_count,
orders_stats.c.total_spent,
orders_stats.c.avg_order_value,
orders_stats.c.recent_orders,
orders_stats.c.avg_order_interval,
(
orders_stats.c.recent_orders * 0.4 +
func.least(orders_stats.c.total_spent / 1000, 10) * 0.3 +
(orders_stats.c.order_count * 0.3)
).label('engagement_score'),
func.percent_rank().over(
order_by=orders_stats.c.total_spent
).label('spending_percentile')
)
.join(orders_stats, self.sqla_model.id == orders_stats.c.user_id)
.where(self.sqla_model.is_active == True)
.order_by(text('engagement_score DESC'))
)
result = await session.execute(query)
return list(result.mappings().all())
# Usage example
async def analyze_user_activity():
user_repo_instance = UserRepository()
with async_session_factory() as session:
stats = await user_repo_instance.get_user_activity_stats(
session,
min_orders=5,
days_window=30
)
return stats
Error Handling
from fastapi import HTTPException, APIRouter
from simple_repository.exceptions import NotFoundException
from sqlalchemy.ext.asyncio import AsyncSession
from .db import async_session_factory
from .my_repository import user_crud
from .domains.user import UserDomain
router = APIRouter("/user")
@router.get("/{user_id}")
async def get_user(
session: Annotated[AsyncSession, Depends(async_session_factory)],
user_id: int,
) -> UserDomain:
try:
return await user_crud.get_one(session, user_id)
except NotFoundException:
raise HTTPException(status_code=404, detail="User not found")
Out-of-the-box Supported Operations
- create
- create_many
- get_one
- get_many
- get_all
- patch
- update
- remove
- remove_many
- count
Class Attribute Protection
The library now uses the FrozenClassAttributesMeta metaclass to prevent accidental or unintended reassignment of sqla_model and domain_model attributes at the class level after they have been set by crud_factory. This ensures the stability and predictability of repository behavior.
Attempting to modify these attributes after class creation will result in an AttributeError.
# Example:
MyUserRepoClass = crud_factory(UserSQLA, UserDomain)
try:
MyUserRepoClass.sqla_model = SomeOtherSQLA # This will raise an AttributeError
MyUserRepoClass.domain_model = SomeOtherDomain # This also will raise an AttributeError
except AttributeError as e:
print(f"Error: {e}")
Contributing
Pull requests are welcome! For major changes, please open an issue first to discuss what you would like to change.
License
This project is licensed under the MIT License.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file simple_repo_asyncsqla-2.2.0.tar.gz.
File metadata
- Download URL: simple_repo_asyncsqla-2.2.0.tar.gz
- Upload date:
- Size: 35.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.1 {"installer":{"name":"uv","version":"0.11.1","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4481111fdfc44a417373aaa5ba7b4db57221a14866eec5270849384cc1c724c6
|
|
| MD5 |
f02fcf7af99f7ac37e669ecbe6e500f7
|
|
| BLAKE2b-256 |
7fdc12a2f4c0e1b445e5e1f2717f9b2ec633d2b7023b0147dee9d045a7a17508
|
File details
Details for the file simple_repo_asyncsqla-2.2.0-py3-none-any.whl.
File metadata
- Download URL: simple_repo_asyncsqla-2.2.0-py3-none-any.whl
- Upload date:
- Size: 12.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.1 {"installer":{"name":"uv","version":"0.11.1","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
778fee0dc5668a4009857c3f4515a77276886ef4cb27c34763a7e718f2415e8e
|
|
| MD5 |
cc1344df9075d1abe51083f777162916
|
|
| BLAKE2b-256 |
9f9463380e936b9f2d8e097ff2d7dde17afe4e98bce63041e8841d61d0f44e54
|