Skip to main content

Psycopg2 helper function to run PostgreSQL query with #if support

Project description

Psycopg2Client — Modern PostgreSQL Helper for Python

A lightweight, opinionated wrapper around psycopg2 with built-in support for:

  • Query dictionary management
  • Conditional SQL (#if / #elif / #endif)
  • Bilingual column aliases (en|ko)
  • Simple transaction handling via context manager
  • Safe parameter binding

Successor-friendly alternative to raw psycopg2 with better developer experience.

Installation

pip install psycopg2-client

Note: psycopg2-client is a custom helper class. See full source in repository.

Quick Start

1. Define Queries

qry_dic.update(
    {
        "upsert_user": """
WITH t AS (
    INSERT INTO t_user
        (
            user_id, user_name
        )
    VALUES
        (
            %(user_id)s, %(user_name)s
        )
    ON CONFLICT (user_id)
    DO UPDATE
    SET     user_name = %(user_name)s,
            update_time = NOW()
    RETURNING user_name
)
SELECT  user_name
FROM    t;
"""
    }
)

2. Configure Database Connection

from psycopg2_client import Psycopg2Client
from psycopg2_client_settings import Psycopg2ClientSettings

db_settings = Psycopg2ClientSettings(
    host="127.0.0.1",
    port=5432,
    database="postgres",
    user="postgres",
    password="0000",
    minconn=3,
    maxconn=10,
    connect_timeout=5,
    use_en_ko_column_alias=True,
    use_conditional=True,
    before_read_execute=lambda qry_type, params, qry_str, qry_with_value: print(
        f'READ_ROWS_START, QRY_TYPE: "{qry_type}"' f", QRY_WITH_VALUE: {qry_with_value}"
    ),
    after_read_execute=lambda qry_type, duration: print(
        f'READ_ROWS_END, QRY_TYPE: "{qry_type}"' f", DURATION: {duration}"
    ),
    before_update_execute=lambda qry_type, params, params_out, qry_str, qry_with_value: print(
        f'UPDATES_START, QRY_TYPE: "{qry_type}"' f", QRY_WITH_VALUE: {qry_with_value}"
    ),
    after_update_execute=lambda qry_type, row_count, params_out, duration: print(
        f'UPDATES_END, QRY_TYPE: "{qry_type}"' f", DURATION: {duration}"
    ),
)

3. Basic Usage

db = Psycopg2Client(db_settings=db_settings)

# Read single row
row = db.read_row("read_user_id_all", {})
print(row)  # {'user_id': 'gildong.hong'}

# Read all rows
rows = db.read_rows("read_user_id_all", {})
print(rows[:2])

Create / Update / Delete Operations

update() — Single CUD Statement

Returns affected row count:

affected = db.update(
    "upsert_user",
    {"user_id": "gildong.hong", "user_name": "홍길동"}
)
print("Affected rows:", affected)  # 1

Capture Output Parameters

params_out = {"user_name": ""}
db.update(
    "upsert_user",
    {"user_id": "gildong.hong", "user_name": "홍길동"},
    params_out=params_out
)
print("Returned name:", params_out["user_name"])  # 홍길동

updates() — Batch Execution

batch = [
    ("upsert_user", {"user_id": "sunja.kim", "user_name": "김순자"}),
    ("upsert_user", {"user_id": "malja.kim", "user_name": "김말자"}),
]

results = db.updates(batch)
print("Batch results:", results)  # [1, 1]

Transaction Support with with

Automatically commits on success, rolls back on exception:

with Psycopg2Client(db_settings=db_settings) as db:
    new_id = "youngja.lee"
    db.update("upsert_user", {"user_id": new_id, "user_name": "이영자"})
    db.update("delete_user", {"user_id": new_id})  # Oops! Will rollback entire block
    print("This won't print if error occurs")

Partially return CSV

Read rows partially and return immediately to client to show progress in client

# Flask
@app.route("/read-csv-partial")
def read_csv_partial():
    """read csv partial"""

    db_client = Psycopg2Client(db_settings=db_settings)
    filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}.csv"

    return Response(
        db_client.read_csv_partial("read_csv_partial", {}),
        mimetype="text/csv",
        headers={
            # if FE and BE are on different origins,
            # server must expose the Content-Disposition header
            "Access-Control-Expose-Headers": "Content-Disposition",
            "Content-Disposition": f'attachment; filename="{filename}"',
            # Very important for progressive saving in many browsers
            "Cache-Control": "no-cache, no-store, must-revalidate",
            "Pragma": "no-cache",
            "Expires": "0",
            "X-Accel-Buffering": "no",  # Important if using nginx
            "Transfer-Encoding": "chunked",
        },
    )

# Fast API
@router.get("/read-csv-partial-async")
async def read_csv_partial_async():
    """read csv partial async"""

    db_client = Psycopg2Client(db_settings=db_settings)
    filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}.csv"

    return StreamingResponse(
        content=db_client.read_csv_partial_async("read_csv_partial", {}),
        media_type="text/csv",
        headers={
            # if FE and BE are on different origins,
            # server must expose the Content-Disposition header
            "Access-Control-Expose-Headers": "Content-Disposition",
            "Content-Disposition": f'attachment; filename="{filename}"',
            # Very important for progressive saving in many browsers
            "Cache-Control": "no-cache, no-store, must-revalidate",
            "Pragma": "no-cache",
            "Expires": "0",
            "X-Accel-Buffering": "no",  # Important if using nginx
            "Transfer-Encoding": "chunked",
        },
    )

