Skip to main content

Advanced SQLAlchemy extension with connection pooling, rate limiting, and security features

Project description

Alchemy-H8: Enhanced Async SQLAlchemy Library

A comprehensive, production-ready async PostgreSQL library built on SQLAlchemy that integrates advanced features:

  • ✅ Async SQLAlchemy with strict typing and generics
  • ✅ Advanced connection pooling with health monitoring
  • ✅ Rate limiting with configurable strategies
  • ✅ Read/write splitting with replica support
  • ✅ Resilient operations with auto-recovery, backoff, and retries
  • ✅ Enhanced security features for database connections

Features

Core Database Features

  • Fully Async: Built on SQLAlchemy's async capabilities
  • Repository Pattern: Type-safe repository pattern for models
  • Unit of Work: Transaction management with ACID guarantees
  • Generic Models: Base models with UUID primary keys
  • Connection Management: Automated connection tracking and cleanup

Connection Pooling

  • Optimized Connection Pool:

    • Configurable pool sizes and overflow settings
    • Connection health monitoring with pre-ping
    • Automatic connection lifecycle management
    • Stale connection detection and cleanup
    • Connection event logging and monitoring
  • Read/Write Splitting:

    • Support for primary and read replica databases
    • True round-robin load distribution across replicas
    • Configurable read-only session routing
    • Seamless transaction management across database roles

Rate Limiting & Backpressure

  • Advanced Rate Limiting:

    • Token bucket algorithm implementation
    • Configurable request limits per time window
    • Multiple rate limiting strategies (token bucket, fixed window, leaky bucket)
    • Per-connection rate tracking
    • Graceful handling of rate limit exceeded scenarios
  • Backpressure Mechanisms:

    • Connection limiting semaphores
    • Configurable maximum concurrency
    • Progressive backoff for resource contention

Security Features

  • Connection Security:

    • SSL/TLS enforcement options
    • Connection timeout configuration
    • Statement timeout controls
    • Sensitive data masking in logs
  • Query Security:

    • SQL injection prevention
    • Parameter sanitization
    • Input validation
    • Secure query execution wrapper

Resilience

  • Retry Logic:

    • Configurable retry attempts with exponential backoff
    • Intelligent transient error detection
    • Jitter to prevent thundering herd problems
    • Detailed retry attempt logging
  • Circuit Breaker Pattern:

    • Automatic detection of database failures
    • Circuit state management (closed, open, half-open)
    • Configurable failure thresholds and recovery timeouts
    • Prevents cascading failures during database outages

Quick Start

Installation

pip install alchemy-h8

Basic Usage

import asyncio
from pydantic import PostgresDsn
from sqlalchemy import Column, String
from sqlalchemy.dialects.postgresql import UUID as PostgresUUID
from sqlalchemy.orm import declarative_base
from uuid import uuid4

from src.alchemy_h8.connection import DBConnectionHandler
from src.alchemy_h8.config import DBConnectionConfig, RateLimiterStrategy

# Define your models
Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(PostgresUUID(as_uuid=True), primary_key=True, default=uuid4)
    name = Column(String, nullable=False)
    email = Column(String, nullable=False, unique=True)

