Caching tool for python
Project description
Caching tool for python, working with Redis single instance and Redis cluster mode
Features
- ✅ Automatic reconnection: Redis client handles reconnection automatically
- ✅ Graceful degradation: Falls back to in-memory cache when Redis is unavailable
- ✅ No crashes: All operations handle errors gracefully
- ✅ Async & Sync support: Works with both async and sync functions
- ✅ Redis Cluster support: Works with single Redis instance and Redis Cluster
- ✅ Custom encoders/decoders: Support for custom serialization
Installation
$ pip install cache-house
or with poetry
poetry add cache-house
Quick Start
Cache decorator works with both async and sync functions. The library automatically handles Redis reconnection and falls back to in-memory cache when Redis is unavailable.
from cache_house.backends import RedisFactory
from cache_house.cache import cache
import asyncio
# Initialize Redis with fallback enabled (recommended for production)
RedisFactory.init(fallback_to_memory=True)
@cache() # default expire time is 180 seconds
async def test_cache(a: int, b: int):
print("async cached - this only prints on cache miss")
return [a, b]
@cache()
def test_cache_1(a: int, b: int):
print("cached - this only prints on cache miss")
return [a, b]
if __name__ == "__main__":
print("First call (cache miss):")
print(test_cache_1(3, 4))
print("\nSecond call (cache hit):")
print(test_cache_1(3, 4)) # This will use cache, print won't execute
print("\nAsync function:")
print(asyncio.run(test_cache(1, 2)))
print(asyncio.run(test_cache(1, 2))) # Cached
Output:
First call (cache miss):
cached - this only prints on cache miss
[3, 4]
Second call (cache hit):
[3, 4] # No print - served from cache
Async function:
async cached - this only prints on cache miss
[1, 2]
[1, 2] # Cached
Check stored cache keys:
➜ $ rdcli KEYS "*"
1) cachehouse:main:8f65aed1010f0062a783c83eb430aca0
2) cachehouse:main:f665833ea64e4fc32653df794257ca06
Setup Redis cache instance
You can pass all redis-py arguments to RedisFactory.init method and additional arguments:
def RedisFactory.init(
host: str = "localhost",
port: int = 6379,
encoder: Callable[..., Any] = ...,
decoder: Callable[..., Any] = ...,
namespace: str = ...,
key_prefix: str = ...,
key_builder: Callable[..., Any] = ...,
password: str = ...,
db: int = ...,
cluster_mode: bool = False, # Force cluster mode (skip auto-detection)
autodetect_cluster: bool = True, # Auto-detect if Redis is running in cluster mode
fallback_to_memory: bool = True, # Enable in-memory fallback when Redis is unavailable
**redis_kwargs
)
Cluster auto-detection
By default (autodetect_cluster=True), RedisFactory.init will:
- Try to send
CLUSTER INFOto the target Redis node - If the command succeeds → cluster mode is detected, and
RedisClusterCacheis used internally - If the command fails with a Redis error → standalone mode is assumed, and
RedisCacheis used
This means you can usually just call:
from cache_house.backends import RedisFactory
RedisFactory.init(
host="localhost",
port=6379,
fallback_to_memory=True,
# autodetect_cluster=True by default
)
and cache-house will automatically choose the correct backend (standalone or cluster) based on the Redis server configuration.
Explicit modes (optional)
- Force standalone Redis (no detection):
RedisFactory.init(
host="localhost",
port=6379,
cluster_mode=False,
autodetect_cluster=False, # Always use standalone RedisCache
)
- Force Redis Cluster (no detection):
RedisFactory.init(
host="localhost",
port=6379,
cluster_mode=True, # Always use RedisClusterCache
autodetect_cluster=False, # Optional, explicit
)
Best Practice: Initialize Redis with fallback enabled
from cache_house.backends import RedisFactory
# Initialize with fallback to memory cache (default: True)
# Your application will continue working even if Redis is temporarily unavailable
RedisFactory.init(
host="localhost",
port=6379,
password="your_password", # Optional
db=0,
fallback_to_memory=True # Falls back to in-memory cache when Redis is down
)
Custom encoder and decoder
from cache_house.backends import RedisFactory
import json
def custom_encoder(data):
return json.dumps(data)
def custom_decoder(data):
return json.loads(data)
RedisFactory.init(
encoder=custom_encoder,
decoder=custom_decoder,
fallback_to_memory=True
)
Default encoder and decoder is pickle module.
Setup Redis Cluster cache instance
All manipulation with RedisCache is the same with RedisClusterCache
from cache_house.backends import RedisFactory
from cache_house.cache import cache
# Initialize Redis Cluster with fallback enabled
RedisFactory.init(
cluster_mode=True,
startup_nodes=[
{"host": "127.0.0.1", "port": "7000"},
{"host": "127.0.0.1", "port": "7001"},
],
fallback_to_memory=True # Falls back to in-memory cache when cluster is unavailable
)
@cache()
async def test_cache(a: int, b: int):
print("cached")
return [a, b]
Redis Cluster parameters (all redis-py cluster arguments are supported):
RedisFactory.init(
cluster_mode=True,
startup_nodes=[{"host": "127.0.0.1", "port": "7000"}],
cluster_error_retry_attempts: int = 3,
require_full_coverage: bool = True,
skip_full_coverage_check: bool = False,
reinitialize_steps: int = 10,
read_from_replicas: bool = False,
fallback_to_memory: bool = True,
**redis_kwargs
)
Setup cache instance with FastAPI
Best Practice: Initialize Redis in startup event with fallback enabled. Your application will continue working even if Redis is temporarily unavailable.
import logging
import uvicorn
from fastapi.applications import FastAPI
from cache_house.backends import RedisFactory
from cache_house.cache import cache
app = FastAPI()
@app.on_event("startup")
async def startup():
# Initialize with fallback - app won't crash if Redis is unavailable
RedisFactory.init(
host="localhost",
port=6379,
fallback_to_memory=True # Enable in-memory fallback
)
print("App started - Redis cache initialized")
@app.on_event("shutdown")
async def shutdown():
# Gracefully close connections
RedisFactory.close_connections()
print("App shutdown - Redis connections closed")
@app.get("/notcached")
async def test_route():
print("notcached")
return {"hello": "world"}
@app.get("/cached")
@cache(expire=60) # Cache for 60 seconds
async def test_route():
print("cached") # This print only runs on cache miss
return {"hello": "world"}
@app.get("/cached-with-custom-expire")
@cache(expire=300, namespace="api") # Cache for 5 minutes with custom namespace
async def expensive_operation():
# Simulate expensive operation
import time
time.sleep(1)
return {"result": "expensive computation"}
if __name__ == "__main__":
uvicorn.run(app, port=8033)
Cache decorator options
You can set expire time (seconds or timedelta), namespace, and key prefix in the cache decorator:
from datetime import timedelta
from cache_house.cache import cache
# Using seconds
@cache(expire=30, namespace="app", key_prefix="test")
async def test_cache(a: int, b: int):
print("cached")
return [a, b]
# Using timedelta
@cache(expire=timedelta(minutes=5), namespace="app", key_prefix="test")
def test_cache_sync(a: int, b: int):
print("cached")
return [a, b]
if __name__ == "__main__":
print(asyncio.run(test_cache(1, 2)))
print(test_cache_sync(3, 4))
Check stored cache:
rdcli KEYS "*"
1) test:app:f665833ea64e4fc32653df794257ca06
Understanding Namespaces and Key Builders
Namespaces
Namespaces help organize your cache keys and make it easier to manage different parts of your application. The default namespace is "main".
Key Format: {prefix}:{namespace}:{hash}
Example with different namespaces:
from cache_house.backends import RedisFactory
from cache_house.cache import cache
RedisFactory.init(fallback_to_memory=True)
# API endpoints namespace
@cache(expire=60, namespace="api")
def get_user(user_id: int):
return {"id": user_id, "name": f"User {user_id}"}
# Database queries namespace
@cache(expire=300, namespace="database")
def get_user_posts(user_id: int):
return [{"id": 1, "title": "Post 1"}]
# Configuration namespace
@cache(expire=3600, namespace="config")
def get_app_config():
return {"setting": "value"}
# Default namespace (if not specified)
@cache(expire=180)
def default_function():
return "default"
Cache keys will be:
cachehouse:api:abc123...
cachehouse:database:def456...
cachehouse:config:ghi789...
cachehouse:main:jkl012... # default namespace
Benefits of using namespaces:
- Organization: Group related cache entries together
- Easy cleanup: Clear all keys in a specific namespace
- Multi-tenancy: Separate cache for different applications/services
- Debugging: Easier to identify cache keys in Redis
Global Namespace Configuration
You can set a default namespace for all cache operations:
from cache_house.backends import RedisFactory
# Set default namespace for all cache operations
RedisFactory.init(
namespace="myapp", # All cache keys will use "myapp" namespace by default
key_prefix="app", # Change default prefix from "cachehouse" to "app"
fallback_to_memory=True
)
# This will use "myapp" namespace
@cache(expire=60)
def my_function():
return "data"
# Override namespace for specific function
@cache(expire=60, namespace="special")
def special_function():
return "special data"
Resulting keys:
app:myapp:abc123... # default namespace
app:special:def456... # overridden namespace
Key Prefix
The key prefix is the first part of every cache key. Default is "cachehouse".
# Global prefix
RedisFactory.init(key_prefix="myapp", namespace="v1")
# Per-decorator prefix (overrides global)
@cache(expire=60, key_prefix="api", namespace="users")
def get_user(id: int):
return {"id": id}
Key format: {key_prefix}:{namespace}:{hash}
Custom Key Builder
You can create your own key builder function for complete control over cache key generation:
import hashlib
from cache_house.backends import RedisFactory
from cache_house.cache import cache
def custom_key_builder(module, name, args, kwargs, prefix="cachehouse", namespace="main"):
"""
Custom key builder function
Args:
module: Function's module name
name: Function name
args: Function positional arguments
kwargs: Function keyword arguments
prefix: Key prefix
namespace: Namespace
"""
# Example: Create a more readable key
# Format: prefix:namespace:module.function:arg1:arg2:kwarg1=value1
key_parts = [prefix, namespace, f"{module}.{name}"]
# Add positional arguments
for arg in args:
key_parts.append(str(arg))
# Add keyword arguments
for k, v in sorted(kwargs.items()):
key_parts.append(f"{k}={v}")
# Join and create hash for long keys
key_string = ":".join(key_parts)
if len(key_string) > 200: # Redis key length limit
key_hash = hashlib.md5(key_string.encode()).hexdigest()
return f"{prefix}:{namespace}:{key_hash}"
return key_string
# Use custom key builder globally
RedisFactory.init(
key_builder=custom_key_builder,
namespace="custom",
fallback_to_memory=True
)
@cache(expire=60)
def my_function(a: int, b: int, name: str = "test"):
return {"result": a + b, "name": name}
Or use custom key builder per decorator:
def simple_key_builder(module, name, args, kwargs, prefix="cache", namespace="app"):
# Simple key: just use function name and first argument
first_arg = args[0] if args else "default"
return f"{prefix}:{namespace}:{name}:{first_arg}"
@cache(expire=60, key_builder=simple_key_builder, namespace="simple")
def get_item(item_id: int):
return {"id": item_id}
Key builder function signature:
def key_builder(
module: str, # Function's module (e.g., "__main__" or "myapp.services")
name: str, # Function name
args: tuple, # Positional arguments
kwargs: dict, # Keyword arguments
prefix: str, # Key prefix
namespace: str # Namespace
) -> str:
# Return the cache key as a string
return "your:custom:key:format"
Clearing Cache by Namespace
You can clear all cache keys in a specific namespace:
from cache_house.backends import RedisCache
# Clear all keys in a namespace
RedisCache.clear_keys("cachehouse:api") # Clears all keys starting with "cachehouse:api"
# Or with custom prefix
RedisCache.clear_keys("myapp:database") # Clears all keys in "database" namespace
Example: Clear cache for a specific namespace
from cache_house.backends import RedisFactory, RedisCache
RedisFactory.init(namespace="myapp", fallback_to_memory=True)
# Cache some data
@cache(expire=300, namespace="users")
def get_user(id: int):
return {"id": id}
@cache(expire=300, namespace="posts")
def get_post(id: int):
return {"id": id}
# Later, clear only "users" namespace
RedisCache.clear_keys("cachehouse:users") # Only clears users cache
# Posts cache remains intact
Best Practices for Namespaces
-
Use descriptive namespaces:
@cache(namespace="api.users") # Good @cache(namespace="x") # Bad - not descriptive
-
Organize by feature or service:
namespace="api.users" namespace="api.products" namespace="database.queries" namespace="external.api"
-
Use consistent naming:
# Good - consistent pattern namespace="v1.api" namespace="v1.database" namespace="v2.api"
-
Set global namespace for multi-tenant apps:
# Different namespace per tenant tenant_id = get_current_tenant() RedisFactory.init(namespace=f"tenant_{tenant_id}")
-
Use namespaces for cache invalidation:
# When user data changes, clear user namespace def update_user(user_id): # ... update logic ... RedisCache.clear_keys("cachehouse:users") # Clear all user cache
Custom encoder and decoder in decorator
If your function works with non-standard data types, you can pass custom encoder and decoder functions to the cache decorator:
import asyncio
import json
from cache_house.backends import RedisFactory
from cache_house.cache import cache
RedisFactory.init(fallback_to_memory=True)
def custom_encoder(data):
return json.dumps(data)
def custom_decoder(data):
return json.loads(data)
@cache(expire=30, encoder=custom_encoder, decoder=custom_decoder, namespace="custom")
async def test_cache(a: int, b: int):
print("async cached")
return {"a": a, "b": b}
@cache(expire=30)
def test_cache_1(a: int, b: int):
print("cached")
return [a, b]
if __name__ == "__main__":
print(asyncio.run(test_cache(1, 2)))
print(test_cache_1(3, 4))
Check stored cache:
rdcli KEYS "*"
1) cachehouse:main:8f65aed1010f0062a783c83eb430aca0
2) cachehouse:custom:f665833ea64e4fc32653df794257ca06
Error Handling and Resilience
cache-house is designed to be resilient and won't crash your application:
Automatic Reconnection
Redis client handles reconnection automatically. You don't need to manage connections manually.
In-Memory Fallback
When Redis is unavailable, cache operations automatically fall back to in-memory cache:
from cache_house.backends import RedisFactory
from cache_house.cache import cache
# Initialize with fallback enabled (default: True)
RedisFactory.init(
host="localhost",
port=6379,
fallback_to_memory=True # Falls back to in-memory cache when Redis is down
)
@cache(expire=60)
def expensive_operation(data):
# This function will work even if Redis is unavailable
# Results will be cached in memory temporarily
return process_data(data)
Graceful Error Handling
All cache operations handle errors gracefully:
from cache_house.backends import RedisFactory
from cache_house.cache import cache
# Even if Redis is not initialized, your code won't crash
cache_instance = RedisFactory.get_instance()
if cache_instance is None:
print("Cache not available, but app continues running")
@cache(expire=60)
def my_function():
# If Redis fails, this function still executes normally
# Cache errors are logged but don't crash the app
return expensive_computation()
Best Practices
-
Always enable fallback for production:
RedisFactory.init(fallback_to_memory=True)
-
Handle cache as optional:
@cache(expire=60) def my_function(): # Function works with or without cache return compute_result()
-
Use appropriate expiration times:
@cache(expire=300) # 5 minutes for stable data def get_stable_data(): return fetch_data() @cache(expire=30) # 30 seconds for frequently changing data def get_dynamic_data(): return fetch_data()
-
Close connections on shutdown (e.g., in FastAPI):
@app.on_event("shutdown") async def shutdown(): RedisFactory.close_connections()
Complete Example: Production-Ready Setup
Here's a complete example showing best practices for using cache-house in production:
import asyncio
import logging
from datetime import timedelta
from cache_house.backends import RedisFactory
from cache_house.cache import cache
# Configure logging to see cache operations
logging.basicConfig(level=logging.INFO)
# Initialize Redis with fallback enabled
# Your app will work even if Redis is temporarily unavailable
RedisFactory.init(
host="localhost",
port=6379,
password=None, # Set if your Redis requires authentication
db=0,
fallback_to_memory=True, # Enable in-memory fallback
# You can pass any redis-py connection arguments here
socket_connect_timeout=5,
socket_timeout=5,
)
# Example 1: Cache expensive computation
@cache(expire=300) # Cache for 5 minutes
def expensive_computation(n: int):
"""This expensive operation will be cached"""
result = sum(i * i for i in range(n))
print(f"Computed result for {n}: {result}")
return result
# Example 2: Cache API response
@cache(expire=60, namespace="api") # Cache for 1 minute with namespace
async def fetch_user_data(user_id: int):
"""Simulate API call - will be cached"""
print(f"Fetching user {user_id} from API...")
await asyncio.sleep(0.1) # Simulate network delay
return {"user_id": user_id, "name": f"User {user_id}"}
# Example 3: Cache with custom expiration
@cache(expire=timedelta(hours=1), namespace="long_term")
def get_configuration():
"""Configuration that changes rarely"""
print("Loading configuration...")
return {"setting1": "value1", "setting2": "value2"}
# Example 4: Cache database query result
@cache(expire=180, namespace="database")
async def get_user_posts(user_id: int):
"""Simulate database query"""
print(f"Querying database for user {user_id} posts...")
await asyncio.sleep(0.05)
return [{"id": 1, "title": "Post 1"}, {"id": 2, "title": "Post 2"}]
async def main():
print("=== Example 1: Expensive computation ===")
print(expensive_computation(1000000)) # First call - computes
print(expensive_computation(1000000)) # Second call - from cache
print("\n=== Example 2: API response caching ===")
print(await fetch_user_data(1)) # First call - fetches
print(await fetch_user_data(1)) # Second call - from cache
print("\n=== Example 3: Configuration caching ===")
print(get_configuration()) # First call - loads
print(get_configuration()) # Second call - from cache
print("\n=== Example 4: Database query caching ===")
print(await get_user_posts(1)) # First call - queries
print(await get_user_posts(1)) # Second call - from cache
# Clean up
RedisFactory.close_connections()
if __name__ == "__main__":
asyncio.run(main())
Output:
INFO:cache_house.backends.redis_backend:redis initialized (Redis will handle reconnections automatically)
=== Example 1: Expensive computation ===
Computed result for 1000000: 333333333333500000
Computed result for 1000000: 333333333333500000
=== Example 2: API response caching ===
Fetching user 1 from API...
{'user_id': 1, 'name': 'User 1'}
{'user_id': 1, 'name': 'User 1'}
...
Note: If Redis is unavailable, all operations will still work using the in-memory fallback cache. Your application won't crash!
All examples work with both Redis Cluster and single Redis instance.
Contributing
Free to open issue and send PR
cache-house supports Python >= 3.10
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cache_house-1.0.0.tar.gz.
File metadata
- Download URL: cache_house-1.0.0.tar.gz
- Upload date:
- Size: 17.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.1 CPython/3.13.9 Darwin/25.1.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7fd24bdc3a77f8e612fe1c46851d722c1bbaf1f8e8df30b1442c17ae7748f87a
|
|
| MD5 |
9e29b61d3c29c2ce77e04bf6b2312e69
|
|
| BLAKE2b-256 |
5bb6834e6ac8456857a9df9e7cb726153e6b3b048cf3ee3a89503f143a8e7818
|
File details
Details for the file cache_house-1.0.0-py3-none-any.whl.
File metadata
- Download URL: cache_house-1.0.0-py3-none-any.whl
- Upload date:
- Size: 15.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.1 CPython/3.13.9 Darwin/25.1.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1f9c3166b6edbce6782dfd6e97192c17288c741d51635d6228162bdef7c743a4
|
|
| MD5 |
1197451e8cff4977abe9d356c918c474
|
|
| BLAKE2b-256 |
7987e6358d9685869dd5776435f8c78a6a106364f1175b33b790467a361a20ac
|