Semantic Memory for AI Text-to-Query systems
Project description
Medha
Semantic Memory for AI Data Agents
Reduce LLM latency and costs by caching Text-to-Query generations (SQL, Cypher, GraphQL) with semantic understanding.
What is Medha?
Medha is an asynchronous, high-performance semantic cache library designed specifically for Text-to-Query systems.
Unlike traditional key-value caches that require exact string matches, Medha understands that "Show me the top 5 users" and "List the first five users" are the same question. It intercepts these queries and returns pre-calculated database queries (SQL, Cypher, etc.), bypassing the expensive and slow LLM generation step.
Why Medha?
- 100x Faster: Return cached queries in milliseconds vs. seconds for LLM generation.
- Cost Efficient: Reduce API calls to OpenAI/Anthropic by 40-60%.
- Agnostic: Works with SQL, Cypher (Neo4j), GraphQL, or any text-based query language.
- Async Native: Built on
asynciofor high-concurrency API backends. - Pluggable: Swap embedders (FastEmbed, OpenAI) and vector backends independently.
The "Waterfall" Architecture
Medha uses a sophisticated multi-tier search strategy to maximize cache hits. If a tier fails, it cascades to the next:
- Tier 0: L1 Memory (LRU)
- Speed: < 1ms
- Exact hash match for identical, repeated questions.
- Tier 1: Template Matching (Intent)
- Speed: ~10ms
- Recognizes patterns like "Show employees in {department}". Extracts parameters and injects them into a cached query template.
- Tier 2 + 3: Exact Vector Match & Semantic Similarity (run in parallel)
- Speed: ~25ms (concurrent, not sequential)
- Exact match uses a high threshold (≥ 0.99); Semantic uses a lower one (≥ 0.90). Both vector queries are fired simultaneously via
asyncio.gatherand the best result is chosen.
- Tier 4: Fuzzy Fallback
- Speed: Variable
- Handles typos and minor string variations using Levenshtein distance.
Installation
Core (minimal)
pip install medha-archai
Core dependencies: pydantic, pydantic-settings.
Breaking change in 0.3.1:
qdrant-clientis no longer a core dependency. Install it explicitly withpip install "medha-archai[qdrant]". The defaultbackend_typeis now"memory".
With an embedding provider
# Local embeddings with FastEmbed (recommended for getting started)
pip install "medha-archai[fastembed]"
# OpenAI embeddings
pip install "medha-archai[openai]"
# Cohere Embed v3
pip install "medha-archai[cohere]"
# Google Gemini embeddings
pip install "medha-archai[gemini]"
With a vector backend
# Qdrant (Docker / Cloud)
pip install "medha-archai[qdrant]"
# PostgreSQL + pgvector
pip install "medha-archai[pgvector]"
# Elasticsearch 8.x
pip install "medha-archai[elasticsearch]"
# PostgreSQL + VectorChord
pip install "medha-archai[vectorchord]"
# ChromaDB
pip install "medha-archai[chroma]"
# Weaviate
pip install "medha-archai[weaviate]"
# Redis Stack (vector backend + L1 cache)
pip install "medha-archai[redis]"
# Azure AI Search
pip install "medha-archai[azure-search]"
# LanceDB (embedded / S3 / GCS / Azure Blob)
pip install "medha-archai[lancedb]"
With optional extras
# Fuzzy matching (Tier 4 - Levenshtein distance)
pip install "medha-archai[fuzzy]"
# spaCy NLP for parameter extraction (pre-trained, fixed entity types, ~15 MB model)
pip install "medha-archai[nlp]"
python -m spacy download en_core_web_sm
# GLiNER NLP for zero-shot parameter extraction (uses param names as labels, ~500 MB model)
pip install "medha-archai[gliner]"
# All optional dependencies (excluding ChromaDB for env compatibility)
pip install "medha-archai[all-no-chroma]"
# Everything
pip install "medha-archai[all]"
Install from source
# From GitHub
!pip install "medha-archai[all] @ git+https://github.com/ArchAI-Labs/medha.git"
# Development install
git clone https://github.com/ArchAI-Labs/medha.git
cd medha
pip install -e ".[dev,all]"
Quick Start
import asyncio
from medha import Medha
from medha.embeddings.fastembed_adapter import FastEmbedAdapter
async def main():
embedder = FastEmbedAdapter()
cache = Medha(collection_name="text2sql_cache", embedder=embedder)
async with cache:
question = "How many users are active?"
# 1. Search the cache
hit = await cache.search(question)
if hit.strategy.value != "no_match":
print(f"Cache Hit! Strategy: {hit.strategy.value}")
print(f"Query: {hit.generated_query}")
print(f"Confidence: {hit.confidence:.2f}")
else:
print("Cache Miss. Calling LLM...")
generated_sql = "SELECT count(*) FROM users WHERE status = 'active';"
# 2. Store the result for next time
await cache.store(
question=question,
generated_query=generated_sql,
)
print("Stored in cache.")
if __name__ == "__main__":
asyncio.run(main())
Choosing a Backend
| Backend | Extra | Persistence | Best For |
|---|---|---|---|
memory (default) |
(none) | No | Testing, development, CI |
qdrant |
[qdrant] |
Yes (Docker/Cloud) | Production, large datasets |
pgvector |
[pgvector] |
Yes (PostgreSQL) | Teams already using PostgreSQL |
vectorchord |
[vectorchord] |
Yes (PostgreSQL + VectorChord) | High-performance approximate search |
elasticsearch |
[elasticsearch] |
Yes (Elasticsearch 8.x) | Teams running the Elastic stack |
chroma |
[chroma] |
Optional (ephemeral / disk / HTTP) | Quick experiments, local dev |
weaviate |
[weaviate] |
Yes (local / Weaviate Cloud) | Weaviate-native deployments |
redis |
[redis] |
Yes (Redis Stack / Sentinel) | Low-latency, Redis-native stacks |
azure-search |
[azure-search] |
Yes (Azure AI Search) | Azure-hosted deployments |
lancedb |
[lancedb] |
Yes (embedded / S3 / GCS / az) | Serverless, edge, embedded apps |
InMemory Backend (zero dependencies)
from medha import Medha, Settings
from medha.embeddings.fastembed_adapter import FastEmbedAdapter
embedder = FastEmbedAdapter()
settings = Settings(backend_type="memory")
async with Medha(collection_name="my_cache", embedder=embedder, settings=settings) as m:
await m.store("How many users?", "SELECT COUNT(*) FROM users")
hit = await m.search("Count of users")
print(hit.generated_query)
PostgreSQL + pgvector Backend
from medha import Medha, Settings
from medha.embeddings.fastembed_adapter import FastEmbedAdapter
settings = Settings(
backend_type="pgvector",
pg_dsn="postgresql://user:password@localhost:5432/mydb",
)
async with Medha(collection_name="my_cache", embedder=FastEmbedAdapter(), settings=settings) as m:
await m.store("How many users?", "SELECT COUNT(*) FROM users")
hit = await m.search("Count of users")
print(hit.generated_query)
LanceDB Backend (embedded / cloud)
No external server needed for local mode. Supports S3, GCS, and Azure Blob Storage URIs for cloud storage.
from medha import Medha, Settings
from medha.embeddings.fastembed_adapter import FastEmbedAdapter
settings = Settings(
backend_type="lancedb",
lancedb_uri="/tmp/my_lancedb", # local path; use s3://... for cloud
lancedb_metric="cosine", # cosine | l2 | dot
)
async with Medha(collection_name="my_cache", embedder=FastEmbedAdapter(), settings=settings) as m:
await m.store("How many users?", "SELECT COUNT(*) FROM users")
hit = await m.search("Count of users")
print(hit.generated_query)
Elasticsearch Backend
from medha import Medha, Settings
from medha.embeddings.fastembed_adapter import FastEmbedAdapter
settings = Settings(
backend_type="elasticsearch",
es_hosts=["http://localhost:9200"],
es_api_key="your-api-key", # or es_username / es_password
)
async with Medha(collection_name="my_cache", embedder=FastEmbedAdapter(), settings=settings) as m:
await m.store("How many users?", "SELECT COUNT(*) FROM users")
hit = await m.search("Count of users")
print(hit.generated_query)
Redis Stack Backend
from medha import Medha, Settings
from medha.embeddings.fastembed_adapter import FastEmbedAdapter
settings = Settings(
backend_type="redis",
redis_url="redis://localhost:6379/0",
redis_index_algorithm="HNSW", # HNSW | FLAT
)
async with Medha(collection_name="my_cache", embedder=FastEmbedAdapter(), settings=settings) as m:
await m.store("How many users?", "SELECT COUNT(*) FROM users")
hit = await m.search("Count of users")
print(hit.generated_query)
Or via environment variables:
export MEDHA_BACKEND_TYPE=pgvector
export MEDHA_PG_DSN=postgresql://user:password@localhost:5432/mydb
Configuration Examples
Medha is highly configurable. Below are examples covering every major use case.
Basic: Zero-Dependency In-Memory Setup
The simplest setup, perfect for development, testing, and CI. No external services needed.
import asyncio
from medha import Medha, Settings
from medha.embeddings.fastembed_adapter import FastEmbedAdapter
async def main():
# backend_type="memory" — pure-Python backend, zero external dependencies
settings = Settings(backend_type="memory")
embedder = FastEmbedAdapter()
async with Medha(
collection_name="dev_cache",
embedder=embedder,
settings=settings,
) as cache:
await cache.store("List all users", "SELECT * FROM users;")
hit = await cache.search("Show me all the users")
print(hit.generated_query) # SELECT * FROM users;
asyncio.run(main())
Qdrant Docker (Local Persistence)
For persistent caching across restarts using a local Qdrant instance.
# Start Qdrant first
docker run -p 6333:6333 qdrant/qdrant
import asyncio
from medha import Medha, Settings
from medha.embeddings.fastembed_adapter import FastEmbedAdapter
async def main():
settings = Settings(
backend_type="qdrant",
qdrant_mode="docker",
qdrant_host="localhost",
qdrant_port=6333,
)
embedder = FastEmbedAdapter()
async with Medha(
collection_name="persistent_cache",
embedder=embedder,
settings=settings,
) as cache:
await cache.store(
"Total revenue last quarter",
"SELECT SUM(amount) FROM orders WHERE date >= '2024-10-01';",
)
hit = await cache.search("What was last quarter's revenue?")
print(f"{hit.strategy.value}: {hit.generated_query}")
asyncio.run(main())
Qdrant Cloud (Production)
For production deployments using Qdrant Cloud with API key authentication.
import asyncio
from medha import Medha, Settings
from medha.embeddings.openai_adapter import OpenAIAdapter
async def main():
settings = Settings(
backend_type="qdrant",
qdrant_mode="cloud",
qdrant_url="https://your-cluster.cloud.qdrant.io",
qdrant_api_key="your-qdrant-api-key", # stored as SecretStr, never logged
)
embedder = OpenAIAdapter(
model_name="text-embedding-3-small",
api_key="sk-your-openai-key",
)
async with Medha(
collection_name="production_cache",
embedder=embedder,
settings=settings,
) as cache:
await cache.store(
"Get all pending orders",
"SELECT * FROM orders WHERE status = 'pending';",
)
hit = await cache.search("Show pending orders")
print(f"Confidence: {hit.confidence:.2f}")
asyncio.run(main())
Environment Variable Configuration
All settings can be configured via environment variables with the MEDHA_ prefix. No code changes needed.
# .env or shell exports
export MEDHA_QDRANT_MODE=docker
export MEDHA_QDRANT_HOST=qdrant.internal.company.com
export MEDHA_QDRANT_PORT=6333
export MEDHA_SCORE_THRESHOLD_SEMANTIC=0.85
export MEDHA_SCORE_THRESHOLD_EXACT=0.98
export MEDHA_L1_CACHE_MAX_SIZE=5000
export MEDHA_QUERY_LANGUAGE=sql
export MEDHA_ENABLE_QUANTIZATION=true
export MEDHA_ON_DISK=false
export MEDHA_TEMPLATE_FILE=/etc/medha/templates.json
import asyncio
from medha import Medha, Settings
from medha.embeddings.fastembed_adapter import FastEmbedAdapter
async def main():
# Settings automatically loads from MEDHA_* environment variables
settings = Settings()
embedder = FastEmbedAdapter()
async with Medha(
collection_name="my_cache",
embedder=embedder,
settings=settings,
) as cache:
hit = await cache.search("Show me all employees")
print(hit.strategy.value)
asyncio.run(main())
Embedding Providers
FastEmbed (Local, No API Key)
Runs entirely locally using ONNX Runtime. No API key, no network calls, no costs.
from medha.embeddings.fastembed_adapter import FastEmbedAdapter
# Default model (384 dimensions, fast and lightweight)
embedder = FastEmbedAdapter()
# Higher quality model
embedder = FastEmbedAdapter(
model_name="BAAI/bge-base-en-v1.5",
max_length=512,
)
# Custom cache directory for model files
embedder = FastEmbedAdapter(
model_name="sentence-transformers/all-MiniLM-L6-v2",
cache_dir="/opt/models/fastembed",
)
OpenAI Embeddings
Uses OpenAI's embedding API. Requires an API key (via parameter or OPENAI_API_KEY env var).
from medha.embeddings.openai_adapter import OpenAIAdapter
# Default: text-embedding-3-small (1536 dimensions)
embedder = OpenAIAdapter(api_key="sk-your-key")
# High-quality large model (3072 dimensions)
embedder = OpenAIAdapter(
model_name="text-embedding-3-large",
api_key="sk-your-key",
)
# With custom dimensions (only supported by text-embedding-3-* models)
embedder = OpenAIAdapter(
model_name="text-embedding-3-small",
dimensions=512,
api_key="sk-your-key",
)
# API key from environment variable (OPENAI_API_KEY)
embedder = OpenAIAdapter()
Cohere Embeddings
Uses Cohere Embed v3 (cohere.AsyncClientV2). Requires an API key.
from medha.embeddings.cohere_adapter import CohereAdapter
# Default: embed-multilingual-v3.0
embedder = CohereAdapter(api_key="your-cohere-key")
# Explicit model
embedder = CohereAdapter(
api_key="your-cohere-key",
model="embed-english-v3.0",
)
Input types search_query / search_document are selected automatically at embed time.
Gemini Embeddings
Uses Google Gemini embeddings (google-generativeai). Requires an API key.
from medha.embeddings.gemini_adapter import GeminiAdapter
# Default: models/text-embedding-004
embedder = GeminiAdapter(api_key="your-gemini-key")
# With reduced output dimensions (MRL — models/text-embedding-004 only)
embedder = GeminiAdapter(
api_key="your-gemini-key",
model="models/text-embedding-004",
output_dimensionality=512,
)
Task types RETRIEVAL_QUERY / RETRIEVAL_DOCUMENT are selected automatically. Requests are batched in chunks of 100.
Custom Embedder
Implement the BaseEmbedder interface to use any embedding provider.
from medha.interfaces import BaseEmbedder
from typing import List
class MyCustomEmbedder(BaseEmbedder):
@property
def dimension(self) -> int:
return 768
@property
def model_name(self) -> str:
return "my-custom-model"
async def aembed(self, text: str) -> List[float]:
# Your embedding logic here
...
async def aembed_batch(self, texts: List[str]) -> List[List[float]]:
# Your batch embedding logic here
...
embedder = MyCustomEmbedder()
Search Threshold Tuning
Fine-tune how aggressively Medha matches questions at each tier.
Strict Matching (High Precision)
Only return cache hits when very confident. Minimizes false positives.
from medha import Settings
settings = Settings(
score_threshold_exact=0.995, # Near-identical vectors only
score_threshold_semantic=0.95, # Very close meaning only
score_threshold_template=0.90, # Template must be a strong match
score_threshold_fuzzy=95.0, # Almost no typos allowed
)
Relaxed Matching (High Recall)
Return more cache hits, accepting slightly lower confidence. Reduces LLM calls.
from medha import Settings
settings = Settings(
score_threshold_exact=0.97,
score_threshold_semantic=0.82,
score_threshold_template=0.75,
score_threshold_fuzzy=75.0,
)
Disable Specific Tiers
from medha import Settings
# Disable L1 in-memory cache (always hit the vector store)
settings = Settings(l1_cache_max_size=0)
# Fuzzy matching is automatically disabled if rapidfuzz is not installed
# To install: pip install "medha-archai[fuzzy]"
Cache Warming
Pre-populate the cache from a file before serving traffic. Supports both JSON array and JSONL formats.
// warm_queries.jsonl — one entry per line
{"question": "How many users are active?", "generated_query": "SELECT COUNT(*) FROM users WHERE status = 'active';"}
{"question": "Total revenue this month", "generated_query": "SELECT SUM(amount) FROM orders WHERE date >= DATE_TRUNC('month', NOW());", "response_summary": "Monthly revenue total"}
import asyncio
from medha import Medha
from medha.embeddings.fastembed_adapter import FastEmbedAdapter
async def main():
async with Medha(
collection_name="my_cache",
embedder=FastEmbedAdapter(),
) as cache:
# Load from JSONL (also accepts JSON array files)
loaded = await cache.warm_from_file("warm_queries.jsonl")
print(f"Warmed {loaded} entries")
# Sync variant
# loaded = cache.warm_from_file_sync("warm_queries.json")
print(cache.stats["warm_loaded"]) # 2
asyncio.run(main())
Required keys per entry: question, generated_query
Optional keys: response_summary, template_id
Internally calls store_batch() — a single embedding round-trip for all entries.
Security Settings
Medha 0.2.0 adds three settings to defend against common attack vectors when Medha is exposed to untrusted input.
Input Length Guard — max_question_length
Prevent DoS via oversized question strings. search() returns
SearchStrategy.ERROR; store() raises ValueError.
settings = Settings(max_question_length=2048) # default: 8192
File Size Limit — max_file_size_mb
warm_from_file() and load_templates_from_file() reject files larger than
this limit before reading them.
settings = Settings(max_file_size_mb=50) # default: 100 MB
Path Traversal Protection — allowed_file_dir
When set, warm_from_file() and load_templates_from_file() reject any path
that resolves outside the specified directory.
settings = Settings(allowed_file_dir="/app/data")
# warm_from_file("/app/data/../etc/passwd") → ValueError
Distributed L1 Cache (Redis)
By default Medha's L1 cache is in-process. With multiple service instances (horizontal scaling) each process has its own isolated cache. Use RedisL1Cache to share the L1 cache across instances.
pip install "medha-archai[redis]"
from medha import Medha
from medha.l1_cache.redis_adapter import RedisL1Cache
from medha.embeddings.fastembed_adapter import FastEmbedAdapter
# Shared L1 cache — all instances see the same hits
redis_l1 = RedisL1Cache(
url="redis://redis.internal:6379/0",
prefix="myapp:medha:l1", # namespace to avoid key collisions
ttl=3600, # 1-hour TTL per entry (optional)
)
async with Medha(
collection_name="prod_cache",
embedder=FastEmbedAdapter(),
l1_backend=redis_l1,
) as cache:
await cache.store("How many users?", "SELECT COUNT(*) FROM users;")
hit = await cache.search("How many users?")
print(hit.strategy.value) # l1_cache (served from Redis)
Redis eviction: Configure
maxmemory-policy allkeys-lruon the Redis server for automatic LRU eviction when memory is full.
Custom L1 Backend
Implement L1CacheBackend to use any fast store (Memcached, DynamoDB DAX, etc.):
from medha.interfaces.l1_cache import L1CacheBackend
from medha.types import CacheHit
from typing import Optional
class MyL1Cache(L1CacheBackend):
async def get(self, key: str) -> Optional[CacheHit]: ...
async def set(self, key: str, value: CacheHit) -> None: ...
async def clear(self) -> None: ...
@property
def size(self) -> int: ...
cache = Medha(..., l1_backend=MyL1Cache())
Persistent Embedding Cache
By default the embedding cache is in-memory and lost on restart. Set embedding_cache_path to persist it across sessions — useful when the same questions recur between deployments.
export MEDHA_EMBEDDING_CACHE_PATH=/var/cache/medha/embeddings.json
from medha import Medha, Settings
from medha.embeddings.fastembed_adapter import FastEmbedAdapter
settings = Settings(
backend_type="qdrant",
qdrant_mode="docker",
embedding_cache_path="/var/cache/medha/embeddings.json",
)
async with Medha(
collection_name="my_cache",
embedder=FastEmbedAdapter(),
settings=settings,
) as cache:
# On start(): embeddings loaded from disk (if file exists)
await cache.store("show active users", "SELECT * FROM users WHERE active = true;")
# On close(): embeddings saved to disk automatically
No extra dependencies — uses stdlib json.
Template Matching
Templates allow Medha to recognize parameterized patterns and generate queries dynamically without an LLM call.
Define Templates in Code
import asyncio
from medha import Medha, QueryTemplate
from medha.embeddings.fastembed_adapter import FastEmbedAdapter
templates = [
QueryTemplate(
intent="top_n_entities",
template_text="Show top {count} {entity}",
query_template="SELECT * FROM {entity} ORDER BY id LIMIT {count}",
parameters=["count", "entity"],
priority=1,
aliases=["List first {count} {entity}", "Get {count} {entity}"],
parameter_patterns={
"count": r"\b(\d+)\b",
"entity": r"\b(users|orders|products|employees)\b",
},
),
QueryTemplate(
intent="filter_by_status",
template_text="Show {entity} with status {status}",
query_template="SELECT * FROM {entity} WHERE status = '{status}'",
parameters=["entity", "status"],
priority=1,
parameter_patterns={
"entity": r"\b(users|orders|products)\b",
"status": r"\b(active|inactive|pending|completed)\b",
},
),
QueryTemplate(
intent="count_by_group",
template_text="Count {entity} by {group}",
query_template="SELECT {group}, COUNT(*) FROM {entity} GROUP BY {group}",
parameters=["entity", "group"],
priority=2,
parameter_patterns={
"entity": r"\b(users|orders|products|employees)\b",
"group": r"\b(department|status|category|region)\b",
},
),
]
async def main():
embedder = FastEmbedAdapter()
async with Medha(
collection_name="template_demo",
embedder=embedder,
templates=templates,
) as cache:
# Template matching with parameter extraction
hit = await cache.search("Show top 10 users")
print(f"Strategy: {hit.strategy.value}")
# template_match
print(f"Query: {hit.generated_query}")
# SELECT * FROM users ORDER BY id LIMIT 10
hit = await cache.search("Show orders with status pending")
print(f"Query: {hit.generated_query}")
# SELECT * FROM orders WHERE status = 'pending'
asyncio.run(main())
Load Templates from a JSON File
[
{
"intent": "top_n_entities",
"template_text": "Show top {count} {entity}",
"query_template": "SELECT * FROM {entity} ORDER BY id LIMIT {count}",
"parameters": ["count", "entity"],
"priority": 1,
"aliases": ["List first {count} {entity}"],
"parameter_patterns": {
"count": "\\b(\\d+)\\b",
"entity": "\\b(users|orders|products)\\b"
}
}
]
from medha import Medha, Settings
from medha.embeddings.fastembed_adapter import FastEmbedAdapter
settings = Settings(template_file="templates.json")
cache = Medha(
collection_name="my_cache",
embedder=FastEmbedAdapter(),
settings=settings,
)
# Templates are loaded automatically during cache.start()
Load Templates at Runtime
async with Medha(
collection_name="my_cache",
embedder=FastEmbedAdapter(),
) as cache:
await cache.load_templates_from_file("templates.json")
# or
await cache.load_templates([QueryTemplate(...), QueryTemplate(...)])
Parameter Extraction (NER)
Template matching requires extracting parameter values (e.g. {department}, {person}) from the user's question. ParameterExtractor applies a cascading strategy:
- Regex — patterns defined in
template.parameter_patterns(fastest, most precise) - GLiNER — zero-shot NER, uses
template.parametersdirectly as entity labels - spaCy — pre-trained NER with a fixed label set mapped to parameter names
- Heuristics — numbers and capitalized words as last resort
spaCy (pre-trained, fixed labels)
spaCy recognizes standard entity types (PERSON, ORG, CARDINAL) and maps them to parameter names.
from medha.utils.nlp import ParameterExtractor
ext = ParameterExtractor(use_spacy=True)
print(ext.spacy_available) # True if en_core_web_sm is installed
GLiNER (zero-shot, arbitrary labels)
GLiNER receives template.parameters directly as entity labels — no mapping table needed.
It excels with domain-specific entities that spaCy cannot recognize without custom training.
from medha.utils.nlp import ParameterExtractor
# Default model: urchade/gliner_medium-v2.1
ext = ParameterExtractor(use_gliner=True)
# Lighter variant (~250 MB)
ext = ParameterExtractor(use_gliner=True, gliner_model="urchade/gliner_small-v2.1")
print(ext.gliner_available) # True if gliner package is installed
Both enabled (recommended for mixed template sets)
from medha.utils.nlp import ParameterExtractor
from medha.types import QueryTemplate
ext = ParameterExtractor(use_spacy=True, use_gliner=True)
template = QueryTemplate(
intent="org_project_issues",
template_text="Show open issues for {org} on project {project}",
query_template="SELECT * FROM issues WHERE org='{org}' AND project='{project}' AND status='open'",
parameters=["org", "project"],
# No regex needed — GLiNER resolves both from the param names directly
)
params = ext.extract("Show open issues for Acme Corp on project Apollo", template)
# {"org": "Acme Corp", "project": "Apollo"}
query = ext.render_query(template, params)
# SELECT * FROM issues WHERE org='Acme Corp' AND project='Apollo' AND status='open'
| Scenario | Recommended backend |
|---|---|
| Numeric or enum parameters | Regex only (use_spacy=False, use_gliner=False) |
| Standard entities (person, org, number) | spaCy (use_spacy=True) |
| Domain-specific or unpredictable param names | GLiNER (use_gliner=True) |
| Mixed templates in the same app | Both enabled — cascade handles it |
| Edge / resource-constrained deployment | Regex + heuristics only |
Both backends fall back gracefully if the package is not installed.
Batch Operations
store_batch — single embedding round-trip
import asyncio
from medha import Medha
from medha.embeddings.fastembed_adapter import FastEmbedAdapter
entries = [
{"question": "How many users are there?", "generated_query": "SELECT COUNT(*) FROM users;"},
{"question": "List all active orders", "generated_query": "SELECT * FROM orders WHERE status = 'active';"},
{"question": "Average order value", "generated_query": "SELECT AVG(amount) FROM orders;",
"response_summary": "Returns the mean order amount."},
]
async def main():
async with Medha(collection_name="batch_demo", embedder=FastEmbedAdapter()) as cache:
success = await cache.store_batch(entries)
print(f"Batch stored: {success}")
hit = await cache.search("How many users exist?")
print(f"{hit.strategy.value}: {hit.generated_query}")
asyncio.run(main())
store_many — chunked bulk upsert with progress
For large datasets that exceed memory or API-rate limits. Chunking and concurrency are controlled by Settings.batch_size and Settings.batch_embed_concurrency.
from medha import Medha, Settings
from medha.embeddings.fastembed_adapter import FastEmbedAdapter
settings = Settings(
batch_size=200, # entries per embedding chunk
batch_embed_concurrency=4, # concurrent embedding requests
)
async def main():
async with Medha(
collection_name="large_cache",
embedder=FastEmbedAdapter(),
settings=settings,
) as cache:
stored = await cache.store_many(
entries, # list of {question, generated_query, ...} dicts
ttl=86400, # optional per-entry TTL (seconds)
on_progress=lambda done, total: print(f"{done}/{total}"),
)
print(f"Stored {stored} entries")
warm_from_file() and warm_from_dataframe() both delegate to store_many() internally.
Export & Dedup
async with Medha(collection_name="my_cache", embedder=FastEmbedAdapter()) as cache:
# Export all entries to a pandas DataFrame
df = await cache.export_to_dataframe()
print(df.head())
# Remove duplicate entries (same query_hash), keep most-used per group
removed = await cache.dedup_collection()
print(f"Removed {removed} duplicates")
Cache Lifecycle (TTL & Invalidation)
Per-entry TTL
Pass ttl (seconds) to store() or store_many(). Expired entries are excluded from all search results automatically.
import asyncio
from medha import Medha, Settings
from medha.embeddings.fastembed_adapter import FastEmbedAdapter
async def main():
settings = Settings(
backend_type="memory",
default_ttl_seconds=3600, # global default: 1 hour
cleanup_interval_seconds=300, # auto-delete expired entries every 5 min
)
async with Medha(
collection_name="my_cache",
embedder=FastEmbedAdapter(),
settings=settings,
) as cache:
# Per-entry TTL overrides the global default
await cache.store(
"Show live orders",
"SELECT * FROM orders WHERE status = 'live';",
ttl=60, # expires in 60 seconds
)
# Entry with no TTL (immortal regardless of default)
await cache.store(
"Count all users",
"SELECT COUNT(*) FROM users;",
ttl=None,
)
# Manually expire all stale entries in the collection
deleted = await cache.expire()
print(f"Deleted {deleted} expired entries")
asyncio.run(main())
Cache Invalidation
async with Medha(collection_name="my_cache", embedder=FastEmbedAdapter()) as cache:
# Remove a specific entry by exact question text
removed = await cache.invalidate("Show live orders")
print(removed) # True if found and deleted
# Remove all entries sharing the same query hash
count = await cache.invalidate_by_query_hash("abc123...")
# Remove all entries associated with a template intent
count = await cache.invalidate_by_template("employee_lookup")
# Drop and recreate the entire collection
count = await cache.invalidate_collection()
Observability
CacheStats
Medha.stats() returns an immutable CacheStats snapshot with hit/miss rates, percentile latencies, and per-strategy breakdowns.
import asyncio
from medha import Medha
from medha.embeddings.fastembed_adapter import FastEmbedAdapter
async def main():
async with Medha(
collection_name="my_cache",
embedder=FastEmbedAdapter(),
) as cache:
await cache.store("Count all users", "SELECT COUNT(*) FROM users;")
await cache.search("How many users are there?")
await cache.search("Something unrelated")
stats = await cache.stats()
print(f"Hit rate: {stats.hit_rate:.1f}%")
print(f"Total requests: {stats.total_requests}")
print(f"Total hits: {stats.total_hits}")
print(f"Avg latency: {stats.avg_latency_ms:.2f} ms")
print(f"p50 / p95 / p99: {stats.p50_latency_ms:.2f} / "
f"{stats.p95_latency_ms:.2f} / {stats.p99_latency_ms:.2f} ms")
for strategy, s in stats.by_strategy.items():
print(f" {strategy:16s} count={s.count} avg={s.avg_latency_ms:.2f} ms")
# Reset counters
await cache.reset_stats()
asyncio.run(main())
Relevant Settings fields:
settings = Settings(
collect_stats=True, # default: True — disable to save overhead
stats_max_latency_samples=10_000, # FIFO buffer size for percentile calculations
)
Synchronous Usage
Medha provides sync wrappers for environments where asyncio is not available (scripts, notebooks, legacy code).
from medha import Medha
from medha.embeddings.fastembed_adapter import FastEmbedAdapter
# Initialize
embedder = FastEmbedAdapter()
cache = Medha(collection_name="sync_demo", embedder=embedder)
# Must call start manually (no async context manager)
import asyncio
asyncio.run(cache.start())
# Sync search and store
cache.store_sync("List all products", "SELECT * FROM products;")
hit = cache.search_sync("Show me all products")
print(f"{hit.strategy.value}: {hit.generated_query}")
# Warm from file synchronously
loaded = cache.warm_from_file_sync("warm_queries.jsonl")
# Clear caches synchronously
cache.clear_caches_sync()
# Clean up
asyncio.run(cache.close())
Query Language Examples
Medha is query-language agnostic. Here are examples for different query languages.
SQL (Text-to-SQL)
from medha import Medha, Settings
from medha.embeddings.fastembed_adapter import FastEmbedAdapter
settings = Settings(query_language="sql")
async with Medha(
collection_name="text2sql",
embedder=FastEmbedAdapter(),
settings=settings,
) as cache:
await cache.store(
"What are the top 10 products by revenue?",
"SELECT p.name, SUM(o.amount) as revenue FROM products p JOIN orders o ON p.id = o.product_id GROUP BY p.name ORDER BY revenue DESC LIMIT 10;",
)
Cypher (Text-to-Cypher for Neo4j)
settings = Settings(query_language="cypher")
async with Medha(
collection_name="text2cypher",
embedder=FastEmbedAdapter(),
settings=settings,
) as cache:
await cache.store(
"Find friends of Alice",
"MATCH (a:Person {name: 'Alice'})-[:FRIEND]->(f:Person) RETURN f.name",
)
await cache.store(
"Shortest path between Alice and Bob",
"MATCH p = shortestPath((a:Person {name: 'Alice'})-[*]-(b:Person {name: 'Bob'})) RETURN p",
)
GraphQL
settings = Settings(query_language="graphql")
async with Medha(
collection_name="text2graphql",
embedder=FastEmbedAdapter(),
settings=settings,
) as cache:
await cache.store(
"Get user profile with posts",
'{ user(id: "123") { name email posts { title createdAt } } }',
)
Qdrant Performance Tuning
HNSW Index Tuning
Adjust the HNSW index parameters for your workload.
from medha import Settings
# High-throughput production (more memory, faster search)
settings = Settings(
hnsw_m=32, # More edges per node (default: 16)
hnsw_ef_construct=200, # Deeper construction search (default: 100)
)
# Low-memory / edge deployment
settings = Settings(
hnsw_m=8,
hnsw_ef_construct=50,
)
Quantization
Reduce memory usage while maintaining search quality.
from medha import Settings
# Scalar quantization (default, ~4x memory reduction)
settings = Settings(
enable_quantization=True,
quantization_type="scalar",
quantization_rescore=True, # Re-score with original vectors
quantization_always_ram=True, # Keep quantized vectors in RAM
)
# Binary quantization (best for high-dimensional embeddings >= 512d)
settings = Settings(
enable_quantization=True,
quantization_type="binary",
quantization_oversampling=2.0, # Fetch 2x candidates before re-scoring
)
# No quantization (maximum accuracy, more memory)
settings = Settings(enable_quantization=False)
On-Disk Storage
Store original vectors on disk to save RAM. Useful for large caches.
settings = Settings(
qdrant_mode="docker",
on_disk=True, # Vectors stored on disk
enable_quantization=True, # Quantized copies in RAM for speed
quantization_always_ram=True,
)
Batch Size Tuning
Control how many entries are upserted per Qdrant API call.
# Large batch inserts (reduce API overhead)
settings = Settings(batch_size=500)
# Small batches (lower memory per call)
settings = Settings(batch_size=50)
Cache Monitoring
Track cache performance and hit rates at runtime. See the Observability section for CacheStats details.
import asyncio
from medha import Medha
from medha.embeddings.fastembed_adapter import FastEmbedAdapter
async def main():
async with Medha(
collection_name="monitored_cache",
embedder=FastEmbedAdapter(),
) as cache:
await cache.store("Count all users", "SELECT COUNT(*) FROM users;")
await cache.store("List departments", "SELECT DISTINCT department FROM employees;")
await cache.search("How many users are there?")
await cache.search("Show all departments")
await cache.search("Something completely unrelated")
# stats() is an async method returning a CacheStats object
stats = await cache.stats()
print(f"Total requests: {stats.total_requests}")
print(f"Hit rate: {stats.hit_rate:.1f}%")
print(f"Avg latency: {stats.avg_latency_ms:.2f} ms")
print(f"p95 latency: {stats.p95_latency_ms:.2f} ms")
for strategy, s in stats.by_strategy.items():
print(f" {strategy:16s} count={s.count} avg={s.avg_latency_ms:.2f} ms")
asyncio.run(main())
Logging
Configure Medha's logging for debugging and monitoring.
from medha import setup_logging
# Basic: INFO level to console
setup_logging(level="INFO")
# Debug mode: see every tier of the waterfall search
setup_logging(level="DEBUG")
# Log to file + console with different levels
setup_logging(
level="DEBUG",
log_file="/var/log/medha/cache.log",
console_level="WARNING",
)
# Custom format
setup_logging(
level="INFO",
fmt="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
date_fmt="%Y-%m-%d %H:%M:%S",
)
Full Production Example
A complete configuration combining all features for a production Text-to-SQL system.
import asyncio
from medha import Medha, Settings, QueryTemplate, setup_logging
from medha.embeddings.openai_adapter import OpenAIAdapter
# Configure logging
setup_logging(level="INFO", log_file="medha.log")
# Production settings
settings = Settings(
# Qdrant Cloud
backend_type="qdrant",
qdrant_mode="cloud",
qdrant_url="https://your-cluster.cloud.qdrant.io",
qdrant_api_key="your-api-key", # stored as SecretStr, never logged
# Query language
query_language="sql",
# Tuned thresholds
score_threshold_exact=0.99,
score_threshold_semantic=0.88,
score_threshold_template=0.82,
score_threshold_fuzzy=80.0,
# L1 cache
l1_cache_max_size=5000,
# HNSW tuning
hnsw_m=32,
hnsw_ef_construct=200,
# Quantization
enable_quantization=True,
quantization_type="scalar",
quantization_rescore=True,
quantization_always_ram=True,
# Batch operations
batch_size=200,
# Templates from file
template_file="production_templates.json",
# Persist embedding cache across restarts
embedding_cache_path="/var/cache/medha/embeddings.json",
# Security
max_question_length=8192, # reject oversized questions (DoS guard)
allowed_file_dir="/app/data", # restrict warm_from_file() to this dir
max_file_size_mb=100, # reject files larger than 100 MB
)
# OpenAI embeddings
embedder = OpenAIAdapter(
model_name="text-embedding-3-small",
api_key="sk-your-key",
)
# Pre-defined templates
templates = [
QueryTemplate(
intent="employee_lookup",
template_text="Find employees in {department}",
query_template="SELECT * FROM employees WHERE department = '{department}'",
parameters=["department"],
priority=1,
aliases=[
"Show {department} employees",
"Who works in {department}",
"List {department} team",
],
parameter_patterns={
"department": r"\b(engineering|sales|marketing|hr|finance|ops)\b",
},
),
]
async def main():
from medha.l1_cache.redis_adapter import RedisL1Cache
async with Medha(
collection_name="production_text2sql",
embedder=embedder,
settings=settings,
templates=templates,
# Shared L1 cache across all service instances
l1_backend=RedisL1Cache(url="redis://redis.internal:6379/0", ttl=3600),
) as cache:
# Pre-warm cache from a curated file of known queries
await cache.warm_from_file("common_queries.jsonl")
# Or inline with store_batch for dynamic queries
await cache.store_batch([
{
"question": "How many active users?",
"generated_query": "SELECT COUNT(*) FROM users WHERE status = 'active';",
"response_summary": "Count of active users",
},
{
"question": "Total revenue this month",
"generated_query": "SELECT SUM(amount) FROM orders WHERE date >= DATE_TRUNC('month', NOW());",
},
{
"question": "Top customers by order count",
"generated_query": "SELECT customer_id, COUNT(*) as n FROM orders GROUP BY customer_id ORDER BY n DESC LIMIT 10;",
},
])
# Search with full waterfall
hit = await cache.search("Find employees in engineering")
print(f"Strategy: {hit.strategy.value}")
print(f"Query: {hit.generated_query}")
print(f"Confidence: {hit.confidence:.3f}")
# Monitor performance
print(cache.stats)
asyncio.run(main())
API Reference Summary
Core
| Class / Method | Description |
|---|---|
Medha |
Core cache class with waterfall search |
Medha.search(question) |
Waterfall search → CacheHit |
Medha.store(question, query, *, ttl) |
Store a question-query pair with optional TTL |
Medha.store_batch(entries) |
Bulk store — single embedding round-trip |
Medha.store_many(entries, *, batch_size, on_progress, ttl) |
Chunked bulk upsert with concurrency control |
Medha.warm_from_file(path, *, ttl) |
Pre-populate cache from JSON / JSONL file |
Medha.warm_from_dataframe(df, *, ttl) |
Pre-populate cache from a pandas DataFrame |
Medha.export_to_dataframe(collection_name) |
Export collection to a pandas DataFrame |
Medha.dedup_collection(collection_name) |
Remove duplicate entries (same query_hash) |
Medha.expire(collection_name) |
Delete all expired entries; returns count |
Medha.invalidate(question) |
Remove entry by exact question text; returns bool |
Medha.invalidate_by_query_hash(hash) |
Remove all entries with a given query hash |
Medha.invalidate_by_template(template_id) |
Remove all entries for a template intent |
Medha.invalidate_collection(collection_name) |
Drop and recreate an entire collection |
Medha.stats(collection_name) |
Returns a CacheStats snapshot (async method) |
Medha.reset_stats() |
Reset all in-process statistics counters |
Medha.load_templates(templates) |
Load QueryTemplate list at runtime |
Medha.load_templates_from_file(path) |
Load templates from JSON file |
Medha.clear_caches() |
Clear L1 + embedding caches (async) |
Medha.search_sync / store_sync / warm_from_file_sync / clear_caches_sync |
Sync wrappers |
Configuration & Types
| Class | Description |
|---|---|
Settings |
Pydantic configuration with env var support (MEDHA_ prefix) |
CacheHit |
Search result: generated_query, confidence, strategy, expires_at |
CacheStats |
Immutable stats snapshot: hit/miss rates, latency percentiles, per-strategy breakdown |
StrategyStats |
Per-strategy count, total_latency_ms, avg_latency_ms |
QueryTemplate |
Parameterized question-to-query template |
CacheEntry |
Stored cache entry with vector and metadata |
CacheResult |
Backend search result with score |
SearchStrategy |
Enum: l1_cache, template_match, exact_match, semantic_match, fuzzy_match, no_match, error |
Interfaces & Backends
| Class | Description |
|---|---|
BaseEmbedder |
Abstract interface for embedding providers |
L1CacheBackend |
Abstract interface for L1 cache backends |
VectorStorageBackend |
Abstract interface for vector storage backends |
FastEmbedAdapter |
Local embeddings via FastEmbed (ONNX) |
OpenAIAdapter |
OpenAI embedding API adapter |
CohereAdapter |
Cohere Embed v3 adapter (pip install medha-archai[cohere]) |
GeminiAdapter |
Google Gemini adapter (pip install medha-archai[gemini]) |
InMemoryBackend |
Pure-Python in-process backend, zero deps (backend_type="memory") |
QdrantBackend |
Qdrant vector storage (memory / docker / cloud) |
PgVectorBackend |
PostgreSQL + pgvector (pip install medha-archai[pgvector]) |
VectorChordBackend |
PostgreSQL + VectorChord (pip install medha-archai[vectorchord]) |
ElasticsearchBackend |
Elasticsearch 8.x (pip install medha-archai[elasticsearch]) |
ChromaBackend |
ChromaDB ephemeral / disk / HTTP (pip install medha-archai[chroma]) |
WeaviateBackend |
Weaviate local / cloud (pip install medha-archai[weaviate]) |
RedisVectorBackend |
Redis Stack HNSW/FLAT (pip install medha-archai[redis]) |
AzureSearchBackend |
Azure AI Search HNSW (pip install medha-archai[azure-search]) |
LanceDBBackend |
LanceDB embedded / S3 / GCS / az (pip install medha-archai[lancedb]) |
InMemoryL1Cache |
Default in-process LRU L1 cache |
RedisL1Cache |
Redis-backed L1 cache (pip install medha-archai[redis]) |
Utilities
| Function | Description |
|---|---|
setup_logging() |
Configure the medha logger |
ParameterExtractor |
NER-based parameter extractor (regex → GLiNER → spaCy → heuristics) |
Roadmap
- Redis L1 Cache backend (
RedisL1Cache,pip install medha[redis]). - Cache warming from JSON / JSONL file (
warm_from_file). - Per-tier latency stats (
tier_latencies_msincache.stats). - Persistent embedding cache (
MEDHA_EMBEDDING_CACHE_PATH). - Parallel execution of Tier 2 (exact) and Tier 3 (semantic).
-
InMemoryBackend— pure-Python vector backend, zero external deps. -
PgVectorBackend— PostgreSQL + pgvector backend. -
backend_typesetting for declarative backend selection. - Security hardening:
max_question_length,max_file_size_mb,allowed_file_dir,qdrant_api_keyasSecretStr, PostgreSQL identifier validation. -
ElasticsearchBackend,VectorChordBackend,ChromaBackend,WeaviateBackend,RedisVectorBackend,AzureSearchBackend,LanceDBBackend— seven new vector backends. -
CohereAdapterandGeminiAdapter— two new embedding providers. - TTL support on
store()/store_many()with per-entry and global defaults. -
expire()andinvalidate*()cache lifecycle methods. -
CacheStatsobservability model with hit rate, latency percentiles, and per-strategy breakdown. -
store_many(),export_to_dataframe(),dedup_collection()— batch and management operations. -
qdrant-clientmoved to optional[qdrant]extra; defaultbackend_typechanged to"memory". - Feedback loop — mark a cache hit as correct/incorrect.
Contributing
We welcome contributions! Please see CONTRIBUTING.md for details on how to set up the dev environment and run tests.
License
This project is licensed under the Apache-2.0 License - see the LICENSE file for details.
Built with ❤️ by ArchAI Labs
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file medha_archai-0.3.1.tar.gz.
File metadata
- Download URL: medha_archai-0.3.1.tar.gz
- Upload date:
- Size: 482.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
165aba8a8d99bef7f68f934b1fe1550c3437bba334179e071b5f7b9cedb169a0
|
|
| MD5 |
93415153baa6c13fd0c2c61afde7a68c
|
|
| BLAKE2b-256 |
e07af9a30f797ec32a41a435407ba53ac0ec77ecba3f73362f21e3dac9818dcc
|
File details
Details for the file medha_archai-0.3.1-py3-none-any.whl.
File metadata
- Download URL: medha_archai-0.3.1-py3-none-any.whl
- Upload date:
- Size: 100.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c6984c2a69bc820bd89828e297b79a984fd546072cc61c4e80c12a416dfc8683
|
|
| MD5 |
fac74910b71860357f3f614cd5eb3df2
|
|
| BLAKE2b-256 |
c8215a264d93dcf00b726845eda24a1e035f93fb6265d3a69f7957e0b5ab78e8
|