Skip to main content

A project to have a base repository class to perform select/insert/update/delete with dynamic syntax

Project description

The session-repository library is a Python library that is designed to use the session/repository pattern to interact with databases in Python projects. It provides a more flexible notation for running SQL queries and is built on top of SQLAlchemy, a popular Python SQL toolkit. With session-repository, users can write SQL queries using a new, more intuitive syntax, simplifying the process of working with SQL databases in Python and making it easier to write and maintain complex queries.

Installing Session-Repository

To install session-repository, if you already have Python, you can install with:

pip install session-repository

How to import Session-Repository

To access session-repository and its functions import it in your Python code like this:

from session_repository import SessionRepository, SessionService, with_session, Operators, LoadingTechnique
from session_repository.utils import RelationshipOption

Reading the example code

To create a repository, you just have to inherit your class from SessionRepository.

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class City(Base):
    __tablename__ = "CITY"

    id = Column(
        "ID",
        Integer,
        primary_key=True,
        index=True,
    )
    name = Column(
        "NAME",
        String,
        index=True,
    )
    state = Column(
        "STATE",
        String,
        index=True,
    )

    addresses = relationship(
        "Address",
        back_populates="city",
    )


class Address(Base):
    __tablename__ = "ADDRESS"

    id = Column(
        "ID",
        Integer,
        primary_key=True,
        index=True,
    )
    street = Column(
        "STREET",
        String,
        index=True,
    )
    zip_code = Column(
        "ZIP_CODE",
        Integer,
        index=True,
    )
    user_id = Column(
        "USER_ID",
        Integer,
        ForeignKey("USER.ID"),
    )
    city_id = Column(
        "CITY_ID",
        Integer,
        ForeignKey("CITY.ID"),
    )

    user = relationship(
        "User",
        back_populates="addresses",
    )
    city = relationship(
        "City",
        back_populates="addresses",
    )


class User(Base):
    __tablename__ = "USER"

    id = Column(
        "ID",
        Integer,
        primary_key=True,
        index=True,
    )
    email = Column(
        "EMAIL",
        String,
        unique=True,
        index=True,
    )
    hashed_password = Column(
        "HASHED_PASSWORD",
        String,
    )
    full_name = Column(
        "FULL_NAME",
        String,
        index=True,
    )
    is_active = Column(
        "IS_ACTIVE",
        Boolean,
        default=True,
    )

    addresses = relationship(
        "Address",
        back_populates="user",
    )


class UserRepository(SessionRepository):
    def __init__(
        self,
        session_factory: Callable[..., AbstractContextManager[Session]],
    ) -> None:
        super().__init__(session_factory)

    @classmethod
    def __get_filters(
        cls,
        ids: Optional[List[int]] = None,
    ):
        return {
            User.id: {
                Operators.IN: ids,
            }
        }

    @classmethod
    def __get_relationship_options(cls):
        return {
            User.adresses: RelationshipOption(
                lazy=LoadingTechnique.JOINED,
                children={
                    Adress.city: RelationshipOption(
                        lazy=LoadingTechnique.LAZY
                    )
                },
            ),
        }

    def get_all(
        self,
        ids: Optional[List[int]] = None,
        order_by: Optional[List[str]] = None,
        direction: Optional[List[str]] = None,
        current_session: Optional[Session] = None,
    ) -> List[User]:
        users = self._select_all(
            current_session=current_session,
            model=User,
            optional_filters=self.__get_filters(
                ids=ids;
            ),
            relationship_options=self.__get_relationship_options(),
            order_by=order_by,
            direction=direction,
        )

        return users

    def get_paginate(
        self,
        page: int,
        per_page: int,
        ids: Optional[List[int]] = None,
        order_by: Optional[List[str]] = None,
        direction: Optional[List[str]] = None,
        current_session: Optional[Session] = None,
    ) -> Tuple[List[User], str]:
        users, pagination = self._select_paginate(
            page=page,
            per_page=per_page,
            model=User,
            optional_filters=self.__get_filters(
                ids=ids,
            ),
            relationship_options=self.__get_relationship_options(),
            order_by=order_by,
            direction=direction,
            current_session=current_session,
        )

        return users, pagination

    def get_by_id(
        self,
        id: int,
        current_session: Optional[Session] = None,
    ) -> Optional[User]:
        user = self._select(
            current_session=current_session,
            model=User,
            filters={
                User.id: {
                    Operators.EQUAL: id,
                },
            },
        )

        return user

    def get_by_email(
        self,
        email: str,
        current_session: Optional[Session] = None,
    ) -> Optional[User]:
        user = self._select(
            current_session=current_session,
            model=User,
            filters={
                User.email: {
                    Operators.IEQUAL: email,
                },
            },
        )

        return user

    def create(
        self,
        data: UserCreateSchema,
        flush: bool = False,
        commit: bool = True,
        current_session: Optional[Session] = None,
    ) -> User:
        user = self._add(
            data=User(
                **{
                    User.email.key: data.email,
                    User.hashed_password.key: data.hashed_password,
                    User.full_name.key: data.full_name,
                }
            ),
            flush=flush,
            commit=commit,
            current_session=current_session,
        )

        return user

    def patch_active(
        self,
        id: int,
        is_active: bool,
        flush: bool = False,
        commit: bool = True,
        current_session: Optional[Session] = None,
    ) -> User:
        user = self._update(
            current_session=current_session,
            model=User,
            values={
                User.is_active.key: is_active,
            },
            filters={
                User.id: {
                    Operators.EQUAL: id,
                },
            },
            flush=flush,
            commit=commit,
        )

        return user

    def delete(
        self,
        id: int,
        flush: bool = False,
        commit: bool = True,
        current_session: Optional[Session] = None,
    ) -> bool:
        is_deleted = self._delete(
            model=User,
            filters={
                User.id: {
                    Operators.EQUAL: id,
                },
            },
            flush=flush,
            commit=commit,
            current_session=current_session,
        )

        return is_deleted

