PostgreSQL schema management
Project description
DCNR Postgres
A Python library for PostgreSQL database access, schema deployment, and migration management. Built on top of psycopg (v3) and psycopg-pool.
Table of Contents
- Installation
- Quick Start
- Configuration
- Connecting to PostgreSQL
- Data Operations
- Schema Management
- Schema Definition File
- Environment / .env Files
- API Reference
Installation
pip install dcnr-postgres
Requirements:
- Python >= 3.9
psycopg>= 3psycopg-pool
Quick Start
import dcnr.postgres as pg
# 1. Set connection credentials once at application startup
pg.set_configuration(
host="localhost",
port=5432,
dbname="mydb",
user="myuser",
password="secret"
)
# 2. Open a connection and insert a row
with pg.get_connection() as conn:
result = pg.insert_update(conn, "myschema", "mytable", {"name": "Alice", "age": 30})
print(result)
Configuration
Call set_configuration() once at the start of your application (e.g. in your startup/init module). All subsequent calls to get_connection() and get_shared_connection() will use these credentials automatically.
import dcnr.postgres as pg
pg.set_configuration(
host="db.example.com",
port=5432,
dbname="production",
user="appuser",
password="s3cr3t",
schema="public" # optional – sets PostgreSQL search_path
)
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
host |
str |
"" |
PostgreSQL server hostname or IP |
port |
int or str |
5432 |
Server port |
dbname |
str |
"" |
Database name |
user |
str |
"" |
Login user |
password |
str |
"" |
Login password |
schema |
str |
None |
If set, adds search_path to every connection opened with the stored config |
Reading the current configuration
creds = pg.get_configuration() # returns stored dict
creds = pg.get_configuration(schema="reporting") # override search_path for this call only
Connecting to PostgreSQL
Single Connection
get_connection() returns a plain psycopg.Connection. Use it as a context manager so it is closed automatically.
import dcnr.postgres as pg
# Using the globally stored configuration (set via set_configuration)
with pg.get_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT now()")
print(cur.fetchone())
# Passing credentials explicitly
creds = {
"host": "localhost",
"port": 5432,
"dbname": "mydb",
"user": "myuser",
"password": "secret",
"schema": "reporting" # optional search_path override
}
with pg.get_connection(creds) as conn:
...
Note: The connection encoding is always forced to
UTF8.
Shared Connection Pool
For web applications or any concurrent workload, use the shared ConnectionPool backed by psycopg-pool. The pool is created lazily on first use and closed automatically at process exit.
import dcnr.postgres as pg
# set_configuration() must be called before the first checkout
pg.set_configuration(host="localhost", dbname="mydb", user="u", password="p")
with pg.get_shared_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT count(1) FROM myschema.orders")
print(cur.fetchone())
Pool defaults:
| Setting | Value | Description |
|---|---|---|
min_size |
1 | Minimum warm connections kept in pool |
max_size |
10 | Maximum simultaneous connections |
max_idle |
300 s | Idle connection recycled after this time |
max_lifetime |
3600 s | Connection recycled after this age |
timeout |
30 s | Max wait time for a connection checkout |
autocommit |
False | Explicit transactions by default |
| TCP keepalives | on | Prevents stale connections on firewalls |
Data Operations
All data-operation functions accept either a psycopg.Connection or a psycopg.Cursor as the first argument.
insert_update
Inserts a new row or updates an existing one in a single call.
# INSERT – no `find` argument means always insert
result = pg.insert_update(conn, "myschema", "users",
data={"username": "alice", "email": "alice@example.com"})
# UPDATE – rows matching `find` are updated
result = pg.insert_update(conn, "myschema", "users",
data={"email": "newemail@example.com"},
find={"username": "alice"})
print(result) # dict with all columns of the affected row (RETURNING *)
Signature:
insert_update(
conn, # psycopg.Connection or psycopg.Cursor
schema_name: str,
table_name: str,
data: dict, # column → value mapping for the write
find: dict = None, # column → value mapping for the WHERE clause (UPDATE)
commit: bool = True,
cursor_always = True
) -> dict
- When
findisNoneanINSERT … RETURNING *statement is executed. - When
findis provided anUPDATE … SET … WHERE … RETURNING *statement is executed. dictvalues that are themselvesdictobjects are automatically serialised to JSON strings before the query is sent.- Passing a
Cursorinstead of aConnectionlets you batch several operations inside one transaction; setcommit=Falseto suppress the automatic commit.
insert0 / update0
Convenience wrappers around insert_update that never commit when a cursor is supplied. Intended for use inside manually managed transactions.
with pg.get_connection() as conn:
with conn.cursor() as cur:
pg.insert0(cur, "myschema", "orders", {"ref": "ORD-001", "total": 99.9})
pg.insert0(cur, "myschema", "order_lines", {"order_ref": "ORD-001", "qty": 2})
conn.commit() # single commit for the whole transaction
pg.update0(cur, "myschema", "orders",
data={"total": 120.0},
find={"ref": "ORD-001"})
conn.commit()
update_or_insert
Tries UPDATE first; if no rows matched (rowcount == 0), falls back to INSERT. Does not commit — the caller is responsible for committing.
with pg.get_connection() as conn:
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
result = pg.update_or_insert(
cur, "myschema", "products",
data={"name": "Widget", "price": 9.99},
find={"sku": "W-100"}
)
conn.commit()
Schema Management
The dcnr.postgres.schema sub-package provides tools for deploying and migrating PostgreSQL schemas defined in YAML files.
deploy_schema
Reads a YAML schema definition file, compares it with the live database, and applies the necessary DDL statements (CREATE TABLE, ALTER TABLE, CREATE INDEX, etc.).
import dcnr.postgres as pg
from dcnr.postgres.schema import deploy_schema
pg.set_configuration(host="localhost", dbname="mydb", user="u", password="p")
conn_data = {
"host": "localhost",
"dbname": "mydb",
"user": "u",
"password": "p"
}
with pg.get_connection() as conn:
results = deploy_schema(
yaml_path="schemas/myapp/definition.yaml",
conn=conn,
cdata=conn_data,
deploy=True, # True = apply without interactive confirmation
debug=True # print each SQL statement as it runs
)
for msg in results["messages"]:
print(msg)
Signature:
deploy_schema(
yaml_path: str, # path to the YAML schema definition file
conn: psycopg.Connection,
cdata: dict, # connection data (host, dbname, user, password, [port, schema])
deploy=None, # None = interactive prompt; True = auto-deploy; False = dry-run
debug: bool = False
) -> dict # {"messages": [...]}
What deploy_schema does:
- Reads the YAML definition file (see Schema Definition File).
- Calls
init_schemawhich:- Prompts to create the PostgreSQL schema if it does not exist.
- Creates any missing sequences.
- Creates any missing functions.
- Computes a diff between the live database structure and the YAML definition.
- Reports the number of differences found.
- If
deploy=True(or the user typesDEPLOYat the prompt), executes the DDL statements. - Loads initial seed data for tables that declare an
__datakey in the YAML.
schema_scripts_apply
Applies versioned SQL migration scripts from a directory (Flyway-style V<n>_<description>.sql naming). Already-applied files are tracked in a schema_scripts_history table.
from dcnr.postgres.schema import schema_scripts_apply
# set_configuration() must be called first
status, error, traceback = schema_scripts_apply(
schema_name="myschema",
migration_scripts_dir="migrations/"
)
if status != 0:
print(f"Migration failed: {error}")
File naming convention: V<version>_<description>.sql
| File | Applied? |
|---|---|
V01_init.sql |
First run |
V02_add_index.sql |
Second run |
V02_01_fix.sql |
Also valid |
Files are sorted lexicographically, so leading zeros are recommended for correct ordering. Once a file is applied it is never re-run.
Return value: (status, error_message, traceback_string)
status == 0→ successstatus == -1→ exception occurred
Reading Database Metadata
These functions query the live database and return Python structures describing the current schema state.
from dcnr.postgres.schema import (
read_database_tables,
read_database_columns,
read_table_columns,
read_routines,
read_triggers,
)
with pg.get_connection() as conn:
# List of table names in the schema
tables = read_database_tables(conn, "myschema")
# → ["users", "orders", "products"]
# Full column/constraint/index/FK metadata for every table
schema_meta = read_database_columns(conn, "myschema")
# → {"users": {"columns": {...}, "indexes": {...}, "constraints": {...}, "f_keys": [...]}, ...}
# Column names for a specific table
cols = read_table_columns(conn, "myschema", "users")
# → dict_keys(["id", "username", "email"])
# Stored functions/procedures
routines = read_routines(conn, "myschema")
# Triggers
triggers = read_triggers(conn, "myschema")
Schema Definition File
The structure of the YAML schema definition file is described in the separate document:
That file covers:
schemametadata block (name, sequences, functions)tablesblock with column definitions, data types, constraints, indexes, foreign keys, triggers, and seed data
Environment / .env Files
load_env_values() reads a simple KEY=VALUE text file (supporting # comments) and returns a dict. Useful for loading database credentials from a local .env file without an additional dependency.
from dcnr.postgres.schema import load_env_values
import dcnr.postgres as pg
env = load_env_values(".env")
pg.set_configuration(
host=env["DB_HOST"],
port=env.get("DB_PORT", "5432"),
dbname=env["DB_NAME"],
user=env["DB_USER"],
password=env["DB_PASS"]
)
Example .env file:
# Database credentials
DB_HOST=localhost
DB_PORT=5432
DB_NAME=mydb
DB_USER=appuser
DB_PASS=secret
API Reference
dcnr.postgres
| Symbol | Description |
|---|---|
set_configuration(**kwargs) |
Store global connection credentials |
get_configuration(schema=None) |
Return the stored credentials dict |
get_connection(creds=None) |
Open and return a psycopg.Connection |
get_shared_connection() |
Borrow a connection from the shared pool (context manager) |
insert_update(conn, schema, table, data, find, commit, cursor_always) |
Insert or update a row |
insert0(conn, schema, table, data) |
Insert without auto-commit |
update0(conn, schema, table, data, find) |
Update without auto-commit |
update_or_insert(curs, schema, table, data, find) |
Update-or-insert without auto-commit |
dcnr.postgres.schema
| Symbol | Description |
|---|---|
deploy_schema(yaml_path, conn, cdata, deploy, debug) |
Deploy schema from YAML definition |
load_env_values(env_file) |
Read KEY=VALUE env file into a dict |
schema_scripts_apply(schema_name, migration_scripts_dir) |
Apply versioned SQL migration scripts |
read_database_tables(conn, schema) |
Return list of table names |
read_database_columns(conn, schema) |
Return full column/index/FK metadata |
read_table_columns(conn, schema, tablename) |
Return column names for one table |
read_routines(conn, schema) |
Return list of routine names |
read_triggers(conn, schema) |
Return list of (table, trigger) tuples |
normalize_datatype_text(dt, charmax, num_precision, num_scale) |
Normalize a PostgreSQL data type string |
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dcnr_postgres-1.0.0.tar.gz.
File metadata
- Download URL: dcnr_postgres-1.0.0.tar.gz
- Upload date:
- Size: 28.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
726e3e531ad4f5d6f9d3cb11f2a1c2107ce6af70ee83c7b7ea856f0079555ba1
|
|
| MD5 |
0bc547f51c8e25e8f549dbc3dbd23ecf
|
|
| BLAKE2b-256 |
17d1030bde271ebc23e8f7792e28df3d61f75549ca07db7d3b9c2abfea54de48
|
File details
Details for the file dcnr_postgres-1.0.0-py3-none-any.whl.
File metadata
- Download URL: dcnr_postgres-1.0.0-py3-none-any.whl
- Upload date:
- Size: 24.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c855b8328c04e486035dd9c57fd694a6f7b07ec9ff639eb1b448f35be8a3d8a6
|
|
| MD5 |
359a5ab15c4e9610cdf3a581bac73fba
|
|
| BLAKE2b-256 |
b29c0ea4468af210471689f450e76845a7118ed72e64a0506f8c297d7dd00b21
|