Skip to main content

A lightweight PostgreSQL wrapper for Python with connection pooling

Project description

ZNPG v1.2.0 Documentation

A robust, high-level PostgreSQL database abstraction layer built on psycopg3 with connection pooling, query building, and context manager support.

Philosophy:

Database access should be simple, safe, and Pythonic.

ZNPG was born from frustration with psycopg2's verbosity and SQLAlchemy's complexity. We believe database libraries should:

  • Get out of your way with clean, intuitive APIs

  • Protect you from yourself with safe defaults

  • Scale with your needs from scripts to production apps

Built by a developer who got tired of boilerplate. Used by 250+ developers who felt the same.

Features

  • Zero-boilerplate - From import to query in 3 lines

  • Safe by default - No unsafe DELETE/UPDATE without explicit flags

  • Built-in pooling - Connection pooling that just works

  • Full CRUD - High-level operations for 95% of use cases

  • Raw SQL access - Escape hatch when you need full control

  • Type-hinted - Modern Python with complete type hints

  • Production-ready - Connection health checks, stats, and maintenance ops

  • JSON native - Built-in import/export for data portability


Table of Contents


Overview

ZNPG provides a Pythonic interface to PostgreSQL databases with the following features:

  • Connection Pooling: Built-in psycopg_pool integration for efficient connection management
  • Context Managers: Safe resource handling with with statements
  • CRUD Abstractions: High-level methods for common database operations
  • Query Builder Integration: Seamless SQL generation via QueryBuilder class
  • Type Safety: Full type hints and generic support
  • Transaction Support: Atomic operations with automatic rollback on failure
  • JSON Export/Import: Native support for data serialization

Installation

# Optional: Install znpg package
pip install znpg

Dependencies:

  • psycopg (v3.x)
  • psycopg-pool
  • typing (Python 3.8+)

Quick Start

from znpg import Database

# Initialize with connection string
db = Database(min_size=2, max_size=10)
db.url_connect("postgresql://user:pass@localhost:5432/mydb")

# Or use manual connection parameters
db.manual_connect(
    username="user",
    password="pass",
    host="localhost",
    port=5432,
    db_name="mydb"
)

# Simple query
users = db.query("SELECT * FROM users WHERE age > %s", [18])

# Using context manager (recommended)
with Database() as db:
    db.url_connect("postgresql://user:pass@localhost/db")
    result = db.select("users", where={"status": "active"})

Core Classes

Database Class

The primary interface for database operations.

Constructor

Database(
    min_size: int = 1,      # Minimum connections in pool
    max_size: int = 10,     # Maximum connections in pool  
    timeout: int = 30       # Connection timeout in seconds
)

Attributes

Attribute Type Description
pool Optional[ConnectionPool] The underlying connection pool instance
min_size int Minimum number of connections maintained
max_size int Maximum number of connections allowed
is_connected bool Connection status flag
timeout int Connection acquisition timeout

Connection Management

Connection Methods

url_connect(conn_string: str) -> None

Connect using a PostgreSQL connection URI.

db = Database()
db.url_connect("postgresql://admin:secret@db.example.com:5432/production")

manual_connect(username, host, password, db_name, port) -> None

Connect using individual parameters.

db.manual_connect(
    username="postgres",
    host="localhost", 
    password="secure_pass",
    db_name="myapp",
    port=5432
)

get_connection() -> ContextManager

Context manager for raw connection access.

with db.get_connection() as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT version()")
        print(cur.fetchone())

is_healthy() -> bool

Check database connectivity.

if db.is_healthy():
    print("Database connection is active")

stats() -> Dict

Retrieve connection pool statistics.

stats = db.stats()
# Returns: {"size": 5, "available": 3, "used": 2}

close() -> None

Explicitly close the connection pool.

db.close()  # Automatically called when using context managers

CRUD Operations

Read Operations

query(sql: str, params: Optional[List[Any]] = None) -> List[Dict[str, Any]]

Execute raw SQL and return results as list of dictionaries.

# Parameterized query (safe against SQL injection)
results = db.query(
    "SELECT * FROM orders WHERE status = %s AND amount > %s",
    ["pending", 100.00]
)
# Returns: [{"id": 1, "status": "pending", "amount": 150.00}, ...]

fetch_one(sql: str, params: Optional[List[Any]] = None) -> Optional[Dict[str, Any]]

Fetch single record or None.

user = db.fetch_one("SELECT * FROM users WHERE email = %s", ["john@example.com"])
if user:
    print(user["name"])

select(table, columns, where, order_by, limit) -> List[Dict[str, Any]]

High-level SELECT with QueryBuilder.

# Basic select
users = db.select("users")

# With filters
active_users = db.select(
    table="users",
    columns=["id", "name", "email"],
    where={"status": "active", "verified": True},
    order_by=["created_at DESC"],
    limit=10
)

get_by_id(table: str, id_name: str, id: Union[str, int]) -> List[Dict]

Fetch record by primary key.

user = db.get_by_id("users", "user_id", 42)

