Production-ready composable caching with TTL, SWR, and background refresh patterns for Python.
Project description
advanced-caching
Production-ready caching library for Python with TTL, stale-while-revalidate (SWR), and background refresh.
Type-safe, fast, thread-safe, async-friendly, and framework-agnostic.
Issues & feature requests: new issue
Table of Contents
- Installation
- Quick Start
- Metrics & Monitoring
- Key Templates
- Storage Backends
- API Reference
- Testing & Benchmarks
- Use Cases
- Comparison
- Contributing
- License
- BGCache (Background)
Installation
uv pip install advanced-caching # core (includes InMemoryMetrics)
uv pip install "advanced-caching[redis]" # Redis support
uv pip install "advanced-caching[opentelemetry]" # OpenTelemetry metrics
uv pip install "advanced-caching[gcp-monitoring]" # GCP Cloud Monitoring
uv pip install "advanced-caching[all-metrics]" # All metrics exporters
# pip works too
Quick Start
from advanced_caching import TTLCache, SWRCache, BGCache
# Sync function
@TTLCache.cached("user:{}", ttl=300)
def get_user(user_id: int) -> dict:
return db.fetch(user_id)
# Async function (works natively)
@TTLCache.cached("user:{}", ttl=300)
async def get_user_async(user_id: int) -> dict:
return await db.fetch(user_id)
# Stale-While-Revalidate (Sync)
@SWRCache.cached("product:{}", ttl=60, stale_ttl=30)
def get_product(product_id: int) -> dict:
return api.fetch_product(product_id)
# Stale-While-Revalidate (Async)
@SWRCache.cached("async:product:{}", ttl=60, stale_ttl=30)
async def get_product_async(product_id: int) -> dict:
return await api.fetch_product(product_id)
# Background refresh (Sync)
@BGCache.register_loader("inventory", interval_seconds=300)
def load_inventory() -> list[dict]:
return warehouse_api.get_all_items()
# Background refresh (Async)
@BGCache.register_loader("inventory_async", interval_seconds=300)
async def load_inventory_async() -> list[dict]:
return await warehouse_api.get_all_items()
# Configured Cache (Reusable Backend)
# Create a decorator pre-wired with a specific cache (e.g., Redis)
RedisTTL = TTLCache.configure(cache=RedisCache(redis_client))
@RedisTTL.cached("user:{}", ttl=300)
async def get_user_redis(user_id: int):
return await db.fetch(user_id)
Metrics & Monitoring
Optional, high-performance metrics with <1% overhead for production monitoring.
from advanced_caching import TTLCache
from advanced_caching.metrics import InMemoryMetrics
# Create metrics collector (no external dependencies!)
metrics = InMemoryMetrics()
# Use with any decorator
@TTLCache.cached("user:{id}", ttl=60, metrics=metrics)
def get_user(id: int):
return {"id": id, "name": "Alice"}
# Query metrics via API
stats = metrics.get_stats()
# Returns: hit_rate, latency percentiles (p50/p95/p99),
# errors, memory usage, background refresh stats
Built-in collectors:
- InMemoryMetrics: Zero dependencies, perfect for API queries
- NullMetrics: Zero overhead when metrics disabled (default)
Exporters (optional):
- OpenTelemetry: OTLP, Jaeger, Zipkin, Prometheus
- GCP Cloud Monitoring: Google Cloud Platform
Custom exporters: See Custom Exporters Guide for Prometheus, StatsD, and Datadog implementations.
Key Templates
The library supports smart key generation that handles both positional and keyword arguments seamlessly.
-
Positional Placeholder:
"user:{}"- Uses the first argument, whether passed positionally or as a keyword.
- Example:
get_user(123)orget_user(user_id=123)->"user:123"
-
Named Placeholder:
"user:{user_id}"- Resolves
user_idfrom keyword arguments OR positional arguments (by inspecting the function signature). - Example:
def get_user(user_id): ...called asget_user(123)->"user:123"
- Resolves
-
Custom Function:
- For complex logic, pass a callable.
- Example1 for kw/args with default values use :
key=lambda *a, **k: f"user:{k.get('user_id', a[0])}" - Example2 fns with no defaults use :
key=lambda user_id: f"user:{user_id}"
Storage Backends
- InMemCache (default): Fast, process-local
- RedisCache: Distributed in-memory
- HybridCache: L1 (memory) + L2 (Redis)
- ChainCache: Arbitrary multi-level chain (e.g., InMem -> Redis -> S3/GCS)
- S3Cache: Object storage backend (AWS)
- GCSCache: Object storage backend (Google Cloud)
- LocalFileCache: Filesystem-backed cache (per-host)
InMemCache
Thread-safe in-memory cache with TTL.
from advanced_caching import InMemCache
cache = InMemCache()
cache.set("key", "value", ttl=60)
cache.get("key")
cache.delete("key")
cache.exists("key")
cache.set_if_not_exists("key", "value", ttl=60)
cache.cleanup_expired()
RedisCache & Serializers
import redis
from advanced_caching import RedisCache, JsonSerializer
client = redis.Redis(host="localhost", port=6379)
cache = RedisCache(client, prefix="app:")
json_cache = RedisCache(client, prefix="app:json:", serializer="json")
custom_json = RedisCache(client, prefix="app:json2:", serializer=JsonSerializer())
HybridCache (L1 + L2)
Two-level cache:
- L1: In-memory
- L2: Redis
Simple setup
import redis
from advanced_caching import HybridCache, TTLCache
client = redis.Redis()
hybrid = HybridCache.from_redis(client, prefix="app:", l1_ttl=60)
@TTLCache.cached("user:{}", ttl=300, cache=hybrid)
def get_user(user_id: int):
return {"id": user_id}
Manual wiring
from advanced_caching import HybridCache, InMemCache, RedisCache
l1 = InMemCache()
l2 = RedisCache(client, prefix="app:")
# l2_ttl defaults to l1_ttl * 2 if not specified
hybrid = HybridCache(l1_cache=l1, l2_cache=l2, l1_ttl=60)
# Explicit l2_ttl for longer L2 persistence
hybrid_long_l2 = HybridCache(l1_cache=l1, l2_cache=l2, l1_ttl=60, l2_ttl=3600)
TTL behavior:
l1_ttl: How long data stays in fast L1 memory cachel2_ttl: How long data persists in L2 (Redis). Defaults tol1_ttl * 2- When data expires from L1 but exists in L2, it's automatically repopulated to L1
With BGCache using lambda factory
For lazy initialization (e.g., deferred Redis connection):
from advanced_caching import BGCache, HybridCache, InMemCache, RedisCache
def get_redis_cache():
"""Lazy Redis connection factory."""
import redis
client = redis.Redis(host="localhost", port=6379)
return RedisCache(client, prefix="app:")
@BGCache.register_loader(
"config_map",
interval_seconds=3600,
run_immediately=True,
cache=lambda: HybridCache(
l1_cache=InMemCache(),
l2_cache=get_redis_cache(),
l1_ttl=3600,
l2_ttl=86400 # L2 persists longer than L1
)
)
def load_config_map() -> dict[str, dict]:
return {"db": {"host": "localhost"}, "cache": {"ttl": 300}}
# Access nested data
db_host = load_config_map().get("db", {}).get("host")
ChainCache (multi-level)
from advanced_caching import InMemCache, RedisCache, S3Cache, ChainCache
chain = ChainCache([
(InMemCache(), 60), # L1 fast
(RedisCache(redis_client), 300), # L2 distributed
(S3Cache(bucket="my-cache"), 3600), # L3 durable
])
# Write-through all levels (per-level TTL caps applied)
chain.set("user:123", {"name": "Ana"}, ttl=900)
# Read-through with promotion to faster levels
user = chain.get("user:123")
Notes:
- Provide per-level TTL caps in the tuple; if
None, the passedttlis used. set_if_not_existsdelegates atomicity to the deepest level and backfills upper levels on success.get/get_entrypromote hits upward for hotter reads.
Object Storage Backends (S3/GCS)
Store large cached objects cheaply in AWS S3 or Google Cloud Storage. Supports compression and metadata-based TTL checks to minimize costs.
📚 Full Documentation & Best Practices
from advanced_caching import S3Cache, GCSCache
user_cache = S3Cache(
bucket="my-cache-bucket",
prefix="users/",
serializer="json",
compress=True,
dedupe_writes=True, # optional: skip uploads when content unchanged (adds HEAD)
)
gcs_cache = GCSCache(
bucket="my-cache-bucket",
prefix="users/",
serializer="json",
compress=True,
dedupe_writes=True, # optional: skip uploads when content unchanged (adds metadata check)
)
RedisCache dedupe_writes
RedisCache(..., dedupe_writes=True) compares the serialized payload to the stored value; if unchanged, it skips rewriting and only refreshes TTL when provided.
LocalFileCache (filesystem)
from advanced_caching import LocalFileCache
cache = LocalFileCache("/var/tmp/ac-cache", dedupe_writes=True)
cache.set("user:123", {"name": "Ana"}, ttl=300)
user = cache.get("user:123")
Notes: one file per key; atomic writes; optional compression and dedupe to skip rewriting identical content.
Custom Storage
Implement your own storage backend by following the CacheStorage protocol:
from advanced_caching import CacheStorage, CacheEntry
from typing import Any
class MyCustomStorage:
"""Custom cache storage implementation."""
def get(self, key: str) -> Any | None:
"""Retrieve value by key, or None if not found/expired."""
...
def get_entry(self, key: str) -> CacheEntry | None:
"""Retrieve full cache entry with metadata."""
...
def set(self, key: str, value: Any, ttl: int | None = None) -> None:
"""Store value with optional TTL in seconds."""
...
def set_if_not_exists(self, key: str, value: Any, ttl: int | None = None) -> bool:
"""Atomic set-if-not-exists. Returns True if set, False if key exists."""
...
def delete(self, key: str) -> None:
"""Remove key from storage."""
...
def exists(self, key: str) -> bool:
"""Check if key exists and is not expired."""
...
# Validate implementation
from advanced_caching import validate_cache_storage
validate_cache_storage(MyCustomStorage())
# Use with decorators
@TTLCache.cached("user:{id}", ttl=60, cache=MyCustomStorage())
def get_user(id: int):
return {"id": id}
Exposing Metrics:
To track cache operations in your custom storage, wrap it with InstrumentedStorage:
from advanced_caching.storage import InstrumentedStorage
from advanced_caching.metrics import InMemoryMetrics
# Create metrics collector
metrics = InMemoryMetrics()
# Wrap your custom storage
instrumented = InstrumentedStorage(
storage=MyCustomStorage(),
metrics=metrics,
cache_name="my_custom_cache"
)
# Use instrumented storage
@TTLCache.cached("user:{id}", ttl=60, cache=instrumented)
def get_user(id: int):
return {"id": id}
# Query metrics
stats = metrics.get_stats()
# Includes: hits, misses, latency, errors, memory usage for "my_custom_cache"
InstrumentedStorage automatically tracks:
- All cache operations (get, set, delete)
- Operation latency (p50/p95/p99 percentiles)
- Errors with exception types
- Memory usage (if your storage supports it)
See Metrics Documentation for details.
BGCache (Background)
Single-writer/multi-reader pattern with background refresh and optional independent reader caches.
from advanced_caching import BGCache, InMemCache
# Writer: enforced single registration per key; refreshes cache on a schedule
@BGCache.register_writer(
"daily_config",
interval_seconds=300, # refresh every 5 minutes
ttl=None, # defaults to interval*2
run_immediately=True,
cache=InMemCache(), # or RedisCache / ChainCache
)
def load_config():
return expensive_fetch()
# Readers: read-only; keep a local cache warm by pulling from the writer's cache
reader = BGCache.get_reader(
"daily_config",
interval_seconds=60, # periodically pull from source cache into local cache
ttl=None, # local cache TTL defaults to interval*2
run_immediately=True,
cache=InMemCache(), # local cache for this process
)
# Usage
cfg = reader() # returns value from local cache; on miss pulls once from source cache
Notes:
register_writerenforces one writer per key globally; raises if duplicate.interval_seconds<= 0 disables scheduling; wrapper still writes-on-demand on misses.run_immediately=Truetriggers an initial refresh if the cache is empty.get_readercreates a read-only accessor backed by its own cache; it pulls from the provided cache (usually the writer’s cache) and optionally keeps it warm on a schedule.- Use
cache=on readers to override the local cache backend (e.g., InMemCache in each process) while sourcing data from the writer’s cache backend.
See docs/bgcache.md for a production-grade example with Redis/ChainCache, error handling, and reader-local caches.
API Reference
-
TTLCache.cached(key, ttl, cache=None) -
SWRCache.cached(key, ttl, stale_ttl=0, cache=None) -
BGCache.register_loader(key, interval_seconds, ttl=None, run_immediately=True) -
Storages:
InMemCache()RedisCache(redis_client, prefix="", serializer="pickle"|"json"|custom)HybridCache(l1_cache, l2_cache, l1_ttl=60, l2_ttl=None)-l2_ttldefaults tol1_ttl * 2
-
Utilities:
CacheEntryCacheStoragevalidate_cache_storage()
Testing & Benchmarks
uv run pytest -q
uv run python tests/benchmark.py
Use Cases
- Web & API caching (FastAPI, Flask, Django)
- Database query caching
- SWR for upstream APIs
- Background refresh for configs & datasets
- Distributed caching with Redis
- Hybrid L1/L2 hot-path optimization
Comparison
| Feature | advanced-caching | lru_cache | cachetools | Redis | Memcached |
|---|---|---|---|---|---|
| TTL | ✅ | ❌ | ✅ | ✅ | ✅ |
| SWR | ✅ | ❌ | ❌ | Manual | Manual |
| Background refresh | ✅ | ❌ | ❌ | Manual | Manual |
| Custom backends | ✅ (InMem/Redis/S3/GCS/Chain) | ❌ | ❌ | N/A | N/A |
| Distributed | ✅ (Redis, ChainCache) | ❌ | ❌ | ✅ | ✅ |
| Multi-level chain | ✅ (ChainCache) | ❌ | ❌ | Manual | Manual |
| Dedupe writes | ✅ (Redis/S3/GCS opt-in) | ❌ | ❌ | Manual | Manual |
| Async support | ✅ | ❌ | ❌ | ✅ | ✅ |
| Type hints | ✅ | ✅ | ✅ | ❌ | ❌ |
Contributing
- Fork the repo
- Create a feature branch
- Add tests
- Run
uv run pytest - Open a pull request
License
MIT License – see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file advanced_caching-0.3.0.tar.gz.
File metadata
- Download URL: advanced_caching-0.3.0.tar.gz
- Upload date:
- Size: 185.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fa16a648d9e4f669a831326001ae073c73164657558989aef1f08a66518b4257
|
|
| MD5 |
931590926095467286a68538411b5194
|
|
| BLAKE2b-256 |
c615ae72b380831eea0704f8b5d431a55e9f74e77e23f94bd9b806234eae5dad
|
Provenance
The following attestation bundles were made for advanced_caching-0.3.0.tar.gz:
Publisher:
publish.yml on agkloop/advanced_caching
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
advanced_caching-0.3.0.tar.gz -
Subject digest:
fa16a648d9e4f669a831326001ae073c73164657558989aef1f08a66518b4257 - Sigstore transparency entry: 788322870
- Sigstore integration time:
-
Permalink:
agkloop/advanced_caching@5726a625b54a5439137c32f05a81da901bba36d2 -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/agkloop
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@5726a625b54a5439137c32f05a81da901bba36d2 -
Trigger Event:
push
-
Statement type:
File details
Details for the file advanced_caching-0.3.0-py3-none-any.whl.
File metadata
- Download URL: advanced_caching-0.3.0-py3-none-any.whl
- Upload date:
- Size: 38.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8d7c0df9d603030a38bd0404d57d5e601571f4cdde42f4590d608c1fc036f0f7
|
|
| MD5 |
b39808ece52c2f0bd760663583c379d1
|
|
| BLAKE2b-256 |
04da1ac403b11aca90e1aaf609fee01f3471e087fc295ea2fb44ae03285ef0f5
|
Provenance
The following attestation bundles were made for advanced_caching-0.3.0-py3-none-any.whl:
Publisher:
publish.yml on agkloop/advanced_caching
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
advanced_caching-0.3.0-py3-none-any.whl -
Subject digest:
8d7c0df9d603030a38bd0404d57d5e601571f4cdde42f4590d608c1fc036f0f7 - Sigstore transparency entry: 788322871
- Sigstore integration time:
-
Permalink:
agkloop/advanced_caching@5726a625b54a5439137c32f05a81da901bba36d2 -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/agkloop
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@5726a625b54a5439137c32f05a81da901bba36d2 -
Trigger Event:
push
-
Statement type: