Advanced SQLAlchemy extension with connection pooling, rate limiting, and security features
Project description
Alchemy-H8: Enhanced Async SQLAlchemy Library
A comprehensive, production-ready async PostgreSQL library built on SQLAlchemy that integrates advanced features:
- ✅ Async SQLAlchemy with strict typing and generics
- ✅ Advanced connection pooling with health monitoring
- ✅ Rate limiting with configurable strategies
- ✅ Read/write splitting with replica support
- ✅ Resilient operations with auto-recovery, backoff, and retries
- ✅ Enhanced security features for database connections
Features
Core Database Features
- Fully Async: Built on SQLAlchemy's async capabilities
- Repository Pattern: Type-safe repository pattern for models
- Unit of Work: Transaction management with ACID guarantees
- Generic Models: Base models with UUID primary keys
- Connection Management: Automated connection tracking and cleanup
Connection Pooling
-
Optimized Connection Pool:
- Configurable pool sizes and overflow settings
- Connection health monitoring with pre-ping
- Automatic connection lifecycle management
- Stale connection detection and cleanup
- Connection event logging and monitoring
-
Read/Write Splitting:
- Support for primary and read replica databases
- True round-robin load distribution across replicas
- Configurable read-only session routing
- Seamless transaction management across database roles
Rate Limiting & Backpressure
-
Advanced Rate Limiting:
- Token bucket algorithm implementation
- Configurable request limits per time window
- Multiple rate limiting strategies (token bucket, fixed window, leaky bucket)
- Per-connection rate tracking
- Graceful handling of rate limit exceeded scenarios
-
Backpressure Mechanisms:
- Connection limiting semaphores
- Configurable maximum concurrency
- Progressive backoff for resource contention
Security Features
-
Connection Security:
- SSL/TLS enforcement options
- Connection timeout configuration
- Statement timeout controls
- Sensitive data masking in logs
-
Query Security:
- SQL injection prevention
- Parameter sanitization
- Input validation
- Secure query execution wrapper
Resilience
-
Retry Logic:
- Configurable retry attempts with exponential backoff
- Intelligent transient error detection
- Jitter to prevent thundering herd problems
- Detailed retry attempt logging
-
Circuit Breaker Pattern:
- Automatic detection of database failures
- Circuit state management (closed, open, half-open)
- Configurable failure thresholds and recovery timeouts
- Prevents cascading failures during database outages
Quick Start
Installation
pip install alchemy-h8
Basic Usage
import asyncio
from pydantic import PostgresDsn
from sqlalchemy import Column, String
from sqlalchemy.dialects.postgresql import UUID as PostgresUUID
from sqlalchemy.orm import declarative_base
from uuid import uuid4
from src.alchemy_h8.connection import DBConnectionHandler
from src.alchemy_h8.config import DBConnectionConfig, RateLimiterStrategy
# Define your models
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(PostgresUUID(as_uuid=True), primary_key=True, default=uuid4)
name = Column(String, nullable=False)
email = Column(String, nullable=False, unique=True)
async def main():
# Configure database connection
config = DBConnectionConfig(
url=PostgresDsn.build(
scheme="postgresql+asyncpg",
host="localhost",
port=5432,
username="postgres",
password="postgres",
path="test_db"
),
# Connection pooling settings
pool_size=5,
max_overflow=10,
# Enable read replicas (optional)
read_replica_hosts=[
PostgresDsn.build(
scheme="postgresql+asyncpg",
host="localhost",
port=5432,
username="postgres",
password="postgres",
path="test_db"
)
],
use_read_replicas_for_reads=True,
# Enable rate limiting
use_rate_limiter=True,
rate_limit_max_requests=50,
rate_limit_time_window=5,
rate_limit_strategy=RateLimiterStrategy.TOKEN_BUCKET,
# Retry configuration
max_retries=3,
retry_backoff=2,
# Circuit breaker settings
use_circuit_breaker=True,
circuit_breaker_threshold=5,
circuit_breaker_recovery_timeout=30,
)
# Create and initialize DB handler
db_handler = DBConnectionHandler(config)
await db_handler.initialize()
try:
# Create a new user with write operations on primary
async with db_handler.async_session_scope() as session:
# Create a new user
new_user = User(name="Alice Smith", email="alice@example.com")
session.add(new_user)
await session.commit()
print(f"Created user: {new_user.name}")
# Read operations automatically use replicas
async with db_handler.async_session_scope(read_only=True) as session:
# Query with automatic read replica routing
result = await session.execute("SELECT * FROM users")
users = result.all()
print(f"Found {len(users)} users")
finally:
# Clean up resources
await db_handler.dispose_gracefully()
if __name__ == "__main__":
asyncio.run(main())
Advanced Configuration
The library supports extensive configuration options:
config = DBConnectionConfig(
# Required connection settings
url="postgresql+asyncpg://user:pass@localhost/db",
# Connection pooling settings
pool_size=10, # Default connection pool size
max_overflow=20, # Maximum overflow connections
pool_timeout=30, # Timeout waiting for connection
pool_recycle=1800, # Recycle connections after 30 minutes
# Rate limiting settings
use_rate_limiter=True, # Enable rate limiting
rate_limit_max_requests=100, # Max requests per time window
rate_limit_time_window=1.0, # Time window in seconds
rate_limit_strategy=RateLimiterStrategy.TOKEN_BUCKET,
# Retry configuration
max_retries=3, # Maximum retry attempts
retry_backoff=2, # Exponential backoff factor
base_retry_delay=0.1, # Base delay before first retry
max_retry_delay=1.0, # Maximum retry delay
retry_jitter=0.1, # Random jitter for retries
# Circuit breaker
use_circuit_breaker=True, # Enable circuit breaker
circuit_breaker_threshold=5, # Failures before tripping
circuit_breaker_recovery_timeout=30, # Recovery time in seconds
# Read replicas
read_replica_hosts=["postgresql+asyncpg://user:pass@replica1/db"],
use_read_replicas_for_reads=True,
)
Example Code
For a complete working example demonstrating all features, see example.py in the repository.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file alchemy_h8-0.1.1.tar.gz.
File metadata
- Download URL: alchemy_h8-0.1.1.tar.gz
- Upload date:
- Size: 56.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9991cd48afb640dcbf0b24f87cd049de2695ac9a4dc54fbbc0cf8adf38126830
|
|
| MD5 |
1f6b064575ca9b32c5b6c99b4ebc1f2f
|
|
| BLAKE2b-256 |
a82ce77fed10d8f85cb130d8247008789c48a8eda10b514215168931cee2d002
|
File details
Details for the file alchemy_h8-0.1.1-py3-none-any.whl.
File metadata
- Download URL: alchemy_h8-0.1.1-py3-none-any.whl
- Upload date:
- Size: 18.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
76aaa890d4782ca100676426a5453bd98947b97e867c7f87044965830b4fd46f
|
|
| MD5 |
ec0fd929b12e3c89d5c403b306cd1f2c
|
|
| BLAKE2b-256 |
0414437e4b50abc2de97ed06bd3deb1896212440a5fd9225c3d49f59ef612a21
|