Skip to main content

An integration package created by the company LOGYCA that connects Postgres and is used to standardize connections and dependency injection in synchronous or asynchronous mode. Tested in fastapi and in console/worker scripts.

Project description

Logyca

LOGYCA public libraries

Package version Python


About us


LOGYCA public libraries: Standard methods to connect to postgres

  • Traversal libraries: Standard methods to connect to postgres with dependency injection using yield to be used by microservices such as API(s), Workers or scripts.

Source code | Package (PyPI) | Samples


"pip install" dependency check

The user must select the required libraries and versions for the project that uses this library, which validates that they are pre-installed in order to be installed.

To install the libraries of the logyca postgres package verifying the SQLAlchemy prerequisite without validating connection drivers to postgres, use the following command:

# Check SQLAlchemy dependency that is installed
pip install logyca_postgres

To install the logyca postgres package libraries and validate the postgres asynchronous or synchronous connection driver, use the following command:

# Check asyncpg driver dependency that is installed
pip install logyca_postgres[async]
# Check psycopg2 driver dependency that is installed
pip install logyca_postgres[sync-psycopg2]
# Check psycopg2-binary driver dependency that is installed
pip install logyca_postgres[sync-psycopg2-binary]
# Check asyncpg+psycopg2-binary driver dependency that is installed
pip install logyca_postgres[async-sync-psycopg2]

Semantic Versioning

logyca < MAJOR >.< MINOR >.< PATCH >

  • MAJOR: version when you make incompatible API changes
  • MINOR: version when you add functionality in a backwards compatible manner
  • PATCH: version when you make backwards compatible bug fixes

Definitions for releasing versions

  • https://peps.python.org/pep-0440/

    • X.YaN (Alpha release): Identify and fix early-stage bugs. Not suitable for production use.
    • X.YbN (Beta release): Stabilize and refine features. Address reported bugs. Prepare for official release.
    • X.YrcN (Release candidate): Final version before official release. Assumes all major features are complete and stable. Recommended for testing in non-critical environments.
    • X.Y (Final release/Stable/Production): Completed, stable version ready for use in production. Full release for public use.

Example of concepts that use a library with a singleton pattern and connection to multiple engines with yield dependency injection

The library uses a singleton pattern "class SyncConnEngine(metaclass=Singleton):", where the class is allowed to be instantiated only once. You can create another connection to another engine but you must create an inherited class in order to create a new configuration instance.

Example: class SyncConnEngineX(SyncConnEngine): def init(self, url_connection,server_settings): super().init(url_connection,server_settings) sync_session_x=SyncConnEngineX( url_connection=SyncConnEngine.build_url_connection(user=settings.DB_USER_X,password=settings.DB_PASS_X,host=settings.DB_HOST_X,port=settings.DB_PORT_X,database=settings.DB_NAME_X,ssl_enable=settings.DB_SSL_X), server_settings=SyncConnEngine.server_settings(pool_size=5,max_overflow=1,pool_recycle=10800,application_name=f"{App.Settings.NAME} - SyncConnEngineX") )

Asynchronous mode

FastAPI

from fastapi import FastAPI, Depends, HTTPException
from logyca_postgres import AsyncConnEngine, commit_rollback_async, check_connection_async
from sqlalchemy import text as text_to_sql
from sqlalchemy.ext.asyncio import AsyncSession
import os

DB_USER=os.getenv('DB_USER','postgres')
DB_PASS=os.getenv('DB_PASS','xxx')
DB_HOST=os.getenv('DB_HOST','localhost')
DB_PORT=os.getenv('DB_PORT',5432)
DB_NAME=os.getenv('DB_NAME','test')
ssl_enable_like_local_docker_container=False

app = FastAPI()

conn_async_session=AsyncConnEngine(
    url_connection=AsyncConnEngine.build_url_connection(user=DB_USER,password=DB_PASS,host=DB_HOST,port=DB_PORT,database=DB_NAME,ssl_enable=ssl_enable_like_local_docker_container),
    server_settings=AsyncConnEngine.server_settings(pool_size=5,max_overflow=1,pool_recycle=10800,application_name="MyApp - AsyncConnEngine")
    )

'''
The connection pool (pool_size) after the first query will remain open until the application is stopped.
'''

@app.get("/simulated_query_async/")
async def read_item(async_session:AsyncSession = Depends(conn_async_session)):
    try:
        status, date_time_check_conn = await check_connection_async(async_session)
        if(status):
            query = text_to_sql("SELECT now();")
            result = await async_session.execute(query)
            simulated_query = result.scalar_one_or_none()
            await commit_rollback_async(async_session)
            return {"date_time_check_conn": date_time_check_conn, "simulated_query": simulated_query}
        else:
            raise HTTPException(status_code=404, detail="async_session connect db error...")
    except Exception as e:
        raise HTTPException(status_code=404, detail=f"error: {e}")

Worker or script

from logyca_postgres import AsyncConnEngine, commit_rollback_async, check_connection_async
from sqlalchemy import text as text_to_sql
from sqlalchemy.ext.asyncio import AsyncSession
import asyncio
import os

DB_USER=os.getenv('DB_USER','postgres')
DB_PASS=os.getenv('DB_PASS','xxx')
DB_HOST=os.getenv('DB_HOST','localhost')
DB_PORT=os.getenv('DB_PORT',5432)
DB_NAME=os.getenv('DB_NAME','test')
ssl_enable_like_local_docker_container=False

