Skip to main content

Advanced SQLAlchemy extension with connection pooling, rate limiting, and security features

Project description

Alchemy-H8: Enhanced Async SQLAlchemy Library

A comprehensive, production-ready async PostgreSQL library built on SQLAlchemy that integrates advanced features:

  • ✅ Async SQLAlchemy with strict typing and generics
  • ✅ Intelligent in-memory caching with tag-based invalidation
  • ✅ Query performance analysis and connection pool metrics
  • ✅ Resilient operations with auto-recovery, backoff, and retries
  • ✅ Advanced security features and connection pooling
  • ✅ Transaction integrity with exactly-once execution

Features

Core Features

  • Fully Async: Built on SQLAlchemy's async capabilities
  • Repository Pattern: Type-safe repository pattern for models
  • Unit of Work: Transaction management with ACID guarantees
  • Generic Models: Base models with UUID and integer primary keys
  • Pagination: Efficient pagination support

Advanced Features

  • Intelligent Caching:

    • TTL-based in-memory caching
    • Tag-based cache invalidation
    • Cache statistics and performance metrics
    • Automatic cache invalidation on write operations
  • Performance Monitoring:

    • Query execution metrics and timing
    • Slow query detection and logging
    • Connection pool utilization statistics
    • Query plan collection (optional)
  • Resilience:

    • Retry logic with configurable backoff strategies
    • Circuit breaker pattern to prevent cascading failures
    • Rate limiting with token bucket algorithm
    • Idempotent operations for exactly-once semantics
  • Security:

    • SSL/TLS enforcement
    • Connection timeouts and statement timeouts
    • Security levels (HIGH/MEDIUM/LOW)
    • Sensitive data masking in logs
    • SQL injection prevention

Quick Start

Installation

pip install alchemy-h8

Basic Usage

import asyncio
import uuid
from sqlalchemy import Column, String, Integer
from alchemy_h8.base_model import BaseModel, UUIDMixin
from alchemy_h8.integration import create_db_manager

# Define your model
class User(BaseModel, UUIDMixin):
    __tablename__ = "users"
    
    username = Column(String(100), nullable=False, unique=True)
    email = Column(String(255), nullable=False, unique=True)
    age = Column(Integer, nullable=True)

async def main():
    # Connect to database
    db_manager = await create_db_manager(
        "postgresql+asyncpg://user:pass@localhost/dbname"
    )
    
    # Get repository for User model
    user_repo = await db_manager.get_repository(User)
    
    # Create a user with transaction
    user = User(username="john", email="john@example.com", age=30)
    
    async with db_manager.transaction() as session:
        await user_repo.add(session, user)
    
    # Retrieve with caching
    async with db_manager.transaction(read_only=True) as session:
        retrieved_user = await user_repo.get_by_id(session, user.id)
        print(f"Retrieved: {retrieved_user.username}")
    
    # Update with cache invalidation
    async with db_manager.transaction(
        cache_invalidation_tags=[f"users_{user.id}"]
    ) as session:
        user.age = 31
        await user_repo.update(session, user)
    
    # Get metrics
    metrics = db_manager.get_metrics()
    print(f"Cache hits: {metrics['cache_stats']['hits']}")
    
    # Cleanup
    await db_manager.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

Advanced Configuration

The library provides extensive configuration options for all features:

from alchemy_h8.config import DbLibraryConfig
from alchemy_h8.connection import SecurityLevel
from alchemy_h8.integration import create_db_manager
from alchemy_h8.resilience import BackoffStrategy

# Create custom configuration
config = DbLibraryConfig()

# Connection settings
config.connection.pool_size = 20
config.connection.max_overflow = 10
config.connection.security_level = SecurityLevel.HIGH
config.connection.ssl_required = True
config.connection.statement_timeout_sec = 30

# Caching settings
config.cache.enabled = True
config.cache.default_ttl = 300  # 5 minutes
config.cache.max_size = 10000

# Resilience settings
config.retry.max_attempts = 3
config.retry.base_delay_ms = 100
config.retry.strategy = BackoffStrategy.EXPONENTIAL
config.retry.idempotent_only = True

# Create database manager with custom config
db_manager = await create_db_manager(
    "postgresql+asyncpg://user:pass@localhost/dbname",
    config=config
)

Idempotent Transactions

Ensure exactly-once execution even with retries by using operation IDs:

# Generate unique operation ID
operation_id = str(uuid.uuid4())

# First attempt - succeeds
async with db_manager.transaction(operation_id=operation_id) as session:
    await user_repo.add(session, user)

# Second attempt with same operation_id - no duplicate created
async with db_manager.transaction(operation_id=operation_id) as session:
    await user_repo.add(session, user_duplicate)  # Won't execute

Metrics and Monitoring

Get detailed performance metrics:

# Get all metrics
metrics = db_manager.get_metrics()

print("Query Metrics:")
print(f"- Total queries: {metrics['query_metrics']['total_queries']}")
print(f"- Average time: {metrics['query_metrics']['average_execution_time_ms']} ms")
print(f"- Slow queries: {metrics['query_metrics']['slow_query_count']}")

print("Transaction Metrics:")
print(f"- Total: {metrics['transaction_metrics']['total_transactions']}")
print(f"- Successful: {metrics['transaction_metrics']['successful_transactions']}")
print(f"- Failed: {metrics['transaction_metrics']['failed_transactions']}")
print(f"- Retried: {metrics['transaction_metrics']['retried_transactions']}")

print("Cache Stats:")
print(f"- Hits: {metrics['cache_stats']['hits']}")
print(f"- Misses: {metrics['cache_stats']['misses']}")
print(f"- Hit ratio: {metrics['cache_stats']['hit_ratio']:.2f}")

# Health check
health = await db_manager.health_check()
print(f"Status: {health['status']}")
print(f"Response time: {health['response_time_ms']} ms")

Circuit Breaker Status

Monitor circuit breaker status to detect service degradation:

cb_status = db_manager.get_circuit_breaker_status()

for name, status in cb_status.items():
    print(f"Circuit Breaker '{name}':")
    print(f"- State: {status['state']}")
    print(f"- Failures: {status['failure_count']}")
    print(f"- Last failure: {status['last_failure_time']}")

Testing

Run the test suite:

# Set test database URL
export TEST_DB_URL="postgresql+asyncpg://postgres:postgres@localhost:5432/test_alchemy_h8"

# Run tests
pytest tests/

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

alchemy_h8-0.1.0.tar.gz (56.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

alchemy_h8-0.1.0-py3-none-any.whl (18.8 kB view details)

Uploaded Python 3

File details

Details for the file alchemy_h8-0.1.0.tar.gz.

File metadata

  • Download URL: alchemy_h8-0.1.0.tar.gz
  • Upload date:
  • Size: 56.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.9

File hashes

Hashes for alchemy_h8-0.1.0.tar.gz
Algorithm Hash digest
SHA256 cbfac053dd6c841e5849ba87607a31e880531605764299544f24fbfac1987a3e
MD5 7bd6d85c593ba31f2856890a07fc1b52
BLAKE2b-256 4d76d99eddc47cc72d53bb731698281835abce737a44befe1ed3b5c8ff45fa71

See more details on using hashes here.

File details

Details for the file alchemy_h8-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: alchemy_h8-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 18.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.9

File hashes

Hashes for alchemy_h8-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 fefa6f4da6c7378d777d79cdcf3280af584a01907e3b7a9d119617c02bbe7b0e
MD5 5e7d8bc7ad09cce0582f8a7208cad66d
BLAKE2b-256 0efb4503967cd0b82bd077f484f59db540c89ae735e765388b0dfe3cb7c07eba

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page