Skip to main content

transactional management using sqlalchemy

Project description

Transactional-SQLAlchemy

개요

지원하는 트랜잭션 전파 방식

참조: Transaction Propagation of Spring framework

  • REQUIRED : 이미 트랜잭션이 열린경우 기존의 세션을 사용하거나, 새로운 트랜잭션을 생성
  • REQUIRES_NEW : 기존 트랜잭션을 무시하고 새롭게 생성
  • NESTED : 기존 트랜잭션의 자식 트랜잭션을 생성

기능

트랜잭션 전파 방식 관리

Auto commit or Rollback (트랜잭션 사용 시)

auto session

동기/비동기 함수 모두 지원

사용법

1. transactional + auto session

  1. 패키지 설치
  • ver. sync
pip install transactional-sqlalchemy
  • ver. async
pip install transactional-sqlalchemy[async]
  1. 세션 핸들러 초기화
from transactional_sqlalchemy import init_manager
from sqlalchemy.ext.asyncio import async_scoped_session

async_scoped_session_ = async_scoped_session(
    async_session_factory, scopefunc=asyncio.current_task
)

init_manager(async_scoped_session_)
  1. ITransactionalRepository를 상속하는 클래스 작성
  • repository 레이어의 클래스 작성시, ITransactionalRepository를 상속
  • session이라는 이름의 변수가 있는경우 만들어 두었던 세션을 할당
from transactional_sqlalchemy import ITransactionalRepository, transactional

class PostRepository(ITransactionalRepository):
    @transactional # or @transactional(propagation=Propagation.REQUIRES)
    async def requires(self, post: Post, session: AsyncSession) -> None:
        session.add(post)
        ...

    @transactional(propagation=Propagation.REQUIRES_NEW)
    async def requires_new(self, post: Post, session: AsyncSession) -> None: ...

    @transactional(propagation=Propagation.NESTED)
    async def nested(self, post: Post, session: AsyncSession) -> None: ...

    async def auto_session_allocate(self, session:AsyncSession) -> None: ...

2. auto session without transactional

from transactional_sqlalchemy import ISessionRepository


class PostRepository(ISessionRepository):

    async def create(self, post: Post, *, session: AsyncSession = None) -> None: ...

3. 기본 CRUD Repository 사용

패키지에서 제공하는 기본 CRUD Repository 클래스를 상속하여 빠르게 Repository를 구현할 수 있습니다.

BaseCRUDRepository

기본적인 CRUD 연산을 제공하는 베이스 클래스입니다.

from transactional_sqlalchemy import BaseCRUDRepository
from sqlalchemy.ext.asyncio import AsyncSession
from your_models import User

class UserRepository(BaseCRUDRepository[User]):
    pass  # 기본 CRUD 메서드들이 자동으로 사용 가능

# 사용 예시
user_repo = UserRepository()

# 기본 제공 메서드들
user = await user_repo.find_by_id(1, session=session)
users = await user_repo.find_all(session=session)
saved_user = await user_repo.save(new_user, session=session)
exists = await user_repo.exists_by_id(1, session=session)
count = await user_repo.count(session=session)

제공되는 메서드:

  • find_by_id(id, *, session): ID로 단일 모델 조회
  • find(where=None, *, session): 조건으로 단일 모델 조회
  • find_all(*, pageable=None, where=None, session): 전체 모델 조회 (페이징 지원)
  • find_all_by_id(ids, *, session): 여러 ID로 모델들 조회
  • save(model, *, session): 모델 저장/업데이트 (upsert)
  • exists(where=None, *, session): 모델 존재 여부 확인
  • exists_by_id(id, *, where=None, session): ID로 존재 여부 확인
  • count(*, where=None, session): 모델 개수 조회

BaseCRUDTransactionRepository

BaseCRUDRepository에 자동 트랜잭션 관리 기능이 추가된 클래스입니다.

from transactional_sqlalchemy import BaseCRUDTransactionRepository, Propagation
from your_models import User

class UserTransactionRepository(BaseCRUDTransactionRepository[User]):
    # 모든 메서드에 자동으로 @transactional 데코레이터가 적용됩니다
    pass

