Persistence layer utilities for mir project
Project description
mir-persistence-layer-utils
Utilities for the persistence layer when working with the mir project. The official PyPi package can be found here.
Dependencies
- pymongo
- motor
- bcrypt
- pydantic
- httpx
- hurry.filesize
- python-dotenv
- psutil
Installation
Installing the utilities via pip
pip install navalmartin-mir-db-utils
For a specific version you can use
pip install navalmartin-mir-db-utils==x.x.x
You can uninstall the project via
pip uninstall navalmartin-mir-db-utils
How to use
Create a session
from dotenv import load_dotenv
from navalmartin_mir_db_utils.dbs import MongoDBSession
# laod configuration variables
# using the default .env
# load the MONGODB_URL
load_dotenv()
# assume that the MONGODB_NAME is not loaded
# so we need to set it manuall
session = MongoDBSession(db_name="my-db-name")
Execute simple queries
You can use the session to execute simple queries as shown below
import asyncio
import bson
from navalmartin_mir_db_utils.dbs.mongodb_session import MongoDBSession
from navalmartin_mir_db_utils.crud.mongodb_crud_utils import ReadEntityCRUDAPI
IMAGES_COLLECTION_TO_READ="YOUR_COLLECTION_NAME"
async def query_db(mongodb_session: MongoDBSession, criteria: dict,
projection: dict,
collection_name: str):
query_result = ReadEntityCRUDAPI.do_find(criteria=criteria, projection=projection,
db_session=mongodb_session,
collection_name=collection_name)
images = [img async for img in query_result]
return images
def main():
MONGODB_URL = "YOUR_MONGODB_URL"
MONGO_DB_NAME_FROM = "YOUR_MONGODB_NAME"
mir_db_session_from = MongoDBSession(mongodb_url=MONGODB_URL,
db_name=MONGO_DB_NAME_FROM)
result = asyncio.run(query_db(mongodb_session=mir_db_session_from,
criteria={'survey_idx': bson.ObjectId('63ad64252c853ee163fc6a63')},
projection={'original_filename': 1},
collection_name=IMAGES_COLLECTION_TO_READ))
print(result)
if __name__ == '__main__':
main()
Simple models
import datetime
from navalmartin_mir_db_utils.schemata import IndexedItemDataViewBase, UserDataViewBase
class MyIndexedItem(IndexedItemDataViewBase):
pass
if __name__ == '__main__':
mdb_json = {'_id': '123456',
'created_at': datetime.datetime.utcnow(),
'updated_at': datetime.datetime.utcnow()}
my_indexed_item = MyIndexedItem.build_from_mongodb_json(mdb_json=mdb_json)
print(f"MyIndexedItem {my_indexed_item}")
print(f"MyIndexedItem fields set {my_indexed_item.__fields_set__}")
user_data_json = {'_id': '123456',
'created_at': datetime.datetime.utcnow(),
'updated_at': datetime.datetime.utcnow(),
"name": "Alex",
"surname": "Giavaras",
"email": "alex@someemail.com"}
user = UserDataViewBase.build_from_mongodb_json(mdb_json=user_data_json,
access_token="1236",
refresh_token="69878")
print(f"User {user}")
print(f"User fields set {user.__fields_set__}")
You can also run transactions.
from typing import Any
import bson
from pymongo.read_concern import ReadConcern
from pymongo.write_concern import WriteConcern
from pymongo.read_preferences import ReadPreference
import asyncio
from navalmartin_mir_db_utils.dbs.mongodb_session import MongoDBSession
from navalmartin_mir_db_utils.transanctions import run_transaction
from navalmartin_mir_db_utils.transanctions.decorators import with_transaction
IMAGES_COLLECTION_TO_READ = "YOUR_COLLECTION_NAME"
MONGODB_URL = "YOUR_MONGODB_URL"
MONGO_DB_NAME_FROM = "YOUR_MONGODB_NAME"
wc_majority = WriteConcern("majority", wtimeout=1000)
read_concern = ReadConcern("local")
async def read_images_callback(session: Any, kwargs: dict):
db_name = kwargs['db_name']
survey_idx = kwargs['survey_idx']
projection = kwargs['projection']
db = session.client.get_database(db_name)
images_collection = db[IMAGES_COLLECTION_TO_READ]
images = images_collection.find({'survey_idx': bson.ObjectId(survey_idx)},
projection=projection,
session=session)
return images
async def transaction_result_handler(transaction_result: Any):
images = [img async for img in transaction_result]
return images
@with_transaction
async def execute_function(mir_db_session: MongoDBSession):
return await run_transaction(mdb_session=mir_db_session,
async_callback=read_images_callback,
callback_args=callback_args,
max_commit_time_ms=None,
read_concern=read_concern,
write_concern=wc_majority,
read_preference=ReadPreference.PRIMARY,
with_log=True,
transaction_result_handler=transaction_result_handler)
if __name__ == '__main__':
mir_db_session = MongoDBSession(mongodb_url=MONGODB_URL,
db_name=MONGO_DB_NAME_FROM)
callback_args = {'db_name': 'mir_db',
'survey_idx': '63ad64252c853ee163fc6a63',
'projection': {'original_filename': 1}}
transaction_result = asyncio.run(execute_function(mdb_session=mir_db_session))
print(transaction_result)
There is also a decorator available to run a transaction
from typing import Any
import bson
from pymongo.read_concern import ReadConcern
from pymongo.write_concern import WriteConcern
from pymongo.read_preferences import ReadPreference
import asyncio
from navalmartin_mir_db_utils.dbs.mongodb_session import MongoDBSession
from navalmartin_mir_db_utils.transanctions.decorators import use_async_transaction
IMAGES_COLLECTION_TO_READ = 'YOUR_COLLECTION_NAME'
MONGODB_URL = "YOUR_MONGODB_URL"
MONGO_DB_NAME_FROM = "YOUR_MONGODB_NAME"
wc_majority = WriteConcern("majority", wtimeout=1000)
read_concern = ReadConcern("local")
callback_args = {'db_name': 'mir_db',
'survey_idx': '63ad64252c853ee163fc6a63',
'projection': {'original_filename': 1}}
async def read_images_callback(session: Any, kwargs: dict):
db_name = kwargs['db_name']
survey_idx = kwargs['survey_idx']
projection = kwargs['projection']
db = session.client.get_database(db_name)
images_collection = db[IMAGES_COLLECTION_TO_READ]
images = images_collection.find({'survey_idx': bson.ObjectId(survey_idx)},
projection=projection,
session=session)
return images
async def transaction_result_handler(transaction_result: Any):
images = [img async for img in transaction_result]
return images
@use_async_transaction(async_callback=read_images_callback,
callback_args=callback_args,
mdb_session=MongoDBSession(mongodb_url=MONGODB_URL, db_name=MONGO_DB_NAME_FROM),
write_concern=wc_majority,
read_concern=read_concern,
read_preference=ReadPreference.PRIMARY,
max_commit_time_ms=None,
with_log=True,
with_transaction_result=True,
transaction_result_handler=transaction_result_handler)
async def query_db(mongodb_session: MongoDBSession, **kwargs):
transaction_result = kwargs['transaction_result']
return transaction_result
if __name__ == '__main__':
mir_db_session_from = MongoDBSession(mongodb_url=MONGODB_URL,
db_name=MONGO_DB_NAME_FROM)
print("Running transaction as decorator...")
transaction_result = asyncio.run(query_db(mongodb_session=mir_db_session_from))
print(transaction_result)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for navalmartin_mir_db_utils-0.0.13.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5e83620b6cb0b76eec674501ff4b080895ae95fbbba43e7d391fb10d7ed28d4d |
|
MD5 | 0dcee59fc746764c1786c5e5d6a6a4a1 |
|
BLAKE2b-256 | 8d0f8df5f5e6c9067712e3fd23cb23befa1c98c233f3967df0969937fe568b98 |
Close
Hashes for navalmartin_mir_db_utils-0.0.13-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | c9cf3a8f2f02d8353398f40eaac76c33eff88db50b0b7eec2a9d9b8c50ffdde3 |
|
MD5 | e583b75ec66aa1c3ea56ae0c1a9ef04a |
|
BLAKE2b-256 | 79b69abdcb630f668c4b21190272b3007a617b564085b6ed55dc5a79a5a31cb9 |