Skip to main content

A powerful sync/async caching library for Python with tag-based invalidation and Redis backend.

Project description

Cachine

A simple, powerful caching library for Python that makes your applications faster.

Stop waiting for slow API calls, database queries, and expensive computations. Cache the results and reuse them instantly.

Why Cachine?

  • โšก 2-10x faster responses - Cache expensive operations and serve results in microseconds
  • ๐ŸŽฏ Dead simple - One decorator is all you need: @cached(cache, ttl=60)
  • ๐Ÿ”ง Start small, scale big - Begin with in-memory, upgrade to Redis when you need distributed caching
  • ๐Ÿš€ Production-ready - Encryption, compression, metrics, clustering, and high-availability out of the box
  • ๐Ÿ Modern Python - Full type hints, sync & async support, Python 3.9+

Perfect for: API response caching, database query caching, expensive computations, rate limiting, session storage.

Backends: In-memory (fastest, non-persistent) โ€ข SQLite (persistent, zero-config) โ€ข Redis (distributed, HA).


Table of Contents


What is Caching?

Caching stores the results of expensive operations so you don't have to repeat them.

Without caching:

def get_user(user_id):
    return database.query(f"SELECT * FROM users WHERE id={user_id}")  # 50ms every time โŒ

With caching:

@cached(cache, ttl=300)
def get_user(user_id):
    return database.query(f"SELECT * FROM users WHERE id={user_id}")  # 50ms first time, <1ms after โœ…

First call: Fetches from database (50ms) Cached calls: Returns from cache (<1ms) - 50x faster!


Installation

Basic installation (in-memory caching):

pip install cachine

With Redis (for distributed caching):

pip install cachine redis

With SQLite (sync uses stdlib sqlite3 โ€” no extra install needed; async needs aiosqlite):

pip install 'cachine[sqlite]'   # installs aiosqlite for AsyncSQLiteCache

Optional extras:

pip install msgpack       # Fast binary serialization
pip install cryptography  # Encryption middleware

Requirements: Python 3.9+, redis-py 4.0+ (optional), aiosqlite 0.19+ (optional, async SQLite)


Quickstart

Step 1: Your First Cache

Start with simple get/set operations:

from cachine import InMemoryCache

# Create a cache
cache = InMemoryCache()

# Store a value (expires after 60 seconds)
cache.set("user:123", {"name": "Alice", "role": "admin"}, ttl=60)

# Retrieve it
user = cache.get("user:123")
print(user)  # {'name': 'Alice', 'role': 'admin'}

# Check if it exists
if cache.exists("user:123"):
    print("User is cached!")

# Remove it
cache.delete("user:123")

Step 2: Cache Expensive Functions

The @cached decorator automatically caches function results:

from cachine import InMemoryCache
from cachine.decorators import cached
import time

cache = InMemoryCache()

@cached(cache=cache, ttl=60)
def expensive_computation(x):
    print(f"Computing {x}...")
    time.sleep(2)  # Simulate slow operation
    return x * 2

# First call: takes 2 seconds
result = expensive_computation(21)  # Prints "Computing 21..." and waits
# => 42

# Subsequent calls: instant! (returns from cache)
result = expensive_computation(21)  # Returns immediately
# => 42 (no print, no wait)

Step 3: Scale to Redis

Share cache across multiple servers. Pick one of three construction styles:

from cachine import RedisCache
from cachine.decorators import cached
from cachine.serializers import JSONSerializer

# (1) Kwargs โ€” simplest for a single host
cache = RedisCache(host="localhost", port=6379, namespace="myapp", serializer=JSONSerializer())

# (2) URL โ€” great when the connection string comes from an env var
# cache = RedisCache.from_url("redis://localhost:6379/0", namespace="myapp", serializer=JSONSerializer())

# (3) Config object โ€” for cluster / sentinel / advanced tuning (see "Choosing the Right Backend")

