An integration package created by the company LOGYCA that connects Postgres and is used to standardize connections and dependency injection in synchronous or asynchronous mode. Tested in fastapi and in console/worker scripts.
Project description
LOGYCA public libraries
About us
LOGYCA public libraries: Standard methods to connect to postgres
- Traversal libraries: Standard methods to connect to postgres with dependency injection using yield to be used by microservices such as API(s), Workers or scripts.
Source code | Package (PyPI) | Samples
"pip install" dependency check
The user must select the required libraries and versions for the project that uses this library, which validates that they are pre-installed in order to be installed.
To install the libraries of the logyca postgres package verifying the SQLAlchemy prerequisite without validating connection drivers to postgres, use the following command:
# Check SQLAlchemy dependency that is installed
pip install logyca_postgres
To install the logyca postgres package libraries and validate the postgres asynchronous or synchronous connection driver, use the following command:
# Check asyncpg driver dependency that is installed
pip install logyca_postgres[async]
# Check psycopg2 driver dependency that is installed
pip install logyca_postgres[sync-psycopg2]
# Check psycopg2-binary driver dependency that is installed
pip install logyca_postgres[sync-psycopg2-binary]
# Check asyncpg+psycopg2-binary driver dependency that is installed
pip install logyca_postgres[async-sync-psycopg2]
Semantic Versioning
logyca < MAJOR >.< MINOR >.< PATCH >
- MAJOR: version when you make incompatible API changes
- MINOR: version when you add functionality in a backwards compatible manner
- PATCH: version when you make backwards compatible bug fixes
Definitions for releasing versions
-
https://peps.python.org/pep-0440/
- X.YaN (Alpha release): Identify and fix early-stage bugs. Not suitable for production use.
- X.YbN (Beta release): Stabilize and refine features. Address reported bugs. Prepare for official release.
- X.YrcN (Release candidate): Final version before official release. Assumes all major features are complete and stable. Recommended for testing in non-critical environments.
- X.Y (Final release/Stable/Production): Completed, stable version ready for use in production. Full release for public use.
Make multiple connections to different motors
When configuring the connection for dependency injection to another engine, a new object must be created that includes the singleton pattern.
The same thing must be done for each engine.
The library uses a singleton pattern "class SyncConnEngine(metaclass=Singleton):", where the class is allowed to be instantiated only once. You can create another connection to another engine but you must create an inherited class in order to create a new configuration instance.
Example:
class SyncConnEngineX(SyncConnEngine):
def __init__(self, url_connection,server_settings):
super().__init__(url_connection,server_settings)
sync_session_x=SyncConnEngineX(
url_connection=SyncConnEngineX.build_url_connection(user=settings.DB_USER_X,password=settings.DB_PASS_X,host=settings.DB_HOST_X,port=settings.DB_PORT_X,database=settings.DB_NAME_X,ssl_enable=SyncConnEngineX.DB_SSL_X),
server_settings=SyncConnEngineX.server_settings(pool_size=5,max_overflow=1,pool_recycle=10800,application_name=f"{App.Settings.NAME} - SyncConnEngineX")
)
Asynchronous mode
FastAPI
from fastapi import FastAPI, Depends, HTTPException
from logyca_postgres import AsyncConnEngine, commit_rollback_async, check_connection_async
from sqlalchemy import text as text_to_sql
from sqlalchemy.ext.asyncio import AsyncSession
import os
DB_USER=os.getenv('DB_USER','postgres')
DB_PASS=os.getenv('DB_PASS','xxx')
DB_HOST=os.getenv('DB_HOST','localhost')
DB_PORT=os.getenv('DB_PORT',5432)
DB_NAME=os.getenv('DB_NAME','test')
ssl_enable_like_local_docker_container=False
app = FastAPI()
conn_async_session=AsyncConnEngine(
url_connection=AsyncConnEngine.build_url_connection(user=DB_USER,password=DB_PASS,host=DB_HOST,port=DB_PORT,database=DB_NAME,ssl_enable=ssl_enable_like_local_docker_container),
server_settings=AsyncConnEngine.server_settings(pool_size=5,max_overflow=1,pool_recycle=10800,application_name="MyApp - AsyncConnEngine")
)
'''
The connection pool (pool_size) after the first query will remain open until the application is stopped.
'''
@app.get("/simulated_query_async/")
async def read_item(async_session:AsyncSession = Depends(conn_async_session)):
try:
status, date_time_check_conn = await check_connection_async(async_session)
if(status):
query = text_to_sql("SELECT now();")
result = await async_session.execute(query)
simulated_query = result.scalar_one_or_none()
await commit_rollback_async(async_session)
return {"date_time_check_conn": date_time_check_conn, "simulated_query": simulated_query}
else:
raise HTTPException(status_code=404, detail="async_session connect db error...")
except Exception as e:
raise HTTPException(status_code=404, detail=f"error: {e}")
Worker or script
from logyca_postgres import AsyncConnEngine, commit_rollback_async, check_connection_async
from sqlalchemy import text as text_to_sql
from sqlalchemy.ext.asyncio import AsyncSession
import asyncio
import os
DB_USER=os.getenv('DB_USER','postgres')
DB_PASS=os.getenv('DB_PASS','xxx')
DB_HOST=os.getenv('DB_HOST','localhost')
DB_PORT=os.getenv('DB_PORT',5432)
DB_NAME=os.getenv('DB_NAME','test')
ssl_enable_like_local_docker_container=False
conn_async_session=AsyncConnEngine(
url_connection=AsyncConnEngine.build_url_connection(user=DB_USER,password=DB_PASS,host=DB_HOST,port=DB_PORT,database=DB_NAME,ssl_enable=ssl_enable_like_local_docker_container),
server_settings=AsyncConnEngine.server_settings(pool_size=5,max_overflow=1,pool_recycle=10800,application_name="MyApp - AsyncConnEngine")
)
'''
The connection pool (pool_size) after the first query will remain open until the application is stopped or the engine is terminated: close_engine().
'''
async def methods(async_session:AsyncSession):
status, date_time_check_conn = await check_connection_async(async_session)
if(status):
query = text_to_sql("SELECT now();")
result = await async_session.execute(query)
simulated_query = result.scalar_one_or_none()
await commit_rollback_async(async_session)
print(f"date_time_check_conn={date_time_check_conn},simulated_query={simulated_query}")
else:
print("async_session connect db error...")
async def main():
async for async_session in conn_async_session.get_async_session():
await methods(async_session)
await conn_async_session.close_engine()
if __name__ == "__main__":
asyncio.run(main())
synchronous mode
FastAPI
from fastapi import FastAPI, Depends, HTTPException
from logyca_postgres import SyncConnEngine, commit_rollback_sync, check_connection_sync
from sqlalchemy.orm.session import Session
import os
from sqlalchemy import text as text_to_sql
DB_USER=os.getenv('DB_USER','postgres')
DB_PASS=os.getenv('DB_PASS','xxx')
DB_HOST=os.getenv('DB_HOST','localhost')
DB_PORT=os.getenv('DB_PORT',5432)
DB_NAME=os.getenv('DB_NAME','test')
ssl_enable_like_local_docker_container=False
app = FastAPI()
conn_sync_session=SyncConnEngine(
url_connection=SyncConnEngine.build_url_connection(user=DB_USER,password=DB_PASS,host=DB_HOST,port=DB_PORT,database=DB_NAME,ssl_enable=ssl_enable_like_local_docker_container),
server_settings=SyncConnEngine.server_settings(pool_size=5,max_overflow=1,pool_recycle=10800,application_name="MyApp - AsyncConnEngine")
)
'''
The connection pool (pool_size) after the first query will remain open until the application is stopped.
'''
@app.get("/simulated_query_sync/")
def read_item(sync_session:Session = Depends(conn_sync_session)):
try:
status, date_time_check_conn = check_connection_sync(sync_session)
if(status):
query = text_to_sql("SELECT now();")
result = sync_session.execute(query)
simulated_query = result.fetchone()[0]
commit_rollback_sync(sync_session)
return {"date_time_check_conn": date_time_check_conn, "simulated_query": simulated_query}
else:
raise HTTPException(status_code=404, detail="async_session connect db error...")
except Exception as e:
raise HTTPException(status_code=404, detail=f"error: {e}")
Worker or script
from logyca_postgres import SyncConnEngine, commit_rollback_sync, check_connection_sync
from sqlalchemy import text as text_to_sql
from sqlalchemy.orm.session import Session
import os
DB_USER=os.getenv('DB_USER','postgres')
DB_PASS=os.getenv('DB_PASS','***')
DB_HOST=os.getenv('DB_HOST','localhost')
DB_PORT=os.getenv('DB_PORT',5432)
DB_NAME=os.getenv('DB_NAME','test')
ssl_enable_like_local_docker_container=False
conn_sync_session=SyncConnEngine(
url_connection=SyncConnEngine.build_url_connection(user=DB_USER,password=DB_PASS,host=DB_HOST,port=DB_PORT,database=DB_NAME,ssl_enable=ssl_enable_like_local_docker_container),
server_settings=SyncConnEngine.server_settings(pool_size=5,max_overflow=1,pool_recycle=10800,application_name="MyApp - SyncConnEngine")
)
'''
The connection pool (pool_size) after the first query will remain open until the application is stopped or the engine is terminated: close_engine().
'''
def methods(sync_session: Session):
status, date_time_check_conn = check_connection_sync(sync_session)
if(status):
query = text_to_sql("SELECT now();")
result = sync_session.execute(query)
simulated_query = result.fetchone()[0]
commit_rollback_sync(sync_session)
print(f"date_time_check_conn={date_time_check_conn},simulated_query={simulated_query}")
else:
print("sync_session connect db error...")
def main():
for sync_session in conn_sync_session.get_sync_session():
methods(sync_session)
conn_sync_session.close_engine()
if __name__ == "__main__":
main()
Changelog
All notable changes to this project will be documented in this file.
The format is based on Keep a Changelog, and this project adheres to Semantic Versioning.
Types of changes
- Added for new features.
- Changed for changes in existing functionality.
- Deprecated for soon-to-be removed features.
- Removed for now removed features.
- Fixed for any bug fixes.
- Security in case of vulnerabilities.
[0.0.1rc1] - 2024-04-22
Added
- First tests using pypi.org in develop environment.
- New functionality of asynchronous and synchronous connections to postgresql databases.
- Functionalities can be used in fastapi or workers like Azure Functions.
- Examples of use are added to the documentation of the functions in docstring
- In the samples folder of this library, there are complete working examples of using the code.
[0.1.0] - 2024-05-21
Added
- Completion of testing and launch into production.
[0.1.2] - 2024-05-23
Added
- Documentation integrated with github
[0.1.3] - 2024-05-30
Fixed
- Static messages are removed, since they do not cover errors globally.
[0.1.4] - 2024-07-03
Fixed
- Postgresql sync connection, fix ssl mode name
[0.1.5] - 2024-07-05
Fixed
- Readme upgrade
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file logyca_postgres-0.1.5.tar.gz
.
File metadata
- Download URL: logyca_postgres-0.1.5.tar.gz
- Upload date:
- Size: 12.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.11.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 57b851951bed7de82406ea16d56ab6615b7e1deaac3230155b63105559142bec |
|
MD5 | 0f05017ad5306b54b1442ac30c83dd01 |
|
BLAKE2b-256 | 67e531adfeb5346262083d347ea9e416d6c3341cfc2e412b9deee56ecdcbf482 |
File details
Details for the file logyca_postgres-0.1.5-py3-none-any.whl
.
File metadata
- Download URL: logyca_postgres-0.1.5-py3-none-any.whl
- Upload date:
- Size: 13.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.11.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1cb33c08d11acba68d96181b8fb40f32560381051984c6da542894bf73dd1edc |
|
MD5 | 3ea5d5d86d4b817a4b2f8b0ec59612b0 |
|
BLAKE2b-256 | a3fdf96f9ceea43132cc81a83d7599561c6682984f30dbc972878a7d3a22a6ca |