Skip to main content

Production-grade async database connection pooling for Netrun Systems services

Project description

netrun-db-pool

Production-grade async database connection pooling for Netrun Systems services.

IMPORTANT: Migration to v2.0.0 (Namespace Structure)

Version 2.0.0 introduces a breaking change in import paths. The package has been migrated to the netrun.db namespace structure for consistency with other Netrun packages.

New Import Path

# Old (v1.x - deprecated but still works with warnings)
from netrun_db_pool import AsyncDatabasePool, PoolConfig
from netrun_db_pool.middleware import get_db_dependency

# New (v2.0+ - recommended)
from netrun.db import AsyncDatabasePool, PoolConfig
from netrun.db.middleware import get_db_dependency

Backwards Compatibility

Version 2.0.0 maintains full backwards compatibility through deprecation shims. Old imports will continue to work but will issue DeprecationWarning. The old import path will be removed in version 3.0.0 (estimated Q2 2026).

Migration Checklist

  • Update all imports from netrun_db_pool to netrun.db
  • Update all imports from netrun_db_pool.middleware to netrun.db.middleware
  • Update requirements.txt or pyproject.toml to use netrun-db-pool>=2.0.0
  • Test thoroughly in development environment
  • Deploy to production

Features

  • AsyncPG Performance: SQLAlchemy 2.0+ with asyncpg driver for maximum PostgreSQL performance
  • Multi-Tenant RLS: Built-in Row-Level Security support for SaaS applications
  • Health Monitoring: Comprehensive pool metrics and health checks
  • FastAPI Integration: Drop-in dependency injection and middleware
  • Production Ready: Connection pre-ping, automatic recycling, configurable pooling
  • Type Safe: Full Pydantic configuration with environment variable support

Installation

pip install netrun-db-pool

Optional Dependencies

# FastAPI integration
pip install netrun-db-pool[fastapi]

# Development tools
pip install netrun-db-pool[dev]

Quick Start

Basic Usage

from netrun.db import AsyncDatabasePool, PoolConfig
from sqlalchemy import select, text

# Create pool from environment variables
pool = AsyncDatabasePool.from_env()
await pool.initialize()

# Use with context manager
async with pool.get_session() as session:
    result = await session.execute(text("SELECT version()"))
    print(result.scalar())

FastAPI Integration

from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from netrun.db import AsyncDatabasePool
from netrun.db.middleware import get_db_dependency

app = FastAPI()
pool = AsyncDatabasePool.from_env()

# Create dependency
get_db = get_db_dependency(pool)

@app.on_event("startup")
async def startup():
    await pool.initialize()

@app.on_event("shutdown")
async def shutdown():
    await pool.close()

@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(User))
    return result.scalars().all()

@app.get("/health/db")
async def db_health():
    health = await pool.health_check_detailed()
    return health.to_dict()

Multi-Tenant with RLS

from fastapi import FastAPI, Request, Depends
from netrun.db import TenantAwareDatabasePool
from netrun.db.middleware import (
    get_tenant_db_dependency,
    TenantContextMiddleware,
)

app = FastAPI()
pool = TenantAwareDatabasePool.from_env()

# Extract tenant_id from JWT
def extract_tenant_id(request: Request) -> str:
    token = request.headers.get("Authorization", "").replace("Bearer ", "")
    claims = decode_jwt(token)  # Your JWT decoder
    return claims.get("tenant_id")

def get_tenant_id(request: Request) -> str:
    return request.state.tenant_id

# Add middleware to extract tenant context
app.add_middleware(
    TenantContextMiddleware,
    tenant_id_extractor=extract_tenant_id,
)

# Create tenant-aware dependency
get_tenant_db = get_tenant_db_dependency(
    pool=pool,
    tenant_id_extractor=get_tenant_id,
)

@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_tenant_db)):
    # Only current tenant's users are accessible (RLS enforced)
    result = await db.execute(select(User))
    return result.scalars().all()

PostgreSQL RLS Setup

-- Enable RLS on table
ALTER TABLE users ENABLE ROW LEVEL SECURITY;

-- Create RLS policy
CREATE POLICY tenant_isolation ON users
    USING (tenant_id = current_setting('app.current_tenant_id')::uuid);

-- Verify RLS is enabled
SELECT relname, relrowsecurity
FROM pg_class
WHERE relname = 'users';

Configuration

Environment Variables

# Database connection
DATABASE_URL=postgresql+asyncpg://user:pass@localhost/db

# Pool settings
DB_POOL_SIZE=20                # Base pool size
DB_MAX_OVERFLOW=10             # Overflow connections (total max: 30)
DB_POOL_TIMEOUT=30             # Connection acquisition timeout (seconds)
DB_POOL_RECYCLE=3600           # Recycle connections after 1 hour
DB_COMMAND_TIMEOUT=60          # Query timeout (seconds)

# Application settings
DB_APP_NAME=my-service         # PostgreSQL application name
DB_ENABLE_JIT=false            # Disable JIT for consistent performance
DB_ECHO=false                  # Enable SQLAlchemy query logging (dev only)

Programmatic Configuration

from netrun.db import AsyncDatabasePool, PoolConfig

config = PoolConfig(
    database_url="postgresql+asyncpg://user:pass@localhost/db",
    pool_size=20,
    max_overflow=10,
    pool_recycle=3600,
    app_name="my-service",
)

pool = AsyncDatabasePool(config=config)

Health Monitoring

Simple Health Check

is_healthy = await pool.health_check()
print(f"Database healthy: {is_healthy}")

Detailed Health Metrics

health = await pool.health_check_detailed()
print(f"Status: {health.status}")
print(f"Pool utilization: {health.utilization_percent}%")
print(f"Available connections: {health.available_connections}")
print(f"Checked out: {health.checked_out}")
print(f"Latency: {health.latency_ms}ms")

# Convert to dict for JSON response
return health.to_dict()

Health Check Response

{
  "healthy": true,
  "status": "healthy",
  "pool_size": 20,
  "checked_in_connections": 18,
  "checked_out_connections": 2,
  "overflow_connections": 0,
  "max_overflow": 10,
  "utilization_percent": 6.67,
  "available_connections": 28,
  "latency_ms": 2.34,
  "database": "postgresql",
  "timestamp": "2025-11-25T10:30:00Z"
}

Advanced Features

Verify RLS is Enabled

pool = TenantAwareDatabasePool.from_env()

# Check if RLS is enabled for a table
is_protected = await pool.verify_rls_enabled("users")
if not is_protected:
    logger.warning("RLS not enabled for users table!")

Get Current Tenant Context

async with pool.get_tenant_session(tenant_id="550e8400-...") as session:
    current_tenant = await pool.get_current_tenant_context(session)
    print(f"Operating in tenant context: {current_tenant}")

Custom Session Factory

# Access SQLAlchemy engine for advanced use cases
engine = pool.engine

# Create custom session factory
from sqlalchemy.ext.asyncio import async_sessionmaker

custom_factory = async_sessionmaker(
    engine,
    expire_on_commit=True,  # Different from default
    autoflush=True,
)

Best Practices

Connection Pool Sizing

# For typical web applications (FastAPI, Django)
pool_size = min(
    10 * num_cpu_cores,  # 10 connections per core
    max_database_connections / num_app_instances,
)

# Example: 4 cores, 100 max DB connections, 2 app instances
# pool_size = min(40, 50) = 40
# max_overflow = pool_size * 0.5 = 20

PostgreSQL Optimizations

config = PoolConfig(
    database_url="postgresql+asyncpg://...",
    pool_pre_ping=True,      # Verify connections before use
    pool_recycle=3600,       # Recycle hourly (Azure requires < 4 hours)
    enable_jit=False,        # Disable JIT for consistent query planning
    command_timeout=60,      # Prevent long-running queries
)

Error Handling

from sqlalchemy.exc import OperationalError, TimeoutError

async with pool.get_session() as session:
    try:
        result = await session.execute(select(User))
        return result.scalars().all()
    except OperationalError as e:
        logger.error(f"Database connection error: {e}")
        raise
    except TimeoutError:
        logger.error("Query timeout exceeded")
        raise

Testing

Unit Tests with SQLite

import pytest
from netrun.db import AsyncDatabasePool, PoolConfig

@pytest.fixture
async def db_pool():
    config = PoolConfig(
        database_url="sqlite+aiosqlite:///:memory:",
        pool_size=5,
        echo=True,
    )
    pool = AsyncDatabasePool(config=config)
    await pool.initialize()
    yield pool
    await pool.close()

@pytest.mark.asyncio
async def test_health_check(db_pool):
    health = await db_pool.health_check_detailed()
    assert health.healthy is True
    assert health.status == "healthy"

Integration Tests with PostgreSQL

# docker-compose.test.yml
services:
  postgres:
    image: postgres:16
    environment:
      POSTGRES_PASSWORD: test_password
      POSTGRES_DB: test_db
    ports:
      - "5432:5432"
@pytest.fixture(scope="session")
async def db_pool():
    config = PoolConfig(
        database_url="postgresql+asyncpg://postgres:test_password@localhost/test_db",
    )
    pool = AsyncDatabasePool(config=config)
    await pool.initialize()
    yield pool
    await pool.close()

Performance Benchmarks

Based on Netrun Systems internal testing (PostgreSQL on Azure Flexible Server):

Metric AsyncDatabasePool Native asyncpg psycopg3
Query Latency (p95) 8.2ms 7.1ms 12.4ms
Throughput (queries/sec) 1,240 1,380 890
Connection Acquisition 0.8ms 0.5ms 1.2ms
Memory per connection 2.1MB 1.8MB 3.4MB

Conclusion: AsyncDatabasePool provides 95% of native asyncpg performance while adding ORM benefits, health monitoring, and RLS support.

Migration from Existing Pools

From SQLAlchemy async_sessionmaker

# Before
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

engine = create_async_engine(DATABASE_URL, pool_size=20)
AsyncSessionLocal = async_sessionmaker(engine)

async def get_db():
    async with AsyncSessionLocal() as session:
        yield session

# After
from netrun.db import AsyncDatabasePool
from netrun.db.middleware import get_db_dependency

pool = AsyncDatabasePool.from_env()
get_db = get_db_dependency(pool)

From Native asyncpg

# Before
import asyncpg

pool = await asyncpg.create_pool(
    host="localhost",
    database="mydb",
    user="user",
    password="pass",
    min_size=10,
    max_size=20,
)

async with pool.acquire() as conn:
    rows = await conn.fetch("SELECT * FROM users")

# After
from netrun.db import AsyncDatabasePool

pool = AsyncDatabasePool.from_env()
await pool.initialize()

async with pool.get_session() as session:
    result = await session.execute(select(User))
    users = result.scalars().all()

Contributing

Contributions welcome! Please submit issues and pull requests to: https://github.com/netrunsystems/netrun-db-pool

License

MIT License - see LICENSE file for details.

Support

Credits

Developed by Netrun Systems as part of the Service Consolidation Initiative (2025). Based on production patterns from 8+ services across the Netrun portfolio.

Related Projects:

  • Service #03 (Intirkast): Multi-tenant SaaS platform
  • Service #71: Unified Database Pool (this project)
  • Service #61: Unified Logging

netrun-db-pool v2.0.0 - Production-grade database pooling for the async era.

Breaking Changes in v2.0.0: Migrated to netrun.db namespace. See migration guide above.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

netrun_db_pool-2.0.0.tar.gz (25.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

netrun_db_pool-2.0.0-py3-none-any.whl (27.9 kB view details)

Uploaded Python 3

File details

Details for the file netrun_db_pool-2.0.0.tar.gz.

File metadata

  • Download URL: netrun_db_pool-2.0.0.tar.gz
  • Upload date:
  • Size: 25.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.12

File hashes

Hashes for netrun_db_pool-2.0.0.tar.gz
Algorithm Hash digest
SHA256 b712a69abb9842b231fbf4ef5a8dc1a3e9fe8f4c5ef022a1a03efc1002d427c6
MD5 c713db010ead188054919c377945d03a
BLAKE2b-256 9f93558887c1473a151ba6e80662db975bb90394de36b6ba2cc8e9b2205aae4c

See more details on using hashes here.

File details

Details for the file netrun_db_pool-2.0.0-py3-none-any.whl.

File metadata

  • Download URL: netrun_db_pool-2.0.0-py3-none-any.whl
  • Upload date:
  • Size: 27.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.12

File hashes

Hashes for netrun_db_pool-2.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 859869c215f1b0c887c96c249e27de752cc01626be85e2220df0aa731cad0035
MD5 45e0e5ba5fa71f9a256a20286cda2567
BLAKE2b-256 6c725d09e6507006e6b837dc45d6b9fc85478032f75183d5017461db5d8a5114

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page