Skip to main content

A lightweight PostgreSQL wrapper for Python with connection pooling

Project description

ZNPG v1.3.0 Documentation

A robust, high-level PostgreSQL database abstraction layer built on psycopg3 with connection pooling, query building, and context manager support.

Philosophy:

Database access should be simple, safe, and Pythonic.

ZNPG was born from frustration with psycopg2's verbosity and SQLAlchemy's complexity. We believe database libraries should:

  • Get out of your way with clean, intuitive APIs
  • Protect you from yourself with safe defaults
  • Scale with your needs from scripts to production apps

Built by a developer who got tired of boilerplate. Used by 250+ developers who felt the same.

What's New in v1.3.0

  • Fixed transaction support — all write methods now accept an optional conn parameter, making atomic multi-operation transactions actually reliable
  • All CRUD methods (insert, update, delete, bulk_insert, select, fetch_one, query, execute) now support conn passthrough

Features

  • Zero-boilerplate — from import to query in 3 lines
  • Safe by default — no unsafe DELETE/UPDATE without explicit flags
  • Built-in pooling — connection pooling that just works
  • Full CRUD — high-level operations for 95% of use cases
  • Raw SQL access — escape hatch when you need full control
  • Type-hinted — modern Python with complete type hints
  • Production-ready — connection health checks, stats, and maintenance ops
  • JSON native — built-in import/export for data portability
  • True atomic transactions — pass conn into any method to run it inside a transaction

Table of Contents


Overview

ZNPG provides a Pythonic interface to PostgreSQL databases with the following features:

  • Connection Pooling: Built-in psycopg_pool integration for efficient connection management
  • Context Managers: Safe resource handling with with statements
  • CRUD Abstractions: High-level methods for common database operations
  • Query Builder Integration: Seamless SQL generation via QueryBuilder class
  • Type Safety: Full type hints and generic support
  • Atomic Transactions: Pass conn into any method to guarantee all-or-nothing execution
  • JSON Export/Import: Native support for data serialization

Installation

pip install znpg

Dependencies:

  • psycopg (v3.x)
  • psycopg-pool
  • Python 3.8+

Quick Start

from znpg import Database

# Initialize with connection string
db = Database(min_size=2, max_size=10)
db.url_connect("postgresql://user:pass@localhost:5432/mydb")

# Or use manual connection parameters
db.manual_connect(
    username="user",
    password="pass",
    host="localhost",
    port=5432,
    db_name="mydb"
)

# Simple query
users = db.query("SELECT * FROM users WHERE age > %s", [18])

# Using context manager (recommended)
with Database() as db:
    db.url_connect("postgresql://user:pass@localhost/db")
    result = db.select("users", where={"status": "active"})

Core Classes

Database Class

The primary interface for all database operations.

Constructor

Database(
    min_size: int = 1,      # Minimum connections in pool
    max_size: int = 10,     # Maximum connections in pool
    timeout: int = 30       # Connection timeout in seconds
)

Attributes

Attribute Type Description
pool Optional[ConnectionPool] The underlying connection pool instance
min_size int Minimum number of connections maintained
max_size int Maximum number of connections allowed
is_connected bool Connection status flag
timeout int Connection acquisition timeout

Connection Management

url_connect(conn_string: str) -> None

Connect using a PostgreSQL connection URI.

db = Database()
db.url_connect("postgresql://admin:secret@db.example.com:5432/production")

manual_connect(username, host, password, db_name, port) -> None

Connect using individual parameters.

db.manual_connect(
    username="postgres",
    host="localhost",
    password="secure_pass",
    db_name="myapp",
    port=5432
)

get_connection() -> ContextManager

Context manager for raw connection access.

with db.get_connection() as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT version()")
        print(cur.fetchone())

is_healthy() -> bool

Check database connectivity.

if db.is_healthy():
    print("Database connection is active")

stats() -> Dict

Retrieve connection pool statistics.

stats = db.stats()
# Returns: {"size": 5, "available": 3, "used": 2}

close() -> None

Explicitly close the connection pool.

db.close()  # Called automatically when using context managers

CRUD Operations

The conn Parameter

All write methods and most read methods accept an optional conn parameter. When provided, the method runs on that connection instead of grabbing one from the pool. This is how you achieve atomic transactions — see Transaction Support.

# without conn — grabs its own connection, auto-commits
db.insert("users", {"name": "Alice"})

# with conn — runs inside your transaction, no auto-commit
with db.transaction() as conn:
    db.insert("users", {"name": "Alice"}, conn=conn)

Read Operations

query(sql, params, conn) -> List[Dict[str, Any]]

Execute raw SQL and return results as a list of dicts.

