Skip to main content

A project to have a base repository class to perform select/insert/update/delete with dynamic syntax

Project description

The pysql-repo library is a Python library that is designed to use the session/repository pattern to interact with databases in Python projects. It provides a more flexible notation for running SQL queries and is built on top of SQLAlchemy, a popular Python SQL toolkit. With pysql_repo, users can write SQL queries using a new, more intuitive syntax, simplifying the process of working with SQL databases in Python and making it easier to write and maintain complex queries.

Installing pysql-repo

To install pysql-repo, if you already have Python, you can install with:

pip install pysql_repo

How to import pysql-repo

To access pysql-repo and its functions import it in your Python code like this:

from pysql_repo import Database, Repository, Service, with_session, Operators, LoadingTechnique, RelationshipOption

To access pysql-repo and its asyncio functions import it in your Python code like this:

from pysql_repo.asyncio import AsyncDatabase, AsyncRepository, AsyncService, with_async_session

Reading the example code

# MODULES
from typing import List

# SQLALCHEMY
from sqlalchemy import Boolean, ForeignKey, Integer, String
from sqlalchemy.orm import relationship, Mapped, mapped_column
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class City(Base):
    __tablename__ = "CITY"

    id: Mapped[int] = mapped_column(
        "ID",
        Integer,
        primary_key=True,
        index=True,
    )
    name: Mapped[str] = mapped_column(
        "NAME",
        String,
        index=True,
    )
    state: Mapped[str] = mapped_column(
        "STATE",
        String,
        index=True,
    )

    addresses: Mapped[List["Address"]] = relationship(
        "Address",
        back_populates="city",
        lazy="joined",
        cascade="all, delete-orphan",
    )


class Address(Base):
    __tablename__ = "ADDRESS"

    id: Mapped[int] = mapped_column(
        "ID",
        Integer,
        primary_key=True,
        index=True,
    )
    street: Mapped[str] = mapped_column(
        "STREET",
        String,
        index=True,
    )
    zip_code: Mapped[str] = mapped_column(
        "ZIP_CODE",
        Integer,
        index=True,
    )
    user_id: Mapped[int] = mapped_column(
        "USER_ID",
        Integer,
        ForeignKey("USER.ID"),
    )
    city_id: Mapped[int] = mapped_column(
        "CITY_ID",
        Integer,
        ForeignKey("CITY.ID"),
    )

    user: Mapped["User"] = relationship(
        "User",
        back_populates="addresses",
        lazy="joined",
    )
    city: Mapped["City"] = relationship(
        "City",
        back_populates="addresses",
        lazy="joined",
    )


class User(Base):
    __tablename__ = "USER"

    id: Mapped[int] = mapped_column(
        "ID",
        Integer,
        primary_key=True,
        index=True,
    )
    email: Mapped[str] = mapped_column(
        "EMAIL",
        String,
        unique=True,
        index=True,
    )
    hashed_password: Mapped[str] = mapped_column(
        "HASHED_PASSWORD",
        String,
    )
    full_name: Mapped[str] = mapped_column(
        "FULL_NAME",
        String,
        index=True,
    )
    is_active: Mapped[bool] = mapped_column(
        "IS_ACTIVE",
        Boolean,
        default=True,
    )

    addresses: Mapped[List["Address"]] = relationship(
        "Address",
        back_populates="user",
        lazy="joined",
        cascade="all, delete-orphan",
    )

To create an instance of Database and generate an instance of Session from the factory:

import logging

logging.basicConfig()
logger_db = logging.get_logger('demo')

database = DataBase(
    databases_config={
        "connection_string": "foo",
    },
    base=Base,
    logger=logger_db,
)

database.create_database()

with database.session_factory() as session:
    session.execute(...)

To create a repository, you just have to inherit your class from Repository.

# MODULES
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple

# SQLALCHEMY
from sqlalchemy.orm import Column, Session

# PYSQL_REPO
from pysql_repo import Operators, Repository, LoadingTechnique, RelationshipOption

# CONTEXTLIB
from contextlib import AbstractContextManager

# MODEL
from tests.repositories.user._base import UserRepositoryBase as _UserRepositoryBase
from tests.models.database.database import Address, User
from tests.models.schemas.user import UserCreate


class UserRepositoryBase:
    @classmethod
    def get_filters(
        cls,
        ids_in: Optional[List[int]] = None,
        ids_not_in: Optional[List[int]] = None,
        emails_iin: Optional[List[str]] = None,
        emails_in: Optional[List[str]] = None,
        emails_not_iin: Optional[List[str]] = None,
        emails_not_in: Optional[List[str]] = None,
        email_ilike: Optional[List[str]] = None,
        email_like: Optional[List[str]] = None,
        email_not_ilike: Optional[List[str]] = None,
        email_not_like: Optional[List[str]] = None,
        email_equal: Optional[str] = None,
        email_iequal: Optional[str] = None,
        email_different: Optional[str] = None,
        email_idifferent: Optional[str] = None,
        zip_codes_in: Optional[List[str]] = None,
        zip_codes_not_in: Optional[List[str]] = None,
        is_active_equal: Optional[bool] = None,
    ) -> Dict[Column, Any]:
        return {
            User.id: {
                Operators.IN: ids_in,
                Operators.NOT_IN: ids_not_in,
            },
            User.email: {
                Operators.IIN: emails_iin,
                Operators.IN: emails_in,
                Operators.NOT_IIN: emails_not_iin,
                Operators.NOT_IN: emails_not_in,
                Operators.ILIKE: email_ilike,
                Operators.LIKE: email_like,
                Operators.NOT_ILIKE: email_not_ilike,
                Operators.NOT_LIKE: email_not_like,
                Operators.EQUAL: email_equal,
                Operators.IEQUAL: email_iequal,
                Operators.DIFFERENT: email_different,
                Operators.IDIFFERENT: email_idifferent,
            },
            User.is_active: {
                Operators.EQUAL: is_active_equal,
            },
            User.addresses: {
                Operators.ANY: {
                    Address.zip_code: {
                        Operators.IN: zip_codes_in,
                        Operators.NOT_IN: zip_codes_not_in,
                    },
                }
            },
        }

    @classmethod
    def get_relationship_options(
        cls,
        load_addresses: bool = False,
        load_city: bool = False,
        zip_codes_not_in: Optional[List[int]] = None,
        zip_codes_in: Optional[List[int]] = None,
    ):
        extra_join_addresses = []
        if zip_codes_not_in:
            extra_join_addresses.append(Address.zip_code.not_in(zip_codes_not_in))
        if zip_codes_in:
            extra_join_addresses.append(Address.zip_code.in_(zip_codes_in))

        return {
            User.addresses: RelationshipOption(
                lazy=LoadingTechnique.JOINED
                if load_addresses
                else LoadingTechnique.NOLOAD,
                added_criteria=extra_join_addresses
                if len(extra_join_addresses) > 0
                else None,
                children={
                    Address.city: RelationshipOption(
                        lazy=LoadingTechnique.JOINED
                        if load_city
                        else LoadingTechnique.NOLOAD,
                    )
                },
            ),
        }


class UserRepository(Repository, _UserRepositoryBase):
    def __init__(
        self,
        session_factory: Callable[..., AbstractContextManager[Session]],
    ) -> None:
        super().__init__(session_factory)

    def get_all(
        self,
        __session__: Session,
        /,
        ids_in: Optional[List[int]] = None,
        ids_not_in: Optional[List[int]] = None,
        emails_iin: Optional[List[str]] = None,
        emails_in: Optional[List[str]] = None,
        emails_not_iin: Optional[List[str]] = None,
        emails_not_in: Optional[List[str]] = None,
        email_ilike: Optional[str] = None,
        email_like: Optional[str] = None,
        email_not_ilike: Optional[str] = None,
        email_not_like: Optional[str] = None,
        email_equal: Optional[str] = None,
        email_iequal: Optional[str] = None,
        email_different: Optional[str] = None,
        email_idifferent: Optional[str] = None,
        zip_codes_in: Optional[List[int]] = None,
        zip_codes_not_in: Optional[List[int]] = None,
        is_active_equal: Optional[bool] = None,
        load_addresses: bool = True,
        load_city: bool = True,
        order_by: Optional[List[str]] = None,
        direction: Optional[List[str]] = None,
    ) -> Sequence[User]:
        users = self._select_all(
            __session__,
            model=User,
            optional_filters=self.get_filters(
                ids_in=ids_in,
                ids_not_in=ids_not_in,
                emails_iin=emails_iin,
                emails_in=emails_in,
                emails_not_iin=emails_not_iin,
                emails_not_in=emails_not_in,
                email_ilike=email_ilike,
                email_like=email_like,
                email_not_ilike=email_not_ilike,
                email_not_like=email_not_like,
                email_equal=email_equal,
                email_iequal=email_iequal,
                email_different=email_different,
                email_idifferent=email_idifferent,
                zip_codes_in=zip_codes_in,
                zip_codes_not_in=zip_codes_not_in,
                is_active_equal=is_active_equal,
            ),
            relationship_options=self.get_relationship_options(
                load_addresses=load_addresses,
                load_city=load_city,
                zip_codes_in=zip_codes_in,
                zip_codes_not_in=zip_codes_not_in,
            ),
            order_by=order_by,
            direction=direction,
        )

        return users

    def get_paginate(
        self,
        __session__: Session,
        /,
        page: int,
        per_page: int,
        ids_in: Optional[List[int]] = None,
        ids_not_in: Optional[List[int]] = None,
        emails_iin: Optional[List[str]] = None,
        emails_in: Optional[List[str]] = None,
        emails_not_iin: Optional[List[str]] = None,
        emails_not_in: Optional[List[str]] = None,
        email_ilike: Optional[str] = None,
        email_like: Optional[str] = None,
        email_not_ilike: Optional[str] = None,
        email_not_like: Optional[str] = None,
        email_equal: Optional[str] = None,
        email_iequal: Optional[str] = None,
        email_different: Optional[str] = None,
        email_idifferent: Optional[str] = None,
        zip_codes_in: Optional[List[int]] = None,
        zip_codes_not_in: Optional[List[int]] = None,
        is_active_equal: Optional[bool] = None,
        load_addresses: bool = True,
        load_city: bool = True,
        order_by: Optional[List[str]] = None,
        direction: Optional[List[str]] = None,
    ) -> Tuple[Sequence[User], str]:
        users, pagination = self._select_paginate(
            __session__,
            model=User,
            optional_filters=self.get_filters(
                ids_in=ids_in,
                ids_not_in=ids_not_in,
                emails_iin=emails_iin,
                emails_in=emails_in,
                emails_not_iin=emails_not_iin,
                emails_not_in=emails_not_in,
                email_ilike=email_ilike,
                email_like=email_like,
                email_not_ilike=email_not_ilike,
                email_not_like=email_not_like,
                email_equal=email_equal,
                email_iequal=email_iequal,
                email_different=email_different,
                email_idifferent=email_idifferent,
                zip_codes_in=zip_codes_in,
                zip_codes_not_in=zip_codes_not_in,
                is_active_equal=is_active_equal,
            ),
            relationship_options=self.get_relationship_options(
                load_addresses=load_addresses,
                load_city=load_city,
                zip_codes_in=zip_codes_in,
                zip_codes_not_in=zip_codes_not_in,
            ),
            order_by=order_by,
            direction=direction,
            page=page,
            per_page=per_page,
        )

        return users, pagination

    def get_by_id(
        self,
        __session__: Session,
        id: int,
    ) -> Optional[User]:
        user = self._select(
            __session__,
            model=User,
            filters={
                User.id: {
                    Operators.EQUAL: id,
                },
            },
        )

        return user

    def create(
        self,
        __session__: Session,
        /,
        data: UserCreate,
        flush: bool = False,
        commit: bool = True,
    ) -> User:
        user = self._add(
            __session__,
            model=User,
            values={
                User.email.key: data.email,
                User.hashed_password.key: data.hashed_password,
                User.full_name.key: data.full_name,
            },
            flush=flush,
            commit=commit,
        )

        return user

    def create_all(
        self,
        __session__: Session,
        /,
        data: List[UserCreate],
        flush: bool = False,
        commit: bool = True,
    ) -> Sequence[User]:
        users = self._add_all(
            __session__,
            model=User,
            values=[
                {
                    User.email.key: item.email,
                    User.hashed_password.key: item.hashed_password,
                    User.full_name.key: item.full_name,
                }
                for item in data
            ],
            flush=flush,
            commit=commit,
        )

        return users

    def patch_email(
        self,
        __session__: Session,
        /,
        id: int,
        email: str,
        flush: bool = False,
        commit: bool = True,
    ) -> Optional[User]:
        user = self._update(
            __session__,
            model=User,
            values={
                User.email.key: email,
            },
            filters={
                User.id: {
                    Operators.EQUAL: id,
                },
            },
            flush=flush,
            commit=commit,
        )

        return user

    def bulk_patch_email(
        self,
        __session__: Session,
        /,
        data: List[Tuple[int, str]],
        flush: bool = False,
        commit: bool = True,
    ) -> Sequence[User]:
        self._bulk_update(
            __session__,
            model=User,
            values=[
                {
                    User.id.key: id,
                    User.email.key: email,
                }
                for id, email in data
            ],
            flush=flush,
            commit=commit,
        )

        return self._select_all(
            __session__,
            model=User,
            filters={
                User.id: {
                    Operators.IN: [id for id, _ in data],
                },
            },
        )

    def patch_disable(
        self,
        __session__: Session,
        /,
        ids: List[int],
        flush: bool = False,
        commit: bool = True,
    ) -> Sequence[User]:
        users = self._update_all(
            __session__,
            model=User,
            values={
                User.is_active.key: False,
            },
            filters={
                User.id: {
                    Operators.IN: ids,
                },
            },
            flush=flush,
            commit=commit,
        )

        return users

    def delete(
        self,
        __session__: Session,
        /,
        id: int,
        flush: bool = False,
        commit: bool = True,
    ) -> bool:
        is_deleted = self._delete(
            __session__,
            model=User,
            filters={
                User.id: {
                    Operators.EQUAL: id,
                },
            },
            flush=flush,
            commit=commit,
        )

        return is_deleted

    def delete_all(
        self,
        __session__: Session,
        /,
        ids: List[int],
        flush: bool = False,
        commit: bool = True,
    ) -> bool:
        is_deleted = self._delete_all(
            __session__,
            model=User,
            filters={
                User.id: {
                    Operators.IN: ids,
                },
            },
            flush=flush,
            commit=commit,
        )

        return is_deleted

To create a service, you just have to inherit your class from Service.

# MODULES
from logging import Logger
from typing import List, Optional, Tuple

# SQLALCHEMY
from sqlalchemy.orm import Session

# PYSQL_REPO
from pysql_repo import Service, with_session

# REPOSITORIES
from tests.repositories.user.user_repository import UserRepository

# MODELS
from tests.models.schemas.user import UserCreate, UserRead


class UserService(Service[UserRepository]):
    def __init__(
        self,
        user_repository: UserRepository,
        logger: Logger,
    ) -> None:
        super().__init__(
            repository=user_repository,
        )
        self._logger = logger

    def get_users(
        self,
        __session__: Session,
        /,
        ids_in: Optional[List[int]] = None,
        ids_not_in: Optional[List[int]] = None,
        emails_iin: Optional[List[str]] = None,
        emails_in: Optional[List[str]] = None,
        emails_not_iin: Optional[List[str]] = None,
        emails_not_in: Optional[List[str]] = None,
        email_ilike: Optional[str] = None,
        email_like: Optional[str] = None,
        email_not_ilike: Optional[str] = None,
        email_not_like: Optional[str] = None,
        email_equal: Optional[str] = None,
        email_iequal: Optional[str] = None,
        email_different: Optional[str] = None,
        email_idifferent: Optional[str] = None,
        zip_codes_in: Optional[List[int]] = None,
        zip_codes_not_in: Optional[List[int]] = None,
        is_active_equal: Optional[bool] = None,
        load_addresses: bool = True,
        load_city: bool = True,
        order_by: Optional[List[str]] = None,
        direction: Optional[List[str]] = None,
    ) -> List[UserRead]:
        users = self._repository.get_all(
            __session__,
            ids_in=ids_in,
            ids_not_in=ids_not_in,
            emails_iin=emails_iin,
            emails_in=emails_in,
            emails_not_iin=emails_not_iin,
            emails_not_in=emails_not_in,
            email_ilike=email_ilike,
            email_like=email_like,
            email_not_ilike=email_not_ilike,
            email_not_like=email_not_like,
            email_equal=email_equal,
            email_iequal=email_iequal,
            email_different=email_different,
            email_idifferent=email_idifferent,
            zip_codes_in=zip_codes_in,
            zip_codes_not_in=zip_codes_not_in,
            is_active_equal=is_active_equal,
            load_addresses=load_addresses,
            load_city=load_city,
            order_by=order_by,
            direction=direction,
        )

        return [UserRead.model_validate(item) for item in users]

    def get_users_paginate(
        self,
        __session__: Session,
        /,
        page: int,
        per_page: int,
        ids_in: Optional[List[int]] = None,
        ids_not_in: Optional[List[int]] = None,
        emails_iin: Optional[List[str]] = None,
        emails_in: Optional[List[str]] = None,
        emails_not_iin: Optional[List[str]] = None,
        emails_not_in: Optional[List[str]] = None,
        email_ilike: Optional[str] = None,
        email_like: Optional[str] = None,
        email_not_ilike: Optional[str] = None,
        email_not_like: Optional[str] = None,
        email_equal: Optional[str] = None,
        email_iequal: Optional[str] = None,
        email_different: Optional[str] = None,
        email_idifferent: Optional[str] = None,
        zip_codes_in: Optional[List[int]] = None,
        zip_codes_not_in: Optional[List[int]] = None,
        is_active_equal: Optional[bool] = None,
        load_addresses: bool = True,
        load_city: bool = True,
        order_by: Optional[List[str]] = None,
        direction: Optional[List[str]] = None,
    ) -> Tuple[List[UserRead], str]:
        users, pagination = self._repository.get_paginate(
            __session__,
            page=page,
            per_page=per_page,
            ids_in=ids_in,
            ids_not_in=ids_not_in,
            emails_iin=emails_iin,
            emails_in=emails_in,
            emails_not_iin=emails_not_iin,
            emails_not_in=emails_not_in,
            email_ilike=email_ilike,
            email_like=email_like,
            email_not_ilike=email_not_ilike,
            email_not_like=email_not_like,
            email_equal=email_equal,
            email_iequal=email_iequal,
            email_different=email_different,
            email_idifferent=email_idifferent,
            zip_codes_in=zip_codes_in,
            zip_codes_not_in=zip_codes_not_in,
            is_active_equal=is_active_equal,
            load_addresses=load_addresses,
            load_city=load_city,
            order_by=order_by,
            direction=direction,
        )

        return [UserRead.model_validate(item) for item in users], pagination

    def get_user_by_id(
        self,
        __session__: Session,
        /,
        id: int,
    ) -> Optional[UserRead]:
        user = self._repository.get_by_id(
            __session__,
            id=id,
        )

        if user is None:
            return None

        return UserRead.model_validate(user)

    def create_user(
        self,
        __session__: Session,
        /,
        data: UserCreate,
    ) -> UserRead:
        user = self._repository.create(
            __session__,
            data=data,
            flush=True,
        )

        return UserRead.model_validate(user)

    def create_users(
        self,
        __session__: Session,
        /,
        data: List[UserCreate],
    ) -> List[UserRead]:
        users = self._repository.create_all(
            __session__,
            data=data,
            flush=True,
        )

        return [UserRead.model_validate(user) for user in users]

    def patch_email(
        self,
        __session__: Session,
        /,
        id: int,
        email: str,
    ) -> UserRead:
        user = self._repository.patch_email(
            __session__,
            id=id,
            email=email,
            flush=True,
        )

        return UserRead.model_validate(user)

    def bulk_patch_email(
        self,
        __session__: Session,
        /,
        data: List[Tuple[int, str]],
    ) -> List[UserRead]:
        users = self._repository.bulk_patch_email(
            __session__,
            data=data,
            flush=True,
        )

        return [UserRead.model_validate(user) for user in users]

    def patch_disable(
        self,
        __session__: Session,
        /,
        ids: List[int],
    ) -> List[UserRead]:
        users = self._repository.patch_disable(
            __session__,
            ids=ids,
            flush=True,
        )

        return [UserRead.model_validate(user) for user in users]

    def delete_by_id(
        self,
        __session__: Session,
        /,
        id: int,
    ) -> bool:
        return self._repository.delete(
            __session__,
            id=id,
            flush=True,
        )

    def delete_by_ids(
        self,
        __session__: Session,
        /,
        ids: List[int],
    ) -> bool:
        return self._repository.delete_all(
            __session__,
            ids=ids,
            flush=True,
        )

