Advanced SQLAlchemy extension with connection pooling, rate limiting, and security features
Project description
Alchemy-H8: Enhanced Async SQLAlchemy Library
A comprehensive, production-ready async PostgreSQL library built on SQLAlchemy that integrates advanced features:
- ✅ Async SQLAlchemy with strict typing and generics
- ✅ Intelligent in-memory caching with tag-based invalidation
- ✅ Query performance analysis and connection pool metrics
- ✅ Resilient operations with auto-recovery, backoff, and retries
- ✅ Advanced security features and connection pooling
- ✅ Transaction integrity with exactly-once execution
Features
Core Features
- Fully Async: Built on SQLAlchemy's async capabilities
- Repository Pattern: Type-safe repository pattern for models
- Unit of Work: Transaction management with ACID guarantees
- Generic Models: Base models with UUID and integer primary keys
- Pagination: Efficient pagination support
Advanced Features
-
Intelligent Caching:
- TTL-based in-memory caching
- Tag-based cache invalidation
- Cache statistics and performance metrics
- Automatic cache invalidation on write operations
-
Performance Monitoring:
- Query execution metrics and timing
- Slow query detection and logging
- Connection pool utilization statistics
- Query plan collection (optional)
-
Resilience:
- Retry logic with configurable backoff strategies
- Circuit breaker pattern to prevent cascading failures
- Rate limiting with token bucket algorithm
- Idempotent operations for exactly-once semantics
-
Security:
- SSL/TLS enforcement
- Connection timeouts and statement timeouts
- Security levels (HIGH/MEDIUM/LOW)
- Sensitive data masking in logs
- SQL injection prevention
Quick Start
Installation
pip install alchemy-h8
Basic Usage
import asyncio
import uuid
from sqlalchemy import Column, String, Integer
from alchemy_h8.base_model import BaseModel, UUIDMixin
from alchemy_h8.integration import create_db_manager
# Define your model
class User(BaseModel, UUIDMixin):
__tablename__ = "users"
username = Column(String(100), nullable=False, unique=True)
email = Column(String(255), nullable=False, unique=True)
age = Column(Integer, nullable=True)
async def main():
# Connect to database
db_manager = await create_db_manager(
"postgresql+asyncpg://user:pass@localhost/dbname"
)
# Get repository for User model
user_repo = await db_manager.get_repository(User)
# Create a user with transaction
user = User(username="john", email="john@example.com", age=30)
async with db_manager.transaction() as session:
await user_repo.add(session, user)
# Retrieve with caching
async with db_manager.transaction(read_only=True) as session:
retrieved_user = await user_repo.get_by_id(session, user.id)
print(f"Retrieved: {retrieved_user.username}")
# Update with cache invalidation
async with db_manager.transaction(
cache_invalidation_tags=[f"users_{user.id}"]
) as session:
user.age = 31
await user_repo.update(session, user)
# Get metrics
metrics = db_manager.get_metrics()
print(f"Cache hits: {metrics['cache_stats']['hits']}")
# Cleanup
await db_manager.disconnect()
if __name__ == "__main__":
asyncio.run(main())
Advanced Configuration
The library provides extensive configuration options for all features:
from alchemy_h8.config import DbLibraryConfig
from alchemy_h8.connection import SecurityLevel
from alchemy_h8.integration import create_db_manager
from alchemy_h8.resilience import BackoffStrategy
# Create custom configuration
config = DbLibraryConfig()
# Connection settings
config.connection.pool_size = 20
config.connection.max_overflow = 10
config.connection.security_level = SecurityLevel.HIGH
config.connection.ssl_required = True
config.connection.statement_timeout_sec = 30
# Caching settings
config.cache.enabled = True
config.cache.default_ttl = 300 # 5 minutes
config.cache.max_size = 10000
# Resilience settings
config.retry.max_attempts = 3
config.retry.base_delay_ms = 100
config.retry.strategy = BackoffStrategy.EXPONENTIAL
config.retry.idempotent_only = True
# Create database manager with custom config
db_manager = await create_db_manager(
"postgresql+asyncpg://user:pass@localhost/dbname",
config=config
)
Idempotent Transactions
Ensure exactly-once execution even with retries by using operation IDs:
# Generate unique operation ID
operation_id = str(uuid.uuid4())
# First attempt - succeeds
async with db_manager.transaction(operation_id=operation_id) as session:
await user_repo.add(session, user)
# Second attempt with same operation_id - no duplicate created
async with db_manager.transaction(operation_id=operation_id) as session:
await user_repo.add(session, user_duplicate) # Won't execute
Metrics and Monitoring
Get detailed performance metrics:
# Get all metrics
metrics = db_manager.get_metrics()
print("Query Metrics:")
print(f"- Total queries: {metrics['query_metrics']['total_queries']}")
print(f"- Average time: {metrics['query_metrics']['average_execution_time_ms']} ms")
print(f"- Slow queries: {metrics['query_metrics']['slow_query_count']}")
print("Transaction Metrics:")
print(f"- Total: {metrics['transaction_metrics']['total_transactions']}")
print(f"- Successful: {metrics['transaction_metrics']['successful_transactions']}")
print(f"- Failed: {metrics['transaction_metrics']['failed_transactions']}")
print(f"- Retried: {metrics['transaction_metrics']['retried_transactions']}")
print("Cache Stats:")
print(f"- Hits: {metrics['cache_stats']['hits']}")
print(f"- Misses: {metrics['cache_stats']['misses']}")
print(f"- Hit ratio: {metrics['cache_stats']['hit_ratio']:.2f}")
# Health check
health = await db_manager.health_check()
print(f"Status: {health['status']}")
print(f"Response time: {health['response_time_ms']} ms")
Circuit Breaker Status
Monitor circuit breaker status to detect service degradation:
cb_status = db_manager.get_circuit_breaker_status()
for name, status in cb_status.items():
print(f"Circuit Breaker '{name}':")
print(f"- State: {status['state']}")
print(f"- Failures: {status['failure_count']}")
print(f"- Last failure: {status['last_failure_time']}")
Testing
Run the test suite:
# Set test database URL
export TEST_DB_URL="postgresql+asyncpg://postgres:postgres@localhost:5432/test_alchemy_h8"
# Run tests
pytest tests/
License
MIT License
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file alchemy_h8-0.1.0.tar.gz.
File metadata
- Download URL: alchemy_h8-0.1.0.tar.gz
- Upload date:
- Size: 56.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cbfac053dd6c841e5849ba87607a31e880531605764299544f24fbfac1987a3e
|
|
| MD5 |
7bd6d85c593ba31f2856890a07fc1b52
|
|
| BLAKE2b-256 |
4d76d99eddc47cc72d53bb731698281835abce737a44befe1ed3b5c8ff45fa71
|
File details
Details for the file alchemy_h8-0.1.0-py3-none-any.whl.
File metadata
- Download URL: alchemy_h8-0.1.0-py3-none-any.whl
- Upload date:
- Size: 18.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fefa6f4da6c7378d777d79cdcf3280af584a01907e3b7a9d119617c02bbe7b0e
|
|
| MD5 |
5e7d8bc7ad09cce0582f8a7208cad66d
|
|
| BLAKE2b-256 |
0efb4503967cd0b82bd077f484f59db540c89ae735e765388b0dfe3cb7c07eba
|