Skip to main content

Psycopg2 helper function to run PostgreSQL query with #if support

Project description

Psycopg2Client — Modern PostgreSQL Helper for Python

A lightweight, opinionated wrapper around psycopg2 with built-in support for:

  • Query dictionary management
  • Conditional SQL (#if / #elif / #endif)
  • Bilingual column aliases (en|ko)
  • Simple transaction handling via context manager
  • Safe parameter binding

Successor-friendly alternative to raw psycopg2 with better developer experience.

Installation

pip install psycopg2-client

Note: psycopg2-client is a custom helper class. See full source in repository.

Quick Start

1. Define Queries

qry_dic: dict[str, str] = {}

qry_dic.update(
    {
        "read_user_id_all": """
SELECT  user_id
FROM    t_user
"""
    }
)

qry_dic.update(
    {
        "upsert_user": """
WITH t AS (
    INSERT INTO t_user
        (
            user_id, user_name, user_rank
        )
    VALUES
        (
            %(user_id)s, %(user_name)s, %(user_rank)s
        )
    ON CONFLICT (user_id)
    DO UPDATE
    SET     user_name = %(user_name)s,
            user_rank = %(user_rank)s,
            update_time = NOW()
    RETURNING user_name, user_rank
)
SELECT  user_name, user_rank
FROM    t;
"""
    }
)

2. Configure Database Connection

from psycopg2_client.settings import Settings

db_settings = Settings(
    host="127.0.0.1",
    port=5432,
    database="postgres",
    user="postgres",
    password="0000",
    minconn=3,
    maxconn=10,
    connect_timeout=5,
    use_en_ko_column_alias=True,
    use_conditional=True,
    all_query=qry_dic,
    before_read_execute=lambda qry_key, params, qry_str, qry_with_value: print(
        f'READ_ROWS_START, QRY_KEY: "{qry_key}"' f", QRY_WITH_VALUE: {qry_with_value}"
    ),
    after_read_execute=lambda qry_key, duration: print(
        f'READ_ROWS_END, QRY_KEY: "{qry_key}"' f", DURATION: {duration}"
    ),
    before_update_execute=lambda qry_key, params, params_out, qry_str, qry_with_value: print(
        f'UPDATES_START, QRY_KEY: "{qry_key}"' f", QRY_WITH_VALUE: {qry_with_value}"
    ),
    after_update_execute=lambda qry_key, row_count, params_out, duration: print(
        f'UPDATES_END, QRY_KEY: "{qry_key}"' f", DURATION: {duration}"
    ),
)

3. Basic Usage

from psycopg2_client.client import Client

db = Client(db_settings=db_settings)

# Read single row
row = db.read_row("read_user_id_all", {})
print(row)  # {'user_id': 'gildong.hong'}

# Read all rows
rows = db.read_rows("read_user_id_all", {})
print(rows[:2])

Create / Update / Delete Operations

update() — Single CUD Statement

Returns affected row count:

affected = db.update(
    "upsert_user",
    {"user_id": "gildong.hong", "user_name": "홍길동", "user_rank": 1}
)
print("Affected rows:", affected)  # 1

Capture Output Parameters

params_out = {"user_name": "", "user_rank": 0}
db.update(
    "upsert_user",
    {"user_id": "gildong.hong", "user_name": "홍길동", "user_rank": 1},
    params_out=params_out
)
print("Returned name:", params_out["user_name"], params_out["user_rank"])  # 홍길동 1

updates() — Batch Execution

batch = [
    ("upsert_user", {"user_id": "sunja.kim", "user_name": "김순자", "user_rank": 2}),
    ("upsert_user", {"user_id": "malja.kim", "user_name": "김말자", "user_rank": 3}),
]

results = db.updates(batch)
print("Batch results:", results)  # [1, 1]

Transaction Support with with

Automatically commits on success, rolls back on exception:

with Client(db_settings=db_settings) as db:
    new_id = "youngja.lee"
    db.update("upsert_user", {"user_id": new_id, "user_name": "이영자", "user_rank": 4})
    db.update("delete_user", {"user_id": new_id})  # Oops! Will rollback entire block
    print("This won't print if error occurs")

Partially return CSV

Read rows partially and return immediately to client to show progress in client

# Flask
@app.route("/read-csv-partial")
def read_csv_partial():
    """read csv partial"""

    db_client = Client(db_settings=db_settings)
    filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}.csv"

    return Response(
        db_client.read_csv_partial("read_csv_partial", {}),
        mimetype="text/csv",
        headers={
            # if FE and BE are on different origins,
            # server must expose the Content-Disposition header
            "Access-Control-Expose-Headers": "Content-Disposition",
            "Content-Disposition": f'attachment; filename="{filename}"',
            # Very important for progressive saving in many browsers
            "Cache-Control": "no-cache, no-store, must-revalidate",
            "Pragma": "no-cache",
            "Expires": "0",
            "X-Accel-Buffering": "no",  # Important if using nginx
            "Transfer-Encoding": "chunked",
        },
    )

# Fast API
@router.get("/read-csv-partial-async")
async def read_csv_partial_async():
    """read csv partial async"""

    db_client = Client(db_settings=db_settings)
    filename = f"{datetime.now().strftime('%Y%m%d%H%M%S')}.csv"

    return StreamingResponse(
        content=db_client.read_csv_partial_async("read_csv_partial", {}),
        media_type="text/csv",
        headers={
            # if FE and BE are on different origins,
            # server must expose the Content-Disposition header
            "Access-Control-Expose-Headers": "Content-Disposition",
            "Content-Disposition": f'attachment; filename="{filename}"',
            # Very important for progressive saving in many browsers
            "Cache-Control": "no-cache, no-store, must-revalidate",
            "Pragma": "no-cache",
            "Expires": "0",
            "X-Accel-Buffering": "no",  # Important if using nginx
            "Transfer-Encoding": "chunked",
        },
    )

