Skip to main content

PostgreSQL schema management

Project description

DCNR Postgres

A Python library for PostgreSQL database access, schema deployment, and migration management. Built on top of psycopg (v3) and psycopg-pool.


Table of Contents

  1. Installation
  2. Quick Start
  3. Configuration
  4. Connecting to PostgreSQL
  5. Data Operations
  6. Schema Management
  7. Schema Definition File
  8. Environment / .env Files
  9. API Reference

Installation

pip install dcnr-postgres

Requirements:

  • Python >= 3.9
  • psycopg >= 3
  • psycopg-pool

Quick Start

import dcnr.postgres as pg

# 1. Set connection credentials once at application startup
pg.set_configuration(
    host="localhost",
    port=5432,
    dbname="mydb",
    user="myuser",
    password="secret"
)

# 2. Open a connection and insert a row
with pg.get_connection() as conn:
    result = pg.insert_update(conn, "myschema", "mytable", {"name": "Alice", "age": 30})
    print(result)

Configuration

Call set_configuration() once at the start of your application (e.g. in your startup/init module). All subsequent calls to get_connection() and get_shared_connection() will use these credentials automatically.

import dcnr.postgres as pg

pg.set_configuration(
    host="db.example.com",
    port=5432,
    dbname="production",
    user="appuser",
    password="s3cr3t",
    schema="public"       # optional – sets PostgreSQL search_path
)

Parameters

Parameter Type Default Description
host str "" PostgreSQL server hostname or IP
port int or str 5432 Server port
dbname str "" Database name
user str "" Login user
password str "" Login password
schema str None If set, adds search_path to every connection opened with the stored config

Reading the current configuration

creds = pg.get_configuration()                   # returns stored dict
creds = pg.get_configuration(schema="reporting") # override search_path for this call only

Connecting to PostgreSQL

Single Connection

get_connection() returns a plain psycopg.Connection. Use it as a context manager so it is closed automatically.

import dcnr.postgres as pg

# Using the globally stored configuration (set via set_configuration)
with pg.get_connection() as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT now()")
        print(cur.fetchone())

# Passing credentials explicitly
creds = {
    "host": "localhost",
    "port": 5432,
    "dbname": "mydb",
    "user": "myuser",
    "password": "secret",
    "schema": "reporting"   # optional search_path override
}
with pg.get_connection(creds) as conn:
    ...

Note: The connection encoding is always forced to UTF8.

Shared Connection Pool

For web applications or any concurrent workload, use the shared ConnectionPool backed by psycopg-pool. The pool is created lazily on first use and closed automatically at process exit.

import dcnr.postgres as pg

# set_configuration() must be called before the first checkout
pg.set_configuration(host="localhost", dbname="mydb", user="u", password="p")

with pg.get_shared_connection() as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT count(1) FROM myschema.orders")
        print(cur.fetchone())

Pool defaults:

Setting Value Description
min_size 1 Minimum warm connections kept in pool
max_size 10 Maximum simultaneous connections
max_idle 300 s Idle connection recycled after this time
max_lifetime 3600 s Connection recycled after this age
timeout 30 s Max wait time for a connection checkout
autocommit False Explicit transactions by default
TCP keepalives on Prevents stale connections on firewalls

Data Operations

All data-operation functions accept either a psycopg.Connection or a psycopg.Cursor as the first argument.

insert_update

Inserts a new row or updates an existing one in a single call.

# INSERT – no `find` argument means always insert
result = pg.insert_update(conn, "myschema", "users",
    data={"username": "alice", "email": "alice@example.com"})

# UPDATE – rows matching `find` are updated
result = pg.insert_update(conn, "myschema", "users",
    data={"email": "newemail@example.com"},
    find={"username": "alice"})

print(result)  # dict with all columns of the affected row (RETURNING *)

Signature:

insert_update(
    conn,           # psycopg.Connection or psycopg.Cursor
    schema_name: str,
    table_name: str,
    data: dict,     # column → value mapping for the write
    find: dict = None,  # column → value mapping for the WHERE clause (UPDATE)
    commit: bool = True,
    cursor_always = True
) -> dict
  • When find is None an INSERT … RETURNING * statement is executed.
  • When find is provided an UPDATE … SET … WHERE … RETURNING * statement is executed.
  • dict values that are themselves dict objects are automatically serialised to JSON strings before the query is sent.
  • Passing a Cursor instead of a Connection lets you batch several operations inside one transaction; set commit=False to suppress the automatic commit.