results = db.query(
    "SELECT * FROM orders WHERE status = %s AND amount > %s",
    ["pending", 100.00]
)
# Returns: [{"id": 1, "status": "pending", "amount": 150.00}, ...]

fetch_one(sql, params, conn) -> Optional[Dict[str, Any]]

Fetch a single record or None.

user = db.fetch_one("SELECT * FROM users WHERE email = %s", ["john@example.com"])
if user:
    print(user["name"])

select(table, columns, where, order_by, limit, conn) -> List[Dict[str, Any]]

High-level SELECT with QueryBuilder.

# Basic select
users = db.select("users")

# With filters
active_users = db.select(
    table="users",
    columns=["id", "name", "email"],
    where={"status": "active", "verified": True},
    order_by=["created_at DESC"],
    limit=10
)

get_by_id(table, id_name, id) -> List[Dict]

Fetch a record by primary key.

user = db.get_by_id("users", "user_id", 42)

count(table, where) -> Optional[int]

Count records with optional filtering.

total = db.count("orders")
pending_count = db.count("orders", where={"status": "pending"})

exists(table, where) -> Optional[bool]

Check if any records matching the criteria exist.

has_admin = db.exists("users", {"role": "admin"})

Write Operations

execute(sql, params, conn) -> int

Execute raw SQL (INSERT, UPDATE, DELETE). Returns rowcount.

rows_deleted = db.execute("DELETE FROM logs WHERE created_at < %s", ["2023-01-01"])

insert(table, data, conn) -> bool

Insert a single record.

success = db.insert("users", {
    "name": "Jane Doe",
    "email": "jane@example.com",
    "created_at": "2024-01-15"
})

update(table, data, conditions, allow_all, conn) -> int

Update records with safety checks.

# Safe update with WHERE clause
rows_updated = db.update(
    table="users",
    data={"last_login": "2024-01-15"},
    conditions={"id": 42}
)

# Update all records (requires explicit flag)
db.update("users", {"status": "inactive"}, allow_all=True)

delete(table, conditions, allow_deleteall, conn) -> int

Delete records with safety checks.

# Safe delete
deleted = db.delete("sessions", {"expired": True})

# Delete all (requires explicit flag)
db.delete("logs", allow_deleteall=True)

bulk_insert(table, data, on_conflict, conn) -> int

Efficient batch insertion.

users = [
    {"name": "Alice", "email": "alice@example.com"},
    {"name": "Bob", "email": "bob@example.com"}
]
inserted = db.bulk_insert("users", users, on_conflict="DO NOTHING")

Transaction Support

transaction() -> ContextManager

Yields a connection for atomic multi-operation transactions. Automatically commits on success and rolls back on any exception.

v1.3.0 fix: pass the yielded conn into any method to guarantee they all run on the same connection — making rollbacks actually work.

# Atomic checkout: sale recorded + inventory updated or neither happens
try:
    with db.transaction() as conn:
        db.insert("sales", {"product_id": "abc", "total": 29.99}, conn=conn)
        db.update("inventory", {"quantity": 11}, {"product_id": "abc"}, conn=conn)
        # both succeed → auto-commit
except Exception as e:
    # either one fails → both roll back
    logger.error(f"Checkout failed, transaction rolled back: {e}")

Without conn (old broken behavior):

# DON'T do this — each call grabs its own connection, rollback won't affect them
with db.transaction() as conn:
    db.insert("sales", {...})      # connection A
    db.update("inventory", {...})  # connection B — not part of the transaction!

DDL Operations

create_table(table, columns) -> bool

db.create_table("products", {
    "id": "SERIAL PRIMARY KEY",
    "name": "VARCHAR(255) NOT NULL",
    "price": "DECIMAL(10,2)",
    "created_at": "TIMESTAMP DEFAULT CURRENT_TIMESTAMP"
})

drop_table(table, cascade, allow_action) -> bool

db.drop_table("temp_table", allow_action=True)
db.drop_table("orders", cascade=True, allow_action=True)

truncate(table) -> bool

db.truncate("logs")

table_exists(table) -> Optional[bool]

if db.table_exists("migrations"):
    print("Already set up")

get_table_columns(table) -> Optional[List[str]]

columns = db.get_table_columns("users")
# Returns: ['id', 'name', 'email', 'created_at']

create_index(table, columns, unique) -> int

db.create_index("users", ["email"], unique=True)
db.create_index("orders", ["user_id", "created_at"])

vacuum(table, analyze) -> int

db.vacuum()                            # full database
db.vacuum("large_table", analyze=True)

Utility Methods

export_to_json(file, data, indent) -> bool

data = db.select("users")
Database.export_to_json("backup.json", data)

