Event-sourced RAG pipeline with time-travel debugging and index versioning
Project description
rag-timetravel
Time-travel debugging for the RAG retrieval layer, built on LanceDB's native dataset versioning.
Every retrieval and generation step is recorded as an immutable event, and the vector index is versioned automatically. You can take any past query, reconstruct the exact index version it ran against, re-run retrieval, and diff what was retrieved then versus now. The defensible, deterministic core is the retrieval diff; answer re-generation is a best-effort layer on top (see Determinism).
The problem
When a RAG answer regresses ("why did this get worse than last month?"), the hard question is what changed: the retrieved chunks, the index state at that time, or the generator. Tracing/eval tools (Langfuse, Arize Phoenix, LangSmith, TruLens) record traces and let you compare runs, but they treat the vector index as opaque: they cannot reconstruct the index as it existed at an arbitrary past moment and re-run retrieval against it.
rag-timetravel fills that specific gap. Because it owns ingestion, it can pin retrieval to a historical LanceDB version and show you exactly which chunks a query would have surfaced at any point in its history, with no full-index copies.
What is and isn't deterministic
- Retrieval replay is faithful. Pinning to a snapshot's LanceDB version reproduces the exact chunk set and scores that existed at that time. This is the core guarantee, and it is tested against a real index.
- Generation replay is best-effort. Remote models drift and sample non-deterministically, so a replayed answer reflects today's generator over the historical retrieval, not a byte-for-byte reproduction of the old answer. Treat the replayed answer as informational; trust the retrieval diff.
How it works
┌─────────────────────────────────────────────────────────────────┐
│ RAGPipeline │
│ │
│ ingest(doc) ──► chunk ──► embed ──► LanceDB ──► emit event │
│ │
│ query(text) ──► embed ──► search ──► LLM ──► emit 5 events │
│ │ │
│ ▼ │
│ EventStore (SQLite) │
└─────────────────────────────────────────────────────────────────┘
│
│ (at any later time)
▼
┌─────────────────────────────────────────────────────────────────┐
│ ReplayEngine │
│ │
│ 1. Load QueryReceived event → get original timestamp T │
│ 2. Find IndexSnapshot with max(ts) where ts <= T │
│ 3. table.checkout(snapshot.lancedb_version) on a fresh handle │
│ (read-only view, O(1), no data copy) │
│ 4. Re-run retrieval + generation │
│ 5. Return ReplayResult │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Comparator │
│ │
│ compare(left_id, right_id) → DiffReport │
│ - chunks added / removed / score-delta │
│ - answer similarity (embedding cosine + token Jaccard) │
│ - latency delta │
└─────────────────────────────────────────────────────────────────┘
Why LanceDB?
LanceDB's versioned storage creates a new dataset version on every write automatically. A "snapshot" in rag-timetravel is just a metadata record pointing to a specific LanceDB version integer. Checking out a past version is O(1): no data is copied. This is the key primitive that makes cheap time-travel possible.
Why SQLite for the event store?
Zero dependencies, file-portable, and fast enough for local workloads (< 1 ms per append). The EventStore interface is narrow enough to swap in a Postgres backend later with no changes to the pipeline.
Installation
pip install rag-timetravel
The base install is lightweight. The default PipelineConfig uses a local
sentence-transformers embedder, which (because it pulls in torch) ships as an
optional extra:
pip install 'rag-timetravel[local]' # local embeddings via sentence-transformers
If you use an OpenAI-compatible embedder instead (embedder_model="openai/..."
or "ollama/..."), the base install is all you need.
Requirements: Python 3.11+
For local generation (default config): install Ollama and pull a model:
ollama pull gemma3
Quickstart
Python API
import asyncio
from rag_timetravel import RAGPipeline, PipelineConfig, ReplayEngine, Comparator
async def main():
# Create a pipeline
config = PipelineConfig(model="ollama/gemma3", top_k=4)
pipeline = await RAGPipeline.create("./my_project", config)
# Ingest documents
await pipeline.ingest("policy_v1.txt", open("policy_v1.txt").read())
snap1 = await pipeline.take_snapshot(label="v1-docs")
# Query
result = await pipeline.query("What is the return window?")
print(result.answer)
print(result.query_id) # save this
# Ingest updated documents
await pipeline.ingest("policy_v2.txt", open("policy_v2.txt").read())
await pipeline.take_snapshot(label="v2-docs")
# Replay the original query against the v1 index
engine = ReplayEngine(pipeline)
replay = await engine.replay(result.query_id)
print(replay.answer) # answer using only v1 docs
# Diff v1 vs v2 answers
result_v2 = await pipeline.query("What is the return window?")
cmp = Comparator(pipeline)
report = await cmp.compare(result.query_id, result_v2.query_id)
print(report.to_text())
asyncio.run(main())
CLI
# Ingest a directory of .txt and .md files
rag-timetravel ingest ./docs --project ./my_project
# Take a snapshot
rag-timetravel snapshot --label "after-v1" --project ./my_project
# Query
rag-timetravel query "What is the refund policy?" --project ./my_project
# Replay a past query
rag-timetravel replay --query-id <uuid> --project ./my_project
# Replay any text against the index at a past timestamp
rag-timetravel replay \
--text "What is the refund policy?" \
--as-of "2024-12-01T10:00:00" \
--project ./my_project
# Diff two queries (add --html report.html to also write an HTML report)
rag-timetravel diff --left <uuid> --right <uuid> --project ./my_project
# Show the event trace for a query
rag-timetravel trace --query-id <uuid> --project ./my_project
# List all snapshots
rag-timetravel snapshots --project ./my_project
# Start the REST API
rag-timetravel serve --project ./my_project --port 8000
REST API
rag-timetravel serve --project ./my_project
| Method | Path | Description |
|---|---|---|
| POST | /ingest |
Ingest a document |
| POST | /query |
Run a RAG query |
| GET | /query/{query_id} |
Get event trace for a query |
| POST | /replay |
Replay a past query against its original index |
| POST | /replay/as-of |
Replay query text at a historical timestamp |
| POST | /diff |
Diff two query executions (JSON) |
| GET | /diff/{left}/{right}/html |
Rendered HTML diff report |
| GET | /snapshots |
List all snapshots |
| POST | /snapshot |
Take a manual snapshot |
| GET | /events |
Query the raw event log |
| GET | /health |
Health check |
Interactive docs: http://localhost:8000/docs
Configuration
from rag_timetravel import PipelineConfig
config = PipelineConfig(
embedder_model="all-MiniLM-L6-v2", # local sentence-transformers model
model="ollama/gemma3", # litellm model string
top_k=4, # chunks to retrieve per query
chunk_size=512, # words per chunk
chunk_overlap=64, # word overlap between chunks
auto_snapshot_every=10, # snapshot after every N docs (0=off)
table_name="documents", # LanceDB table name
)
Using OpenAI
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
config = PipelineConfig(
embedder_model="openai/text-embedding-3-small",
model="gpt-4o",
)
Using a custom embedder
from rag_timetravel.index.embedder import Embedder
class MyEmbedder:
@property
def dim(self) -> int:
return 768
async def embed(self, texts: list[str]) -> list[list[float]]:
# your implementation
...
pipeline = await RAGPipeline.create("./project", config, embedder=MyEmbedder())
Project structure
my_project/
├── events.db # SQLite event store (append-only log)
└── lancedb/ # LanceDB vector index (versioned automatically)
└── documents.lance/
The event store and index are self-contained in a single directory. Copy or rsync the directory to back up the full history.
Event types
| Event | When | Key payload fields |
|---|---|---|
query.received |
Query enters the pipeline | text, pipeline_config_id |
retrieval.executed |
Vector search completes | chunk_ids, scores, latency_ms |
context.assembled |
Chunks merged into prompt | chunk_ids, token_count |
generation.executed |
LLM produces answer | answer, model, token counts |
query.completed |
Full round-trip done | total_latency_ms |
document.ingested |
Document written to index | doc_id, source, chunk_count |
index.snapshot_taken |
Snapshot recorded | snapshot_id, lancedb_version |
replay.* |
Replay events (same structure, separate prefix) |
Development
git clone https://github.com/Ar-maan05/rag-timetravel
cd rag-timetravel
pip install -e ".[dev]"
# Run tests: they exercise a real local LanceDB but mock the LLM and use a
# stub embedder, so no GPU, model download, or API key is needed.
pytest tests/
# Lint and type-check
ruff check rag_timetravel/
mypy rag_timetravel/
Roadmap
- SQLite event store
- LanceDB index versioning via
checkout - Query, replay, diff (JSON + HTML report)
- CLI + FastAPI server
- PostgreSQL event store backend
- Snapshot scheduling (cron-style)
-
bisectcommand: binary search across snapshots to find when an answer changed - Web UI for browsing event traces
- MCP server wrapper
Future Explorations
- Advanced Retrieval Replay: Hybrid search (vector + BM25) and re-ranker versioning
- Visual Diffing & Analytics: Ingested document diffs and embedding drift visualization
- Cloud Storage Backends: Support LanceDB checkouts directly from S3/GCS
- Observability Integrations: Export time-travel traces to OpenTelemetry, Langfuse, or Phoenix
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rag_timetravel-0.1.0.tar.gz.
File metadata
- Download URL: rag_timetravel-0.1.0.tar.gz
- Upload date:
- Size: 313.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
666dba5e2212272d9502d9022b11cad5f021972dfd97e234ef99a13c50d8c7f4
|
|
| MD5 |
fc396624d8d3128c9e2241b585397f90
|
|
| BLAKE2b-256 |
a902a96477b91521012f92d939ad7b7f8bad2c18437cdeb69f6609d0eb05d6ec
|
File details
Details for the file rag_timetravel-0.1.0-py3-none-any.whl.
File metadata
- Download URL: rag_timetravel-0.1.0-py3-none-any.whl
- Upload date:
- Size: 58.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fb17661612b074a6102812c2581d3eee765ce186268fd199007e1860b8500265
|
|
| MD5 |
1bc54aabfb729e57c3384ab4ed820685
|
|
| BLAKE2b-256 |
f3e36856aec154849fe56ddeb7c238ce135f231cd26d6add69241b5b612e861f
|