Skip to main content

The library provides a standardized way to connect to DB, and Base DAO class implementation

Project description

Ash DAL

The library provides a standardized way to connect to DB, and Base DAO class implementation

Installation

PyPi

pip install ash-dal

# OR

poetry add ash-dal

From github

In order to install Ash DAL directly from GitHub repository, run:

pip install git+https://github.com/meetash/ash-dal.git@main

# OR

poetry add git+https://github.com/meetash/ash-dal.git@main

Usage

Database class

There are two options: sync or async database connection.

Synchronous database

from ash_dal import Database, URL
from ash_dal.utils import prepare_ssl_context
from models import User


ssl_context = prepare_ssl_context(
    ssl_root_dir='/tmp/certs',
    client_cert_path='client-cert.pem',
    client_key_path='client-key.pem',
    server_ca_path='server-ca.pem'
)

db_url = URL.create(
    drivername="mysql+pymysql",
    username="my_db_user",
    password="S3cret",
    host="127.0.0.1",
    port=3306,
    database="my_db",
)

read_replica_db_url = URL.create(
    drivername="mysql+pymysql",
    username="my_db_user",
    password="S3cret",
    host="127.0.0.1",
    port=3307,
    database="my_db",
)


DATABASE = Database(
	db_url=db_url,
	read_replica_url=read_replica_db_url,
	ssl_context=ssl_context,
    read_replica_ssl_context=ssl_context
)


def get_users(db: Database):
	with db.session as session:
		users = session.scalars(User)
	return users

Asynchronous database

from ash_dal import AsyncDatabase, URL
from ash_dal.utils import prepare_ssl_context
from models import User


ssl_context = prepare_ssl_context(
    ssl_root_dir='/tmp/certs',
    client_cert_path='client-cert.pem',
    client_key_path='client-key.pem',
    server_ca_path='server-ca.pem'
)

db_url = URL.create(
    drivername="mysql+aiomysql",
    username="my_db_user",
    password="S3cret",
    host="127.0.0.1",
    port=3306,
    database="my_db",
)

read_replica_db_url = URL.create(
    drivername="mysql+aiomysql",
    username="my_db_user",
    password="S3cret",
    host="127.0.0.1",
    port=3307,
    database="my_db",
)


DATABASE = AsyncDatabase(
	db_url=db_url,
	read_replica_url=read_replica_db_url,
	ssl_context=ssl_context,
    read_replica_ssl_context=ssl_context
)

async def async_get_users(db: AsyncDatabase):
	async with db.session as session:
		users = await session.scalars(User)
	return users

DAO Base class

Like you can use sync/async Database classes, there are also two variations of DAO Base class

Synchronous DAO Base class

from dataclasses import dataclass

from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from ash_dal import BaseDAO, Database, URL


class Base(DeclarativeBase):
    pass


class ExampleORMModel(Base):
    __tablename__ = "example_table"

    id: Mapped[int] = mapped_column(autoincrement=True, primary_key=True)
    first_name: Mapped[str] = mapped_column(String(64))
    last_name: Mapped[str] = mapped_column(String(64))
    age: Mapped[int]


@dataclass()
class ExampleEntity:
    id: int
    first_name: str
    last_name: str
    age: int


class ExampleDAO(BaseDAO[ExampleEntity]):
    __entity__ = ExampleEntity
    __model__ = ExampleORMModel


if __name__ == '__main__':
    db = Database(
        db_url=URL.create(
            drivername="mysql+pymysql",
            username="my_db_user",
            password="S3cret",
            host="127.0.0.1",
            port=3306,
            database="my_db",
        )
    )
    dao = ExampleDAO(database=db)

    entity = dao.get_by_pk(pk='some-primary-key')

Asynchronous DAO Base class

from dataclasses import dataclass

from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from ash_dal import AsyncBaseDAO, AsyncDatabase, URL
import asyncio


class Base(DeclarativeBase):
    pass


class ExampleORMModel(Base):
    __tablename__ = "example_table"

    id: Mapped[int] = mapped_column(autoincrement=True, primary_key=True)
    first_name: Mapped[str] = mapped_column(String(64))
    last_name: Mapped[str] = mapped_column(String(64))
    age: Mapped[int]


@dataclass()
class ExampleEntity:
    id: int
    first_name: str
    last_name: str
    age: int


class ExampleDAO(AsyncBaseDAO[ExampleEntity]):
    __entity__ = ExampleEntity
    __model__ = ExampleORMModel


if __name__ == '__main__':
    db = AsyncDatabase(
        db_url=URL.create(
            drivername="mysql+aiomysql",
            username="my_db_user",
            password="S3cret",
            host="127.0.0.1",
            port=3306,
            database="my_db",
        )
    )
    dao = ExampleDAO(database=db)
    entity = asyncio.run(dao.get_by_pk(pk='some-primary-key'))

BaseDAO methods

The main goal of BaseDAO class is to provide CRUD methods that could be useful while building a basic CRUD API. BaseDAO class provides the following default methods:

Data fetching methods

  • BaseDAO.get_by_pk(pk) - Using this method you can fetch an entity by its primary key.
    entity = dao.get_by_pk(pk='AWABCD1234')
    
  • BaseDAO.all - Using this method you can fetch all entities from the database. It's might be useful for fetching data from small tables where you don't actually need pagination (configs etc)
    entities = dao.get_all()
    
  • BaseDAO.get_page(page_index, [page_size]) - Fetch a page with entities by page index. If the index is out of range, an empty page will be returned. If page_size is not passed - the default page size (20) will be applied.
    page = dao.get_page(page_index=2, page_size=10)
    for entity in page:
        # Do some stuff with entity
        ...
    
  • BaseDAO.paginate([specification, page_size]) - An iterator that returns pages with entities. A specification can be applied to fetch filtered data.
    for page in dao.paginate(specification={'status': 'notified'}, page_size=15):
        # Do some stuff with page
        ...
    
  • BaseDAO.filter(specification) - Fetch entities from database by specification. It's might be useful for fetching filtered data from small tables where you don't actually need pagination (configs etc)
    entities = dao.filter(specification={'labId': 2})
    
  • BaseDAO.stream([specification, batch_size]) - Stream entities from database one by one. Useful when processing large datasets without loading all records into memory.
    for entity in dao.stream(specification={'status': 'notified'}, batch_size=100):
        # Do some stuff with entity
        ...
    

Data manipulation methods

  • BaseDAO.create(data) - Create an entity in database based on passed data. Returns back an entity
    data = {'foo': 'bar'}
    entity = dao.create(data=data)
    
  • BaseDAO.bulk_create(data) - Create multiple entities within one query. Unlike the previous method this one doesn't return anything.
    data = [{'foo': 'bar'}, {'foo': 'beer'}]
    dao.bulk_create(data=data)
    
  • BaseDAO.update(specification, update_data) - Patch entity(ies) by specification.
    update_data = {'foo': 'bar'}
    is_updated = dao.update(specification={'foo': 'beer'}, update_data=update_data)
    
  • BaseDAO.delete(specification) - Remove entity(ies) by specification.
    is_removed = dao.delete(specification={'id': 'some-id'},)
    

Read-only replicas

This library automatically routes to the slave database for read queries (SELECT/WITH) and the master for writes. This behavior assumes that sa.text() queries do not contain multiple semicolon-separated statements like SELECT 1; TRUNCATE some_table;.

If you execute a write in a session, subsequent reads will switch to the master in order to maintain read-after-write consistency.

If you want to manually route a query, pass an execution hint:

from sqlalchemy import text

with DATABASE.session as session:
    result = session.execute(text("CALL some_read_only_procedure()").execution_options(use_slave=True))

You can also force a query to use master with the use_master hint.

Pagination strategies

Three pagination strategies are offered out of the box:

  1. Offset limit pagination is the default for normal queries in the BaseDAO class
  2. Cursor pagination is the default for streaming queries in the BaseDAO class
  3. Deferred join pagination is offered as a potential performance improvement over offset pagination when deeply paginating

If you want to change the pagination strategy for you DAO class, you can do it by re-defining the __paginator_factory__ field inside your DAO class:

from ash_dal import BaseDAO, DeferredJoinPaginator
from ash_dal.utils import DeferredJoinPaginatorFactory, AsyncDeferredJoinPaginator

class ExampleEntity:
    ...

class ExampleORMModel:
    id: int
    ...

class ExampleDAO(BaseDAO[ExampleEntity]):
    __entity__ = ExampleEntity
    __model__ = ExampleORMModel
    __paginator_factory__ = DeferredJoinPaginatorFactory(
        paginator_class=DeferredJoinPaginator,
        pk_field=ExampleORMModel.id,
    )

# OR async

class ExampleAsyncDAO(BaseDAO[ExampleEntity]):
    __entity__ = ExampleEntity
    __model__ = ExampleORMModel
    __paginator_factory__ = DeferredJoinPaginatorFactory(
        paginator_class=AsyncDeferredJoinPaginator,
        pk_field=ExampleORMModel.id,
    )

Custom pagination strategy

You can also define your own pagination strategy. Be aware that your paginator class should implement IPaginator or IAsyncPaginator interfaces:

from ash_dal.utils.paginator.interface import IPaginator, IAsyncPaginator
from ash_dal import PaginatorPage
import typing as t

class ExampleORM:
    ...

class MyPaginator(IPaginator[ExampleORM]):
    def get_page(self, page_index: int) -> PaginatorPage[ExampleORM]:
        # Do page fetching
        ...

    def paginate(self) -> t.Iterator[PaginatorPage[ExampleORM]]:
        # Do pagination
        ...
    @property
    def size(self) -> int:
        # Get pages count
        return 10

# or async paginator

class MyAsyncPaginator(IAsyncPaginator[ExampleORM]):
    async def get_page(self, page_index: int) -> PaginatorPage[ExampleORM]:
        # Do page fetching asynchronously
        ...

    async def paginate(self) -> t.AsyncIterator[PaginatorPage[ExampleORM]]:
        # Do pagination asynchronously
        ...
    @property
    async def size(self) -> int:
        # Get pages count asynchronously
        return 10

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ash_dal-0.5.2.tar.gz (16.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ash_dal-0.5.2-py3-none-any.whl (24.3 kB view details)

Uploaded Python 3

File details

Details for the file ash_dal-0.5.2.tar.gz.

File metadata

  • Download URL: ash_dal-0.5.2.tar.gz
  • Upload date:
  • Size: 16.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.0.1 CPython/3.11.4 Linux/6.14.0-1017-azure

File hashes

Hashes for ash_dal-0.5.2.tar.gz
Algorithm Hash digest
SHA256 c2b2738a89833eede9d3d502a76820cd670e424a12119ff6338a783c320154c5
MD5 2fd6b592a58696abc3bc85069d0a01d4
BLAKE2b-256 5b436957d1d5bba28adee389494958b81cf93171ca529419fe1b8c6bec311b1c

See more details on using hashes here.

File details

Details for the file ash_dal-0.5.2-py3-none-any.whl.

File metadata

  • Download URL: ash_dal-0.5.2-py3-none-any.whl
  • Upload date:
  • Size: 24.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.0.1 CPython/3.11.4 Linux/6.14.0-1017-azure

File hashes

Hashes for ash_dal-0.5.2-py3-none-any.whl
Algorithm Hash digest
SHA256 11fe7f03c9af78cdec45641cd5ea847860cbc8399dd23ddb89aefc6e3016fa44
MD5 739ee70732dafe5352230ae54192b1d7
BLAKE2b-256 9ee04f3326783922e1c1faf1252cc7b2f778ad7778b11f91b6a202b061d79229

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page