# 사용 예시
user_repo = UserTransactionRepository()

# 트랜잭션이 자동으로 관리됩니다
user = await user_repo.find_by_id(1)  # session 매개변수 불필요
saved_user = await user_repo.save(new_user)  # 자동 커밋/롤백

조건부 조회와 페이징

from sqlalchemy import and_
from transactional_sqlalchemy import Pageable

# where 조건 사용
active_users = await user_repo.find_all(
    where=and_(User.is_active == True, User.age >= 18),
    session=session
)

# 페이징 사용
pageable = Pageable(offset=0, limit=10)
users_page = await user_repo.find_all(
    pageable=pageable,
    session=session
)

# 조건부 개수 조회
adult_count = await user_repo.count(
    where=User.age >= 18,
    session=session
)

커스텀 메서드 추가

class UserRepository(BaseCRUDTransactionRepository[User]):

    @transactional(propagation=Propagation.REQUIRES)
    async def find_by_email(self, email: str, *, session: AsyncSession) -> User | None:
        return await self.find(where=User.email == email, session=session)

    @transactional(propagation=Propagation.REQUIRES)
    async def create_user_with_profile(self, user_data: dict, profile_data: dict, *, session: AsyncSession) -> User:
        # 복잡한 비즈니스 로직
        user = User(**user_data)
        saved_user = await self.save(user, session=session)

        profile = UserProfile(user_id=saved_user.id, **profile_data)
        session.add(profile)

        return saved_user

고급 사용 예시

from sqlalchemy import or_, desc
from transactional_sqlalchemy import BaseCRUDTransactionRepository, Propagation

class UserService(BaseCRUDTransactionRepository[User]):

    @transactional(propagation=Propagation.REQUIRES)
    async def search_users(self, keyword: str, *, session: AsyncSession) -> list[User]:
        """이름 또는 이메일로 사용자 검색"""
        return await self.find_all(
            where=or_(
                User.name.ilike(f"%{keyword}%"),
                User.email.ilike(f"%{keyword}%")
            ),
            session=session
        )

    @transactional(propagation=Propagation.REQUIRES)
    async def get_user_stats(self, *, session: AsyncSession) -> dict:
        """사용자 통계 조회"""
        total_users = await self.count(session=session)
        active_users = await self.count(where=User.is_active == True, session=session)

        return {
            "total": total_users,
            "active": active_users,
            "inactive": total_users - active_users
        }

    @transactional(propagation=Propagation.REQUIRES_NEW)
    async def deactivate_user(self, user_id: int, *, session: AsyncSession) -> User:
        """사용자 비활성화 (새로운 트랜잭션)"""
        user = await self.find_by_id(user_id, session=session)
        if not user:
            raise ValueError("User not found")

        user.is_active = False
        return await self.save(user, session=session)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

transactional_sqlalchemy-0.1.8.tar.gz (33.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

transactional_sqlalchemy-0.1.8-py3-none-any.whl (26.8 kB view details)

Uploaded Python 3

File details

Details for the file transactional_sqlalchemy-0.1.8.tar.gz.

File metadata

File hashes

Hashes for transactional_sqlalchemy-0.1.8.tar.gz
Algorithm Hash digest
SHA256 ce92a9e3262b9f4dacba212a9ec34e3733b27573e4803387dbf88e98f7d1138b
MD5 5262daeb1f5344503cb1e7777f27a471
BLAKE2b-256 fb77343cacd72687c48fa8e6807c7fe33716b5ff18d7e3c7ab4a6990c3c2fc3e

See more details on using hashes here.

File details

Details for the file transactional_sqlalchemy-0.1.8-py3-none-any.whl.

File metadata

File hashes

Hashes for transactional_sqlalchemy-0.1.8-py3-none-any.whl
Algorithm Hash digest
SHA256 bfb1a757b7947e90dd2b7094723631ea5db5139787d51fc67bc4dbbc8d90ecbb
MD5 95c36ffda44d700c1449f4797ee6e355
BLAKE2b-256 c8ff1f30daae399027ff6ab9fca1b1312574cbd31679791c8dfa01c5d31ca2a8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page