Utility functions for PostgreSQL
Project description
ry-pg-utils
A Python utility library for PostgreSQL database operations with dynamic table creation, connection management, Protocol Buffer integration, and PostgreSQL LISTEN/NOTIFY support.
Overview
ry-pg-utils provides a robust framework for working with PostgreSQL databases in Python applications. It includes utilities for:
- Database connection management with connection pooling
- Dynamic table creation from Protocol Buffer message definitions
- Thread-safe session management
- Multi-backend support with automatic backend ID tracking
- PostgreSQL LISTEN/NOTIFY triggers and notifications
- Database updater for dynamic configuration via Redis
- Argument parsing for PostgreSQL connection parameters
Features
- Connection Management: Thread-safe PostgreSQL connection pooling with automatic retry logic and health checks
- Dynamic Tables: Automatically create and manage database tables from Protocol Buffer message schemas
- Multi-Backend Support: Track data across multiple backend instances with automatic ID tagging
- Session Management: Context managers for safe database session handling
- Notification System: Built-in PostgreSQL LISTEN/NOTIFY support with triggers and callbacks
- Advanced Configuration: Lazy-loaded, thread-safe config with runtime overrides and import order independence
- Redis Integration: Optional Redis-based database configuration updates
- Type Safety: Full type hints and mypy support
Installation
pip install ry-pg-utils
Dependencies
- Python 3.12+
- PostgreSQL database
- SQLAlchemy 2.0+
- Protocol Buffer support (protobuf)
- psycopg2-binary
- tenacity (for retry logic)
- python-dotenv (for environment variables)
- ryutils (logging utilities)
- ry_redis_bus (optional, for Redis integration)
Configuration
ry-pg-utils uses a powerful, thread-safe configuration system that supports lazy loading, runtime overrides, and import order independence.
Key Features
- Lazy Loading: Configuration loads only when first accessed
- Thread-Safe: All configuration operations use proper locking
- Import Order Independent: Runtime overrides work regardless of when modules are imported
- Environment Variables: Load from
.envfiles automatically - Runtime Overrides: Programmatically set config values at any time
1. Environment Variables
Create a .env file in your project root:
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_DB=mydb
POSTGRES_USER=postgres
POSTGRES_PASSWORD=secret
SSH_HOST=remote.server.com
SSH_PORT=22
SSH_USER=deploy
SSH_KEY_PATH=/path/to/key
2. Runtime Configuration
from ry_pg_utils.config import get_config, set_config
# Get current configuration (lazy-loaded from environment)
config = get_config()
print(config.postgres_host)
print(config.postgres_port)
# Set configuration at runtime (thread-safe, import order independent)
set_config(
postgres_host="new-host",
postgres_port=5433,
postgres_db="production_db",
add_backend_to_tables=True
)
# All modules now get the updated config, regardless of import order
config = get_config()
print(config.postgres_host) # "new-host"
3. Argument Parsing Integration
Perfect for command-line applications that need to override config at startup:
import argparse
from ry_pg_utils.parse_args import add_postgres_db_args
from ry_pg_utils.config import set_config
# Parse arguments
parser = argparse.ArgumentParser()
add_postgres_db_args(parser)
args = parser.parse_args()
# Apply runtime overrides from command-line arguments
set_config(
postgres_host=args.postgres_host,
postgres_port=args.postgres_port,
postgres_db=args.postgres_db,
postgres_user=args.postgres_user,
postgres_password=args.postgres_password,
)
# Now all modules use the command-line provided config
# Import order doesn't matter!
4. Testing Support
from ry_pg_utils.config import reset_config, set_config, has_config_overrides
# Reset config to clean state (useful for test isolation)
reset_config()
# Set test-specific configuration
set_config(postgres_host="test-db", postgres_port=5432)
# Check if overrides have been applied
if has_config_overrides():
print("Custom config is active")
Configuration Options
| Option | Type | Default | Description |
|---|---|---|---|
postgres_host |
str|None | From env | PostgreSQL server hostname |
postgres_port |
int|None | From env | PostgreSQL server port |
postgres_db |
str|None | From env | Database name |
postgres_user |
str|None | From env | Database username |
postgres_password |
str|None | From env | Database password |
ssh_host |
str|None | From env | SSH tunnel hostname |
ssh_port |
int|None | From env | SSH tunnel port |
ssh_user |
str|None | From env | SSH tunnel username |
ssh_key_path |
str|None | From env | Path to SSH private key |
backend_id |
str | POSTGRES_USER or hostname_ip | Unique identifier for this backend instance |
add_backend_to_all |
bool | False | Add backend_id column to all tables |
add_backend_to_tables |
bool | False | Append backend_id to table names |
raise_on_use_before_init |
bool | True | Raise exception if DB used before initialization |
do_publish_db |
bool | True | Enable database publishing features (for Redis integration) |
use_local_db_only |
bool | True | Use only local database connections |
Configuration API
from ry_pg_utils.config import (
get_config, # Get current configuration (lazy-loaded)
set_config, # Set configuration values at runtime
reset_config, # Reset to defaults (reload from environment)
has_config_overrides, # Check if runtime overrides are active
pg_config, # Backward compatibility function
)
# Get configuration
config = get_config()
# Set single or multiple values
set_config(postgres_host="localhost")
set_config(
postgres_host="localhost",
postgres_port=5432,
add_backend_to_tables=True
)
# Check override status
if has_config_overrides():
print("Using custom configuration")
# Reset to environment defaults
reset_config()
Why this matters: The lazy-loading, thread-safe design eliminates race conditions when setting config at runtime. You can safely call set_config() in your main function, and ALL modules—even those imported before the call—will see the updated values. No more import order issues!
Quick Start
1. Initialize Database Connection
from ry_pg_utils.connect import init_database, ManagedSession
# Initialize the database connection
init_database(
db_name="myapp_db",
db_host="localhost",
db_port=5432,
db_user="postgres",
db_password="secret"
)
2. Use Dynamic Tables with Protocol Buffers
from ry_pg_utils.dynamic_table import DynamicTableDb
from your_app.proto import YourMessagePb
# Create a message
message = YourMessagePb()
message.field1 = "value1"
message.field2 = 42
# Log message to database (table created automatically)
DynamicTableDb.log_data_to_db(
msg=message,
db_name="myapp",
channel="my_channel"
)
# Check if data exists
exists = DynamicTableDb.is_in_db(
msg=message,
db_name="myapp",
channel="my_channel",
attr="field1",
value="value1"
)
3. Manual Session Management
from ry_pg_utils.connect import ManagedSession
from sqlalchemy import text
# Use context manager for automatic session cleanup
with ManagedSession(db="myapp_db") as session:
if session:
result = session.execute(text("SELECT * FROM my_table"))
for row in result:
print(row)
Core Components
connect.py - Connection Management
The connection module provides thread-safe database connection and session management:
from ry_pg_utils.connect import (
init_database, # Initialize database connection
init_engine, # Initialize SQLAlchemy engine
ManagedSession, # Context manager for sessions
get_backend_id, # Get current backend ID
set_backend_id, # Set backend ID for thread
get_engine, # Get engine for database
close_engine, # Close database connection
clear_db, # Clear all connections
get_table_name, # Get table name with backend suffix
is_database_initialized, # Check if database is initialized
Base, # SQLAlchemy declarative base
)
Key Features:
- Thread-local backend ID tracking
- Connection pooling with configurable parameters (pool_size, max_overflow, pool_recycle)
- Automatic connection recovery with retry logic (using tenacity)
- Session scoping for thread safety
- Automatic backend_id injection before flush operations
- Pre-ping health checks to validate connections
- Support for auto-importing model modules
dynamic_table.py - Dynamic Table Creation
Automatically create and manage database tables from Protocol Buffer definitions:
from ry_pg_utils.dynamic_table import DynamicTableDb
# Create instance
db = DynamicTableDb(db_name="myapp")
# Add message to database
db.add_message(
channel_name="events",
message_pb=my_protobuf_message,
log_print_failure=True,
verbose=True
)
# Check existence
exists = db.inst_is_in_db(
message_pb=my_protobuf_message,
channel_name="events",
attr="event_id",
value=12345
)
Supported Protocol Buffer Types:
int32,int64,uint32,uint64→ PostgreSQLIntegerfloat,double→ PostgreSQLFloatbool→ PostgreSQLBooleanstring→ PostgreSQLStringbytes→ PostgreSQLLargeBinaryTimestamp(message) → PostgreSQLDateTime
postgres_info.py - Connection Information
Class for PostgreSQL connection parameters:
from ry_pg_utils.postgres_info import PostgresInfo
db_info = PostgresInfo(
db_name="mydb",
host="localhost",
port=5432,
user="postgres",
password="secret"
)
# Check if valid
if not db_info.is_null():
print(db_info) # Prints info with password masked
# Create null instance
null_info = PostgresInfo.null()
parse_args.py - Argument Parsing
Add PostgreSQL arguments to your argument parser:
import argparse
from ry_pg_utils.parse_args import add_postgres_db_args
parser = argparse.ArgumentParser()
add_postgres_db_args(parser)
args = parser.parse_args()
# Access: args.postgres_host, args.postgres_port, etc.
updater.py - Database Configuration Updater
Dynamically update database connections based on configuration messages via Redis:
from ry_pg_utils.updater import DbUpdater
from ry_redis_bus.helpers import RedisInfo
from ryutils.verbose import Verbose
updater = DbUpdater(
redis_info=redis_info,
args=args, # argparse.Namespace with postgres config
verbose=Verbose(True),
logging_error_db_callback=None, # Optional error logging callback
models_module="myapp.models" # Optional: auto-import SQLAlchemy models
)
# Initialize and start listening for configuration updates
updater.init()
# Run the update loop
updater.run() # Blocks, continuously checking for updates
Key Features:
- Automatic database initialization with connection pooling
- Dynamic database switching via Redis messages
- Optional auto-importing of SQLAlchemy models from specified module
- Publish database configuration changes to Redis channels
- Error logging callback support
- Automatic retry logic for database connection failures
notify_trigger.py - PostgreSQL LISTEN/NOTIFY Support
Create database triggers that send notifications on table changes:
from ry_pg_utils.notify_trigger import (
create_notify_trigger,
drop_notify_trigger,
subscribe_to_notifications,
NotificationListener,
)
from ry_pg_utils.connect import get_engine
engine = get_engine("myapp_db")
# Create a trigger that notifies on INSERT/UPDATE/DELETE
create_notify_trigger(
engine=engine,
table_name="users",
channel_name="user_changes",
events=["INSERT", "UPDATE", "DELETE"],
columns=["id", "username", "email"] # Optional: only include specific columns
)
# Subscribe to notifications
def handle_notification(notification):
print(f"Table: {notification['table']}")
print(f"Action: {notification['action']}")
print(f"Data: {notification['data']}")
with subscribe_to_notifications(
engine=engine,
channel_name="user_changes",
callback=handle_notification,
timeout=60.0
) as notifications:
# Notifications are handled in background thread
time.sleep(10)
# Or use the NotificationListener class for long-running listeners
listener = NotificationListener(db_name="myapp_db")
listener.create_listener(
table_name="users",
channel_name="user_changes",
events=["INSERT", "UPDATE"]
)
listener.add_callback("user_changes", handle_notification)
listener.start()
# ... your application code ...
listener.stop()
ipc/channels.py - Redis Communication Channels
Pre-defined Redis channels for database configuration updates (requires ry_redis_bus):
from ry_pg_utils.ipc.channels import (
DATABASE_CHANNEL, # DatabaseConfigPb messages
DATABASE_CONFIG_CHANNEL, # DatabaseSettingsPb messages
DATABASE_NOTIFY_CHANNEL, # DatabaseNotificationPb messages
)
# These channels are used by DbUpdater for dynamic database configuration
Advanced Usage
Multi-Backend Support
When add_backend_to_all is enabled (default: True), all tables automatically get a backend_id column:
from ry_pg_utils.connect import set_backend_id, ManagedSession
from sqlalchemy import text
# Set backend ID for current thread
set_backend_id("backend_1")
# All subsequent operations will include this backend_id
# ManagedSession automatically sets the backend_id if provided
with ManagedSession(db="myapp_db", backend_id="backend_1") as session:
if session:
# New/dirty records automatically get backend_id injected before flush
result = session.execute(text("SELECT * FROM my_table"))
Custom Table Names
When add_backend_to_tables is enabled (default: True), table names are automatically suffixed:
from ry_pg_utils.connect import get_table_name
from ry_pg_utils.config import pg_config
# Returns "events_my_backend" if add_backend_to_tables=True
table_name = get_table_name("events", backend_id="my_backend", verbose=True)
# Configure globally
pg_config.add_backend_to_tables = False # Disable backend suffix
ORM Base Class
Use the pre-configured base class for SQLAlchemy models:
from ry_pg_utils.connect import Base
from sqlalchemy import Column, Integer, String
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(100))
email = Column(String(200))
# If add_backend_to_all=True (default), backend_id column is automatically added
# The backend_id is a String(256) column, nullable=False
Auto-Importing Models
Use the models_module parameter to automatically import all models before table creation:
from ry_pg_utils.connect import init_database
# This will walk through 'myapp.models' and import all submodules
# ensuring all model classes are registered with Base.metadata
init_database(
db_name="myapp_db",
db_host="localhost",
db_port=5432,
db_user="postgres",
db_password="secret",
models_module="myapp.models" # Dot-separated module path
)
Error Handling
The library includes robust error handling with automatic retries:
from ry_pg_utils.connect import ManagedSession
from sqlalchemy import text
with ManagedSession(db="myapp_db") as session:
if session is None:
# Connection failed after retries, handle gracefully
print("Failed to establish database connection")
return
try:
session.execute(text("SELECT * FROM my_table"))
except Exception as e:
# Session will automatically rollback on exception
print(f"Query failed: {e}")
# Retries are built-in:
# - Session operations retry 3 times on OperationalError
# - Exponential backoff: min 4s, max 10s
# - Connection health checks via pool_pre_ping
Type Safety
The library is fully typed and includes a py.typed marker for mypy support:
# Run type checking
mypy your_app.py
Development
Setup Development Environment
# Clone the repository
git clone https://github.com/yourusername/ry-pg-utils.git
cd ry-pg-utils
# Create virtual environment
python -m venv venv-dev
source venv-dev/bin/activate # On Windows: venv-dev\Scripts\activate
# Install dependencies
pip install -r packages/requirements-dev.txt
Running Tests
Tests require a running PostgreSQL instance. Configure test database connection via environment variables or .env file.
# Activate virtual environment
source venv-dev/bin/activate
# Run all tests
make test
# Run specific test module
make test TESTMODULE=connect_test
Code Quality
The project uses several tools for code quality:
# Format code
make format
# Run linting
make lint_full
# Type checking is included in lint_full (uses mypy)
Examples
Complete Application Example
import argparse
from ry_pg_utils.parse_args import add_postgres_db_args
from ry_pg_utils.connect import init_database, ManagedSession
from ry_pg_utils.dynamic_table import DynamicTableDb
from sqlalchemy import text
def parse_args():
parser = argparse.ArgumentParser(description="My Database App")
add_postgres_db_args(parser)
return parser.parse_args()
def main():
args = parse_args()
# Initialize database
init_database(
db_name=args.postgres_db,
db_host=args.postgres_host,
db_port=args.postgres_port,
db_user=args.postgres_user,
db_password=args.postgres_password
)
# Use the database
with ManagedSession(db=args.postgres_db) as session:
if session:
result = session.execute(text("SELECT version()"))
print(f"PostgreSQL version: {result.fetchone()[0]}")
if __name__ == "__main__":
main()
Real-time Notification Example
from ry_pg_utils.connect import init_database, get_engine, Base
from ry_pg_utils.notify_trigger import create_notify_trigger, NotificationListener
from sqlalchemy import Column, Integer, String
import time
# Define a model
class Product(Base):
__tablename__ = 'products'
id = Column(Integer, primary_key=True)
name = Column(String(100))
price = Column(Integer)
# Initialize database
init_database(
db_name="inventory_db",
db_host="localhost",
db_port=5432,
db_user="postgres",
db_password="secret"
)
# Create notification trigger
engine = get_engine("inventory_db")
create_notify_trigger(
engine=engine,
table_name="products",
channel_name="product_updates",
events=["INSERT", "UPDATE", "DELETE"],
columns=["id", "name", "price"]
)
# Set up listener
listener = NotificationListener(db_name="inventory_db")
def on_product_change(notification):
action = notification['action']
data = notification['data']
print(f"Product {action}: {data}")
listener.add_callback("product_updates", on_product_change)
listener.start()
# Your application runs...
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
listener.stop()
License
This project is licensed under the MIT License - see the LICENSE file for details.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
Author
Ross Yeager - ryeager12@email.com
Changelog
Version 1.1.0 (Current)
Configuration System Improvements:
- Thread-Safe Config: New lazy-loaded, thread-safe configuration singleton pattern
- Runtime Overrides:
set_config()function for programmatic configuration changes - Import Order Independence: Config overrides work regardless of when modules are imported
- Config API: New functions:
get_config(),set_config(),reset_config(),has_config_overrides() - Perfect for CLI Apps: Easy integration with argument parsing for runtime config
- Test Support:
reset_config()for clean test isolation
Breaking Changes:
- Config is now accessed via
get_config()function instead ofpg_configobject - For backward compatibility,
pg_config()function still available (returns same config)
Version 1.0.2
- PostgreSQL LISTEN/NOTIFY support with triggers and notifications
- NotificationListener class for background notification handling
- Automatic connection health checks with pool_pre_ping
- Auto-importing models from specified module paths
- Enhanced retry logic with tenacity
- Improved error handling and connection recovery
Version 1.0.0
- Initial release
- Database connection management with pooling
- Dynamic table creation from Protocol Buffers
- Multi-backend support with automatic ID tagging
- Configuration system with environment variables
- Protocol Buffer integration
- Redis-based database configuration updates
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ry_pg_utils-2.0.5.tar.gz.
File metadata
- Download URL: ry_pg_utils-2.0.5.tar.gz
- Upload date:
- Size: 32.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
402220cf43a6710e197b58f2d8bbd99704268f33e8a757f1bd2146bc9343170b
|
|
| MD5 |
ef38474ff920b14a3aa3cf4975703b5e
|
|
| BLAKE2b-256 |
84c19319292cd7a71e52445d8ef88d4e06f5b254cf5c6082b21f0b47eecbd639
|
Provenance
The following attestation bundles were made for ry_pg_utils-2.0.5.tar.gz:
Publisher:
python-publish.yml on droneshire/ry-pg-utils
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ry_pg_utils-2.0.5.tar.gz -
Subject digest:
402220cf43a6710e197b58f2d8bbd99704268f33e8a757f1bd2146bc9343170b - Sigstore transparency entry: 621672543
- Sigstore integration time:
-
Permalink:
droneshire/ry-pg-utils@32dbfc7fb713d566b26a5376d33ecf2fb7b3af8c -
Branch / Tag:
refs/tags/2.0.5 - Owner: https://github.com/droneshire
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@32dbfc7fb713d566b26a5376d33ecf2fb7b3af8c -
Trigger Event:
release
-
Statement type:
File details
Details for the file ry_pg_utils-2.0.5-py3-none-any.whl.
File metadata
- Download URL: ry_pg_utils-2.0.5-py3-none-any.whl
- Upload date:
- Size: 29.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4dbc33b44f6d04f67bfb3417ac353264b687aa2c564823dda38778732f12fdda
|
|
| MD5 |
8d1b64ea106b8226996a077f573cf8c3
|
|
| BLAKE2b-256 |
bf7588b4fe9fddae0a7f069ac0236c405e74c94bd7e640ac96385f7ab69561d5
|
Provenance
The following attestation bundles were made for ry_pg_utils-2.0.5-py3-none-any.whl:
Publisher:
python-publish.yml on droneshire/ry-pg-utils
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ry_pg_utils-2.0.5-py3-none-any.whl -
Subject digest:
4dbc33b44f6d04f67bfb3417ac353264b687aa2c564823dda38778732f12fdda - Sigstore transparency entry: 621672547
- Sigstore integration time:
-
Permalink:
droneshire/ry-pg-utils@32dbfc7fb713d566b26a5376d33ecf2fb7b3af8c -
Branch / Tag:
refs/tags/2.0.5 - Owner: https://github.com/droneshire
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@32dbfc7fb713d566b26a5376d33ecf2fb7b3af8c -
Trigger Event:
release
-
Statement type: