Production-grade async database connection pooling for Netrun Systems services
Project description
netrun-db-pool
Production-grade async database connection pooling for Netrun Systems services.
IMPORTANT: Migration to v2.0.0 (Namespace Structure)
Version 2.0.0 introduces a breaking change in import paths. The package has been migrated to the netrun.db namespace structure for consistency with other Netrun packages.
New Import Path
# Old (v1.x - deprecated but still works with warnings)
from netrun_db_pool import AsyncDatabasePool, PoolConfig
from netrun_db_pool.middleware import get_db_dependency
# New (v2.0+ - recommended)
from netrun.db import AsyncDatabasePool, PoolConfig
from netrun.db.middleware import get_db_dependency
Backwards Compatibility
Version 2.0.0 maintains full backwards compatibility through deprecation shims. Old imports will continue to work but will issue DeprecationWarning. The old import path will be removed in version 3.0.0 (estimated Q2 2026).
Migration Checklist
- Update all imports from
netrun_db_pooltonetrun.db - Update all imports from
netrun_db_pool.middlewaretonetrun.db.middleware - Update
requirements.txtorpyproject.tomlto usenetrun-db-pool>=2.0.0 - Test thoroughly in development environment
- Deploy to production
Features
- AsyncPG Performance: SQLAlchemy 2.0+ with asyncpg driver for maximum PostgreSQL performance
- Multi-Tenant RLS: Built-in Row-Level Security support for SaaS applications
- Health Monitoring: Comprehensive pool metrics and health checks
- FastAPI Integration: Drop-in dependency injection and middleware
- Production Ready: Connection pre-ping, automatic recycling, configurable pooling
- Type Safe: Full Pydantic configuration with environment variable support
Installation
pip install netrun-db-pool
Optional Dependencies
# FastAPI integration
pip install netrun-db-pool[fastapi]
# Development tools
pip install netrun-db-pool[dev]
Quick Start
Basic Usage
from netrun.db import AsyncDatabasePool, PoolConfig
from sqlalchemy import select, text
# Create pool from environment variables
pool = AsyncDatabasePool.from_env()
await pool.initialize()
# Use with context manager
async with pool.get_session() as session:
result = await session.execute(text("SELECT version()"))
print(result.scalar())
FastAPI Integration
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from netrun.db import AsyncDatabasePool
from netrun.db.middleware import get_db_dependency
app = FastAPI()
pool = AsyncDatabasePool.from_env()
# Create dependency
get_db = get_db_dependency(pool)
@app.on_event("startup")
async def startup():
await pool.initialize()
@app.on_event("shutdown")
async def shutdown():
await pool.close()
@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User))
return result.scalars().all()
@app.get("/health/db")
async def db_health():
health = await pool.health_check_detailed()
return health.to_dict()
Multi-Tenant with RLS
from fastapi import FastAPI, Request, Depends
from netrun.db import TenantAwareDatabasePool
from netrun.db.middleware import (
get_tenant_db_dependency,
TenantContextMiddleware,
)
app = FastAPI()
pool = TenantAwareDatabasePool.from_env()
# Extract tenant_id from JWT
def extract_tenant_id(request: Request) -> str:
token = request.headers.get("Authorization", "").replace("Bearer ", "")
claims = decode_jwt(token) # Your JWT decoder
return claims.get("tenant_id")
def get_tenant_id(request: Request) -> str:
return request.state.tenant_id
# Add middleware to extract tenant context
app.add_middleware(
TenantContextMiddleware,
tenant_id_extractor=extract_tenant_id,
)
# Create tenant-aware dependency
get_tenant_db = get_tenant_db_dependency(
pool=pool,
tenant_id_extractor=get_tenant_id,
)
@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_tenant_db)):
# Only current tenant's users are accessible (RLS enforced)
result = await db.execute(select(User))
return result.scalars().all()
PostgreSQL RLS Setup
-- Enable RLS on table
ALTER TABLE users ENABLE ROW LEVEL SECURITY;
-- Create RLS policy
CREATE POLICY tenant_isolation ON users
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);
-- Verify RLS is enabled
SELECT relname, relrowsecurity
FROM pg_class
WHERE relname = 'users';
Configuration
Environment Variables
# Database connection
DATABASE_URL=postgresql+asyncpg://user:pass@localhost/db
# Pool settings
DB_POOL_SIZE=20 # Base pool size
DB_MAX_OVERFLOW=10 # Overflow connections (total max: 30)
DB_POOL_TIMEOUT=30 # Connection acquisition timeout (seconds)
DB_POOL_RECYCLE=3600 # Recycle connections after 1 hour
DB_COMMAND_TIMEOUT=60 # Query timeout (seconds)
# Application settings
DB_APP_NAME=my-service # PostgreSQL application name
DB_ENABLE_JIT=false # Disable JIT for consistent performance
DB_ECHO=false # Enable SQLAlchemy query logging (dev only)
Programmatic Configuration
from netrun.db import AsyncDatabasePool, PoolConfig
config = PoolConfig(
database_url="postgresql+asyncpg://user:pass@localhost/db",
pool_size=20,
max_overflow=10,
pool_recycle=3600,
app_name="my-service",
)
pool = AsyncDatabasePool(config=config)
Health Monitoring
Simple Health Check
is_healthy = await pool.health_check()
print(f"Database healthy: {is_healthy}")
Detailed Health Metrics
health = await pool.health_check_detailed()
print(f"Status: {health.status}")
print(f"Pool utilization: {health.utilization_percent}%")
print(f"Available connections: {health.available_connections}")
print(f"Checked out: {health.checked_out}")
print(f"Latency: {health.latency_ms}ms")
# Convert to dict for JSON response
return health.to_dict()
Health Check Response
{
"healthy": true,
"status": "healthy",
"pool_size": 20,
"checked_in_connections": 18,
"checked_out_connections": 2,
"overflow_connections": 0,
"max_overflow": 10,
"utilization_percent": 6.67,
"available_connections": 28,
"latency_ms": 2.34,
"database": "postgresql",
"timestamp": "2025-11-25T10:30:00Z"
}
Advanced Features
Verify RLS is Enabled
pool = TenantAwareDatabasePool.from_env()
# Check if RLS is enabled for a table
is_protected = await pool.verify_rls_enabled("users")
if not is_protected:
logger.warning("RLS not enabled for users table!")
Get Current Tenant Context
async with pool.get_tenant_session(tenant_id="550e8400-...") as session:
current_tenant = await pool.get_current_tenant_context(session)
print(f"Operating in tenant context: {current_tenant}")
Custom Session Factory
# Access SQLAlchemy engine for advanced use cases
engine = pool.engine
# Create custom session factory
from sqlalchemy.ext.asyncio import async_sessionmaker
custom_factory = async_sessionmaker(
engine,
expire_on_commit=True, # Different from default
autoflush=True,
)
Best Practices
Connection Pool Sizing
# For typical web applications (FastAPI, Django)
pool_size = min(
10 * num_cpu_cores, # 10 connections per core
max_database_connections / num_app_instances,
)
# Example: 4 cores, 100 max DB connections, 2 app instances
# pool_size = min(40, 50) = 40
# max_overflow = pool_size * 0.5 = 20
PostgreSQL Optimizations
config = PoolConfig(
database_url="postgresql+asyncpg://...",
pool_pre_ping=True, # Verify connections before use
pool_recycle=3600, # Recycle hourly (Azure requires < 4 hours)
enable_jit=False, # Disable JIT for consistent query planning
command_timeout=60, # Prevent long-running queries
)
Error Handling
from sqlalchemy.exc import OperationalError, TimeoutError
async with pool.get_session() as session:
try:
result = await session.execute(select(User))
return result.scalars().all()
except OperationalError as e:
logger.error(f"Database connection error: {e}")
raise
except TimeoutError:
logger.error("Query timeout exceeded")
raise
Testing
Unit Tests with SQLite
import pytest
from netrun.db import AsyncDatabasePool, PoolConfig
@pytest.fixture
async def db_pool():
config = PoolConfig(
database_url="sqlite+aiosqlite:///:memory:",
pool_size=5,
echo=True,
)
pool = AsyncDatabasePool(config=config)
await pool.initialize()
yield pool
await pool.close()
@pytest.mark.asyncio
async def test_health_check(db_pool):
health = await db_pool.health_check_detailed()
assert health.healthy is True
assert health.status == "healthy"
Integration Tests with PostgreSQL
# docker-compose.test.yml
services:
postgres:
image: postgres:16
environment:
POSTGRES_PASSWORD: test_password
POSTGRES_DB: test_db
ports:
- "5432:5432"
@pytest.fixture(scope="session")
async def db_pool():
config = PoolConfig(
database_url="postgresql+asyncpg://postgres:test_password@localhost/test_db",
)
pool = AsyncDatabasePool(config=config)
await pool.initialize()
yield pool
await pool.close()
Performance Benchmarks
Based on Netrun Systems internal testing (PostgreSQL on Azure Flexible Server):
| Metric | AsyncDatabasePool | Native asyncpg | psycopg3 |
|---|---|---|---|
| Query Latency (p95) | 8.2ms | 7.1ms | 12.4ms |
| Throughput (queries/sec) | 1,240 | 1,380 | 890 |
| Connection Acquisition | 0.8ms | 0.5ms | 1.2ms |
| Memory per connection | 2.1MB | 1.8MB | 3.4MB |
Conclusion: AsyncDatabasePool provides 95% of native asyncpg performance while adding ORM benefits, health monitoring, and RLS support.
Migration from Existing Pools
From SQLAlchemy async_sessionmaker
# Before
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
engine = create_async_engine(DATABASE_URL, pool_size=20)
AsyncSessionLocal = async_sessionmaker(engine)
async def get_db():
async with AsyncSessionLocal() as session:
yield session
# After
from netrun.db import AsyncDatabasePool
from netrun.db.middleware import get_db_dependency
pool = AsyncDatabasePool.from_env()
get_db = get_db_dependency(pool)
From Native asyncpg
# Before
import asyncpg
pool = await asyncpg.create_pool(
host="localhost",
database="mydb",
user="user",
password="pass",
min_size=10,
max_size=20,
)
async with pool.acquire() as conn:
rows = await conn.fetch("SELECT * FROM users")
# After
from netrun.db import AsyncDatabasePool
pool = AsyncDatabasePool.from_env()
await pool.initialize()
async with pool.get_session() as session:
result = await session.execute(select(User))
users = result.scalars().all()
Contributing
Contributions welcome! Please submit issues and pull requests to: https://github.com/netrunsystems/netrun-db-pool
License
MIT License - see LICENSE file for details.
Support
- Documentation: https://github.com/netrunsystems/netrun-db-pool
- Issues: https://github.com/netrunsystems/netrun-db-pool/issues
- Email: engineering@netrunsystems.com
Credits
Developed by Netrun Systems as part of the Service Consolidation Initiative (2025). Based on production patterns from 8+ services across the Netrun portfolio.
Related Projects:
- Service #03 (Intirkast): Multi-tenant SaaS platform
- Service #71: Unified Database Pool (this project)
- Service #61: Unified Logging
netrun-db-pool v2.0.0 - Production-grade database pooling for the async era.
Breaking Changes in v2.0.0: Migrated to netrun.db namespace. See migration guide above.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file netrun_db_pool-2.0.0.tar.gz.
File metadata
- Download URL: netrun_db_pool-2.0.0.tar.gz
- Upload date:
- Size: 25.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b712a69abb9842b231fbf4ef5a8dc1a3e9fe8f4c5ef022a1a03efc1002d427c6
|
|
| MD5 |
c713db010ead188054919c377945d03a
|
|
| BLAKE2b-256 |
9f93558887c1473a151ba6e80662db975bb90394de36b6ba2cc8e9b2205aae4c
|
File details
Details for the file netrun_db_pool-2.0.0-py3-none-any.whl.
File metadata
- Download URL: netrun_db_pool-2.0.0-py3-none-any.whl
- Upload date:
- Size: 27.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
859869c215f1b0c887c96c249e27de752cc01626be85e2220df0aa731cad0035
|
|
| MD5 |
45e0e5ba5fa71f9a256a20286cda2567
|
|
| BLAKE2b-256 |
6c725d09e6507006e6b837dc45d6b9fc85478032f75183d5017461db5d8a5114
|