Skip to main content

Psycopg2 helper function to run PostgreSQL query with #if support

Project description

Psycopg2Client — Modern PostgreSQL Helper for Python

A lightweight, opinionated wrapper around psycopg2 with built-in support for:

  • Query dictionary management
  • Conditional SQL (#if / #elif / #endif)
  • Bilingual column aliases (en|ko)
  • Simple transaction handling via context manager
  • Safe parameter binding

Successor-friendly alternative to raw psycopg2 with better developer experience.

Installation

pip install psycopg2-client

Note: psycopg2-client is a custom helper class. See full source in repository.

Quick Start

1. Define Queries

qry_dic: dict[str, str] = {}

qry_dic.update(
    {
        "read_user_id_all": """
SELECT  user_id
FROM    t_user
"""
    }
)

qry_dic.update(
    {
        "upsert_user": """
WITH t AS (
    INSERT INTO t_user
        (
            user_id, user_name, user_rank
        )
    VALUES
        (
            %(user_id)s, %(user_name)s, %(user_rank)s
        )
    ON CONFLICT (user_id)
    DO UPDATE
    SET     user_name = %(user_name)s,
            user_rank = %(user_rank)s,
            update_time = NOW()
    RETURNING user_name, user_rank
)
SELECT  user_name, user_rank
FROM    t;
"""
    }
)

2. Configure Database Connection

from psycopg2_client.settings import Settings

db_settings = Settings(
    host="127.0.0.1",
    port=5432,
    database="postgres",
    user="postgres",
    password="0000",
    minconn=3,
    maxconn=10,
    connect_timeout=5,
    use_en_ko_column_alias=True,
    use_conditional=True,
    all_query=qry_dic,
    before_read_execute=lambda qry_key, params, qry_str, qry_with_value: print(
        f'READ_ROWS_START, QRY_KEY: "{qry_key}"' f", QRY_WITH_VALUE: {qry_with_value}"
    ),
    after_read_execute=lambda qry_key, duration: print(
        f'READ_ROWS_END, QRY_KEY: "{qry_key}"' f", DURATION: {duration}"
    ),
    before_update_execute=lambda qry_key, params, params_out, qry_str, qry_with_value: print(
        f'UPDATES_START, QRY_KEY: "{qry_key}"' f", QRY_WITH_VALUE: {qry_with_value}"
    ),
    after_update_execute=lambda qry_key, row_count, params_out, duration: print(
        f'UPDATES_END, QRY_KEY: "{qry_key}"' f", DURATION: {duration}"
    ),
)

3. Basic Usage

from psycopg2_client.client import Client

db = Client(db_settings=db_settings)

# Read single row
row = db.read_row("read_user_id_all", {})
print(row)  # {'user_id': 'gildong.hong'}

# Read all rows
rows = db.read_rows("read_user_id_all", {})
print(rows[:2])

Create / Update / Delete Operations

update() — Single CUD Statement

Returns affected row count:

affected = db.update(
    "upsert_user",
    {"user_id": "gildong.hong", "user_name": "홍길동", "user_rank": 1}
)
print("Affected rows:", affected)  # 1

Capture Output Parameters

params_out = {"user_name": "", "user_rank": 0}
db.update(
    "upsert_user",
    {"user_id": "gildong.hong", "user_name": "홍길동", "user_rank": 1},
    params_out=params_out
)
print("Returned name:", params_out["user_name"], params_out["user_rank"])  # 홍길동 1

updates() — Batch Execution

batch = [
    ("upsert_user", {"user_id": "sunja.kim", "user_name": "김순자", "user_rank": 2}),
    ("upsert_user", {"user_id": "malja.kim", "user_name": "김말자", "user_rank": 3}),
]

results = db.updates(batch)
print("Batch results:", results)  # [1, 1]

Transaction Support with with

Automatically commits on success, rolls back on exception:

with Client(db_settings=db_settings) as db:
    new_id = "youngja.lee"
    db.update("upsert_user", {"user_id": new_id, "user_name": "이영자", "user_rank": 4})
    db.update("delete_user", {"user_id": new_id})  # Oops! Will rollback entire block
    print("This won't print if error occurs")

Partially return CSV

Read rows partially and return immediately to client to show progress in client

# Flask
@app.route("/read-csv-partial")
def read_csv_partial():
    """read csv partial"""

    db_client = Client(db_settings=db_settings)
    filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}.csv"

    return Response(
        db_client.read_csv_partial("read_csv_partial", {}),
        mimetype="text/csv",
        headers={
            # if FE and BE are on different origins,
            # server must expose the Content-Disposition header
            "Access-Control-Expose-Headers": "Content-Disposition",
            "Content-Disposition": f'attachment; filename="{filename}"',
            # Very important for progressive saving in many browsers
            "Cache-Control": "no-cache, no-store, must-revalidate",
            "Pragma": "no-cache",
            "Expires": "0",
            "X-Accel-Buffering": "no",  # Important if using nginx
            "Transfer-Encoding": "chunked",
        },
    )

# Fast API
@router.get("/read-csv-partial-async")
async def read_csv_partial_async():
    """read csv partial async"""

    db_client = Client(db_settings=db_settings)
    filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}.csv"

    return StreamingResponse(
        content=db_client.read_csv_partial_async("read_csv_partial", {}),
        media_type="text/csv",
        headers={
            # if FE and BE are on different origins,
            # server must expose the Content-Disposition header
            "Access-Control-Expose-Headers": "Content-Disposition",
            "Content-Disposition": f'attachment; filename="{filename}"',
            # Very important for progressive saving in many browsers
            "Cache-Control": "no-cache, no-store, must-revalidate",
            "Pragma": "no-cache",
            "Expires": "0",
            "X-Accel-Buffering": "no",  # Important if using nginx
            "Transfer-Encoding": "chunked",
        },
    )

