Skip to main content

Multi-tier async caching with request coalescing, thundering herd prevention, and semantic similarity matching.

Project description

stampede

Multi-tier async caching with request coalescing, thundering herd prevention, and semantic similarity matching for Python.

Built for LLM-heavy backends where duplicate and near-duplicate requests are expensive.

What it does

Request 1 ─┐
Request 2 ─┼──▶ Single Execution ──▶ Result shared by all
Request 3 ─┘

Without stampede, 3 identical requests = 3 LLM calls = 3x cost. With stampede, 3 identical requests = 1 LLM call = 1x cost.

Install

# Core (zero deps, pure asyncio)
pip install stampede

# With Redis distributed caching
pip install stampede[redis]

# With semantic similarity caching (pgvector)
pip install stampede[semantic]

# Everything
pip install stampede[all]

Quick Start

Request Coalescing (in-flight dedup)

Multiple concurrent calls with the same key share a single execution:

from stampede import coalesce

@coalesce(ttl=60)
async def generate_course(topic: str) -> str:
    return await llm.generate(topic)  # Only called once for concurrent identical requests

Content-Based Coalescing

For LLM workloads where exact-match keys are too strict:

from stampede import coalesce_by_content

@coalesce_by_content(ttl=300, content_fields=["topic", "context"])
async def generate_content(topic: str, context: str, user_id: str) -> str:
    # user_id excluded from cache key — different users share results
    return await llm.generate(topic, context)

TTL Cache with Thundering Herd Prevention

from stampede import cached, async_cached, Cache, ValkeyBackend

# Decorator API
@cached(ttl=300)
def fetch_data(key: str) -> dict:
    return expensive_computation(key)

@async_cached(ttl=3600)
async def search(query: str) -> list:
    return await api_call(query)

# Programmatic API with Redis backend
backend = ValkeyBackend.from_url("redis://localhost:6379/0")
cache = Cache(namespace="my_service", backend=backend)
cache.get_or_set("key", lambda: compute(), ttl=300)  # Atomic, stampede-safe

Distributed Coalescing (multi-instance)

Cross-instance dedup for Cloud Run, Kubernetes, etc:

from stampede import distributed_coalesce
import redis

redis_client = redis.from_url("redis://localhost:6379/0")

@distributed_coalesce(ttl=60, redis_client=redis_client)
async def generate(query: str) -> str:
    return await llm.generate(query)

Semantic Cache (pgvector)

Cache by meaning, not exact text. "What courses for engineering?" hits the same cache as "Recommend engineering courses":

from stampede import semantic_coalesce

@semantic_coalesce(
    ttl=300,
    threshold=0.92,       # Cosine similarity threshold
    embed_fn=my_embed,    # async (str) -> list[float]
    pool=my_pg_pool,      # asyncpg.Pool with pgvector
)
async def answer(query: str) -> str:
    return await llm.generate(query)

Tiered lookup: Redis exact hash (<1ms) → PostgreSQL exact hash → pgvector HNSW similarity → cache miss.

Architecture

┌─────────────────────────────────────────────────────────┐
│                      stampede                           │
├──────────────┬──────────────┬──────────────┬────────────┤
│  coalesce    │    cache     │ distributed  │  semantic  │
│              │              │              │            │
│ In-flight    │ Memory +     │ Redis locks  │ pgvector   │
│ dedup        │ Redis/Valkey │ + polling    │ + Redis    │
│ TTL cache    │ Lua scripts  │ Lua scripts  │ hot-path   │
│ Stats        │ Thundering   │ Cross-inst.  │ HNSW       │
│              │ herd prev.   │ fallback     │ Embeddings │
└──────────────┴──────────────┴──────────────┴────────────┘

Observability

Every component tracks stats:

from stampede import get_all_coalescer_stats

stats = get_all_coalescer_stats()
# {'skills': {'requests': 1000, 'cache_hits': 450, 'coalesce_hits': 200,
#             'executions': 350, 'savings_rate': '65.0%', ...}}

Configuration

import stampede

# Optional: plug in OpenTelemetry tracing
stampede.configure(tracer=my_otel_tracer)

Optional Dependencies

Extra Package What it enables
blake3 blake3 3-5x faster hashing (falls back to SHA256)
orjson orjson 10-50x faster JSON serialization
redis redis Distributed cache + coalescing + thundering herd Lua scripts
semantic asyncpg Semantic similarity caching with pgvector

Semantic Cache Schema

If using the semantic cache, create this table (or add to your migrations):

CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE semantic_cache (
    id BIGSERIAL PRIMARY KEY,
    namespace TEXT NOT NULL,
    query_hash TEXT NOT NULL,
    query_normalized TEXT NOT NULL,
    embedding vector(1536) NOT NULL,
    response TEXT NOT NULL,
    hits INTEGER DEFAULT 0,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    expires_at TIMESTAMPTZ,
    UNIQUE(namespace, query_hash)
);

CREATE INDEX ON semantic_cache USING hnsw (embedding vector_cosine_ops);

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stampede_cache-0.1.0.tar.gz (44.2 kB view details)

Uploaded Source

File details

Details for the file stampede_cache-0.1.0.tar.gz.

File metadata

  • Download URL: stampede_cache-0.1.0.tar.gz
  • Upload date:
  • Size: 44.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.0

File hashes

Hashes for stampede_cache-0.1.0.tar.gz
Algorithm Hash digest
SHA256 df78b0266f471d323e1a934879a1bc5efaabc866ec57db5ccd7db6603feb1bf6
MD5 acfaf1dff3e5a270e6c4273617de1b2b
BLAKE2b-256 1940d5fd3b641cdb7e73451e274e21f4a1f1d711eebbee43cbefc5a6f5e0acd0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page