@cached(cache=cache, ttl=300)  # Cache for 5 minutes
def get_user(user_id):
    # Now cached across ALL your servers!
    return database.query_user(user_id)

user = get_user(123)

Step 4: Async Support

Full async/await support. Every backend has an async twin:

import asyncio
from cachine import AsyncRedisCache
from cachine.decorators import cached

cache = AsyncRedisCache(host="localhost", port=6379, namespace="myapp")
# or: AsyncRedisCache.from_url("redis://localhost:6379/0", namespace="myapp")

@cached(cache=cache, ttl=60)
async def fetch_data(item_id):
    await asyncio.sleep(1)  # pretend this is an HTTP call
    return {"id": item_id, "data": "..."}

async def main():
    result = await fetch_data(1)  # Slow first time
    result = await fetch_data(1)  # Fast from cache

asyncio.run(main())

Common Use Cases

๐ŸŒ Caching API Responses

import requests

@cached(cache=cache, ttl=300)  # Cache for 5 minutes
def fetch_weather(city):
    response = requests.get(f"https://api.weather.com/forecast/{city}")
    return response.json()

# First call: hits the API (slow)
weather = fetch_weather("London")

# Next 5 minutes: instant responses from cache
weather = fetch_weather("London")  # โšก Fast!

๐Ÿ—„๏ธ Caching Database Queries

from sqlalchemy.orm import Session

@cached(cache=cache, ttl=3600, tags=lambda user_id: [f"user:{user_id}"])
def get_user_profile(user_id: int):
    db: Session = get_db()
    user = db.query(User).filter_by(id=user_id).first()
    return user.to_dict() if user else None

# Cache for 1 hour, invalidate with tags
profile = get_user_profile(123)

# When user updates, invalidate their cache
cache.invalidate_tags(["user:123"])

๐Ÿ”„ Preventing Duplicate Work (Singleflight)

When many requests come in at once, only compute once:

@cached(cache=cache, ttl=60, singleflight=True)
def generate_report():
    # Only ONE server generates this, even if 1000 requests come in
    # Others wait and get the same result
    time.sleep(10)  # Expensive operation
    return create_monthly_report()

# 1000 concurrent requests = 1 computation

๐Ÿ“Š Rate Limiting with Counters

from datetime import timedelta

def check_rate_limit(user_id: str, max_requests: int = 100):
    key = f"ratelimit:{user_id}"

    # Increment counter, set TTL on first request
    count = cache.incr(key, delta=1, ttl_if_new=60)

    if count > max_requests:
        raise Exception(f"Rate limit exceeded: {count}/{max_requests}")

    return count

# Allow 100 requests per minute per user
check_rate_limit("user123", max_requests=100)

๐Ÿ”– Tag-Based Invalidation

Invalidate related cache entries together:

@cached(
    cache=cache,
    ttl=3600,
    tags=lambda user_id: ["users", f"user:{user_id}"]
)
def get_user(user_id):
    return fetch_from_db(user_id)

@cached(
    cache=cache,
    ttl=3600,
    tags=lambda user_id: ["users", f"user:{user_id}"]
)
def get_user_orders(user_id):
    return fetch_orders_from_db(user_id)

# Both functions tagged with "users" and "user:123"
user = get_user(123)
orders = get_user_orders(123)

# Invalidate ALL user-related cache at once
removed = cache.invalidate_tags(["user:123"])
# Both get_user(123) and get_user_orders(123) are now cleared

Choosing the Right Backend

๐Ÿ“ฆ InMemoryCache

When to use:

  • โœ… Single server/process application
  • โœ… Maximum performance needed (<1ฮผs access)
  • โœ… Cache can be lost on restart (transient data)
  • โœ… Limited memory usage

When NOT to use:

  • โŒ Multiple servers need to share cache
  • โŒ Cache must survive restarts
  • โŒ Cache size > available RAM

Example:

from cachine import InMemoryCache
from cachine.strategies import LRUEviction

