transactional management using sqlalchemy
Project description
Transactional-SQLAlchemy
개요
지원하는 트랜잭션 전파 방식
참조: Transaction Propagation of Spring framework
REQUIRED: 이미 트랜잭션이 열린경우 기존의 세션을 사용하거나, 새로운 트랜잭션을 생성REQUIRES_NEW: 기존 트랜잭션을 무시하고 새롭게 생성NESTED: 기존 트랜잭션의 자식 트랜잭션을 생성
기능
트랜잭션 전파 방식 관리
Auto commit or Rollback (트랜잭션 사용 시)
auto session
동기/비동기 함수 모두 지원
사용법
1. transactional + auto session
- 패키지 설치
- ver. sync
pip install transactional-sqlalchemy
- ver. async
pip install transactional-sqlalchemy[async]
- 세션 핸들러 초기화
from transactional_sqlalchemy import init_manager
from sqlalchemy.ext.asyncio import async_scoped_session
async_scoped_session_ = async_scoped_session(
async_session_factory, scopefunc=asyncio.current_task
)
init_manager(async_scoped_session_)
- ITransactionalRepository를 상속하는 클래스 작성
- repository 레이어의 클래스 작성시, ITransactionalRepository를 상속
session이라는 이름의 변수가 있는경우 만들어 두었던 세션을 할당
from transactional_sqlalchemy import ITransactionalRepository, transactional
class PostRepository(ITransactionalRepository):
@transactional # or @transactional(propagation=Propagation.REQUIRES)
async def requires(self, post: Post, session: AsyncSession) -> None:
session.add(post)
...
@transactional(propagation=Propagation.REQUIRES_NEW)
async def requires_new(self, post: Post, session: AsyncSession) -> None: ...
@transactional(propagation=Propagation.NESTED)
async def nested(self, post: Post, session: AsyncSession) -> None: ...
async def auto_session_allocate(self, session:AsyncSession) -> None: ...
2. auto session without transactional
from transactional_sqlalchemy import ISessionRepository
class PostRepository(ISessionRepository):
async def create(self, post: Post, *, session: AsyncSession = None) -> None: ...
3. 기본 CRUD Repository 사용
패키지에서 제공하는 기본 CRUD Repository 클래스를 상속하여 빠르게 Repository를 구현할 수 있습니다.
BaseCRUDRepository
기본적인 CRUD 연산을 제공하는 베이스 클래스입니다.
from transactional_sqlalchemy import BaseCRUDRepository
from sqlalchemy.ext.asyncio import AsyncSession
from your_models import User
class UserRepository(BaseCRUDRepository[User]):
pass # 기본 CRUD 메서드들이 자동으로 사용 가능
# 사용 예시
user_repo = UserRepository()
# 기본 제공 메서드들
user = await user_repo.find_by_id(1, session=session)
users = await user_repo.find_all(session=session)
saved_user = await user_repo.save(new_user, session=session)
exists = await user_repo.exists_by_id(1, session=session)
count = await user_repo.count(session=session)
제공되는 메서드:
find_by_id(id, *, session): ID로 단일 모델 조회find(where=None, *, session): 조건으로 단일 모델 조회find_all(*, pageable=None, where=None, session): 전체 모델 조회 (페이징 지원)find_all_by_id(ids, *, session): 여러 ID로 모델들 조회save(model, *, session): 모델 저장/업데이트 (upsert)exists(where=None, *, session): 모델 존재 여부 확인exists_by_id(id, *, where=None, session): ID로 존재 여부 확인count(*, where=None, session): 모델 개수 조회
BaseCRUDTransactionRepository
BaseCRUDRepository에 자동 트랜잭션 관리 기능이 추가된 클래스입니다.
from transactional_sqlalchemy import BaseCRUDTransactionRepository, Propagation
from your_models import User
class UserTransactionRepository(BaseCRUDTransactionRepository[User]):
# 모든 메서드에 자동으로 @transactional 데코레이터가 적용됩니다
pass
# 사용 예시
user_repo = UserTransactionRepository()
# 트랜잭션이 자동으로 관리됩니다
user = await user_repo.find_by_id(1) # session 매개변수 불필요
saved_user = await user_repo.save(new_user) # 자동 커밋/롤백
조건부 조회와 페이징
from sqlalchemy import and_
from transactional_sqlalchemy import Pageable
# where 조건 사용
active_users = await user_repo.find_all(
where=and_(User.is_active == True, User.age >= 18),
session=session
)
# 페이징 사용
pageable = Pageable(offset=0, limit=10)
users_page = await user_repo.find_all(
pageable=pageable,
session=session
)
# 조건부 개수 조회
adult_count = await user_repo.count(
where=User.age >= 18,
session=session
)
커스텀 메서드 추가
class UserRepository(BaseCRUDTransactionRepository[User]):
@transactional(propagation=Propagation.REQUIRES)
async def find_by_email(self, email: str, *, session: AsyncSession) -> User | None:
return await self.find(where=User.email == email, session=session)
@transactional(propagation=Propagation.REQUIRES)
async def create_user_with_profile(self, user_data: dict, profile_data: dict, *, session: AsyncSession) -> User:
# 복잡한 비즈니스 로직
user = User(**user_data)
saved_user = await self.save(user, session=session)
profile = UserProfile(user_id=saved_user.id, **profile_data)
session.add(profile)
return saved_user
고급 사용 예시
from sqlalchemy import or_, desc
from transactional_sqlalchemy import BaseCRUDTransactionRepository, Propagation
class UserService(BaseCRUDTransactionRepository[User]):
@transactional(propagation=Propagation.REQUIRES)
async def search_users(self, keyword: str, *, session: AsyncSession) -> list[User]:
"""이름 또는 이메일로 사용자 검색"""
return await self.find_all(
where=or_(
User.name.ilike(f"%{keyword}%"),
User.email.ilike(f"%{keyword}%")
),
session=session
)
@transactional(propagation=Propagation.REQUIRES)
async def get_user_stats(self, *, session: AsyncSession) -> dict:
"""사용자 통계 조회"""
total_users = await self.count(session=session)
active_users = await self.count(where=User.is_active == True, session=session)
return {
"total": total_users,
"active": active_users,
"inactive": total_users - active_users
}
@transactional(propagation=Propagation.REQUIRES_NEW)
async def deactivate_user(self, user_id: int, *, session: AsyncSession) -> User:
"""사용자 비활성화 (새로운 트랜잭션)"""
user = await self.find_by_id(user_id, session=session)
if not user:
raise ValueError("User not found")
user.is_active = False
return await self.save(user, session=session)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file transactional_sqlalchemy-0.1.8.tar.gz.
File metadata
- Download URL: transactional_sqlalchemy-0.1.8.tar.gz
- Upload date:
- Size: 33.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ce92a9e3262b9f4dacba212a9ec34e3733b27573e4803387dbf88e98f7d1138b
|
|
| MD5 |
5262daeb1f5344503cb1e7777f27a471
|
|
| BLAKE2b-256 |
fb77343cacd72687c48fa8e6807c7fe33716b5ff18d7e3c7ab4a6990c3c2fc3e
|
File details
Details for the file transactional_sqlalchemy-0.1.8-py3-none-any.whl.
File metadata
- Download URL: transactional_sqlalchemy-0.1.8-py3-none-any.whl
- Upload date:
- Size: 26.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bfb1a757b7947e90dd2b7094723631ea5db5139787d51fc67bc4dbbc8d90ecbb
|
|
| MD5 |
95c36ffda44d700c1449f4797ee6e355
|
|
| BLAKE2b-256 |
c8ff1f30daae399027ff6ab9fca1b1312574cbd31679791c8dfa01c5d31ca2a8
|