async def main():
    # Configure database connection
    config = DBConnectionConfig(
        url=PostgresDsn.build(
            scheme="postgresql+asyncpg",
            host="localhost",
            port=5432,
            username="postgres",
            password="postgres",
            path="test_db"
        ),
        # Connection pooling settings
        pool_size=5,
        max_overflow=10,
        
        # Enable read replicas (optional)
        read_replica_hosts=[
            PostgresDsn.build(
                scheme="postgresql+asyncpg",
                host="localhost",
                port=5432,
                username="postgres",
                password="postgres",
                path="test_db"
            )
        ],
        use_read_replicas_for_reads=True,
        
        # Enable rate limiting
        use_rate_limiter=True,
        rate_limit_max_requests=50,
        rate_limit_time_window=5,
        rate_limit_strategy=RateLimiterStrategy.TOKEN_BUCKET,
        
        # Retry configuration
        max_retries=3,
        retry_backoff=2,
        
        # Circuit breaker settings
        use_circuit_breaker=True,
        circuit_breaker_threshold=5,
        circuit_breaker_recovery_timeout=30,
    )
    
    # Create and initialize DB handler
    db_handler = DBConnectionHandler(config)
    await db_handler.initialize()
    
    try:
        # Create a new user with write operations on primary
        async with db_handler.async_session_scope() as session:
            # Create a new user
            new_user = User(name="Alice Smith", email="alice@example.com")
            session.add(new_user)
            await session.commit()
            print(f"Created user: {new_user.name}")
        
        # Read operations automatically use replicas
        async with db_handler.async_session_scope(read_only=True) as session:
            # Query with automatic read replica routing
            result = await session.execute("SELECT * FROM users")
            users = result.all()
            print(f"Found {len(users)} users")
            
    finally:
        # Clean up resources
        await db_handler.dispose_gracefully()

if __name__ == "__main__":
    asyncio.run(main())

Advanced Configuration

The library supports extensive configuration options:

config = DBConnectionConfig(
    # Required connection settings
    url="postgresql+asyncpg://user:pass@localhost/db",
    
    # Connection pooling settings
    pool_size=10,                  # Default connection pool size
    max_overflow=20,               # Maximum overflow connections
    pool_timeout=30,               # Timeout waiting for connection
    pool_recycle=1800,             # Recycle connections after 30 minutes
    
    # Rate limiting settings
    use_rate_limiter=True,         # Enable rate limiting
    rate_limit_max_requests=100,   # Max requests per time window
    rate_limit_time_window=1.0,    # Time window in seconds
    rate_limit_strategy=RateLimiterStrategy.TOKEN_BUCKET,
    
    # Retry configuration
    max_retries=3,                 # Maximum retry attempts
    retry_backoff=2,               # Exponential backoff factor
    base_retry_delay=0.1,          # Base delay before first retry
    max_retry_delay=1.0,           # Maximum retry delay
    retry_jitter=0.1,              # Random jitter for retries
    
    # Circuit breaker
    use_circuit_breaker=True,      # Enable circuit breaker
    circuit_breaker_threshold=5,   # Failures before tripping
    circuit_breaker_recovery_timeout=30, # Recovery time in seconds
    
    # Read replicas
    read_replica_hosts=["postgresql+asyncpg://user:pass@replica1/db"],
    use_read_replicas_for_reads=True,
)

Example Code

For a complete working example demonstrating all features, see example.py in the repository.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

alchemy_h8-0.1.1.tar.gz (56.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

alchemy_h8-0.1.1-py3-none-any.whl (18.9 kB view details)

Uploaded Python 3

File details

Details for the file alchemy_h8-0.1.1.tar.gz.

File metadata

  • Download URL: alchemy_h8-0.1.1.tar.gz
  • Upload date:
  • Size: 56.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.9

File hashes

Hashes for alchemy_h8-0.1.1.tar.gz
Algorithm Hash digest
SHA256 9991cd48afb640dcbf0b24f87cd049de2695ac9a4dc54fbbc0cf8adf38126830
MD5 1f6b064575ca9b32c5b6c99b4ebc1f2f
BLAKE2b-256 a82ce77fed10d8f85cb130d8247008789c48a8eda10b514215168931cee2d002

See more details on using hashes here.

File details

Details for the file alchemy_h8-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: alchemy_h8-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 18.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.9

File hashes

Hashes for alchemy_h8-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 76aaa890d4782ca100676426a5453bd98947b97e867c7f87044965830b4fd46f
MD5 ec0fd929b12e3c89d5c403b306cd1f2c
BLAKE2b-256 0414437e4b50abc2de97ed06bd3deb1896212440a5fd9225c3d49f59ef612a21

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page