Skip to main content

Event-sourced RAG pipeline with time-travel debugging and index versioning

Project description

rag-timetravel

CI PyPI version Python versions License: MIT

Time-travel debugging for the RAG retrieval layer, built on LanceDB's native dataset versioning.

Every retrieval and generation step is recorded as an immutable event, and the vector index is versioned automatically. You can take any past query, reconstruct the exact index version it ran against, re-run retrieval, and diff what was retrieved then versus now. The defensible, deterministic core is the retrieval diff; answer re-generation is a best-effort layer on top (see Determinism).


The problem

When a RAG answer regresses ("why did this get worse than last month?"), the hard question is what changed: the retrieved chunks, the index state at that time, or the generator. Tracing/eval tools (Langfuse, Arize Phoenix, LangSmith, TruLens) record traces and let you compare runs, but they treat the vector index as opaque: they cannot reconstruct the index as it existed at an arbitrary past moment and re-run retrieval against it.

rag-timetravel fills that specific gap. Because it owns ingestion, it can pin retrieval to a historical LanceDB version and show you exactly which chunks a query would have surfaced at any point in its history, with no full-index copies.

What is and isn't deterministic

  • Retrieval replay is faithful. Pinning to a snapshot's LanceDB version reproduces the exact chunk set and scores that existed at that time. This is the core guarantee, and it is tested against a real index.
  • Generation replay is best-effort. Remote models drift and sample non-deterministically, so a replayed answer reflects today's generator over the historical retrieval, not a byte-for-byte reproduction of the old answer. Treat the replayed answer as informational; trust the retrieval diff.

How it works

┌─────────────────────────────────────────────────────────────────┐
│                        RAGPipeline                              │
│                                                                 │
│  ingest(doc) ──► chunk ──► embed ──► LanceDB ──► emit event    │
│                                                                 │
│  query(text) ──► embed ──► search ──► LLM ──► emit 5 events    │
│                                         │                       │
│                                         ▼                       │
│                                   EventStore (SQLite)           │
└─────────────────────────────────────────────────────────────────┘
          │
          │  (at any later time)
          ▼
┌─────────────────────────────────────────────────────────────────┐
│                      ReplayEngine                               │
│                                                                 │
│  1. Load QueryReceived event → get original timestamp T         │
│  2. Find IndexSnapshot with max(ts) where ts <= T               │
│  3. table.checkout(snapshot.lancedb_version) on a fresh handle  │
│     (read-only view, O(1), no data copy)                        │
│  4. Re-run retrieval + generation                               │
│  5. Return ReplayResult                                         │
└─────────────────────────────────────────────────────────────────┘
          │
          ▼
┌─────────────────────────────────────────────────────────────────┐
│                       Comparator                                │
│                                                                 │
│  compare(left_id, right_id) → DiffReport                        │
│    - chunks added / removed / score-delta                       │
│    - answer similarity (embedding cosine + token Jaccard)       │
│    - latency delta                                              │
└─────────────────────────────────────────────────────────────────┘

Why LanceDB?

LanceDB's versioned storage creates a new dataset version on every write automatically. A "snapshot" in rag-timetravel is just a metadata record pointing to a specific LanceDB version integer. Checking out a past version is O(1): no data is copied. This is the key primitive that makes cheap time-travel possible.

Why SQLite for the event store?

Zero dependencies, file-portable, and fast enough for local workloads (< 1 ms per append). The EventStore interface is narrow enough to swap in a Postgres backend later with no changes to the pipeline.


Installation

pip install rag-timetravel

The base install is lightweight. The default PipelineConfig uses a local sentence-transformers embedder, which (because it pulls in torch) ships as an optional extra:

pip install 'rag-timetravel[local]'    # local embeddings via sentence-transformers

If you use an OpenAI-compatible embedder instead (embedder_model="openai/..." or "ollama/..."), the base install is all you need.

Requirements: Python 3.11+

For local generation (default config): install Ollama and pull a model:

ollama pull gemma3

Quickstart

Python API

import asyncio
from rag_timetravel import RAGPipeline, PipelineConfig, ReplayEngine, Comparator

async def main():
    # Create a pipeline
    config = PipelineConfig(model="ollama/gemma3", top_k=4)
    pipeline = await RAGPipeline.create("./my_project", config)

    # Ingest documents
    await pipeline.ingest("policy_v1.txt", open("policy_v1.txt").read())
    snap1 = await pipeline.take_snapshot(label="v1-docs")

    # Query
    result = await pipeline.query("What is the return window?")
    print(result.answer)
    print(result.query_id)  # save this

    # Ingest updated documents
    await pipeline.ingest("policy_v2.txt", open("policy_v2.txt").read())
    await pipeline.take_snapshot(label="v2-docs")

    # Replay the original query against the v1 index
    engine = ReplayEngine(pipeline)
    replay = await engine.replay(result.query_id)
    print(replay.answer)  # answer using only v1 docs

    # Diff v1 vs v2 answers
    result_v2 = await pipeline.query("What is the return window?")
    cmp = Comparator(pipeline)
    report = await cmp.compare(result.query_id, result_v2.query_id)
    print(report.to_text())

asyncio.run(main())

CLI

# Ingest a directory of .txt and .md files
rag-timetravel ingest ./docs --project ./my_project

# Take a snapshot
rag-timetravel snapshot --label "after-v1" --project ./my_project

# Query
rag-timetravel query "What is the refund policy?" --project ./my_project

