A lightweight PostgreSQL wrapper for Python with connection pooling
Project description
znpg
A lightweight PostgreSQL wrapper for Python with connection pooling and a clean API.
Why znpg?
Working with psycopg directly involves a lot of boilerplate. znpg removes the repetition while maintaining the flexibility of raw SQL when you need it.
Key features:
- Built-in connection pooling
- Simple CRUD operations
- Bulk insert support
- Transaction management
- SQL injection protection
- Type hints throughout
Installation
pip install znpg
Quick Start
from znpg import Database
# Connect to database
db = Database()
db.url_connect("postgresql://user:password@localhost:5432/dbname")
# Create table
db.create_table('users', {
'id': 'SERIAL PRIMARY KEY',
'name': 'VARCHAR(100) NOT NULL',
'email': 'VARCHAR(255) UNIQUE',
'created_at': 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP'
})
# Insert data
db.insert('users', {
'name': 'John Doe',
'email': 'john@example.com'
})
# Query data
users = db.select('users', where={'name': 'John Doe'})
print(users) # [{'id': 1, 'name': 'John Doe', 'email': 'john@example.com', ...}]
# Close connection
db.close()
Using Context Manager
The recommended way to use znpg is with a context manager, which automatically handles connection cleanup:
from znpg import Database
with Database() as db:
db.url_connect("postgresql://user:password@localhost:5432/dbname")
# Your database operations
users = db.select('users')
# Connection pool automatically closed
Core Operations
Connecting
Using URL string:
db = Database()
db.url_connect("postgresql://user:password@localhost:5432/dbname")
Using individual parameters:
db = Database()
db.manual_connect(
username="user",
password="password",
host="localhost",
db_name="dbname",
port=5432
)
Selecting Data
Select all:
users = db.select('users')
Select with conditions:
users = db.select('users', where={'active': True})
Select specific columns:
users = db.select('users', columns=['name', 'email'])
With ordering and limit:
users = db.select('users',
where={'active': True},
order_by='created_at DESC',
limit=10
)
Inserting Data
Single row:
db.insert('users', {
'name': 'Jane Smith',
'email': 'jane@example.com'
})
Multiple rows (bulk insert):
db.bulk_insert('users', [
{'name': 'Alice', 'email': 'alice@example.com'},
{'name': 'Bob', 'email': 'bob@example.com'},
{'name': 'Charlie', 'email': 'charlie@example.com'}
])
Bulk insert is significantly faster for large datasets.
Updating Data
Update with conditions:
db.update('users',
data={'active': False},
conditions={'email': 'john@example.com'}
)
Update all rows (requires explicit permission):
db.update('users',
data={'verified': True},
allow_all=True
)
Deleting Data
Delete with conditions:
db.delete('users', conditions={'active': False})
Delete all (requires explicit permission):
db.delete('users', allow_deleteall=True)
Table Management
Create Table
db.create_table('products', {
'id': 'SERIAL PRIMARY KEY',
'name': 'VARCHAR(200) NOT NULL',
'price': 'DECIMAL(10, 2)',
'stock': 'INTEGER DEFAULT 0',
'created_at': 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP'
})
Drop Table
db.drop_table('old_table', allow_action=True)
With cascade:
db.drop_table('parent_table', cascade=True, allow_action=True)
Check if Table Exists
if db.table_exists('users'):
print("Table exists")
Get Table Columns
columns = db.get_table_columns('users')
print(columns) # ['id', 'name', 'email', 'created_at']
Truncate Table
db.truncate('logs')
Utility Methods
Count Rows
total_users = db.count('users')
active_users = db.count('users', where={'active': True})
Check if Record Exists
exists = db.exists('users', {'email': 'john@example.com'})
Get by ID
user = db.get_by_id('users', 'id', 123)
Transactions
For operations that need to be atomic:
with db.transaction() as conn:
cursor = conn.cursor()
cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
# Automatically commits on success, rolls back on error
Raw SQL
When you need full control, you can execute raw SQL:
# Query with results
results = db.query("SELECT * FROM users WHERE age > %s", [18])
# Execute without results
rows_affected = db.execute("DELETE FROM logs WHERE created_at < %s", ['2023-01-01'])
# Fetch single row
user = db.fetch_one("SELECT * FROM users WHERE id = %s", [123])
Safety Features
znpg includes safety checks for destructive operations:
UPDATE without WHERE clause:
# This will raise ValueError
db.update('users', {'active': False})
# Must explicitly allow
db.update('users', {'active': False}, allow_all=True)
DELETE without WHERE clause:
# This will raise ValueError
db.delete('users')
# Must explicitly allow
db.delete('users', allow_deleteall=True)
DROP TABLE requires confirmation:
# This will raise AuthorizationError
db.drop_table('important_table')
# Must explicitly allow
db.drop_table('important_table', allow_action=True)
Connection Pooling
znpg uses connection pooling by default (1-10 connections). This means:
- Connections are reused across operations
- Better performance under load
- Automatic connection management
- Thread-safe operations
You don't need to manage connections manually - the pool handles everything.
Requirements
- Python 3.7+
- psycopg 3.0+
- psycopg-pool 3.0+
Performance
Bulk insert performance test (69 rows):
- Traditional loop insert: ~15-20 seconds
- znpg bulk_insert: <5 seconds
For data pipelines and ETL operations, bulk_insert provides significant performance improvements.
Error Handling
All methods include error handling and return sensible defaults:
# Returns empty list on error
users = db.select('nonexistent_table') # []
# Returns False on error
success = db.insert('users', {'invalid': 'data'}) # False
# Returns 0 on error
count = db.count('nonexistent_table') # None
Errors are printed to console for debugging.
License
MIT License - see LICENSE file for details.
Contributing
Contributions are welcome. Please open an issue first to discuss proposed changes.
Author
Built by Zain, a 17-year-old developer from Pakistan.
Changelog
Version 1.0.0
- Initial release
- Core CRUD operations
- Connection pooling
- Bulk insert support
- Table management
- Transaction support
- Safety checks for destructive operations
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file znpg-1.1.1.tar.gz.
File metadata
- Download URL: znpg-1.1.1.tar.gz
- Upload date:
- Size: 10.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e19f6c077be9b328050aa0e4177b4d0b5e47a92fd8e40c6dc9653449a8afa19e
|
|
| MD5 |
b27698b5cfd50145c3d4f6397675e858
|
|
| BLAKE2b-256 |
b12da21c8e1278a0802e57d0a578761ce75ab6b8f0ee526ac608e474400c11b5
|
File details
Details for the file znpg-1.1.1-py3-none-any.whl.
File metadata
- Download URL: znpg-1.1.1-py3-none-any.whl
- Upload date:
- Size: 8.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
371f9fa9d93056e67121d931a598809dc69a4bea0c848262984f56864dc4989e
|
|
| MD5 |
931f7ccd318c8d7c4e7a0d8187dc6f35
|
|
| BLAKE2b-256 |
32456a8064a331afe087808747ec9e61ff6e847c4b40c550f784eced41b6dbd5
|