To create an instance of AsyncDatabase and generate an instance of AsyncSession from the factory:

import logging

logging.basicConfig()
logger_db = logging.get_logger('demo')

database = AsyncDataBase(
    databases_config={
        "connection_string": "foo",
    },
    base=Base,
    logger=logger_db,
)

await database.create_database()

async with database.session_factory() as session:
    await session.execute(...)

To create an async repository, you just have to inherit your class from AsyncRepository.

# MODULES
from typing import Callable, List, Optional, Tuple

# SQLALCHEMY
from sqlalchemy import Sequence
from sqlalchemy.ext.asyncio import AsyncSession

# PYSQL_REPO
from pysql_repo import Operators
from pysql_repo.asyncio import AsyncRepository

# CONTEXTLIB
from contextlib import AbstractAsyncContextManager

# MODEL
from tests.repositories.user._base import UserRepositoryBase as _UserRepositoryBase
from tests.models.database.database import User
from tests.models.schemas.user import UserCreate


class AsyncUserRepository(AsyncRepository, _UserRepositoryBase):
    def __init__(
        self,
        session_factory: Callable[..., AbstractAsyncContextManager[AsyncSession]],
    ) -> None:
        super().__init__(session_factory)

    async def get_all(
        self,
        __session__: AsyncSession,
        /,
        ids_in: Optional[List[int]] = None,
        ids_not_in: Optional[List[int]] = None,
        emails_iin: Optional[List[str]] = None,
        emails_in: Optional[List[str]] = None,
        emails_not_iin: Optional[List[str]] = None,
        emails_not_in: Optional[List[str]] = None,
        email_ilike: Optional[str] = None,
        email_like: Optional[str] = None,
        email_not_ilike: Optional[str] = None,
        email_not_like: Optional[str] = None,
        email_equal: Optional[str] = None,
        email_iequal: Optional[str] = None,
        email_different: Optional[str] = None,
        email_idifferent: Optional[str] = None,
        zip_codes_in: Optional[List[int]] = None,
        zip_codes_not_in: Optional[List[int]] = None,
        is_active_equal: Optional[bool] = None,
        load_addresses: bool = True,
        load_city: bool = True,
        order_by: Optional[List[str]] = None,
        direction: Optional[List[str]] = None,
    ) -> Sequence[User]:
        users = await self._select_all(
            __session__,
            model=User,
            optional_filters=self.get_filters(
                ids_in=ids_in,
                ids_not_in=ids_not_in,
                emails_iin=emails_iin,
                emails_in=emails_in,
                emails_not_iin=emails_not_iin,
                emails_not_in=emails_not_in,
                email_ilike=email_ilike,
                email_like=email_like,
                email_not_ilike=email_not_ilike,
                email_not_like=email_not_like,
                email_equal=email_equal,
                email_iequal=email_iequal,
                email_different=email_different,
                email_idifferent=email_idifferent,
                zip_codes_in=zip_codes_in,
                zip_codes_not_in=zip_codes_not_in,
                is_active_equal=is_active_equal,
            ),
            relationship_options=self.get_relationship_options(
                load_addresses=load_addresses,
                load_city=load_city,
                zip_codes_in=zip_codes_in,
                zip_codes_not_in=zip_codes_not_in,
            ),
            order_by=order_by,
            direction=direction,
        )

        return users

    async def get_paginate(
        self,
        __session__: AsyncSession,
        /,
        page: int,
        per_page: int,
        ids_in: Optional[List[int]] = None,
        ids_not_in: Optional[List[int]] = None,
        emails_iin: Optional[List[str]] = None,
        emails_in: Optional[List[str]] = None,
        emails_not_iin: Optional[List[str]] = None,
        emails_not_in: Optional[List[str]] = None,
        email_ilike: Optional[str] = None,
        email_like: Optional[str] = None,
        email_not_ilike: Optional[str] = None,
        email_not_like: Optional[str] = None,
        email_equal: Optional[str] = None,
        email_iequal: Optional[str] = None,
        email_different: Optional[str] = None,
        email_idifferent: Optional[str] = None,
        zip_codes_in: Optional[List[int]] = None,
        zip_codes_not_in: Optional[List[int]] = None,
        is_active_equal: Optional[bool] = None,
        load_addresses: bool = True,
        load_city: bool = True,
        order_by: Optional[List[str]] = None,
        direction: Optional[List[str]] = None,
    ) -> Tuple[Sequence[User], str]:
        users, pagination = await self._select_paginate(
            __session__,
            model=User,
            optional_filters=self.get_filters(
                ids_in=ids_in,
                ids_not_in=ids_not_in,
                emails_iin=emails_iin,
                emails_in=emails_in,
                emails_not_iin=emails_not_iin,
                emails_not_in=emails_not_in,
                email_ilike=email_ilike,
                email_like=email_like,
                email_not_ilike=email_not_ilike,
                email_not_like=email_not_like,
                email_equal=email_equal,
                email_iequal=email_iequal,
                email_different=email_different,
                email_idifferent=email_idifferent,
                zip_codes_in=zip_codes_in,
                zip_codes_not_in=zip_codes_not_in,
                is_active_equal=is_active_equal,
            ),
            relationship_options=self.get_relationship_options(
                load_addresses=load_addresses,
                load_city=load_city,
                zip_codes_in=zip_codes_in,
                zip_codes_not_in=zip_codes_not_in,
            ),
            order_by=order_by,
            direction=direction,
            page=page,
            per_page=per_page,
        )

        return users, pagination

    async def get_by_id(
        self,
        __session__: AsyncSession,
        /,
        id: int,
    ) -> Optional[User]:
        user = await self._select(
            __session__,
            model=User,
            filters={
                User.id: {
                    Operators.EQUAL: id,
                },
            },
        )

        return user

    async def create(
        self,
        __session__: AsyncSession,
        /,
        data: UserCreate,
        flush: bool = False,
        commit: bool = True,
    ) -> User:
        user = await self._add(
            __session__,
            model=User,
            values={
                User.email.key: data.email,
                User.hashed_password.key: data.hashed_password,
                User.full_name.key: data.full_name,
            },
            flush=flush,
            commit=commit,
        )

        return user

    async def create_all(
        self,
        __session__: AsyncSession,
        /,
        data: List[UserCreate],
        flush: bool = False,
        commit: bool = True,
    ) -> Sequence[User]:
        users = await self._add_all(
            __session__,
            model=User,
            values=[
                {
                    User.email.key: item.email,
                    User.hashed_password.key: item.hashed_password,
                    User.full_name.key: item.full_name,
                }
                for item in data
            ],
            flush=flush,
            commit=commit,
        )

        return users

    async def patch_email(
        self,
        __session__: AsyncSession,
        /,
        id: int,
        email: str,
        flush: bool = False,
        commit: bool = True,
    ) -> Optional[User]:
        user = await self._update(
            __session__,
            model=User,
            values={
                User.email.key: email,
            },
            filters={
                User.id: {
                    Operators.EQUAL: id,
                },
            },
            flush=flush,
            commit=commit,
        )

        return user

    async def bulk_patch_email(
        self,
        __session__: AsyncSession,
        /,
        data: List[Tuple[int, str]],
        flush: bool = False,
        commit: bool = True,
    ) -> Sequence[User]:
        await self._bulk_update(
            __session__,
            model=User,
            values=[
                {
                    User.id.key: id,
                    User.email.key: email,
                }
                for id, email in data
            ],
            flush=flush,
            commit=commit,
        )

        return await self._select_all(
            __session__,
            model=User,
            filters={
                User.id: {
                    Operators.IN: [id for id, _ in data],
                },
            },
        )

    async def patch_disable(
        self,
        __session__: AsyncSession,
        /,
        ids: List[int],
        flush: bool = False,
        commit: bool = True,
    ) -> Sequence[User]:
        users = await self._update_all(
            __session__,
            model=User,
            values={
                User.is_active.key: False,
            },
            filters={
                User.id: {
                    Operators.IN: ids,
                },
            },
            flush=flush,
            commit=commit,
        )

        return users

    async def delete(
        self,
        __session__: AsyncSession,
        /,
        id: int,
        flush: bool = False,
        commit: bool = True,
    ) -> bool:
        is_deleted = await self._delete(
            __session__,
            model=User,
            filters={
                User.id: {
                    Operators.EQUAL: id,
                },
            },
            flush=flush,
            commit=commit,
        )

        return is_deleted

    async def delete_all(
        self,
        __session__: AsyncSession,
        /,
        ids: List[int],
        flush: bool = False,
        commit: bool = True,
    ) -> bool:
        is_deleted = await self._delete_all(
            __session__,
            model=User,
            filters={
                User.id: {
                    Operators.IN: ids,
                },
            },
            flush=flush,
            commit=commit,
        )

        return is_deleted