Bilingual Column Aliases (English ↔ Korean)

Enabled when use_en_ko_column_alias=True and en not ommited

qry_dic.update(
    {
        "read_user_alias": """
SELECT  user_id "Id|아이디", user_name "Name|이름", user_rank "Rank|순위"
FROM    t_user
WHERE   user_id = %(user_id)s
"""
    }
)
"""

English mode (en=True)

rows = db.read_rows("read_user_alias", {"user_id": "gildong.hong"}, en=True)
print(rows[0])
# {'Id': 'gildong.hong', 'Name': '홍길동'}

Korean mode (en=False)

rows = db.read_rows("read_user_alias", {"user_id": "gildong.hong"}, en=False)
print(rows[0])
# {'아이디': 'gildong.hong', '이름': '홍길동'}

Conditional SQL (#if, #elif, #endif)

Enabled when use_conditional=True

qry_dic.update(
    {
        "read_user_search": """
SELECT  user_id, user_name, user_rank, insert_time, update_time
FROM    t_user
WHERE   1 = 1
#if user_id
        AND user_id = %(user_id)s
#elif user_name
        AND user_name ILIKE %(user_name)s
#elif user_rank
        AND user_rank <= %(user_rank)s
#endif
"""
    }
)
"""

Example: Search by user_id

rows = db.read_rows(
    "read_user_search",
    {"user_id": "gildong.hong", "user_name": "", "user_rank": 0}
)
print([r["user_name"] for r in rows])
# ['홍길동']

Example: Search by user_name (partial match)

rows = db.read_rows(
    "read_user_search",
    {"user_id": "", "user_name": "%김%", "user_rank": 0}
)
print([r["user_name"] for r in rows])
# ['김순자', '김말자']

Example: Search by user_rank (partial match)

rows = db.read_rows(
    "read_user_search",
    {"user_id": "", "user_name": "", "user_rank": 3}
)
print([r["user_name"] for r in rows])
# ['홍길동', '김순자', '김말자']

Logging support

  • before_read_execute called before execute query for read
  • after_read_execute called after execute query for read
  • before_update_execute called before execute query for CUD
  • after_update_execute called after execute query for CUD

Can be replaced print with logger

Example: Use logger to write debug info

import logging

def get_sql_logger(name="sql"):
    logger = logging.getLogger(name)
    if not logger.handlers:
        logging.basicConfig(
            filename="sql.log",
            level=logging.DEBUG,
            format="%(asctime)s [%(levelname)7s] %(message)s",
            encoding="utf-8"
        )
    return logger

logger = get_sql_logger()
db_settings.before_read_execute = lambda qry_key, params, qry_str, qry_with_value: logger.debug(
    f'READ_ROWS_START, QRY_KEY: "{qry_key}"' f", QRY_WITH_VALUE: {qry_with_value}"
)

Safety & Security

Q: Is conditional SQL safe from injection?

A: Yes — completely safe.

The #if preprocessor only allows:

  • Parameter names (e.g. user_id)
  • String literals ('active', "pending")
  • Numbers and basic operators
  • Whitespace and comments

Any attempt to inject raw SQL will raise a parsing error before execution.

# This will RAISE an exception "ValueError: 'user_id;' not in ..." (not execute!)
"#if user_id; DROP TABLE t_user; --"

Features Summary

Feature Notes
Connection pooling Via minconn / maxconn
Named queries Stored in dictionary
Single-row / multi-row fetch read_row() / read_rows()
Single / batch CUD operations update() / updates()
Output parameters Via params_out dict
Transactions via with Auto rollback on exception
Partially return CSV read_csv_partial / read_csv_partial_async
Bilingual column aliases "Name|이름" syntax
Conditional SQL #if / #elif / #endif
Logging support Before and after execute to DB via before... and after... callable function
SQL injection protection Strict parsing in conditionals

License

MIT (or as defined in your project)


Made with ❤️ for cleaner, safer PostgreSQL code in Python.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

psycopg2_client-1.0.9.tar.gz (15.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

psycopg2_client-1.0.9-py3-none-any.whl (13.5 kB view details)

Uploaded Python 3

File details

Details for the file psycopg2_client-1.0.9.tar.gz.

File metadata

  • Download URL: psycopg2_client-1.0.9.tar.gz
  • Upload date:
  • Size: 15.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for psycopg2_client-1.0.9.tar.gz
Algorithm Hash digest
SHA256 8e7af06f30e192d17e0317350d5978ad13ad5ee52bd84b1453832a759fec5bbd
MD5 610fc1f1701edd9adc18bde74a7f9d76
BLAKE2b-256 7413a58c3afb9a820c06f4143de3e3d634c5306ac277fca64263b65e02be51de

See more details on using hashes here.

Provenance

The following attestation bundles were made for psycopg2_client-1.0.9.tar.gz:

Publisher: python-publish.yml on doctorgu/psycopg2-client

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file psycopg2_client-1.0.9-py3-none-any.whl.

File metadata

File hashes

Hashes for psycopg2_client-1.0.9-py3-none-any.whl
Algorithm Hash digest
SHA256 ab918e44d3a7209597b4dfc7e95a27a73038b6e5036568128456f72d725358fd
MD5 8e9c0460366458452bebd8034b783882
BLAKE2b-256 1efb374853e4e1122a34943261c5ef75c77de83c684c242069a4d5404885a592

See more details on using hashes here.

Provenance

The following attestation bundles were made for psycopg2_client-1.0.9-py3-none-any.whl:

Publisher: python-publish.yml on doctorgu/psycopg2-client

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page