insert0 / update0

Convenience wrappers around insert_update that never commit when a cursor is supplied. Intended for use inside manually managed transactions.

with pg.get_connection() as conn:
    with conn.cursor() as cur:
        pg.insert0(cur, "myschema", "orders", {"ref": "ORD-001", "total": 99.9})
        pg.insert0(cur, "myschema", "order_lines", {"order_ref": "ORD-001", "qty": 2})
        conn.commit()   # single commit for the whole transaction
pg.update0(cur, "myschema", "orders",
    data={"total": 120.0},
    find={"ref": "ORD-001"})
conn.commit()

update_or_insert

Tries UPDATE first; if no rows matched (rowcount == 0), falls back to INSERT. Does not commit — the caller is responsible for committing.

with pg.get_connection() as conn:
    with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
        result = pg.update_or_insert(
            cur, "myschema", "products",
            data={"name": "Widget", "price": 9.99},
            find={"sku": "W-100"}
        )
        conn.commit()

Schema Management

The dcnr.postgres.schema sub-package provides tools for deploying and migrating PostgreSQL schemas defined in YAML files.

deploy_schema

Reads a YAML schema definition file, compares it with the live database, and applies the necessary DDL statements (CREATE TABLE, ALTER TABLE, CREATE INDEX, etc.).

import dcnr.postgres as pg
from dcnr.postgres.schema import deploy_schema

pg.set_configuration(host="localhost", dbname="mydb", user="u", password="p")

conn_data = {
    "host": "localhost",
    "dbname": "mydb",
    "user": "u",
    "password": "p"
}

with pg.get_connection() as conn:
    results = deploy_schema(
        yaml_path="schemas/myapp/definition.yaml",
        conn=conn,
        cdata=conn_data,
        deploy=True,    # True = apply without interactive confirmation
        debug=True      # print each SQL statement as it runs
    )

for msg in results["messages"]:
    print(msg)

Signature:

deploy_schema(
    yaml_path: str,          # path to the YAML schema definition file
    conn: psycopg.Connection,
    cdata: dict,             # connection data (host, dbname, user, password, [port, schema])
    deploy=None,             # None = interactive prompt; True = auto-deploy; False = dry-run
    debug: bool = False
) -> dict                    # {"messages": [...]}

What deploy_schema does:

  1. Reads the YAML definition file (see Schema Definition File).
  2. Calls init_schema which:
    • Prompts to create the PostgreSQL schema if it does not exist.
    • Creates any missing sequences.
    • Creates any missing functions.
  3. Computes a diff between the live database structure and the YAML definition.
  4. Reports the number of differences found.
  5. If deploy=True (or the user types DEPLOY at the prompt), executes the DDL statements.
  6. Loads initial seed data for tables that declare an __data key in the YAML.

schema_scripts_apply

Applies versioned SQL migration scripts from a directory (Flyway-style V<n>_<description>.sql naming). Already-applied files are tracked in a schema_scripts_history table.

from dcnr.postgres.schema import schema_scripts_apply

# set_configuration() must be called first
status, error, traceback = schema_scripts_apply(
    schema_name="myschema",
    migration_scripts_dir="migrations/"
)

if status != 0:
    print(f"Migration failed: {error}")

File naming convention: V<version>_<description>.sql

File Applied?
V01_init.sql First run
V02_add_index.sql Second run
V02_01_fix.sql Also valid

Files are sorted lexicographically, so leading zeros are recommended for correct ordering. Once a file is applied it is never re-run.

Return value: (status, error_message, traceback_string)

  • status == 0 → success
  • status == -1 → exception occurred

Reading Database Metadata

These functions query the live database and return Python structures describing the current schema state.

from dcnr.postgres.schema import (
    read_database_tables,
    read_database_columns,
    read_table_columns,
    read_routines,
    read_triggers,
)