Bilingual Column Aliases (English ↔ Korean)

Enabled when use_en_ko_column_alias=True and en not ommited

qry_dic.update(
    {
        "read_user_alias": """
SELECT  user_id "Id|아이디", user_name "Name|이름"
FROM    t_user
WHERE   user_id = %(user_id)s
"""
    }
)
"""

English mode (en=True)

rows = db.read_rows("read_user_alias", {"user_id": "gildong.hong"}, en=True)
print(rows[0])
# {'Id': 'gildong.hong', 'Name': '홍길동'}

Korean mode (en=False)

rows = db.read_rows("read_user_alias", {"user_id": "gildong.hong"}, en=False)
print(rows[0])
# {'아이디': 'gildong.hong', '이름': '홍길동'}

Conditional SQL (#if, #elif, #endif)

Enabled when use_conditional=True

qry_dic.update(
    {
        "read_user_search": """
SELECT  user_id, user_name, insert_time, update_time
FROM    t_user
WHERE   1 = 1
#if user_id
        AND user_id = %(user_id)s
#elif user_name
        AND user_name ILIKE %(user_name)s
#endif
"""
    }
)
"""

Example: Search by user_id

rows = db.read_rows(
    "read_user_search",
    {"user_id": "gildong.hong", "user_name": ""}
)
print([r["user_name"] for r in rows])
# ['홍길동']

Example: Search by user_name (partial match)

rows = db.read_rows(
    "read_user_search",
    {"user_id": "", "user_name": "%김%"}
)
print([r["user_name"] for r in rows])
# ['김순자', '김말자']

Logging support

  • before_read_execute called before execute query for read
  • after_read_execute called after execute query for read
  • before_update_execute called before execute query for CUD
  • after_update_execute called after execute query for CUD

Can be replaced print with logger

Example: Use logger to write debug info

import logging

def get_sql_logger(name="sql"):
    logger = logging.getLogger(name)
    if not logger.handlers:
        logging.basicConfig(
            filename="sql.log",
            level=logging.DEBUG,
            format="%(asctime)s [%(levelname)7s] %(message)s",
            encoding="utf-8"
        )
    return logger

logger = get_sql_logger()
db_settings.before_read_execute = lambda qry_type, params, qry_str, qry_with_value: logger.debug(
    f'READ_ROWS_START, QRY_TYPE: "{qry_type}"' f", QRY_WITH_VALUE: {qry_with_value}"
)

Safety & Security

Q: Is conditional SQL safe from injection?

A: Yes — completely safe.

The #if preprocessor only allows:

  • Parameter names (e.g. user_id)
  • String literals ('active', "pending")
  • Numbers and basic operators
  • Whitespace and comments

Any attempt to inject raw SQL will raise a parsing error before execution.

# This will RAISE an exception "ValueError: 'user_id;' not in ..." (not execute!)
"#if user_id; DROP TABLE t_user; --"

Features Summary

Feature Notes
Connection pooling Via minconn / maxconn
Named queries Stored in dictionary
Single-row / multi-row fetch read_row() / read_rows()
Single / batch CUD operations update() / updates()
Output parameters Via params_out dict
Transactions via with Auto rollback on exception
Partially return CSV read_csv_partial / read_csv_partial_async
Bilingual column aliases "Name|이름" syntax
Conditional SQL #if / #elif / #endif
Logging support Before and after execute to DB via before... and after... callable function
SQL injection protection Strict parsing in conditionals

License

MIT (or as defined in your project)


Made with ❤️ for cleaner, safer PostgreSQL code in Python.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

psycopg2_client-1.0.0.tar.gz (6.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

psycopg2_client-1.0.0-py3-none-any.whl (7.0 kB view details)

Uploaded Python 3

File details

Details for the file psycopg2_client-1.0.0.tar.gz.

File metadata

  • Download URL: psycopg2_client-1.0.0.tar.gz
  • Upload date:
  • Size: 6.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for psycopg2_client-1.0.0.tar.gz
Algorithm Hash digest
SHA256 080c18720a33b55a6decde68211d7b9d216001c381150f5a9b696ed3bc706efe
MD5 669731bed7f35a73e0ebfbc92eb34a40
BLAKE2b-256 dbdc6e0df68669f378a7c7279a7fd140e6aa5e79248f5dec4d42a20c5235421e

See more details on using hashes here.

Provenance

The following attestation bundles were made for psycopg2_client-1.0.0.tar.gz:

Publisher: python-publish.yml on doctorgu/psycopg2-client

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file psycopg2_client-1.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for psycopg2_client-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c0091f1a76a913238f2e4053d9b4b432544d85c3fea1d714ab81ba88b4b8fa29
MD5 fd4ac514c1c4f99b00f42ae5d12f7093
BLAKE2b-256 8c959331272482f182861fba86279220e689c58677ba57bf03950e95c42f6450

See more details on using hashes here.

Provenance

The following attestation bundles were made for psycopg2_client-1.0.0-py3-none-any.whl:

Publisher: python-publish.yml on doctorgu/psycopg2-client

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page