Skip to main content

Psycopg2 helper function to run PostgreSQL query with #if support

Project description

Psycopg2Client — Modern PostgreSQL Helper for Python

A lightweight, opinionated wrapper around psycopg2 with built-in support for:

  • Query dictionary management
  • Conditional SQL (#if / #elif / #endif)
  • Bilingual column aliases (en|ko)
  • Simple transaction handling via context manager
  • Safe parameter binding

Successor-friendly alternative to raw psycopg2 with better developer experience.

Installation

pip install psycopg2-client

Note: psycopg2-client is a custom helper class. See full source in repository.

Quick Start

1. Define Queries

qry_dic.update(
    {
        "upsert_user": """
WITH t AS (
    INSERT INTO t_user
        (
            user_id, user_name
        )
    VALUES
        (
            %(user_id)s, %(user_name)s
        )
    ON CONFLICT (user_id)
    DO UPDATE
    SET     user_name = %(user_name)s,
            update_time = NOW()
    RETURNING user_name
)
SELECT  user_name
FROM    t;
"""
    }
)

2. Configure Database Connection

from psycopg2_client.psycopg2_client_settings import Psycopg2ClientSettings

db_settings = Psycopg2ClientSettings(
    host="127.0.0.1",
    port=5432,
    database="postgres",
    user="postgres",
    password="0000",
    minconn=3,
    maxconn=10,
    connect_timeout=5,
    use_en_ko_column_alias=True,
    use_conditional=True,
    before_read_execute=lambda qry_type, params, qry_str, qry_with_value: print(
        f'READ_ROWS_START, QRY_TYPE: "{qry_type}"' f", QRY_WITH_VALUE: {qry_with_value}"
    ),
    after_read_execute=lambda qry_type, duration: print(
        f'READ_ROWS_END, QRY_TYPE: "{qry_type}"' f", DURATION: {duration}"
    ),
    before_update_execute=lambda qry_type, params, params_out, qry_str, qry_with_value: print(
        f'UPDATES_START, QRY_TYPE: "{qry_type}"' f", QRY_WITH_VALUE: {qry_with_value}"
    ),
    after_update_execute=lambda qry_type, row_count, params_out, duration: print(
        f'UPDATES_END, QRY_TYPE: "{qry_type}"' f", DURATION: {duration}"
    ),
)

3. Basic Usage

from psycopg2_client.psycopg2_client import Psycopg2Client

db = Psycopg2Client(db_settings=db_settings)

# Read single row
row = db.read_row("read_user_id_all", {})
print(row)  # {'user_id': 'gildong.hong'}

# Read all rows
rows = db.read_rows("read_user_id_all", {})
print(rows[:2])

Create / Update / Delete Operations

update() — Single CUD Statement

Returns affected row count:

affected = db.update(
    "upsert_user",
    {"user_id": "gildong.hong", "user_name": "홍길동"}
)
print("Affected rows:", affected)  # 1

Capture Output Parameters

params_out = {"user_name": ""}
db.update(
    "upsert_user",
    {"user_id": "gildong.hong", "user_name": "홍길동"},
    params_out=params_out
)
print("Returned name:", params_out["user_name"])  # 홍길동

updates() — Batch Execution

batch = [
    ("upsert_user", {"user_id": "sunja.kim", "user_name": "김순자"}),
    ("upsert_user", {"user_id": "malja.kim", "user_name": "김말자"}),
]

results = db.updates(batch)
print("Batch results:", results)  # [1, 1]

Transaction Support with with

Automatically commits on success, rolls back on exception:

with Psycopg2Client(db_settings=db_settings) as db:
    new_id = "youngja.lee"
    db.update("upsert_user", {"user_id": new_id, "user_name": "이영자"})
    db.update("delete_user", {"user_id": new_id})  # Oops! Will rollback entire block
    print("This won't print if error occurs")

Partially return CSV

Read rows partially and return immediately to client to show progress in client

# Flask
@app.route("/read-csv-partial")
def read_csv_partial():
    """read csv partial"""

    db_client = Psycopg2Client(db_settings=db_settings)
    filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}.csv"

    return Response(
        db_client.read_csv_partial("read_csv_partial", {}),
        mimetype="text/csv",
        headers={
            # if FE and BE are on different origins,
            # server must expose the Content-Disposition header
            "Access-Control-Expose-Headers": "Content-Disposition",
            "Content-Disposition": f'attachment; filename="{filename}"',
            # Very important for progressive saving in many browsers
            "Cache-Control": "no-cache, no-store, must-revalidate",
            "Pragma": "no-cache",
            "Expires": "0",
            "X-Accel-Buffering": "no",  # Important if using nginx
            "Transfer-Encoding": "chunked",
        },
    )

# Fast API
@router.get("/read-csv-partial-async")
async def read_csv_partial_async():
    """read csv partial async"""

    db_client = Psycopg2Client(db_settings=db_settings)
    filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}.csv"

    return StreamingResponse(
        content=db_client.read_csv_partial_async("read_csv_partial", {}),
        media_type="text/csv",
        headers={
            # if FE and BE are on different origins,
            # server must expose the Content-Disposition header
            "Access-Control-Expose-Headers": "Content-Disposition",
            "Content-Disposition": f'attachment; filename="{filename}"',
            # Very important for progressive saving in many browsers
            "Cache-Control": "no-cache, no-store, must-revalidate",
            "Pragma": "no-cache",
            "Expires": "0",
            "X-Accel-Buffering": "no",  # Important if using nginx
            "Transfer-Encoding": "chunked",
        },
    )