import_from_json(file) -> Union[dict, bool]

data = Database.import_from_json("config.json")
db.bulk_insert("settings", data)

c_string(username, host, password, db_name, port) -> str

conn_str = Database.c_string("user", "localhost", "pass", "db", 5432)
# Returns: "postgresql://user:pass@localhost:5432/db"

Error Handling

All operations catch psycopg.Error and log via the configured logger. Methods return safe defaults on failure:

Method Failure Return
select() []
insert() False
update() 0
delete() 0
create_table() False
drop_table() False
truncate() False
table_exists() None
bulk_insert() 0
get_table_columns() None
get_by_id() None
count() None
exists() None

Connection failures are re-raised after logging.


Best Practices

1. Always Use Context Managers

# good
with Database() as db:
    db.url_connect(conn_str)
    data = db.select("users")

# risky — pool may not close if an exception occurs
db = Database()
db.url_connect(conn_str)

2. Use Parameterized Queries

# safe
db.query("SELECT * FROM users WHERE id = %s", [user_id])

# never do this — SQL injection risk
db.query(f"SELECT * FROM users WHERE id = {user_id}")

3. Always Pass conn Inside Transactions

with db.transaction() as conn:
    db.insert("orders", order_data, conn=conn)      # correct
    db.update("inventory", inv_data, conn=conn)     # correct

4. Handle Connection Failures

try:
    db.url_connect(conn_str)
except Exception as e:
    logger.critical(f"Database connection failed: {e}")
    raise SystemExit(1)

5. Use Explicit Safety Flags

# raises ValueError — missing WHERE
db.update("users", {"role": "admin"})

# works
db.update("users", {"role": "admin"}, allow_all=True)

6. Monitor Pool Health

stats = db.stats()
if stats["available"] / stats["size"] < 0.2:
    logger.warning("Pool running low on connections")

API Reference Summary

Method Returns Description
url_connect(conn_string) None Connect via URI
manual_connect(...) None Connect via params
get_connection() ContextManager Raw connection access
transaction() ContextManager Atomic transaction context
query(sql, params, conn) List[Dict] Raw SQL query
execute(sql, params, conn) int Raw SQL execution
fetch_one(sql, params, conn) Optional[Dict] Single record fetch
select(..., conn) List[Dict] High-level SELECT
insert(table, data, conn) bool Insert record
update(table, data, conditions, allow_all, conn) int Update records
delete(table, conditions, allow_deleteall, conn) int Delete records
bulk_insert(table, data, on_conflict, conn) int Batch insert
create_table(table, columns) bool Create table
drop_table(table, cascade, allow_action) bool Drop table
truncate(table) bool Truncate table
table_exists(table) Optional[bool] Check existence
get_table_columns(table) Optional[List[str]] Get columns
get_by_id(table, id_name, id) List[Dict] Fetch by PK
count(table, where) Optional[int] Count records
exists(table, where) Optional[bool] Check existence
create_index(table, columns, unique) int Create index
vacuum(table, analyze) int Run VACUUM
is_healthy() bool Health check
stats() Dict Pool statistics
close() None Close pool
export_to_json(file, data, indent) bool Static: export JSON
import_from_json(file) Union[dict, bool] Static: import JSON
c_string(...) str Static: build conn string

Version: 1.3.0 License: MIT Python Support: 3.8+ PostgreSQL: 12+ Author: ZN-0X

For issues and contributions, visit the repository.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

znpg-1.3.0.tar.gz (18.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

znpg-1.3.0-py3-none-any.whl (14.0 kB view details)

Uploaded Python 3

File details

Details for the file znpg-1.3.0.tar.gz.

File metadata

  • Download URL: znpg-1.3.0.tar.gz
  • Upload date:
  • Size: 18.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for znpg-1.3.0.tar.gz
Algorithm Hash digest
SHA256 d06022c61dafb777dfb037effca1c8c5d40f8b624121fd99f2d64398d4bf7e26
MD5 1d01b1298ee95eb5a79159229d50db04
BLAKE2b-256 44557e99429929ce14ffda025188668f221d4ea49dc2f3cb920e3b775b2fc92e

See more details on using hashes here.

File details

Details for the file znpg-1.3.0-py3-none-any.whl.

File metadata

  • Download URL: znpg-1.3.0-py3-none-any.whl
  • Upload date:
  • Size: 14.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for znpg-1.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 cf6d21756bf490b3b6b921179903159e29adbeebcd2bdc59f4aa8067dc26312d
MD5 df1b6132e4045bd26dfbf3e69ef60f56
BLAKE2b-256 868350ec6aa3d50c7d1a5e50e4ee80c15b7b60912227a772b2dc45086a921037

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page