count(table: str, where: Optional[Dict] = None) -> Optional[int]

Count records with optional filtering.

total = db.count("orders")
pending_count = db.count("orders", where={"status": "pending"})

exists(table: str, where: Dict[str, Any]) -> Optional[bool]

Check if records matching criteria exist.

has_admin = db.exists("users", {"role": "admin"})

Write Operations

execute(sql: str, params: Optional[List[Any]] = None) -> int

Execute non-query SQL (INSERT, UPDATE, DELETE). Returns rowcount.

rows_deleted = db.execute("DELETE FROM logs WHERE created_at < %s", ["2023-01-01"])

insert(table: str, data: Dict[str, Any]) -> bool

Insert single record.

success = db.insert("users", {
    "name": "Jane Doe",
    "email": "jane@example.com",
    "created_at": "2024-01-15"
})

update(table: str, data: Dict, conditions: Optional[Dict] = None, allow_all: bool = False) -> int

Update records with safety checks.

# Safe update with WHERE clause
rows_updated = db.update(
    table="users",
    data={"last_login": "2024-01-15"},
    conditions={"id": 42}
)

# DANGEROUS: Update all records (requires explicit flag)
db.update("users", {"status": "inactive"}, allow_all=True)

delete(table: str, conditions: Optional[Dict] = None, allow_deleteall: bool = False) -> int

Delete records with safety checks.

# Safe delete
deleted = db.delete("sessions", {"expired": True})

# DANGEROUS: Delete all (requires explicit flag)
db.delete("logs", allow_deleteall=True)

bulk_insert(table: str, data: List[Dict], on_conflict: Optional[str] = None) -> int

Efficient batch insertion.

users = [
    {"name": "Alice", "email": "alice@example.com"},
    {"name": "Bob", "email": "bob@example.com"}
]
inserted = db.bulk_insert("users", users, on_conflict="DO NOTHING")

DDL Operations

create_table(table: str, columns: Optional[Dict[str, str]] = None) -> bool

Create new table.

db.create_table("products", {
    "id": "SERIAL PRIMARY KEY",
    "name": "VARCHAR(255) NOT NULL",
    "price": "DECIMAL(10,2)",
    "created_at": "TIMESTAMP DEFAULT CURRENT_TIMESTAMP"
})

drop_table(table: str, cascade: bool = False, allow_action: bool = False) -> bool

Drop table with safety checks.

db.drop_table("temp_table", allow_action=True)
db.drop_table("orders", cascade=True, allow_action=True)  # Cascades to dependencies

truncate(table: str) -> bool

Truncate table (remove all data, keep structure).

db.truncate("logs")

table_exists(table: str) -> Optional[bool]

Check table existence.

if db.table_exists("migrations"):
    print("Migrations table already created")

get_table_columns(table: str) -> Optional[List[str]]

Retrieve column names for a table.

columns = db.get_table_columns("users")
# Returns: ['id', 'name', 'email', 'created_at']

create_index(table: str, columns: List[str], unique: bool = False) -> int

Create database index.

db.create_index("users", ["email"], unique=True)
db.create_index("orders", ["user_id", "created_at"])

vacuum(table: Optional[str] = None, analyze: bool = True) -> int

Run PostgreSQL VACUUM for maintenance.

db.vacuum()  # Full database
db.vacuum("large_table", analyze=True)

Transaction Support

transaction() -> ContextManager

Explicit transaction management with automatic commit/rollback.

try:
    with db.transaction() as conn:
        # All operations use same connection
        db.insert("accounts", {"user_id": 1, "balance": 100})
        db.insert("transactions", {"account_id": 1, "amount": 100})
        # Automatically commits if no exception
except Exception as e:
    # Automatically rolled back on exception
    logger.error(f"Transaction failed: {e}")

Note: The transaction() method yields a connection, but the current implementation doesn't override internal methods to use this connection. For true transactional safety with the high-level methods, extend the class or use get_connection() directly:

with db.get_connection() as conn:
    try:
        with conn.cursor() as cur:
            cur.execute("INSERT INTO accounts ...")
            cur.execute("INSERT INTO transactions ...")
        conn.commit()
    except Exception:
        conn.rollback()
        raise

Utility Methods

JSON Serialization

export_to_json(file: str, data: dict, indent: int = 4) -> bool

Static method to export data to JSON file.

data = db.select("users")
Database.export_to_json("backup.json", data)

import_from_json(file: str) -> Union[dict, bool]

Static method to import data from JSON file.

data = Database.import_from_json("config.json")
db.bulk_insert("settings", data)

Connection String Builder

c_string(username, host, password, db_name, port) -> str

Static method to generate connection string.

conn_str = Database.c_string("user", "localhost", "pass", "db", 5432)
# Returns: "postgresql://user:pass@localhost:5432/db"

Query Builder Integration

The Database class relies on a QueryBuilder class (not shown) to construct SQL. Expected interface:

Method Purpose
build_select_query(table, columns, where, order_by, limit) Generate SELECT SQL
build_insert_query(table, data) Generate INSERT SQL
build_update_query(table, data, conditions) Generate UPDATE SQL
build_delete_query(table, conditions) Generate DELETE SQL
build_createtable_query(table, columns) Generate CREATE TABLE SQL
build_droptable_query(table, cascade, allow) Generate DROP TABLE SQL
build_truncate_query(table) Generate TRUNCATE SQL
build_findtable_query() Generate table existence check
build_bulk_insert(table, data, on_conflict) Generate batch INSERT
build_allcolumns_query() Generate column metadata query
build_getby_id(table, id_name) Generate primary key lookup
build_count_query(table, where) Generate COUNT SQL
build_exists_query(table, where) Generate EXISTS SQL
build_create_index(table, columns, unique) Generate CREATE INDEX
build_vacuum(table, analyze) Generate VACUUM SQL

Error Handling

All database operations catch psycopg.Error and log via the configured logger. Methods return safe defaults on failure:

Method Failure Return
select() []
insert() False
update() 0
delete() 0
create_table() False
drop_table() False
truncate() False
table_exists() None
bulk_insert() 0
get_table_columns() None
get_by_id() None
count() None
exists() None

Critical errors (connection failures) are re-raised after logging.


Best Practices

1. Always Use Context Managers

# Good
with Database() as db:
    db.url_connect(conn_str)
    data = db.select("users")

# Avoid
db = Database()
db.url_connect(conn_str)
# If exception occurs here, pool may not close

2. Use Parameterized Queries

# Safe
db.query("SELECT * FROM users WHERE id = %s", [user_id])

# NEVER do this (SQL Injection risk)
db.query(f"SELECT * FROM users WHERE id = {user_id}")

3. Handle Connection Failures

try:
    db.url_connect(conn_str)
except Exception as e:
    logger.critical(f"Database connection failed: {e}")
    raise SystemExit(1)

4. Use Explicit Safety Flags

# This raises ValueError
db.update("users", {"role": "admin"})  # Missing WHERE

# This works
db.update("users", {"role": "admin"}, allow_all=True)

5. Monitor Pool Health

stats = db.stats()
if stats["available"] / stats["size"] < 0.2:
    logger.warning("Database pool running low on connections")

API Reference Summary

Class: Database

Method Returns Description
__init__(min_size, max_size, timeout) Database Constructor
url_connect(conn_string) None Connect via URI
manual_connect(...) None Connect via params
get_connection() ContextManager Raw connection access
query(sql, params) List[Dict] Raw SQL query
execute(sql, params) int Raw SQL execution
fetch_one(sql, params) Optional[Dict] Single record fetch
select(...) List[Dict] High-level SELECT
insert(table, data) bool Insert record
update(table, data, conditions, allow_all) int Update records
delete(table, conditions, allow_deleteall) int Delete records
bulk_insert(table, data, on_conflict) int Batch insert
create_table(table, columns) bool Create table
drop_table(table, cascade, allow_action) bool Drop table
truncate(table) bool Truncate table
table_exists(table) Optional[bool] Check existence
get_table_columns(table) Optional[List[str]] Get columns
get_by_id(table, id_name, id) List[Dict] Fetch by PK
count(table, where) Optional[int] Count records
exists(table, where) Optional[bool] Check existence
create_index(table, columns, unique) int Create index
vacuum(table, analyze) int Run VACUUM
is_healthy() bool Health check
stats() Dict Pool statistics
transaction() ContextManager Transaction context
close() None Close pool
export_to_json(file, data, indent) bool Static: Export JSON
import_from_json(file) Union[dict, bool] Static: Import JSON
c_string(...) str Static: Build conn string

Version: 1.2.0
License: MIT
Python Support: 3.8+
PostgreSQL: 12+
Author: ZN-0X

For issues and contributions, visit the project repository.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

znpg-1.2.0.tar.gz (18.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

znpg-1.2.0-py3-none-any.whl (13.9 kB view details)

Uploaded Python 3

File details

Details for the file znpg-1.2.0.tar.gz.

File metadata

  • Download URL: znpg-1.2.0.tar.gz
  • Upload date:
  • Size: 18.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for znpg-1.2.0.tar.gz
Algorithm Hash digest
SHA256 b94dcd1a9507eaff36bf80189131c180708b709f45f0174b0640bb14c35f3953
MD5 611effa2f5a9257cbae17fa4caad45de
BLAKE2b-256 9d1de78f86614e0142b0345c435467c1567859e51ee71478fc20a9477d561d4c

See more details on using hashes here.

File details

Details for the file znpg-1.2.0-py3-none-any.whl.

File metadata

  • Download URL: znpg-1.2.0-py3-none-any.whl
  • Upload date:
  • Size: 13.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for znpg-1.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 91a51aba5fad35b6513cab035ac12a5639dc192ef22502242f3be9b6291bb6bb
MD5 fb62deb62567fa6d8af590469322e9e7
BLAKE2b-256 211b36022c16a954f1500a00e40e88c9e5d667f9238bcb60237e9573b8534cde

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page