Bilingual Column Aliases (English ↔ Korean)

Enabled when use_en_ko_column_alias=True and en not ommited

qry_dic.update(
    {
        "read_user_alias": """
SELECT  user_id "Id|아이디", user_name "Name|이름"
FROM    t_user
WHERE   user_id = %(user_id)s
"""
    }
)
"""

English mode (en=True)

rows = db.read_rows("read_user_alias", {"user_id": "gildong.hong"}, en=True)
print(rows[0])
# {'Id': 'gildong.hong', 'Name': '홍길동'}

Korean mode (en=False)

rows = db.read_rows("read_user_alias", {"user_id": "gildong.hong"}, en=False)
print(rows[0])
# {'아이디': 'gildong.hong', '이름': '홍길동'}

Conditional SQL (#if, #elif, #endif)

Enabled when use_conditional=True

qry_dic.update(
    {
        "read_user_search": """
SELECT  user_id, user_name, insert_time, update_time
FROM    t_user
WHERE   1 = 1
#if user_id
        AND user_id = %(user_id)s
#elif user_name
        AND user_name ILIKE %(user_name)s
#endif
"""
    }
)
"""

Example: Search by user_id

rows = db.read_rows(
    "read_user_search",
    {"user_id": "gildong.hong", "user_name": ""}
)
print([r["user_name"] for r in rows])
# ['홍길동']

Example: Search by user_name (partial match)

rows = db.read_rows(
    "read_user_search",
    {"user_id": "", "user_name": "%김%"}
)
print([r["user_name"] for r in rows])
# ['김순자', '김말자']

Logging support

  • before_read_execute called before execute query for read
  • after_read_execute called after execute query for read
  • before_update_execute called before execute query for CUD
  • after_update_execute called after execute query for CUD

Can be replaced print with logger

Example: Use logger to write debug info

import logging

def get_sql_logger(name="sql"):
    logger = logging.getLogger(name)
    if not logger.handlers:
        logging.basicConfig(
            filename="sql.log",
            level=logging.DEBUG,
            format="%(asctime)s [%(levelname)7s] %(message)s",
            encoding="utf-8"
        )
    return logger

logger = get_sql_logger()
db_settings.before_read_execute = lambda qry_type, params, qry_str, qry_with_value: logger.debug(
    f'READ_ROWS_START, QRY_TYPE: "{qry_type}"' f", QRY_WITH_VALUE: {qry_with_value}"
)

Safety & Security

Q: Is conditional SQL safe from injection?

A: Yes — completely safe.

The #if preprocessor only allows:

  • Parameter names (e.g. user_id)
  • String literals ('active', "pending")
  • Numbers and basic operators
  • Whitespace and comments

Any attempt to inject raw SQL will raise a parsing error before execution.

# This will RAISE an exception "ValueError: 'user_id;' not in ..." (not execute!)
"#if user_id; DROP TABLE t_user; --"

Features Summary

Feature Notes
Connection pooling Via minconn / maxconn
Named queries Stored in dictionary
Single-row / multi-row fetch read_row() / read_rows()
Single / batch CUD operations update() / updates()
Output parameters Via params_out dict
Transactions via with Auto rollback on exception
Partially return CSV read_csv_partial / read_csv_partial_async
Bilingual column aliases "Name|이름" syntax
Conditional SQL #if / #elif / #endif
Logging support Before and after execute to DB via before... and after... callable function
SQL injection protection Strict parsing in conditionals

License

MIT (or as defined in your project)


Made with ❤️ for cleaner, safer PostgreSQL code in Python.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

psycopg2_client-1.0.5.tar.gz (14.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

psycopg2_client-1.0.5-py3-none-any.whl (13.0 kB view details)

Uploaded Python 3

File details

Details for the file psycopg2_client-1.0.5.tar.gz.

File metadata

  • Download URL: psycopg2_client-1.0.5.tar.gz
  • Upload date:
  • Size: 14.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for psycopg2_client-1.0.5.tar.gz
Algorithm Hash digest
SHA256 6b1903d49b007a8b7c6cc6bd771435d387ab0a3d18eadedccf055cdb50b7a560
MD5 77aaaa5222cff7be54165a64f43ad048
BLAKE2b-256 d07b654501d481480f44cbc1ce077ccca2cb33422e66b46904143287cb50f3e4

See more details on using hashes here.

Provenance

The following attestation bundles were made for psycopg2_client-1.0.5.tar.gz:

Publisher: python-publish.yml on doctorgu/psycopg2-client

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file psycopg2_client-1.0.5-py3-none-any.whl.

File metadata

File hashes

Hashes for psycopg2_client-1.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 7f65fe0a456ba807ba5c2807230fec737f2a55e4560a1c424f9e23f264d9a3b0
MD5 9aa7ee38a76fcc73ffa8faee8822a16c
BLAKE2b-256 ae713c1a9fc1ba9f5db565cd8cd06c4c3468b2ef56c96414f0591aa737ab33a4

See more details on using hashes here.

Provenance

The following attestation bundles were made for psycopg2_client-1.0.5-py3-none-any.whl:

Publisher: python-publish.yml on doctorgu/psycopg2-client

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page