To create an async service, you just have to inherit your class from AsyncService.

# MODULES
from logging import Logger
from typing import List, Optional, Tuple

# SQLALCHEMY
from sqlalchemy.ext.asyncio import AsyncSession

# PYSQL_REPO
from pysql_repo.asyncio import AsyncService, with_async_session

# REPOSITORIES
from tests.repositories.user.async_user_repository import AsyncUserRepository

# MODELS
from tests.models.schemas.user import UserCreate, UserRead


class AsyncUserService(AsyncService[AsyncUserRepository]):
    def __init__(
        self,
        user_repository: AsyncUserRepository,
        logger: Logger,
    ) -> None:
        super().__init__(
            repository=user_repository,
        )
        self._logger = logger

    async def get_users(
        self,
        __session__: AsyncSession,
        /,
        ids_in: Optional[List[int]] = None,
        ids_not_in: Optional[List[int]] = None,
        emails_iin: Optional[List[str]] = None,
        emails_in: Optional[List[str]] = None,
        emails_not_iin: Optional[List[str]] = None,
        emails_not_in: Optional[List[str]] = None,
        email_ilike: Optional[str] = None,
        email_like: Optional[str] = None,
        email_not_ilike: Optional[str] = None,
        email_not_like: Optional[str] = None,
        email_equal: Optional[str] = None,
        email_iequal: Optional[str] = None,
        email_different: Optional[str] = None,
        email_idifferent: Optional[str] = None,
        zip_codes_in: Optional[List[int]] = None,
        zip_codes_not_in: Optional[List[int]] = None,
        is_active_equal: Optional[bool] = None,
        load_addresses: bool = True,
        load_city: bool = True,
        order_by: Optional[List[str]] = None,
        direction: Optional[List[str]] = None,
    ) -> List[UserRead]:
        users = await self._repository.get_all(
            __session__,
            ids_in=ids_in,
            ids_not_in=ids_not_in,
            emails_iin=emails_iin,
            emails_in=emails_in,
            emails_not_iin=emails_not_iin,
            emails_not_in=emails_not_in,
            email_ilike=email_ilike,
            email_like=email_like,
            email_not_ilike=email_not_ilike,
            email_not_like=email_not_like,
            email_equal=email_equal,
            email_iequal=email_iequal,
            email_different=email_different,
            email_idifferent=email_idifferent,
            zip_codes_in=zip_codes_in,
            zip_codes_not_in=zip_codes_not_in,
            is_active_equal=is_active_equal,
            load_addresses=load_addresses,
            load_city=load_city,
            order_by=order_by,
            direction=direction,
        )

        return [UserRead.model_validate(item) for item in users]

    async def get_users_paginate(
        self,
        __session__: AsyncSession,
        /,
        page: int,
        per_page: int,
        ids_in: Optional[List[int]] = None,
        ids_not_in: Optional[List[int]] = None,
        emails_iin: Optional[List[str]] = None,
        emails_in: Optional[List[str]] = None,
        emails_not_iin: Optional[List[str]] = None,
        emails_not_in: Optional[List[str]] = None,
        email_ilike: Optional[str] = None,
        email_like: Optional[str] = None,
        email_not_ilike: Optional[str] = None,
        email_not_like: Optional[str] = None,
        email_equal: Optional[str] = None,
        email_iequal: Optional[str] = None,
        email_different: Optional[str] = None,
        email_idifferent: Optional[str] = None,
        zip_codes_in: Optional[List[int]] = None,
        zip_codes_not_in: Optional[List[int]] = None,
        is_active_equal: Optional[bool] = None,
        load_addresses: bool = True,
        load_city: bool = True,
        order_by: Optional[List[str]] = None,
        direction: Optional[List[str]] = None,
    ) -> Tuple[List[UserRead], str]:
        users, pagination = await self._repository.get_paginate(
            __session__,
            page=page,
            per_page=per_page,
            ids_in=ids_in,
            ids_not_in=ids_not_in,
            emails_iin=emails_iin,
            emails_in=emails_in,
            emails_not_iin=emails_not_iin,
            emails_not_in=emails_not_in,
            email_ilike=email_ilike,
            email_like=email_like,
            email_not_ilike=email_not_ilike,
            email_not_like=email_not_like,
            email_equal=email_equal,
            email_iequal=email_iequal,
            email_different=email_different,
            email_idifferent=email_idifferent,
            zip_codes_in=zip_codes_in,
            zip_codes_not_in=zip_codes_not_in,
            is_active_equal=is_active_equal,
            load_addresses=load_addresses,
            load_city=load_city,
            order_by=order_by,
            direction=direction,
        )

        return [UserRead.model_validate(item) for item in users], pagination

    async def get_user_by_id(
        self,
        __session__: AsyncSession,
        /,
        id: int,
    ) -> Optional[UserRead]:
        user = await self._repository.get_by_id(
            __session__,
            id=id,
        )

        if user is None:
            return None

        return UserRead.model_validate(user)

    async def create_user(
        self,
        __session__: AsyncSession,
        /,
        data: UserCreate,
    ) -> UserRead:
        user = await self._repository.create(
            __session__,
            data=data,
            flush=True,
        )

        return UserRead.model_validate(user)

    async def create_users(
        self,
        __session__: AsyncSession,
        /,
        data: List[UserCreate],
    ) -> List[UserRead]:
        users = await self._repository.create_all(
            __session__,
            data=data,
            flush=True,
        )

        return [UserRead.model_validate(user) for user in users]

    async def patch_email(
        self,
        __session__: AsyncSession,
        /,
        id: int,
        email: str,
    ) -> UserRead:
        user = await self._repository.patch_email(
            __session__,
            id=id,
            email=email,
            flush=True,
        )

        return UserRead.model_validate(user)

    async def bulk_patch_email(
        self,
        __session__: AsyncSession,
        /,
        data: List[Tuple[int, str]],
    ) -> List[UserRead]:
        users = await self._repository.bulk_patch_email(
            __session__,
            data=data,
            flush=True,
        )

        return [UserRead.model_validate(user) for user in users]

    async def patch_disable(
        self,
        __session__: AsyncSession,
        /,
        ids: List[int],
    ) -> List[UserRead]:
        users = await self._repository.patch_disable(
            __session__,
            ids=ids,
            flush=True,
        )

        return [UserRead.model_validate(user) for user in users]

    async def delete_by_id(
        self,
        __session__: AsyncSession,
        /,
        id: int,
    ) -> bool:
        return await self._repository.delete(
            __session__,
            id=id,
            flush=True,
        )

    async def delete_by_ids(
        self,
        __session__: AsyncSession,
        /,
        ids: List[int],
    ) -> bool:
        return await self._repository.delete_all(
            __session__,
            ids=ids,
            flush=True,
        )

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pysql-repo-0.7.4.0.tar.gz (28.2 kB view details)