cache = InMemoryCache(
    max_size=10000,           # Limit to 10k entries
    eviction_policy=LRUEviction(),  # Evict least-recently-used
    namespace="myapp"         # Prefix all keys
)

cache.set("key", "value", ttl=300)

๐Ÿ’พ SQLiteCache (Persistent, Zero-Config)

Persistent cache stored in a single file โ€” fills the gap between InMemoryCache (non-persistent) and RedisCache (needs a server).

When to use:

  • โœ… CLI tools, desktop apps, Jupyter notebooks
  • โœ… Need cache to survive restarts, without running Redis
  • โœ… Single-VM services, dev/test environments
  • โœ… Concurrent readers/writers in the same process (WAL mode by default)

When NOT to use:

  • โŒ Cache must be shared across machines (use Redis)
  • โŒ Very high write contention across many processes (WAL helps, but Redis scales better)

Example (sync):

from cachine import SQLiteCache
from cachine.serializers import JSONSerializer

cache = SQLiteCache(
    database="/tmp/cache.db",       # or ":memory:" for ephemeral
    namespace="myapp",
    serializer=JSONSerializer(),
)

cache.set("user:1", {"id": 1, "name": "Alice"}, ttl=300)
cache.get("user:1")  # {'id': 1, 'name': 'Alice'}

# URL form also works:
# cache = SQLiteCache.from_url("sqlite:///tmp/cache.db", namespace="myapp")

Example (async, requires aiosqlite):

from cachine import AsyncSQLiteCache

cache = AsyncSQLiteCache(database="/tmp/cache.db", namespace="myapp")

await cache.set("k", b"v", ttl=60)
await cache.get("k")  # b'v'

Built-in features:

  • WAL journal mode for concurrent readers/writers
  • Atomic counters via BEGIN IMMEDIATE transactions (incr / decr with ttl_if_new)
  • Tag-based invalidation (add_tags / invalidate_tags)
  • Lazy TTL expiration on read
  • Namespace isolation (multiple caches can share the same file)

๐Ÿ”ด RedisCache (Single Instance)

When to use:

  • โœ… Multiple servers need shared cache
  • โœ… Cache should survive restarts
  • โœ… Simple deployment (single Redis server)

When NOT to use:

  • โŒ Need high availability (use Sentinel)
  • โŒ Cache size > single server RAM (use Cluster)

Example:

from cachine import RedisCache
from cachine.serializers import JSONSerializer

cache = RedisCache(
    host="localhost",
    port=6379,
    db=0,
    password="your-password",  # Optional
    namespace="myapp",
    serializer=JSONSerializer()
)

๐Ÿ”ด๐Ÿ”ด๐Ÿ”ด Redis Cluster

When to use:

  • โœ… Need horizontal scaling
  • โœ… High availability required
  • โœ… Cache size > single server RAM
  • โœ… Production workloads

Example:

from cachine.backends.redis import RedisCache
from cachine.models.redis_config import RedisClusterConfig, RedisNodeConfig

config = RedisClusterConfig(
    nodes=[
        RedisNodeConfig(host="redis1.example.com", port=7000),
        RedisNodeConfig(host="redis2.example.com", port=7001),
        RedisNodeConfig(host="redis3.example.com", port=7002),
    ]
)

cache = RedisCache(config, namespace="myapp")

๐Ÿ›ก๏ธ Redis Sentinel (High Availability)

When to use:

  • โœ… Need automatic failover
  • โœ… Master goes down โ†’ automatic promotion
  • โœ… Production reliability critical

Example:

from cachine.backends.redis import RedisCache
from cachine.models.redis_config import RedisSentinelConfig

config = RedisSentinelConfig(
    service_name="mymaster",
    sentinels=[
        ("sentinel1.example.com", 26379),
        ("sentinel2.example.com", 26379),
        ("sentinel3.example.com", 26379),
    ]
)

cache = RedisCache(config, namespace="myapp")

Advanced Decorator Features

Custom Key Generation

Control exactly how cache keys are created:

from cachine.decorators.cached import KeyContext

