The library provides a standardized way to connect to DB, and Base DAO class implementation
Project description
Ash DAL
The library provides a standardized way to connect to DB, and Base DAO class implementation
Installation
PyPi
pip install ash-dal
# OR
poetry add ash-dal
From github
In order to install Ash DAL directly from GitHub repository, run:
pip install git+https://github.com/meetash/ash-dal.git@main
# OR
poetry add git+https://github.com/meetash/ash-dal.git@main
Usage
Database class
There are two options: sync or async database connection.
Synchronous database
from ash_dal import Database, URL
from ash_dal.utils import prepare_ssl_context
from models import User
ssl_context = prepare_ssl_context(
ssl_root_dir='/tmp/certs',
client_cert_path='client-cert.pem',
client_key_path='client-key.pem',
server_ca_path='server-ca.pem'
)
db_url = URL.create(
drivername="mysql+pymysql",
username="my_db_user",
password="S3cret",
host="127.0.0.1",
port=3306,
database="my_db",
)
read_replica_db_url = URL.create(
drivername="mysql+pymysql",
username="my_db_user",
password="S3cret",
host="127.0.0.1",
port=3307,
database="my_db",
)
DATABASE = Database(
db_url=db_url,
read_replica_url=read_replica_db_url,
ssl_context=ssl_context,
read_replica_ssl_context=ssl_context
)
def get_users(db: Database):
with db.session as session:
users = session.scalars(User)
return users
Asynchronous database
from ash_dal import AsyncDatabase, URL
from ash_dal.utils import prepare_ssl_context
from models import User
ssl_context = prepare_ssl_context(
ssl_root_dir='/tmp/certs',
client_cert_path='client-cert.pem',
client_key_path='client-key.pem',
server_ca_path='server-ca.pem'
)
db_url = URL.create(
drivername="mysql+aiomysql",
username="my_db_user",
password="S3cret",
host="127.0.0.1",
port=3306,
database="my_db",
)
read_replica_db_url = URL.create(
drivername="mysql+aiomysql",
username="my_db_user",
password="S3cret",
host="127.0.0.1",
port=3307,
database="my_db",
)
DATABASE = AsyncDatabase(
db_url=db_url,
read_replica_url=read_replica_db_url,
ssl_context=ssl_context,
read_replica_ssl_context=ssl_context
)
async def async_get_users(db: AsyncDatabase):
async with db.session as session:
users = await session.scalars(User)
return users
DAO Base class
Like you can use sync/async Database classes, there are also two variations of DAO Base class
Synchronous DAO Base class
from dataclasses import dataclass
from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from ash_dal import BaseDAO, Database, URL
class Base(DeclarativeBase):
pass
class ExampleORMModel(Base):
__tablename__ = "example_table"
id: Mapped[int] = mapped_column(autoincrement=True, primary_key=True)
first_name: Mapped[str] = mapped_column(String(64))
last_name: Mapped[str] = mapped_column(String(64))
age: Mapped[int]
@dataclass()
class ExampleEntity:
id: int
first_name: str
last_name: str
age: int
class ExampleDAO(BaseDAO[ExampleEntity]):
__entity__ = ExampleEntity
__model__ = ExampleORMModel
if __name__ == '__main__':
db = Database(
db_url=URL.create(
drivername="mysql+pymysql",
username="my_db_user",
password="S3cret",
host="127.0.0.1",
port=3306,
database="my_db",
)
)
dao = ExampleDAO(database=db)
entity = dao.get_by_pk(pk='some-primary-key')
Asynchronous DAO Base class
from dataclasses import dataclass
from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from ash_dal import AsyncBaseDAO, AsyncDatabase, URL
import asyncio
class Base(DeclarativeBase):
pass
class ExampleORMModel(Base):
__tablename__ = "example_table"
id: Mapped[int] = mapped_column(autoincrement=True, primary_key=True)
first_name: Mapped[str] = mapped_column(String(64))
last_name: Mapped[str] = mapped_column(String(64))
age: Mapped[int]
@dataclass()
class ExampleEntity:
id: int
first_name: str
last_name: str
age: int
class ExampleDAO(AsyncBaseDAO[ExampleEntity]):
__entity__ = ExampleEntity
__model__ = ExampleORMModel
if __name__ == '__main__':
db = AsyncDatabase(
db_url=URL.create(
drivername="mysql+aiomysql",
username="my_db_user",
password="S3cret",
host="127.0.0.1",
port=3306,
database="my_db",
)
)
dao = ExampleDAO(database=db)
entity = asyncio.run(dao.get_by_pk(pk='some-primary-key'))
BaseDAO methods
The main goal of BaseDAO class is to provide CRUD methods that could be useful while building a basic CRUD API.
BaseDAO
class provides the following default methods:
Data fetching methods
BaseDAO.get_by_pk(pk)
- Using this method you can fetch an entity by its primary key.entity = dao.get_by_pk(pk='AWABCD1234')
BaseDAO.all
- Using this method you can fetch all entities from the database. It's might be useful for fetching data from small tables where you don't actually need pagination (configs etc)entities = dao.get_all()
BaseDAO.get_page(page_index, [page_size])
- Fetch a page with entities by page index. If the index is out of range, an empty page will be returned. Ifpage_size
is not passed - the default page size (20) will be applied.page = dao.get_page(page_index=2, page_size=10) for entity in page: # Do some stuff with entity ...
BaseDAO.paginate([specification, page_size])
- An iterator that returns pages with entities. A specification can be applied to fetch filtered data.for page in dao.paginate(specification={'status': 'notified'}, page_size=15): # Do some stuff with page ...
BaseDAO.filter(specification)
- Fetch entities from database by specification. It's might be useful for fetching filtered data from small tables where you don't actually need pagination (configs etc)entities = dao.filter(specification={'labId': 2})
Data manipulation methods
BaseDAO.create(data)
- Create an entity in database based on passed data. Returns back an entitydata = {'foo': 'bar'} entity = dao.create(data=data)
BaseDAO.bulk_create(data)
- Create multiple entities within one query. Unlike the previous method this one doesn't return anything.data = [{'foo': 'bar'}, {'foo': 'beer'}] dao.bulk_create(data=data)
BaseDAO.update(specification, update_data)
- Patch entity(ies) by specification.update_data = {'foo': 'bar'} is_updated = dao.update(specification={'foo': 'beer'}, update_data=update_data)
BaseDAO.delete(specification)
- Remove entity(ies) by specification.is_removed = dao.delete(specification={'id': 'some-id'},)
Pagination strategies
By default the BaseDAO class uses the simplest pagination strategy that is based on SQL offset & limit mechanisms. The library provides one more strategy out of the box that is called deferred join pagination. You can learn more about it here.
If you want to change the pagination strategy for you DAO class, you can do it by re-defining
the __paginator_factory__
field inside your DAO class:
from ash_dal import BaseDAO, DeferredJoinPaginator
from ash_dal.utils import DeferredJoinPaginatorFactory, AsyncDeferredJoinPaginator
class ExampleEntity:
...
class ExampleORMModel:
id: int
...
class ExampleDAO(BaseDAO[ExampleEntity]):
__entity__ = ExampleEntity
__model__ = ExampleORMModel
__paginator_factory__ = DeferredJoinPaginatorFactory(
paginator_class=DeferredJoinPaginator,
pk_field=ExampleORMModel.id,
)
# OR async
class ExampleAsyncDAO(BaseDAO[ExampleEntity]):
__entity__ = ExampleEntity
__model__ = ExampleORMModel
__paginator_factory__ = DeferredJoinPaginatorFactory(
paginator_class=AsyncDeferredJoinPaginator,
pk_field=ExampleORMModel.id,
)
Custom pagination strategy
You can also define your own pagination strategy. Be aware that your paginator class should implement IPaginator or IAsyncPaginator interfaces:
from ash_dal.utils.paginator.interface import IPaginator, IAsyncPaginator
from ash_dal import PaginatorPage
import typing as t
class ExampleORM:
...
class MyPaginator(IPaginator[ExampleORM]):
def get_page(self, page_index: int) -> PaginatorPage[ExampleORM]:
# Do page fetching
...
def paginate(self) -> t.Iterator[PaginatorPage[ExampleORM]]:
# Do pagination
...
@property
def size(self) -> int:
# Get pages count
return 10
# or async paginator
class MyAsyncPaginator(IAsyncPaginator[ExampleORM]):
async def get_page(self, page_index: int) -> PaginatorPage[ExampleORM]:
# Do page fetching asynchronously
...
async def paginate(self) -> t.AsyncIterator[PaginatorPage[ExampleORM]]:
# Do pagination asynchronously
...
@property
async def size(self) -> int:
# Get pages count asynchronously
return 10
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file ash_dal-0.2.2.tar.gz
.
File metadata
- Download URL: ash_dal-0.2.2.tar.gz
- Upload date:
- Size: 11.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.6.1 CPython/3.11.4 Linux/6.5.0-1025-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | e17bc7571dc5deca1ea4086cf8139f723ac37577ac00318a1a4bb05cb01bccad |
|
MD5 | cb01f746208f1062fc76441b68ccf5c3 |
|
BLAKE2b-256 | e0e5d732f7c59719a46086beeca0580b269761f7fa4eec3d25884d1f208e6f76 |
File details
Details for the file ash_dal-0.2.2-py3-none-any.whl
.
File metadata
- Download URL: ash_dal-0.2.2-py3-none-any.whl
- Upload date:
- Size: 17.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.6.1 CPython/3.11.4 Linux/6.5.0-1025-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | cbf8f6fdae0cd14bdc65b6d2310e0c6b5bddd334924fdc72f824fd4857bf1401 |
|
MD5 | 7be0484da77200c4a5bd1e1e825c3be4 |
|
BLAKE2b-256 | 13cdf595c211c08261cdf25e9f41c45ec20c7e2843be7ef3c219555b3bd6bb63 |