with pg.get_connection() as conn:

    # List of table names in the schema
    tables = read_database_tables(conn, "myschema")
    # → ["users", "orders", "products"]

    # Full column/constraint/index/FK metadata for every table
    schema_meta = read_database_columns(conn, "myschema")
    # → {"users": {"columns": {...}, "indexes": {...}, "constraints": {...}, "f_keys": [...]}, ...}

    # Column names for a specific table
    cols = read_table_columns(conn, "myschema", "users")
    # → dict_keys(["id", "username", "email"])

    # Stored functions/procedures
    routines = read_routines(conn, "myschema")

    # Triggers
    triggers = read_triggers(conn, "myschema")

Schema Definition File

The structure of the YAML schema definition file is described in the separate document:

📄 SCHEMA_DEFINITION.md

That file covers:

  • schema metadata block (name, sequences, functions)
  • tables block with column definitions, data types, constraints, indexes, foreign keys, triggers, and seed data

Environment / .env Files

load_env_values() reads a simple KEY=VALUE text file (supporting # comments) and returns a dict. Useful for loading database credentials from a local .env file without an additional dependency.

from dcnr.postgres.schema import load_env_values
import dcnr.postgres as pg

env = load_env_values(".env")

pg.set_configuration(
    host=env["DB_HOST"],
    port=env.get("DB_PORT", "5432"),
    dbname=env["DB_NAME"],
    user=env["DB_USER"],
    password=env["DB_PASS"]
)

Example .env file:

# Database credentials
DB_HOST=localhost
DB_PORT=5432
DB_NAME=mydb
DB_USER=appuser
DB_PASS=secret

API Reference

dcnr.postgres

Symbol Description
set_configuration(**kwargs) Store global connection credentials
get_configuration(schema=None) Return the stored credentials dict
get_connection(creds=None) Open and return a psycopg.Connection
get_shared_connection() Borrow a connection from the shared pool (context manager)
insert_update(conn, schema, table, data, find, commit, cursor_always) Insert or update a row
insert0(conn, schema, table, data) Insert without auto-commit
update0(conn, schema, table, data, find) Update without auto-commit
update_or_insert(curs, schema, table, data, find) Update-or-insert without auto-commit

dcnr.postgres.schema

Symbol Description
deploy_schema(yaml_path, conn, cdata, deploy, debug) Deploy schema from YAML definition
load_env_values(env_file) Read KEY=VALUE env file into a dict
schema_scripts_apply(schema_name, migration_scripts_dir) Apply versioned SQL migration scripts
read_database_tables(conn, schema) Return list of table names
read_database_columns(conn, schema) Return full column/index/FK metadata
read_table_columns(conn, schema, tablename) Return column names for one table
read_routines(conn, schema) Return list of routine names
read_triggers(conn, schema) Return list of (table, trigger) tuples
normalize_datatype_text(dt, charmax, num_precision, num_scale) Normalize a PostgreSQL data type string

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dcnr_postgres-1.0.0.tar.gz (28.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dcnr_postgres-1.0.0-py3-none-any.whl (24.3 kB view details)

Uploaded Python 3

File details

Details for the file dcnr_postgres-1.0.0.tar.gz.

File metadata

  • Download URL: dcnr_postgres-1.0.0.tar.gz
  • Upload date:
  • Size: 28.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.4

File hashes

Hashes for dcnr_postgres-1.0.0.tar.gz
Algorithm Hash digest
SHA256 726e3e531ad4f5d6f9d3cb11f2a1c2107ce6af70ee83c7b7ea856f0079555ba1
MD5 0bc547f51c8e25e8f549dbc3dbd23ecf
BLAKE2b-256 17d1030bde271ebc23e8f7792e28df3d61f75549ca07db7d3b9c2abfea54de48

See more details on using hashes here.

File details

Details for the file dcnr_postgres-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: dcnr_postgres-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 24.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.4

File hashes

Hashes for dcnr_postgres-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c855b8328c04e486035dd9c57fd694a6f7b07ec9ff639eb1b448f35be8a3d8a6
MD5 359a5ab15c4e9610cdf3a581bac73fba
BLAKE2b-256 b29c0ea4468af210471689f450e76845a7118ed72e64a0506f8c297d7dd00b21

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page