# Replay a past query
rag-timetravel replay --query-id <uuid> --project ./my_project

# Replay any text against the index at a past timestamp
rag-timetravel replay \
  --text "What is the refund policy?" \
  --as-of "2024-12-01T10:00:00" \
  --project ./my_project

# Diff two queries (add --html report.html to also write an HTML report)
rag-timetravel diff --left <uuid> --right <uuid> --project ./my_project

# Show the event trace for a query
rag-timetravel trace --query-id <uuid> --project ./my_project

# List all snapshots
rag-timetravel snapshots --project ./my_project

# Start the REST API
rag-timetravel serve --project ./my_project --port 8000

REST API

rag-timetravel serve --project ./my_project
Method Path Description
POST /ingest Ingest a document
POST /query Run a RAG query
GET /query/{query_id} Get event trace for a query
POST /replay Replay a past query against its original index
POST /replay/as-of Replay query text at a historical timestamp
POST /diff Diff two query executions (JSON)
GET /diff/{left}/{right}/html Rendered HTML diff report
GET /snapshots List all snapshots
POST /snapshot Take a manual snapshot
GET /events Query the raw event log
GET /health Health check

Interactive docs: http://localhost:8000/docs


Configuration

from rag_timetravel import PipelineConfig

config = PipelineConfig(
    embedder_model="all-MiniLM-L6-v2",   # local sentence-transformers model
    model="ollama/gemma3",                # litellm model string
    top_k=4,                              # chunks to retrieve per query
    chunk_size=512,                       # words per chunk
    chunk_overlap=64,                     # word overlap between chunks
    auto_snapshot_every=10,               # snapshot after every N docs (0=off)
    table_name="documents",               # LanceDB table name
)

Using OpenAI

import os
os.environ["OPENAI_API_KEY"] = "sk-..."

config = PipelineConfig(
    embedder_model="openai/text-embedding-3-small",
    model="gpt-4o",
)

Using a custom embedder

from rag_timetravel.index.embedder import Embedder

class MyEmbedder:
    @property
    def dim(self) -> int:
        return 768

    async def embed(self, texts: list[str]) -> list[list[float]]:
        # your implementation
        ...

pipeline = await RAGPipeline.create("./project", config, embedder=MyEmbedder())

Project structure

my_project/
├── events.db          # SQLite event store (append-only log)
└── lancedb/           # LanceDB vector index (versioned automatically)
    └── documents.lance/

The event store and index are self-contained in a single directory. Copy or rsync the directory to back up the full history.


Event types

Event When Key payload fields
query.received Query enters the pipeline text, pipeline_config_id
retrieval.executed Vector search completes chunk_ids, scores, latency_ms
context.assembled Chunks merged into prompt chunk_ids, token_count
generation.executed LLM produces answer answer, model, token counts
query.completed Full round-trip done total_latency_ms
document.ingested Document written to index doc_id, source, chunk_count
index.snapshot_taken Snapshot recorded snapshot_id, lancedb_version
replay.* Replay events (same structure, separate prefix)

Development

git clone https://github.com/Ar-maan05/rag-timetravel
cd rag-timetravel
pip install -e ".[dev]"

# Run tests: they exercise a real local LanceDB but mock the LLM and use a
# stub embedder, so no GPU, model download, or API key is needed.
pytest tests/

# Lint and type-check
ruff check src/rag_timetravel/
mypy src/rag_timetravel/

Roadmap

  • SQLite event store
  • LanceDB index versioning via checkout
  • Query, replay, diff (JSON + HTML report)
  • CLI + FastAPI server
  • PostgreSQL event store backend
  • Snapshot scheduling (cron-style)
  • bisect command: binary search across snapshots to find when an answer changed
  • Web UI for browsing event traces
  • MCP server wrapper

Future Explorations

  • Advanced Retrieval Replay: Hybrid search (vector + BM25) and re-ranker versioning
  • Visual Diffing & Analytics: Ingested document diffs and embedding drift visualization
  • Cloud Storage Backends: Support LanceDB checkouts directly from S3/GCS
  • Observability Integrations: Export time-travel traces to OpenTelemetry, Langfuse, or Phoenix

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rag_timetravel-1.0.0.tar.gz (61.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rag_timetravel-1.0.0-py3-none-any.whl (58.2 kB view details)

Uploaded Python 3

File details

Details for the file rag_timetravel-1.0.0.tar.gz.

File metadata

  • Download URL: rag_timetravel-1.0.0.tar.gz
  • Upload date:
  • Size: 61.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.5

File hashes

Hashes for rag_timetravel-1.0.0.tar.gz
Algorithm Hash digest
SHA256 44ad30c32f4d6210f700011fffeb8de9f01ca9f87d2b1148a31532205f7c07c4
MD5 79f271818a20e7fe6075cf2cae232028
BLAKE2b-256 e0ad9e67e24203bc0b5adb4cf78c0a84c8379e29c25b3c081112157f8e71a5c3

See more details on using hashes here.

File details

Details for the file rag_timetravel-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: rag_timetravel-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 58.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.5

File hashes

Hashes for rag_timetravel-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 dabce4f1cf3ce70fbb2f4bdb857b4162004c4dfb6622fc0f507e7b5351a5699e
MD5 3c4f9a18e24ef704263f0f616c53d754
BLAKE2b-256 1435a68a6f9e8b33b40e79b91a4da87c650a56b3e90785d29525d22f5796f87c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page