To create a service, you just have to inherit your class from SessionService.

T = TypeVar("T", bound=UserReadSchema)

class UserService(SessionService[UserRepository]):

    def __init__(
            self,
            repository: UserRepository,
            logger: Logger,
    ) -> None:
        super().__init__(
            repository=repository,
            logger=logger,
        )


    @with_session()
    def get_users(
        self,
        ids: Optional[List[int]] = None,
        order_by: Optional[List[str]] = None,
        direction: Optional[List[str]] = None,
        schema: Type[T] = UserReadSchema,
        current_session: Optional[Session] = None,
    ) -> List[T]:
        users = self._repository.get_all(
            ids=ids,
            order_by=order_by,
            direction=direction,
            current_session=current_session,
        )

        return [schema.model_validate(user) for user in users]

    @with_session()
    def get_users_paginate(
        self,
        page: int,
        per_page: int,
        user_permissions: List[str],
        ids: Optional[List[int]] = None,
        order_by: Optional[List[str]] = None,
        direction: Optional[List[str]] = None,
        schema: Type[T] = RecipeInspectionFviReadSchema,
        current_session: Optional[Session] = None,
    ) -> Tuple[List[T], str]:
        users, pagination = self._repository.get_paginate(
            page=page,
            per_page=per_page,
            ids=ids,
            order_by=order_by,
            direction=direction,
            current_session=current_session,
        )

        return [schema.model_validate(user) for user in users], pagination


    @with_session()
    def get_user_by_id(
        self,
        id: int,
        schema: Type[T] = UserReadSchema,
        current_session: Optional[Session] = None,
    ) -> T:
        user = self._repository.get_by_id(
            id=id,
            current_session=current_session,
        )

        if user is None:
            raise ValueError("User not found")

        return schema.model_validate(user)

    @with_session()
    async def create_user(
        self,
        data: UserCreateSchema,
        commit: bool = True,
        schema: Type[T] = UserReadSchema,
        current_session: Optional[Session] = None,
    ) -> T:
        user = self._repository.get_by_email(
            email=data.email,
            current_session=current_session,
        )

        if user is not None:
            self._logger.error(
                "Unable to create new user beacuse email alrady used bu another one"
            )

            raise ValueError(
                "User already exists with same email"
            )

        user = self._repository.create(
            data=data,
            flush=True,
            commit=False,
            current_session=current_session,
        )

        return schema.model_validate(user)


    @with_session()
    async def delete_user(
        self,
        id: int,
        commit: bool = True,
        current_session: Optional[Session] = None,
    ) -> bool:
        current_user = self._repository.get_by_id(
            id=id,
            current_session=current_session,
        )

        if current_user is None:
            raise ValueError(f"No user with {id=}")

        is_deleted = self._repository.delete(
            id=id,
            flush=True,
            commit=False,
            current_session=current_session,
        )

        if commit:
            current_session.commit()

        self._logger.info(
            f"User was successfully deleted"
        )

        return is_deleted

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

session-repository-0.4.5.4.tar.gz (9.8 kB view details)

Uploaded Source

Built Distribution

session_repository-0.4.5.4-py3-none-any.whl (9.4 kB view details)

Uploaded Python 3

File details

Details for the file session-repository-0.4.5.4.tar.gz.

File metadata

  • Download URL: session-repository-0.4.5.4.tar.gz
  • Upload date:
  • Size: 9.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.18

File hashes

Hashes for session-repository-0.4.5.4.tar.gz
Algorithm Hash digest
SHA256 8cfed470e3672d6ab426345ee0b6a3b84a9602780b8fe746f137ecfc2b442841
MD5 4780cd296fd6723efe5097678c27b865
BLAKE2b-256 0b421d16b1f40f0c69d19f4cf0b00014f32f5f5d87ee79f11a0fcd3538520068

See more details on using hashes here.

File details

Details for the file session_repository-0.4.5.4-py3-none-any.whl.

File metadata

File hashes

Hashes for session_repository-0.4.5.4-py3-none-any.whl
Algorithm Hash digest
SHA256 b5e3e5f6ad76a555a746022eda4b7acfd1afbf577b927fe5746cdf80112b3459
MD5 04fa0e6bd8b6633c250be3c15dd5f7ff
BLAKE2b-256 f623940ce34f60e08c06dbb631b80dfaf142e96b65f2ee0d1e63f41c75c3aaf9

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page