def my_key_builder(ctx: KeyContext, user_id: int, include_details: bool = False):
    # ctx.full_name = "mymodule.myfunction"
    # ctx.version = "v2" (if specified)
    return f"user:{user_id}:details={include_details}:version={ctx.version}"

@cached(
    cache=cache,
    ttl=300,
    key_builder=my_key_builder,
    version="v2"  # Change version to invalidate all old cache
)
def get_user(user_id: int, include_details: bool = False):
    return fetch_user_data(user_id, include_details)

Stale-While-Revalidate (SWR)

Serve stale data while refreshing in background:

@cached(
    cache=cache,
    ttl=60,         # Fresh for 60 seconds
    stale_ttl=120   # Serve stale for additional 60s while refreshing
)
def get_dashboard_data():
    # Users ALWAYS get fast response:
    # - Within 60s: fresh data
    # - 60-120s: stale data + background refresh started
    # - After 120s: cache miss, compute new
    return expensive_dashboard_computation()

Conditional Caching

Cache only when certain conditions are met:

@cached(
    cache=cache,
    ttl=300,
    condition=lambda result: result is not None and result.get("status") == "success"
)
def fetch_api_data(endpoint):
    response = requests.get(endpoint)
    # Only cache successful responses
    return response.json()

Don't Cache None

@cached(cache=cache, ttl=60, cache_none=False)
def find_user(email):
    user = database.find_by_email(email)
    return user  # None is NOT cached, forces fresh lookup

Add Jitter to Prevent Stampedes

@cached(
    cache=cache,
    ttl=60,
    jitter=10  # Adds 0-10 seconds randomly to TTL
)
def popular_data():
    # If 1000 entries expire at same time โ†’ 1000 cache misses
    # With jitter: they expire at different times
    return expensive_operation()

Middleware (Optional Power Features)

Add compression, encryption, or metrics by wrapping your cache:

๐Ÿ“Š Track Cache Performance

from cachine import CacheBuilder, InMemoryCache
from cachine.middleware import MetricsMiddleware

# Build an in-memory cache wrapped with metrics middleware
cache = (
    CacheBuilder(InMemoryCache())
    .add_middleware(MetricsMiddleware)
    .build()
)

# Use cache normally
cache.set("key1", "value1")
cache.get("key1")
cache.get("key2")  # Miss

# Check performance
stats = cache.get_stats()
print(stats)
# {
#   'hits': 1,
#   'misses': 1,
#   'hit_rate': 0.5,
#   'errors': 0,
#   'avg_latency_ms': 0.023
# }

Async usage:

from cachine import AsyncCacheBuilder, AsyncRedisCache
from cachine.middleware import AsyncMetricsMiddleware

cache = (
    AsyncCacheBuilder(AsyncRedisCache.from_url("redis://localhost:6379/0"))
    .add_middleware(AsyncMetricsMiddleware)
    .build()
)

async def main():
    await cache.set("k", "v")
    await cache.get("k")
    await cache.get("missing", default=None)
    print(cache.get_stats())

๐Ÿ—œ๏ธ Compress Large Values

from cachine import CacheBuilder, InMemoryCache
from cachine.middleware import CompressionMiddleware
from cachine.serializers import JSONSerializer

# Configure serializer on the base cache; middleware picks it up automatically.
cache = (
    CacheBuilder(InMemoryCache(serializer=JSONSerializer()))
    .add_middleware(CompressionMiddleware, algorithm="gzip", min_size=1024)
    .build()
)

# Large values automatically compressed
large_json = {"data": "x" * 10000}
cache.set("big_data", large_json)

# Automatically decompressed on get
result = cache.get("big_data")

๐Ÿ” Encrypt Sensitive Data

from cachine import CacheBuilder, InMemoryCache
from cachine.middleware import EncryptionMiddleware

cache = (
    CacheBuilder(InMemoryCache())
    .add_middleware(
        EncryptionMiddleware,
        key="your-32-character-secret-key!!",  # Keep this secret!
        key_id="v1",  # For key rotation
    )
    .build()
)

# Data encrypted at rest
cache.set("user_ssn", "123-45-6789")
cache.set("api_key", "secret-api-key-xyz")

# Automatically decrypted on get
ssn = cache.get("user_ssn")  # "123-45-6789"

๐Ÿ›Ÿ Fail-Open (Keep Running if Redis Fails)

Ensure your app still works when Redis is down. Wrap caches with a failโ€‘open middleware; reads return defaults and writes become noโ€‘ops during outages.

Sync:

from cachine import CacheBuilder, RedisCache
from cachine.decorators import cached
from cachine.middleware import FailOpenMiddleware

cache = (
    CacheBuilder(RedisCache.from_url("redis://localhost:6379/0", namespace="myapp"))
    .add_middleware(FailOpenMiddleware)
    .build()
)

@cached(cache=cache, ttl=60)
def compute(x):
    return x * 2  # still runs even if Redis errors

Async:

from cachine import AsyncCacheBuilder, AsyncRedisCache
from cachine.decorators import cached
from cachine.middleware import AsyncFailOpenMiddleware

cache = (
    AsyncCacheBuilder(AsyncRedisCache.from_url("redis://localhost:6379/0", namespace="myapp"))
    .add_middleware(AsyncFailOpenMiddleware)
    .build()
)

@cached(cache=cache, ttl=60)
async def fetch(uid):
    return {"id": uid}

๐Ÿ”— Stack Multiple Middleware

Order matters! Stack from inside-out. Builders let you add multiple middleware layers clearly:

from cachine import CacheBuilder, InMemoryCache
from cachine.middleware import CompressionMiddleware, EncryptionMiddleware, MetricsMiddleware
from cachine.serializers import JSONSerializer

cache = (
    CacheBuilder(InMemoryCache(namespace="secure", serializer=JSONSerializer()))
    # Layer 1 (inner): Compress before encryption
    .add_middleware(CompressionMiddleware, algorithm="gzip", min_size=128)
    # Layer 2: Encrypt compressed data
    .add_middleware(EncryptionMiddleware, key="your-secret-key-here!!", key_id="v1")
    # Layer 3 (outer): Metrics tracks everything
    .add_middleware(MetricsMiddleware)
    .build()
)

# Now you have: Metrics โ†’ Encryption โ†’ Compression โ†’ InMemory
cache.set("sensitive_data", {"secret": "data"})
value = cache.get("sensitive_data")
print(cache.get_stats())  # See metrics

๐Ÿงฑ Build Caches Fluently (Builder)

CacheBuilder and AsyncCacheBuilder compose middleware chains. Pass the backend (sync or async) as the constructor argument and stack layers:

Sync:

from cachine import CacheBuilder, RedisCache
from cachine.middleware import MetricsMiddleware

cache = (
    CacheBuilder(RedisCache.from_url("redis://localhost:6379/0", namespace="myapp"))
    .add_middleware(MetricsMiddleware)  # first added = inner; last = outer
    .build()
)

Async:

from cachine import AsyncCacheBuilder, AsyncRedisCache
from cachine.middleware import AsyncMetricsMiddleware, MetricsMiddleware

# Sync middleware classes are auto-mapped to their async counterparts
# when the builder recognises them (Metrics, FailOpen).
acache = (
    AsyncCacheBuilder(AsyncRedisCache.from_url("redis://localhost:6379/0", namespace="myapp"))
    .add_middleware(MetricsMiddleware)        # mapped to AsyncMetricsMiddleware
    # .add_middleware(AsyncMetricsMiddleware) # explicit async class also works
    .build()
)

Use the builder lazily by passing a factory:

from cachine import CacheBuilder, RedisCache
from cachine.decorators import cached

def _factory():
    return RedisCache.from_url("redis://localhost:6379/0")

builder = CacheBuilder(_factory)

@cached(cache=builder.as_factory(), ttl=60)
def compute(x):
    return x * 2

Serializers

Convert Python objects to bytes for Redis storage:

from cachine import RedisCache
from cachine.serializers import JSONSerializer, MsgPackSerializer, PickleSerializer

# JSON: Safe, human-readable, limited types
cache = RedisCache(host="localhost", serializer=JSONSerializer())
cache.set("data", {"a": 1, "b": [2, 3]})

# MsgPack: Fast, compact, binary
cache = RedisCache(host="localhost", serializer=MsgPackSerializer())
cache.set("data", {"complex": "object"})

# Pickle: All Python types, but UNSAFE for untrusted data
cache = RedisCache(host="localhost", serializer=PickleSerializer())
cache.set("data", {"any": "python object"})

Comparison:

Serializer Speed Size Safe? Types Supported
JSON Medium Large โœ… Yes Basic (dict, list, str, int, float, bool, None)
MsgPack Fast Small โœ… Yes Similar to JSON + bytes, datetime
Pickle Medium Medium โŒ No* All Python objects

*Pickle can execute arbitrary code during deserialization. Only use with trusted data.


Configuration & Factory

Constructing from URLs

Each backend exposes a from_url classmethod โ€” use it when your connection string comes from an env var or secret manager. The URL scheme uniquely identifies the backend, so code stays grep-able:

from cachine import RedisCache, AsyncRedisCache, SQLiteCache, AsyncSQLiteCache

# Sync Redis
cache = RedisCache.from_url("redis://localhost:6379/0", namespace="myapp")

# Async Redis
async_cache = AsyncRedisCache.from_url("redis://localhost:6379/0", namespace="myapp")

# With authentication
cache = RedisCache.from_url("redis://user:password@localhost:6379/0", namespace="myapp")

# With SSL/TLS
cache = RedisCache.from_url("rediss://localhost:6379/0", namespace="myapp")

# Redis Cluster
cache = RedisCache.from_url("redis://node1:7000,node2:7001,node3:7002", namespace="myapp")

# Redis Sentinel
cache = RedisCache.from_url(
    "redis+sentinel://mymaster/0?sentinels=s1:26379,s2:26379",
    namespace="myapp",
)

# SQLite (file-backed, persistent)
cache = SQLiteCache.from_url("sqlite:///tmp/cache.db", namespace="myapp")

# SQLite (in-memory, ephemeral)
cache = SQLiteCache.from_url("sqlite:///:memory:", namespace="myapp")

# SQLite with tuning parameters
cache = SQLiteCache.from_url(
    "sqlite:///tmp/cache.db?timeout=10&busy_timeout=3000&journal_mode=WAL&synchronous=NORMAL",
    namespace="myapp",
)

# Async SQLite mirrors the sync API
async_cache = AsyncSQLiteCache.from_url("sqlite:///tmp/cache.db", namespace="myapp")

URL Parameters

Configure connection behaviour via URL query parameters:

from cachine import RedisCache

# Timeout configuration
cache = RedisCache.from_url(
    "redis://localhost:6379/0?"
    "socket_timeout=5.0&"              # Read/write timeout (seconds)
    "socket_connect_timeout=2.0&"      # Initial connection timeout
    "retry_on_timeout=true&"           # Retry on timeout
    "decode_responses=true",           # Decode Redis responses to str
    namespace="myapp",
)

# Cluster with SSL and timeouts
cache = RedisCache.from_url(
    "rediss://user:pass@node1:7000,node2:7001?"
    "socket_timeout=10&"
    "retry_on_timeout=1",
    namespace="myapp",
)

Glossary

Key Terms

Cache Hit: When requested data is found in cache (fast โœ…) Cache Miss: When requested data is NOT in cache, must fetch from source (slow โŒ) TTL (Time To Live): How long cached data stays fresh before expiring (in seconds) Namespace: Prefix for all cache keys to prevent collisions (e.g., "prod:" vs "dev:")

Advanced Terms

Cache-Aside Pattern: Your code checks cache first, fetches from source on miss, then stores in cache

Singleflight: When multiple requests arrive for the same uncached key, only one computation runs. Others wait and share the result. Prevents "thundering herd".

Stale-While-Revalidate (SWR): Serve slightly old cached data while refreshing it in the background. Users always get fast responses.

Jitter: Random delay (0 to N seconds) added to TTL so cache entries don't all expire at the exact same time.

Tag-Based Invalidation: Group related cache entries with tags (like "user:123" or "products"), then invalidate all entries with a tag at once.

Eviction Policy: When cache is full, which entries to remove? LRU = remove least-recently-used, LFU = remove least-frequently-used.

Serialization: Converting Python objects to bytes for storage (and back). Required for Redis.

Middleware: Wrapper that adds functionality (compression, encryption, metrics) without changing cache API.


Performance Tips

1. Prevent Thundering Herd

Problem: 1000 requests hit expired cache at once โ†’ 1000 database queries

Solutions:

# Solution A: Add jitter
@cached(cache=cache, ttl=60, jitter=10)  # Expires between 60-70 seconds

# Solution B: Stale-while-revalidate
@cached(cache=cache, ttl=60, stale_ttl=120)  # Serve stale, refresh in background

# Solution C: Singleflight
@cached(cache=cache, ttl=60, singleflight=True)  # Only one computes

2. Choose Right Serializer

# Small data, human-readable: JSON
cache = RedisCache(serializer=JSONSerializer())

# Large data, need speed: MsgPack
cache = RedisCache(serializer=MsgPackSerializer())  # 2-3x faster than JSON

# Complex Python objects (trusted): Pickle
cache = RedisCache(serializer=PickleSerializer())  # Supports all Python types

3. Compress Only Large Data

from cachine import CacheBuilder
from cachine.middleware import CompressionMiddleware

cache = (
    CacheBuilder(base_cache)
    .add_middleware(CompressionMiddleware, algorithm="gzip", min_size=1024)  # Only compress > 1KB
    .build()
)

4. Set Appropriate TTL

# Frequently changing data: Short TTL
@cached(cache=cache, ttl=60)  # 1 minute

# Rarely changing data: Long TTL
@cached(cache=cache, ttl=86400)  # 24 hours

# Static data: Very long TTL
@cached(cache=cache, ttl=604800)  # 1 week

5. Use Namespaces

# Development
dev_cache = InMemoryCache(namespace="dev")

# Production
prod_cache = RedisCache(host="prod-redis", namespace="prod")

# Easy to clear: cache.clear() only affects your namespace

Troubleshooting

Redis Connection Issues

# Test connection
if cache.healthy():
    print("โœ… Connected to Redis")
else:
    print("โŒ Cannot connect to Redis")

# Full health check
health = cache.health()
print(health)  # {'healthy': True, 'latency_ms': 1.2, 'backend': 'redis'}

# Tune client timeouts (sync/async)
from cachine import RedisCache, AsyncRedisCache

rc = RedisCache(host="localhost", socket_timeout=2.5, socket_connect_timeout=1.0, retry_on_timeout=True)
arc = AsyncRedisCache(host="localhost", socket_timeout=2.5, socket_connect_timeout=1.0, retry_on_timeout=True)

Serialization Errors

Error: JSONDecodeError or PickleError

Solution: configure a single serializer at the cache level โ€” don't switch serializers mid-flight. The old per-call serializer= kwarg on get/set is deprecated:

# โŒ Don't switch serializers for the same key
from cachine import RedisCache
from cachine.serializers import JSONSerializer, PickleSerializer

json_cache = RedisCache(host="localhost", serializer=JSONSerializer())
pickle_cache = RedisCache(host="localhost", serializer=PickleSerializer())

json_cache.set("key", data)
pickle_cache.get("key")  # Error: encoded with JSON, decoded with Pickle

# โœ… Pick one serializer per cache instance and stick with it
cache = RedisCache(host="localhost", serializer=JSONSerializer())
cache.set("key", data)
cache.get("key")

Missing Dependencies

# ModuleNotFoundError: No module named 'redis'
pip install redis

# ModuleNotFoundError: No module named 'cryptography'
pip install cryptography

# ModuleNotFoundError: No module named 'msgpack'
pip install msgpack

Cache Not Clearing

# Requires namespace OR all=True
cache = InMemoryCache(namespace="myapp")
cache.clear()  # โœ… Works โ€” only clears keys in "myapp"

cache = InMemoryCache()  # No namespace
cache.clear()  # โŒ Raises error (safety check)
cache.clear(all=True)  # โœ… Clears EVERYTHING

API Reference

Cache Operations

# Get/Set
cache.get(key, default=None)
cache.set(key, value, ttl=None)
cache.delete(key)
cache.exists(key)
cache.clear()

# TTL Management
cache.ttl(key)                    # Get remaining TTL in seconds
cache.expire(key, ttl=60)         # Set new TTL (seconds or timedelta)
cache.expire_at(key, when=datetime)  # Set absolute expiration
cache.persist(key)                # Remove TTL (never expires)
cache.touch(key, ttl=None)        # Update last access time, optionally set TTL

# Counters
cache.incr(key, delta=1, ttl_if_new=None)
cache.decr(key, delta=1)

# Tags
cache.invalidate_tags(tags)       # Remove all entries with these tags
cache.add_tags(key, tags)         # Add tags to existing entry

# Utility
cache.get_or_set(key, factory, ttl=None)  # Get cached or compute & cache
cache.health()                    # Health status dict
cache.healthy()                   # Boolean shortcut
cache.close()                     # Close connections

Decorator Parameters

@cached(
    cache,                  # Cache instance (required)
    ttl=None,              # Seconds to cache (int or timedelta)
    key_builder=None,      # Custom key function
    condition=None,        # Cache only if condition(result) is True
    cache_none=False,      # Cache None results?
    jitter=None,           # Random 0-N seconds added to TTL
    stale_ttl=None,        # Stale-while-revalidate window
    singleflight=False,    # Prevent duplicate computations
    tags=None,             # Function to generate tags from args
    tags_from_result=None, # Function to generate tags from result
    version=None,          # Version string for cache busting
)

Examples & Recipes

Check out real-world examples:


Contributing

We welcome contributions! Please see CONTRIBUTING.md for:

  • Development setup
  • Running tests
  • Code style guidelines
  • Pull request process

License

Apache-2.0 OR MIT - choose whichever works best for your project.

See LICENSE-APACHE and LICENSE-MIT for details.


Acknowledgments

Built with โค๏ธ using:


Support & Community


Happy caching! ๐Ÿš€

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cachine-0.1.2.tar.gz (66.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cachine-0.1.2-py3-none-any.whl (82.5 kB view details)

Uploaded Python 3

File details

Details for the file cachine-0.1.2.tar.gz.

File metadata

  • Download URL: cachine-0.1.2.tar.gz
  • Upload date:
  • Size: 66.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.10.13 Darwin/25.4.0

File hashes

Hashes for cachine-0.1.2.tar.gz
Algorithm Hash digest
SHA256 6fadfb288ebd51097bf4bf113b013bceab98096e4d0007aa844d7223176d0b8a
MD5 76d23473c6bbdc5fb36c94d22b012ecb
BLAKE2b-256 caae0123a4066bd4c1c0b5e3fce99678ec619cec5f9248489dfb96f1cc2d5bfa

See more details on using hashes here.

File details

Details for the file cachine-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: cachine-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 82.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.10.13 Darwin/25.4.0

File hashes

Hashes for cachine-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 3157f0393886710252f87f01143ec0db84b589c8cf23735833f88618b691d7a0
MD5 0e16c146d26ffa76da42519dd0daa0ff
BLAKE2b-256 7f4c33fa671c7c79be5b326da486a91ba5732145c870f02a25e9f98bd1359460

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page