conn_async_session=AsyncConnEngine(
    url_connection=AsyncConnEngine.build_url_connection(user=DB_USER,password=DB_PASS,host=DB_HOST,port=DB_PORT,database=DB_NAME,ssl_enable=ssl_enable_like_local_docker_container),
    server_settings=AsyncConnEngine.server_settings(pool_size=5,max_overflow=1,pool_recycle=10800,application_name="MyApp - AsyncConnEngine")
            )

'''
The connection pool (pool_size) after the first query will remain open until the application is stopped or the engine is terminated: close_engine().
'''

async def methods(async_session:AsyncSession):
    status, date_time_check_conn = await check_connection_async(async_session)
    if(status):
        query = text_to_sql("SELECT now();")
        result = await async_session.execute(query)
        simulated_query = result.scalar_one_or_none()
        await commit_rollback_async(async_session)
        print(f"date_time_check_conn={date_time_check_conn},simulated_query={simulated_query}")
    else:
        print("async_session connect db error...")
async def main():
    async for async_session in conn_async_session.get_async_session():
        await methods(async_session)
    await conn_async_session.close_engine()

if __name__ == "__main__":
    asyncio.run(main())

synchronous mode

FastAPI

from fastapi import FastAPI, Depends, HTTPException
from logyca_postgres import SyncConnEngine, commit_rollback_sync, check_connection_sync
from sqlalchemy.orm.session import Session
import os
from sqlalchemy import text as text_to_sql

DB_USER=os.getenv('DB_USER','postgres')
DB_PASS=os.getenv('DB_PASS','xxx')
DB_HOST=os.getenv('DB_HOST','localhost')
DB_PORT=os.getenv('DB_PORT',5432)
DB_NAME=os.getenv('DB_NAME','test')
ssl_enable_like_local_docker_container=False

app = FastAPI()

conn_sync_session=SyncConnEngine(
    url_connection=SyncConnEngine.build_url_connection(user=DB_USER,password=DB_PASS,host=DB_HOST,port=DB_PORT,database=DB_NAME,ssl_enable=ssl_enable_like_local_docker_container),
    server_settings=SyncConnEngine.server_settings(pool_size=5,max_overflow=1,pool_recycle=10800,application_name="MyApp - AsyncConnEngine")
    )

'''
The connection pool (pool_size) after the first query will remain open until the application is stopped.
'''

@app.get("/simulated_query_sync/")
def read_item(sync_session:Session = Depends(conn_sync_session)):
    try:
        status, date_time_check_conn = check_connection_sync(sync_session)
        if(status):
            query = text_to_sql("SELECT now();")
            result = sync_session.execute(query)
            simulated_query = result.fetchone()[0]
            commit_rollback_sync(sync_session)
            return {"date_time_check_conn": date_time_check_conn, "simulated_query": simulated_query}
        else:
            raise HTTPException(status_code=404, detail="async_session connect db error...")
    except Exception as e:
        raise HTTPException(status_code=404, detail=f"error: {e}")

Worker or script

from logyca_postgres import SyncConnEngine, commit_rollback_sync, check_connection_sync
from sqlalchemy import text as text_to_sql
from sqlalchemy.orm.session import Session
import os

DB_USER=os.getenv('DB_USER','postgres')
DB_PASS=os.getenv('DB_PASS','***')
DB_HOST=os.getenv('DB_HOST','localhost')
DB_PORT=os.getenv('DB_PORT',5432)
DB_NAME=os.getenv('DB_NAME','test')
ssl_enable_like_local_docker_container=False

conn_sync_session=SyncConnEngine(
    url_connection=SyncConnEngine.build_url_connection(user=DB_USER,password=DB_PASS,host=DB_HOST,port=DB_PORT,database=DB_NAME,ssl_enable=ssl_enable_like_local_docker_container),
    server_settings=SyncConnEngine.server_settings(pool_size=5,max_overflow=1,pool_recycle=10800,application_name="MyApp - SyncConnEngine")
            )

'''
The connection pool (pool_size) after the first query will remain open until the application is stopped or the engine is terminated: close_engine().
'''

def methods(sync_session: Session):
    status, date_time_check_conn = check_connection_sync(sync_session)
    if(status):
        query = text_to_sql("SELECT now();")
        result = sync_session.execute(query)
        simulated_query = result.fetchone()[0]
        commit_rollback_sync(sync_session)
        print(f"date_time_check_conn={date_time_check_conn},simulated_query={simulated_query}")
    else:
        print("sync_session connect db error...")
def main():
    for sync_session in conn_sync_session.get_sync_session():
        methods(sync_session)
    conn_sync_session.close_engine()            


if __name__ == "__main__":
    main()

Changelog

All notable changes to this project will be documented in this file.

The format is based on Keep a Changelog, and this project adheres to Semantic Versioning.

Types of changes

  • Added for new features.
  • Changed for changes in existing functionality.
  • Deprecated for soon-to-be removed features.
  • Removed for now removed features.
  • Fixed for any bug fixes.
  • Security in case of vulnerabilities.

[0.0.1rc1] - 2024-04-22

Added

  • First tests using pypi.org in develop environment.
  • New functionality of asynchronous and synchronous connections to postgresql databases.
  • Functionalities can be used in fastapi or workers like Azure Functions.
  • Examples of use are added to the documentation of the functions in docstring
  • In the samples folder of this library, there are complete working examples of using the code.

[0.1.0] - 2024-05-21

Added

  • Completion of testing and launch into production.

[0.1.2] - 2024-05-23

Added

  • Documentation integrated with github

[0.1.3] - 2024-05-30

Fixed

  • Static messages are removed, since they do not cover errors globally.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

logyca_postgres-0.1.3.tar.gz (11.9 kB view hashes)

Uploaded Source

Built Distribution

logyca_postgres-0.1.3-py3-none-any.whl (13.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page