Bilingual Column Aliases (English ↔ Korean)

Enabled when use_en_ko_column_alias=True and en not ommited

qry_dic.update(
    {
        "read_user_alias": """
SELECT  user_id "Id|아이디", user_name "Name|이름", user_rank "Rank|순위"
FROM    t_user
WHERE   user_id = %(user_id)s
"""
    }
)
"""

English mode (en=True)

rows = db.read_rows("read_user_alias", {"user_id": "gildong.hong"}, en=True)
print(rows[0])
# {'Id': 'gildong.hong', 'Name': '홍길동'}

Korean mode (en=False)

rows = db.read_rows("read_user_alias", {"user_id": "gildong.hong"}, en=False)
print(rows[0])
# {'아이디': 'gildong.hong', '이름': '홍길동'}

Conditional SQL (#if, #elif, #endif)

Enabled when use_conditional=True

qry_dic.update(
    {
        "read_user_search": """
SELECT  user_id, user_name, user_rank, insert_time, update_time
FROM    t_user
WHERE   1 = 1
#if user_id
        AND user_id = %(user_id)s
#elif user_name
        AND user_name ILIKE %(user_name)s
#elif user_rank
        AND user_rank <= %(user_rank)s
#endif
"""
    }
)
"""

Example: Search by user_id

rows = db.read_rows(
    "read_user_search",
    {"user_id": "gildong.hong", "user_name": "", "user_rank": 0}
)
print([r["user_name"] for r in rows])
# ['홍길동']

Example: Search by user_name (partial match)

rows = db.read_rows(
    "read_user_search",
    {"user_id": "", "user_name": "%김%", "user_rank": 0}
)
print([r["user_name"] for r in rows])
# ['김순자', '김말자']

Example: Search by user_rank (partial match)

rows = db.read_rows(
    "read_user_search",
    {"user_id": "", "user_name": "", "user_rank": 3}
)
print([r["user_name"] for r in rows])
# ['홍길동', '김순자', '김말자']

Logging support

  • before_read_execute called before execute query for read
  • after_read_execute called after execute query for read
  • before_update_execute called before execute query for CUD
  • after_update_execute called after execute query for CUD

Can be replaced print with logger

Example: Use logger to write debug info

import logging

def get_sql_logger(name="sql"):
    logger = logging.getLogger(name)
    if not logger.handlers:
        logging.basicConfig(
            filename="sql.log",
            level=logging.DEBUG,
            format="%(asctime)s [%(levelname)7s] %(message)s",
            encoding="utf-8"
        )
    return logger

logger = get_sql_logger()
db_settings.before_read_execute = lambda qry_key, params, qry_str, qry_with_value: logger.debug(
    f'READ_ROWS_START, QRY_KEY: "{qry_key}"' f", QRY_WITH_VALUE: {qry_with_value}"
)

Safety & Security

Q: Is conditional SQL safe from injection?

A: Yes — completely safe.

The #if preprocessor only allows:

  • Parameter names (e.g. user_id)
  • String literals ('active', "pending")
  • Numbers and basic operators
  • Whitespace and comments

Any attempt to inject raw SQL will raise a parsing error before execution.

# This will RAISE an exception "ValueError: 'user_id;' not in ..." (not execute!)
"#if user_id; DROP TABLE t_user; --"

Features Summary

Feature Notes
Connection pooling Via minconn / maxconn
Named queries Stored in dictionary
Single-row / multi-row fetch read_row() / read_rows()
Single / batch CUD operations update() / updates()
Output parameters Via params_out dict
Transactions via with Auto rollback on exception
Partially return CSV read_csv_partial / read_csv_partial_async
Bilingual column aliases "Name|이름" syntax
Conditional SQL #if / #elif / #endif
Logging support Before and after execute to DB via before... and after... callable function
SQL injection protection Strict parsing in conditionals

License

MIT (or as defined in your project)


Made with ❤️ for cleaner, safer PostgreSQL code in Python.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

psycopg2_client-1.0.8.tar.gz (13.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

psycopg2_client-1.0.8-py3-none-any.whl (10.3 kB view details)

Uploaded Python 3

File details

Details for the file psycopg2_client-1.0.8.tar.gz.

File metadata

  • Download URL: psycopg2_client-1.0.8.tar.gz
  • Upload date:
  • Size: 13.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for psycopg2_client-1.0.8.tar.gz
Algorithm Hash digest
SHA256 85654c44c778895d0099d1651b6e7be0d0afc290e89ee7fffceb6df61232e6e3
MD5 882490d8c36dec2b99568a2f94c7c214
BLAKE2b-256 af273bdd58bc62950144233f284578088f1790544297e1b2508b6864b0cd7b7d

See more details on using hashes here.

Provenance

The following attestation bundles were made for psycopg2_client-1.0.8.tar.gz:

Publisher: python-publish.yml on doctorgu/psycopg2-client

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file psycopg2_client-1.0.8-py3-none-any.whl.

File metadata

File hashes

Hashes for psycopg2_client-1.0.8-py3-none-any.whl
Algorithm Hash digest
SHA256 552ad3590490a080e8b3a02fa513d76ac4095b1d4808d1d123474f0dec5775de
MD5 424968d3142bb7a7396cd13014d62526
BLAKE2b-256 c822ba8d395987d2eeb72ebf0353f74ce315b81e535a0d890edd71cf67e3938e

See more details on using hashes here.

Provenance

The following attestation bundles were made for psycopg2_client-1.0.8-py3-none-any.whl:

Publisher: python-publish.yml on doctorgu/psycopg2-client

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page