Corpus health analytics for RAG pipelines
Project description
corpulse
Corpus health analytics for RAG pipelines. Track which documents help, which ones don't, and which ones are just noise.
The Problem
Your vector database grows over time. Documents get added, re-chunked, and updated. Old versions linger. Near-identical content gets indexed twice. Outdated files keep surfacing in results. Without tracking, you're building on top of noise. corpulse surfaces these issues automatically.
What corpulse is (and isn't)
corpulse measures corpus health — which documents are retrieved, how often, and whether users act on them.
It does not measure answer quality, faithfulness, or relevance. For that, see tools like Ragas or DeepEval.
Think of it as a fitness tracker for your document corpus, not a grade on your answers.
Installation
# Core library
pip install corpulse
# With Qdrant wrapper support
pip install "corpulse[qdrant]"
# From source
pip install "git+https://github.com/arkadyb/corpulse.git"
Requires Python 3.10+. The [qdrant] extra installs qdrant-client>=1.7.
Maintainers should use .github/RELEASE_CHECKLIST.md for the first-release flow.
Contributing and Security
Contributions are welcome through reviewed pull requests. See CONTRIBUTING.md before opening a larger change.
Please report suspected vulnerabilities privately using the process in SECURITY.md, not through public issues.
Quickstart: Manual API
from corpulse import Corpulse
corp = Corpulse() # writes to ./corpulse.db
# After your vector DB search returns results
results = [
{"doc_id": "abc123", "filename": "guide.md", "score": 0.91},
{"doc_id": "def456", "filename": "faq.md", "score": 0.87},
]
corp.log_retrieval(results, query="how to install?")
# When user acts on a result
corp.log_engagement("abc123", event="opened")
# Print corpus health table
corp.report()
report() pretty-prints with tabulate if installed, falls back to plain text otherwise.
Quickstart: Qdrant Wrapper
Before (manual instrumentation):
from qdrant_client import QdrantClient
from corpulse import Corpulse
client = QdrantClient(":memory:")
corp = Corpulse()
result = client.query_points(collection_name="docs", query=[0.1, 0.2, ...], limit=5)
# Must manually extract results and call log_retrieval
records = [
{"doc_id": str(p.id), "filename": p.payload.get("filename", str(p.id)), "score": p.score}
for p in result.points
]
corp.log_retrieval(records, query="how to install?")
After (automatic via wrapper):
from qdrant_client import QdrantClient
from corpulse import Corpulse, QdrantCorpulseClient
client = QdrantClient(":memory:")
corp = Corpulse()
wrapped = QdrantCorpulseClient(client, corp)
result = wrapped.query_points(collection_name="docs", query=[0.1, 0.2, ...], limit=5)
# log_retrieval() called automatically — result is unchanged
Async variant:
import asyncio
from qdrant_client import AsyncQdrantClient
from corpulse import Corpulse, AsyncQdrantCorpulseClient
async def main():
client = AsyncQdrantClient(":memory:")
corp = Corpulse()
wrapped = AsyncQdrantCorpulseClient(client, corp)
result = await wrapped.query_points(
collection_name="docs", query=[0.1, 0.2, ...], limit=5
)
# log_retrieval() called automatically
asyncio.run(main())
Constructor parameters:
payload_id_field— payload key to use as document ID (default:None, uses Qdrant point ID)payload_filename_key— payload key for filename (default:"filename")
Advanced: generic wrapper engine
If a client exposes stable query methods and you can normalize its native
response into Corpulse records, you can use the generic wrap() API instead of
writing a dedicated class:
from corpulse import Corpulse, WrapMethod, wrap
wrapped = wrap(
client,
Corpulse(),
methods={
"search": WrapMethod(
normalize=lambda result, args, kwargs: [
{
"doc_id": hit["id"],
"filename": hit["name"],
"score": hit["score"],
"embedding": None,
}
for hit in result.hits
]
)
},
)
This removes most wrapper boilerplate, but each database still needs a normalization recipe for its response shape.
Async usage
corpulse ships a fully async interface via AsyncCorpulse. It returns structured data instead of printing, making it ideal for web services and async pipelines.
import asyncio
from corpulse import AsyncCorpulse
from corpulse.backends import AsyncPostgresBackend
async def main():
backend = await AsyncPostgresBackend.create(
"postgresql://user:pass@localhost/mydb"
)
async with AsyncCorpulse(backend=backend) as corp:
# Ingest: called after every vector DB query in your RAG pipeline
await corp.log_retrieval(
[{"doc_id": "abc123", "filename": "guide.md", "score": 0.91}],
query="how to install?",
)
await corp.log_engagement("abc123", event="opened")
ghosts = await corp.get_ghosts()
print(f"Ghost docs: {len(ghosts)}")
report = await corp.report(window_days=30)
print(report["summary"])
print(report["rows"][:3])
cleanup = await corp.cleanup_report()
print(cleanup["ghosts"])
print(cleanup["suspects"])
asyncio.run(main())
AsyncCorpulse.report() and AsyncCorpulse.cleanup_report() return dictionaries with structured payloads, so you can log them, send them over HTTP, or render them in your own UI without parsing stdout.
Workload Trace Capture
corpulse can capture append-only RAG request traces for observability and later replay work. Raw query and component content are optional; you can store hashes and references instead.
from corpulse import Corpulse, AsyncCorpulse
corp = Corpulse()
corp.log_rag_request(
session_id="session-123",
query="What is the answer?",
request_id="req-123",
components=[
{"type": "system_prompt", "token_count": 12, "refs": None, "content_hash": "sp-1", "metadata": None},
{"type": "vector_db", "token_count": 42, "refs": [{"doc_id": "abc123"}], "content_hash": "vec-1", "metadata": {"top_k": 5}},
{"type": "chat_history", "token_count": 18, "refs": [{"turn": 3}], "content_hash": None, "metadata": {"window": 4}},
{"type": "user_input", "token_count": 9, "refs": None, "content_hash": "ui-1", "metadata": None},
],
timings={"ttft_ms": 210, "tpot_ms": 18, "retrieval_ms": 42},
)
async def log_async(async_corp: AsyncCorpulse) -> None:
await async_corp.alog_rag_request(
session_id="session-123",
query=None,
request_id="req-124",
components=[{"type": "other", "token_count": None, "refs": None, "content_hash": "fallback", "metadata": {"mode": "hash-only"}}],
timings={"queue_ms": 7, "total_latency_ms": 409},
timeout=False,
)
Workload Trace JSONL Import/Export
The JSONL schema version is corpulse.rag_request_trace.v1. Export is privacy-first by default: raw query text and component metadata are omitted unless you opt in.
from corpulse import Corpulse, AsyncCorpulse
corp = Corpulse()
corp.export_rag_request_traces_jsonl("traces.jsonl")
corp.import_rag_request_traces_jsonl("traces.jsonl")
async def round_trip(async_corp: AsyncCorpulse) -> None:
await async_corp.aexport_rag_request_traces_jsonl("traces.jsonl")
await async_corp.aimport_rag_request_traces_jsonl("traces.jsonl")
Default export keeps the trace portable without raw content:
{"captured_at":1710000000.0,"components":[{"content_hash":"vec-1","metadata":null,"refs":[{"doc_id":"abc123"}],"token_count":42,"type":"vector_db"}],"error":null,"input_token_count":42,"output_token_count":9,"query_hash":"abc123","query_text":null,"request_id":"req-123","schema_version":"corpulse.rag_request_trace.v1","session_id":"session-123","timeout":false,"timings":{"retrieval_ms":42.0,"ttft_ms":210.0}}
Use include_raw_text=True and include_component_metadata=True only when you explicitly want to export the raw trace payload.
Import is append-oriented and skips duplicate trace fingerprints by default, so re-importing the same JSONL file does not create duplicate analytics rows.
Callable Replay
Replay uses captured or JSONL-imported workload traces and invokes your supplied callable once per trace. corpulse sorts traces by capture time, builds a replay request envelope, records success or failure, and does not store the callable return value.
from corpulse import Corpulse
corp = Corpulse()
def replay_handler(request):
print(request["request_id"], request["query_hash"])
replay = corp.replay_rag_request_traces(
replay_handler,
window_days=30,
time_scale=None,
)
print(replay["summary"])
async def async_replay_handler(request):
print(request["request_id"], request["query_hash"])
replay = await async_corp.areplay_rag_request_traces(async_replay_handler)
time_scale=None means no sleeping. time_scale=1.0 replays captured deltas in real time, and larger values replay faster. You can cap each scheduled delay with max_delay_seconds.
Core corpulse does not ship an OpenAI SDK, HTTP client, or benchmark exporter for replay. Users needing OpenAI-compatible endpoint replay should implement the supplied callable with their own raw prompt/message reconstruction, endpoint client, and result retention policy.
Workload and Serving Reports
Use workload_report() to summarize request volume, throughput, burst windows, token pressure, and component composition. Use serving_report() to inspect TTFT, TPOT, total latency, stage latencies, percentiles, timeout rate, error rate, and slow-request contributors. Use session_report() to inspect conversation-level behavior across captured or JSONL-imported traces.
from corpulse import Corpulse
corp = Corpulse()
workload = corp.workload_report(window_days=30)
print(workload["traffic"])
print(workload["tokens"])
print(workload["components"])
serving = corp.serving_report(window_days=30)
print(serving["ttft_ms"])
print(serving["slow_request_contributors"])
session = corp.session_report(window_days=30)
print(session["summary"])
print(session["sessions"])
print(session["context_reuse"])
These reports read the same captured or JSONL-imported traces exposed by get_rag_request_traces(). Session summary covers request count, turns per session, duration, follow-up rate, and history growth; sessions contains per-session timing and token growth details. context_reuse surfaces repeated refs or content hashes within the same session, without semantic matching, cache recommendations, LLM-as-judge, or online inference dependencies.
Generation trace capture
corpulse also supports append-only trace capture for future generation metrics. Use it to store the prompt or query text, the retrieved context references you fed into generation, the final answer text, and optional evaluation labels.
from corpulse import Corpulse
corp = Corpulse()
corp.log_generation_trace(
prompt_text="Answer the user's question",
retrieved_context_refs=[{"doc_id": "abc123", "chunk_id": "c-1"}],
final_answer_text="Here is the response.",
evaluation_labels=["grounded"],
)
traces = corp.get_generation_traces()
Trace records are read-only once written and do not change any existing corpus-health analytics.
What It Measures
- Ghost documents — registered but never retrieved within a time window
- Near-duplicates — embedding pairs above a cosine similarity threshold (requires scikit-learn)
- Obsolete versions — e.g.
api-v1.mdsuperseded byapi-v2.md - Stale embeddings — source file updated but embedding not refreshed
- Low-engagement suspects — retrieved often but users rarely act on them
- Mean Reciprocal Rank — retrieval-order quality proxy based on existing ranks plus engagement overlap
- User Acceptance Rate — share of engagement rows whose
event_typeis one ofopened,clicked,copied, orthumbs_up - Generation trace capture — append-only prompt/query text, retrieved context refs, final answer text, and optional labels for future generation metrics
Configuration
corp = Corpulse(
db_path="./corpulse.db", # SQLite database path
ghost_threshold_days=30, # Days before flagging as ghost
duplicate_threshold=0.92, # Cosine similarity threshold
stale_threshold_days=14, # Days of source-vs-embedding lag
obsolete_pattern=r"v\d+", # Regex for version detection in filenames
top_k_report=20, # Documents shown in report()
)
Analysis Methods
All analysis methods use the configured lookback window. If you do not pass window_days, corpulse uses ghost_threshold_days.
| Method | What it measures | Example use |
|---|---|---|
get_ghosts() |
Documents that were registered but not retrieved during the lookback window. | Find files that exist in the index but never show up in search, such as a stale draft nobody clicks. |
get_duplicates() |
Pairs of documents whose embeddings are above the configured cosine similarity threshold. | Spot near-identical files like api-v1.md and api-v1-copy.md that are both being indexed. |
get_obsolete() |
Older documents that appear to have been superseded by a newer filename version. | Detect versioned docs such as guide-v1.md that should probably be replaced by guide-v2.md. |
get_stale_embeddings() |
Documents whose source file timestamp is newer than the stored embedding timestamp. | Catch a document that was edited yesterday but still has an embedding from last week. |
get_suspects() |
Documents with high retrieval volume but low engagement rate. | Identify pages that are frequently returned by search but rarely opened or acted on. |
mean_reciprocal_rank() |
A retrieval-order quality proxy based on retrieval rank and whether the document was engaged with. Higher is better. | Use it to check whether documents that users actually interact with tend to appear near the top of results. |
acceptance_rate() |
The share of engagement events whose normalized event_type is in the accepted allowlist: opened, clicked, copied, or thumbs_up. |
If you log 80 total engagement events and 60 are opens/clicks/copies/thumbs-up, the acceptance rate is 0.75. |
corpus_health() |
A summary of corpus noise: ghosts, obsolete docs, stale embeddings, duplicates, plus a bloat warning and recommendation. | Get a quick “how healthy is my index?” snapshot before deciding whether cleanup is urgent. |
to_dataframe() |
A per-document pandas DataFrame with retrievals, engagements, engagement rate, and status. | Load the full stats into a notebook or BI tool to sort by retrievals and inspect outliers. |
report() |
A human-readable corpus health report printed to stdout. | Run it in a CLI job or cron task to print a quick snapshot without writing custom formatting code. |
cleanup_report() |
A prioritized cleanup payload with ghosts, obsolete docs, stale embeddings, and suspects. | Feed it into a maintenance workflow that decides what to delete, refresh, or review first. |
License
MPL 2.0 — see LICENSE for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file corpulse-1.9.3.tar.gz.
File metadata
- Download URL: corpulse-1.9.3.tar.gz
- Upload date:
- Size: 51.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
116f1cd183239dcc278a87694577beda8a45013f0d0d0201d34aa5d89eb7ded9
|
|
| MD5 |
2591d30c22d78f30db40374a3a6060cf
|
|
| BLAKE2b-256 |
d7a64d1714adbe25a8defc7ca1947cdd974480afc245096ae86d0b09e3cd44c8
|
Provenance
The following attestation bundles were made for corpulse-1.9.3.tar.gz:
Publisher:
release.yml on arkadyb/corpulse
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
corpulse-1.9.3.tar.gz -
Subject digest:
116f1cd183239dcc278a87694577beda8a45013f0d0d0201d34aa5d89eb7ded9 - Sigstore transparency entry: 1681017919
- Sigstore integration time:
-
Permalink:
arkadyb/corpulse@cd99be2a62babb1bf396829b6d282e6ae925d1e6 -
Branch / Tag:
refs/tags/v1.9.3 - Owner: https://github.com/arkadyb
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@cd99be2a62babb1bf396829b6d282e6ae925d1e6 -
Trigger Event:
push
-
Statement type:
File details
Details for the file corpulse-1.9.3-py3-none-any.whl.
File metadata
- Download URL: corpulse-1.9.3-py3-none-any.whl
- Upload date:
- Size: 62.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9c6c0848ed0b9d21ace15965047c2652f490e9ddc7d787d3cafd93efc399fc6f
|
|
| MD5 |
ffdf12bc3891c1dcafb14a402064ebee
|
|
| BLAKE2b-256 |
1d987396961cddcecf64822ac0755fa32af356612d75f980b3b65493a9bd6812
|
Provenance
The following attestation bundles were made for corpulse-1.9.3-py3-none-any.whl:
Publisher:
release.yml on arkadyb/corpulse
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
corpulse-1.9.3-py3-none-any.whl -
Subject digest:
9c6c0848ed0b9d21ace15965047c2652f490e9ddc7d787d3cafd93efc399fc6f - Sigstore transparency entry: 1681018134
- Sigstore integration time:
-
Permalink:
arkadyb/corpulse@cd99be2a62babb1bf396829b6d282e6ae925d1e6 -
Branch / Tag:
refs/tags/v1.9.3 - Owner: https://github.com/arkadyb
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@cd99be2a62babb1bf396829b6d282e6ae925d1e6 -
Trigger Event:
push
-
Statement type: