Skip to main content

cache engine

Project description

Cachine

A simple, powerful caching library for Python that makes your applications faster.

Stop waiting for slow API calls, database queries, and expensive computations. Cache the results and reuse them instantly.

Why Cachine?

  • โšก 2-10x faster responses - Cache expensive operations and serve results in microseconds
  • ๐ŸŽฏ Dead simple - One decorator is all you need: @cached(cache, ttl=60)
  • ๐Ÿ”ง Start small, scale big - Begin with in-memory, upgrade to Redis when you need distributed caching
  • ๐Ÿš€ Production-ready - Encryption, compression, metrics, clustering, and high-availability out of the box
  • ๐Ÿ Modern Python - Full type hints, sync & async support, Python 3.9+

Perfect for: API response caching, database query caching, expensive computations, rate limiting, session storage.


Table of Contents


What is Caching?

Caching stores the results of expensive operations so you don't have to repeat them.

Without caching:

def get_user(user_id):
    return database.query(f"SELECT * FROM users WHERE id={user_id}")  # 50ms every time โŒ

With caching:

@cached(cache, ttl=300)
def get_user(user_id):
    return database.query(f"SELECT * FROM users WHERE id={user_id}")  # 50ms first time, <1ms after โœ…

First call: Fetches from database (50ms) Cached calls: Returns from cache (<1ms) - 50x faster!


Installation

Basic installation (in-memory caching):

pip install cachine

With Redis (for distributed caching):

pip install cachine redis

Optional extras:

pip install msgpack       # Fast binary serialization
pip install cryptography  # Encryption middleware

Requirements: Python 3.9+, redis-py 4.0+ (optional)


Quickstart

Step 1: Your First Cache

Start with simple get/set operations:

from cachine import InMemoryCache

# Create a cache
cache = InMemoryCache()

# Store a value (expires after 60 seconds)
cache.set("user:123", {"name": "Alice", "role": "admin"}, ttl=60)

# Retrieve it
user = cache.get("user:123")
print(user)  # {'name': 'Alice', 'role': 'admin'}

# Check if it exists
if cache.exists("user:123"):
    print("User is cached!")

# Remove it
cache.delete("user:123")

Step 2: Cache Expensive Functions

The @cached decorator automatically caches function results:

from cachine import InMemoryCache, cached
import time

cache = InMemoryCache()

@cached(cache=cache, ttl=60)
def expensive_computation(x):
    print(f"Computing {x}...")
    time.sleep(2)  # Simulate slow operation
    return x * 2

# First call: takes 2 seconds
result = expensive_computation(21)  # Prints "Computing 21..." and waits
# => 42

# Subsequent calls: instant! (returns from cache)
result = expensive_computation(21)  # Returns immediately
# => 42 (no print, no wait)

Step 3: Scale to Redis

Share cache across multiple servers with Redis:

from cachine import cached
from cachine import RedisCache
from cachine.serializers import JSONSerializer

# Create Redis cache
cache = RedisCache(
    host="localhost",
    port=6379,
    namespace="myapp",  # Prefix all keys with "myapp:"
    serializer=JSONSerializer()
)

@cached(cache=cache, ttl=300)  # Cache for 5 minutes
def get_user(user_id):
    # Now cached across ALL your servers!
    return database.query_user(user_id)

user = get_user(123)

Step 4: Async Support

Full async/await support for async applications:

import asyncio
from cachine.decorators import cached
from cachine import AsyncRedisCache

cache = AsyncRedisCache(host="localhost", namespace="myapp")

@cached(cache=cache, ttl=60)
async def fetch_data(item_id):
    # Simulate async API call
    await asyncio.sleep(1)
    return {"id": item_id, "data": "..."}

async def main():
    result = await fetch_data(1)  # Slow first time
    result = await fetch_data(1)  # Fast from cache

asyncio.run(main())

Common Use Cases

๐ŸŒ Caching API Responses

import requests

@cached(cache=cache, ttl=300)  # Cache for 5 minutes
def fetch_weather(city):
    response = requests.get(f"https://api.weather.com/forecast/{city}")
    return response.json()

# First call: hits the API (slow)
weather = fetch_weather("London")

# Next 5 minutes: instant responses from cache
weather = fetch_weather("London")  # โšก Fast!

๐Ÿ—„๏ธ Caching Database Queries

from sqlalchemy.orm import Session

@cached(cache=cache, ttl=3600, tags=lambda user_id: [f"user:{user_id}"])
def get_user_profile(user_id: int):
    db: Session = get_db()
    user = db.query(User).filter_by(id=user_id).first()
    return user.to_dict() if user else None

# Cache for 1 hour, invalidate with tags
profile = get_user_profile(123)

# When user updates, invalidate their cache
cache.invalidate_tags(["user:123"])

๐Ÿ”„ Preventing Duplicate Work (Singleflight)

When many requests come in at once, only compute once:

@cached(cache=cache, ttl=60, singleflight=True)
def generate_report():
    # Only ONE server generates this, even if 1000 requests come in
    # Others wait and get the same result
    time.sleep(10)  # Expensive operation
    return create_monthly_report()

# 1000 concurrent requests = 1 computation

๐Ÿ“Š Rate Limiting with Counters

from datetime import timedelta

def check_rate_limit(user_id: str, max_requests: int = 100):
    key = f"ratelimit:{user_id}"

    # Increment counter, set TTL on first request
    count = cache.incr(key, delta=1, ttl_on_create=60)

    if count > max_requests:
        raise Exception(f"Rate limit exceeded: {count}/{max_requests}")

    return count

# Allow 100 requests per minute per user
check_rate_limit("user123", max_requests=100)

๐Ÿ”– Tag-Based Invalidation

Invalidate related cache entries together:

@cached(
    cache=cache,
    ttl=3600,
    tags=lambda user_id: ["users", f"user:{user_id}"]
)
def get_user(user_id):
    return fetch_from_db(user_id)

@cached(
    cache=cache,
    ttl=3600,
    tags=lambda user_id: ["users", f"user:{user_id}"]
)
def get_user_orders(user_id):
    return fetch_orders_from_db(user_id)

# Both functions tagged with "users" and "user:123"
user = get_user(123)
orders = get_user_orders(123)

# Invalidate ALL user-related cache at once
removed = cache.invalidate_tags(["user:123"])
# Both get_user(123) and get_user_orders(123) are now cleared

Choosing the Right Backend

๐Ÿ“ฆ InMemoryCache

When to use:

  • โœ… Single server/process application
  • โœ… Maximum performance needed (<1ฮผs access)
  • โœ… Cache can be lost on restart (transient data)
  • โœ… Limited memory usage

When NOT to use:

  • โŒ Multiple servers need to share cache
  • โŒ Cache must survive restarts
  • โŒ Cache size > available RAM

Example:

from cachine import InMemoryCache
from cachine.strategies import LRUEviction

cache = InMemoryCache(
    max_size=10000,           # Limit to 10k entries
    eviction_policy=LRUEviction(),  # Evict least-recently-used
    namespace="myapp"         # Prefix all keys
)

cache.set("key", "value", ttl=300)

๐Ÿ”ด RedisCache (Single Instance)

When to use:

  • โœ… Multiple servers need shared cache
  • โœ… Cache should survive restarts
  • โœ… Simple deployment (single Redis server)

When NOT to use:

  • โŒ Need high availability (use Sentinel)
  • โŒ Cache size > single server RAM (use Cluster)

Example:

from cachine import RedisCache
from cachine.serializers import JSONSerializer

cache = RedisCache(
    host="localhost",
    port=6379,
    db=0,
    password="your-password",  # Optional
    namespace="myapp",
    serializer=JSONSerializer()
)

๐Ÿ”ด๐Ÿ”ด๐Ÿ”ด Redis Cluster

When to use:

  • โœ… Need horizontal scaling
  • โœ… High availability required
  • โœ… Cache size > single server RAM
  • โœ… Production workloads

Example:

from cachine.backends.redis.cluster import RedisClusterCache

cache = RedisClusterCache(
    nodes=[
        {"host": "redis1.example.com", "port": 7000},
        {"host": "redis2.example.com", "port": 7001},
        {"host": "redis3.example.com", "port": 7002},
    ],
    namespace="myapp"
)

๐Ÿ›ก๏ธ Redis Sentinel (High Availability)

When to use:

  • โœ… Need automatic failover
  • โœ… Master goes down โ†’ automatic promotion
  • โœ… Production reliability critical

Example:

from cachine.backends.redis.sentinel import RedisSentinelCache

cache = RedisSentinelCache(
    sentinels=[
        ("sentinel1.example.com", 26379),
        ("sentinel2.example.com", 26379),
        ("sentinel3.example.com", 26379),
    ],
    service_name="mymaster",
    namespace="myapp"
)

Advanced Decorator Features

Custom Key Generation

Control exactly how cache keys are created:

from cachine.decorators.cached import KeyContext

def my_key_builder(ctx: KeyContext, user_id: int, include_details: bool = False):
    # ctx.full_name = "mymodule.myfunction"
    # ctx.version = "v2" (if specified)
    return f"user:{user_id}:details={include_details}:version={ctx.version}"

@cached(
    cache=cache,
    ttl=300,
    key_builder=my_key_builder,
    version="v2"  # Change version to invalidate all old cache
)
def get_user(user_id: int, include_details: bool = False):
    return fetch_user_data(user_id, include_details)

Stale-While-Revalidate (SWR)

Serve stale data while refreshing in background:

@cached(
    cache=cache,
    ttl=60,         # Fresh for 60 seconds
    stale_ttl=120   # Serve stale for additional 60s while refreshing
)
def get_dashboard_data():
    # Users ALWAYS get fast response:
    # - Within 60s: fresh data
    # - 60-120s: stale data + background refresh started
    # - After 120s: cache miss, compute new
    return expensive_dashboard_computation()

Conditional Caching

Cache only when certain conditions are met:

@cached(
    cache=cache,
    ttl=300,
    condition=lambda result: result is not None and result.get("status") == "success"
)
def fetch_api_data(endpoint):
    response = requests.get(endpoint)
    # Only cache successful responses
    return response.json()

Don't Cache None

@cached(cache=cache, ttl=60, cache_none=False)
def find_user(email):
    user = database.find_by_email(email)
    return user  # None is NOT cached, forces fresh lookup

Add Jitter to Prevent Stampedes

@cached(
    cache=cache,
    ttl=60,
    jitter=10  # Adds 0-10 seconds randomly to TTL
)
def popular_data():
    # If 1000 entries expire at same time โ†’ 1000 cache misses
    # With jitter: they expire at different times
    return expensive_operation()

Middleware (Optional Power Features)

Add compression, encryption, or metrics by wrapping your cache:

๐Ÿ“Š Track Cache Performance

from cachine import InMemoryCache
from cachine.middleware import MetricsMiddleware

base = InMemoryCache()
cache = MetricsMiddleware(base)

# Use cache normally
cache.set("key1", "value1")
cache.get("key1")
cache.get("key2")  # Miss

# Check performance
stats = cache.get_stats()
print(stats)
# {
#   'hits': 1,
#   'misses': 1,
#   'hit_rate': 0.5,
#   'errors': 0,
#   'avg_latency_ms': 0.023
# }

๐Ÿ—œ๏ธ Compress Large Values

from cachine.middleware import CompressionMiddleware

base = InMemoryCache()
cache = CompressionMiddleware(
    base,
    algorithm="gzip",  # or "zlib"
    min_size=1024      # Only compress values > 1KB
)

# Large values automatically compressed
large_json = {"data": "x" * 10000}
cache.set("big_data", large_json, serializer=JSONSerializer())

# Automatically decompressed on get
result = cache.get("big_data", serializer=JSONSerializer())

๐Ÿ” Encrypt Sensitive Data

from cachine.middleware import EncryptionMiddleware

base = InMemoryCache()
cache = EncryptionMiddleware(
    base,
    key="your-32-character-secret-key!!",  # Keep this secret!
    key_id="v1"  # For key rotation
)

# Data encrypted at rest
cache.set("user_ssn", "123-45-6789")
cache.set("api_key", "secret-api-key-xyz")

# Automatically decrypted on get
ssn = cache.get("user_ssn")  # "123-45-6789"

๐Ÿ”— Stack Multiple Middleware

Order matters! Stack from inside-out:

from cachine.middleware import CompressionMiddleware, EncryptionMiddleware, MetricsMiddleware
from cachine.serializers import JSONSerializer

# Layer 1: Base cache
base = InMemoryCache(namespace="secure")

# Layer 2: Compress (first, before encryption)
cache = CompressionMiddleware(base, algorithm="gzip", min_size=128)

# Layer 3: Encrypt (second, encrypts compressed data)
cache = EncryptionMiddleware(cache, key="your-secret-key-here!!", key_id="v1")

# Layer 4: Metrics (outer layer, tracks everything)
cache = MetricsMiddleware(cache)

# Now you have: Metrics โ†’ Encryption โ†’ Compression โ†’ InMemory
# Data flow on SET: value โ†’ compress โ†’ encrypt โ†’ store
# Data flow on GET: fetch โ†’ decrypt โ†’ decompress โ†’ return

cache.set("sensitive_data", {"secret": "data"}, serializer=JSONSerializer())
value = cache.get("sensitive_data", serializer=JSONSerializer())
print(cache.get_stats())  # See metrics

Serializers

Convert Python objects to bytes for Redis storage:

from cachine.serializers import JSONSerializer, PickleSerializer, MsgPackSerializer
from caching.backends.redis.sync import RedisCache

# JSON: Safe, human-readable, limited types
cache = RedisCache(host="localhost", serializer=JSONSerializer())
cache.set("data", {"a": 1, "b": [2, 3]})

# MsgPack: Fast, compact, binary
cache = RedisCache(host="localhost", serializer=MsgPackSerializer())
cache.set("data", {"complex": "object"})

# Pickle: All Python types, but UNSAFE for untrusted data
cache = RedisCache(host="localhost", serializer=PickleSerializer())
cache.set("data", any_python_object)

Comparison:

Serializer Speed Size Safe? Types Supported
JSON Medium Large โœ… Yes Basic (dict, list, str, int, float, bool, None)
MsgPack Fast Small โœ… Yes Similar to JSON + bytes, datetime
Pickle Medium Medium โŒ No* All Python objects

*Pickle can execute arbitrary code during deserialization. Only use with trusted data.


Configuration & Factory

From Dictionary

from cachine import create_cache

config = {
    "backend": "redis",
    "host": "localhost",
    "port": 6379,
    "db": 0,
    "namespace": "myapp",
    "password": "optional-password"
}

# Create sync cache
cache = create_cache(config, mode="sync")

# Create async cache
cache = create_cache(config, mode="async")

From Environment Variables

# .env file
CACHE_BACKEND=redis
CACHE_HOST=localhost
CACHE_PORT=6379
CACHE_DB=0
CACHE_PASSWORD=your-password
CACHE_SSL=false
CACHE_NAMESPACE=myapp
from cachine import create_cache

# Reads environment variables
cache = create_cache.from_env(mode="sync")

Glossary

Key Terms

Cache Hit: When requested data is found in cache (fast โœ…) Cache Miss: When requested data is NOT in cache, must fetch from source (slow โŒ) TTL (Time To Live): How long cached data stays fresh before expiring (in seconds) Namespace: Prefix for all cache keys to prevent collisions (e.g., "prod:" vs "dev:")

Advanced Terms

Cache-Aside Pattern: Your code checks cache first, fetches from source on miss, then stores in cache

Singleflight: When multiple requests arrive for the same uncached key, only one computation runs. Others wait and share the result. Prevents "thundering herd".

Stale-While-Revalidate (SWR): Serve slightly old cached data while refreshing it in the background. Users always get fast responses.

Jitter: Random delay (0 to N seconds) added to TTL so cache entries don't all expire at the exact same time.

Tag-Based Invalidation: Group related cache entries with tags (like "user:123" or "products"), then invalidate all entries with a tag at once.

Eviction Policy: When cache is full, which entries to remove? LRU = remove least-recently-used, LFU = remove least-frequently-used.

Serialization: Converting Python objects to bytes for storage (and back). Required for Redis.

Middleware: Wrapper that adds functionality (compression, encryption, metrics) without changing cache API.


Performance Tips

1. Prevent Thundering Herd

Problem: 1000 requests hit expired cache at once โ†’ 1000 database queries

Solutions:

# Solution A: Add jitter
@cached(cache=cache, ttl=60, jitter=10)  # Expires between 60-70 seconds

# Solution B: Stale-while-revalidate
@cached(cache=cache, ttl=60, stale_ttl=120)  # Serve stale, refresh in background

# Solution C: Singleflight
@cached(cache=cache, ttl=60, singleflight=True)  # Only one computes

2. Choose Right Serializer

# Small data, human-readable: JSON
cache = RedisCache(serializer=JSONSerializer())

# Large data, need speed: MsgPack
cache = RedisCache(serializer=MsgPackSerializer())  # 2-3x faster than JSON

# Complex Python objects (trusted): Pickle
cache = RedisCache(serializer=PickleSerializer())  # Supports all Python types

3. Compress Only Large Data

cache = CompressionMiddleware(
    base_cache,
    algorithm="gzip",
    min_size=1024  # Only compress > 1KB (avoid overhead on small values)
)

4. Set Appropriate TTL

# Frequently changing data: Short TTL
@cached(cache=cache, ttl=60)  # 1 minute

# Rarely changing data: Long TTL
@cached(cache=cache, ttl=86400)  # 24 hours

# Static data: Very long TTL
@cached(cache=cache, ttl=604800)  # 1 week

5. Use Namespaces

# Development
dev_cache = InMemoryCache(namespace="dev")

# Production
prod_cache = RedisCache(host="prod-redis", namespace="prod")

# Easy to clear: cache.clear() only affects your namespace

Troubleshooting

Redis Connection Issues

# Test connection
if cache.ping_ok():
    print("โœ… Connected to Redis")
else:
    print("โŒ Cannot connect to Redis")

# Full health check
health = cache.ping()
print(health)  # {'healthy': True, 'latency_ms': 1.2, 'backend': 'redis'}

Serialization Errors

Error: JSONDecodeError or PickleError

Solution: Make sure you use the same serializer for get/set:

# โŒ Wrong: Different serializers
cache.set("key", data, serializer=JSONSerializer())
result = cache.get("key", serializer=PickleSerializer())  # Error!

# โœ… Correct: Same serializer
cache.set("key", data, serializer=JSONSerializer())
result = cache.get("key", serializer=JSONSerializer())

Missing Dependencies

# ModuleNotFoundError: No module named 'redis'
pip install redis

# ModuleNotFoundError: No module named 'cryptography'
pip install cryptography

# ModuleNotFoundError: No module named 'msgpack'
pip install msgpack

Cache Not Clearing

# Requires namespace OR dangerously_clear_all=True
cache = InMemoryCache(namespace="myapp")
cache.clear()  # โœ… Works

cache = InMemoryCache()  # No namespace
cache.clear()  # โŒ Raises error (safety check)
cache.clear(dangerously_clear_all=True)  # โœ… Works but clears EVERYTHING

API Reference

Cache Operations

# Get/Set
cache.get(key, default=None)
cache.set(key, value, ttl=None)
cache.delete(key)
cache.exists(key)
cache.clear()

# TTL Management
cache.ttl(key)                    # Get remaining TTL in seconds
cache.expire(key, ttl=60)         # Set new TTL (seconds or timedelta)
cache.expire_at(key, when=datetime)  # Set absolute expiration
cache.persist(key)                # Remove TTL (never expires)
cache.touch(key, ttl=None)        # Update last access time, optionally set TTL

# Counters
cache.incr(key, delta=1, ttl_on_create=None)
cache.decr(key, delta=1)

# Tags
cache.invalidate_tags(tags)       # Remove all entries with these tags
cache.add_tags(key, tags)         # Add tags to existing entry

# Utility
cache.get_or_set(key, factory, ttl=None)  # Get cached or compute & cache
cache.ping()                      # Health check
cache.ping_ok()                   # Boolean health check
cache.close()                     # Close connections

Decorator Parameters

@cached(
    cache,                  # Cache instance (required)
    ttl=None,              # Seconds to cache (int or timedelta)
    key_builder=None,      # Custom key function
    condition=None,        # Cache only if condition(result) is True
    cache_none=False,      # Cache None results?
    jitter=None,           # Random 0-N seconds added to TTL
    stale_ttl=None,        # Stale-while-revalidate window
    singleflight=False,    # Prevent duplicate computations
    tags=None,             # Function to generate tags from args
    tags_from_result=None, # Function to generate tags from result
    version=None,          # Version string for cache busting
)

Examples & Recipes

Check out real-world examples:


Contributing

We welcome contributions! Please see CONTRIBUTING.md for:

  • Development setup
  • Running tests
  • Code style guidelines
  • Pull request process

License

Apache-2.0 OR MIT - choose whichever works best for your project.

See LICENSE-APACHE and LICENSE-MIT for details.


Acknowledgments

Built with โค๏ธ using:


Support & Community


Happy caching! ๐Ÿš€

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cachine-0.1.0rc1.tar.gz (46.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cachine-0.1.0rc1-py3-none-any.whl (53.9 kB view details)

Uploaded Python 3

File details

Details for the file cachine-0.1.0rc1.tar.gz.

File metadata

  • Download URL: cachine-0.1.0rc1.tar.gz
  • Upload date:
  • Size: 46.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.10.13 Darwin/24.1.0

File hashes

Hashes for cachine-0.1.0rc1.tar.gz
Algorithm Hash digest
SHA256 67cf1b7bb146839ae2f0a55a719c809f6fd0dbaf45721bcbcdc665eb717213bc
MD5 0f779b6d11da289f223aeefa7464a209
BLAKE2b-256 7114125de8b0bdf20fe9b1a0276d72012ccad20212ddb41d423402122979bc0b

See more details on using hashes here.

File details

Details for the file cachine-0.1.0rc1-py3-none-any.whl.

File metadata

  • Download URL: cachine-0.1.0rc1-py3-none-any.whl
  • Upload date:
  • Size: 53.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.10.13 Darwin/24.1.0

File hashes

Hashes for cachine-0.1.0rc1-py3-none-any.whl
Algorithm Hash digest
SHA256 caad57a1bcd877f9d951373fcdabdd587badcb723c3f74eb1d2b10b9db6b2258
MD5 6e1763d6c3c9df398fa65e5e94ac9f42
BLAKE2b-256 c69242acdd806e69ae5087233cb91068190d317c6e4325febb574b7e71b97daf

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page