Uploaded Source

Built Distribution

pysql_repo-0.7.4.0-py3-none-any.whl (27.1 kB view details)

Uploaded Python 3

File details

Details for the file pysql-repo-0.7.4.0.tar.gz.

File metadata

  • Download URL: pysql-repo-0.7.4.0.tar.gz
  • Upload date:
  • Size: 28.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.9.19

File hashes

Hashes for pysql-repo-0.7.4.0.tar.gz
Algorithm Hash digest
SHA256 4e9a59780d67eddb4d61bf077d7faba8b5e29d0c6316a0839c9b3e42b6a2c526
MD5 016bc660898e16b93728103fcd806857
BLAKE2b-256 4819bacf94b17a2352cf8dd736da1b6e6ff1170aa584787031380bf816a8ca28

See more details on using hashes here.

File details

Details for the file pysql_repo-0.7.4.0-py3-none-any.whl.

File metadata

  • Download URL: pysql_repo-0.7.4.0-py3-none-any.whl
  • Upload date:
  • Size: 27.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.9.19

File hashes

Hashes for pysql_repo-0.7.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e54e86fd34df1e44c7a0ad0c7106034b9e731606d4b7fb93656f790bb5f9ecba
MD5 0036db14248d3d4ed51696ad5f83fab5
BLAKE2b-256 54c1318eca0019bb831a75948c7881adea9b2c28f1e57aaecfbbfbd7e8e7aabf

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page