Skip to main content

Corpus health analytics for RAG pipelines

Project description

Corpulse logo

corpulse

Corpus health analytics for RAG pipelines. Track which documents help, which ones don't, and which ones are just noise.


The Problem

Your vector database grows over time. Documents get added, re-chunked, and updated. Old versions linger. Near-identical content gets indexed twice. Outdated files keep surfacing in results. Without tracking, you're building on top of noise. corpulse surfaces these issues automatically.


What corpulse is (and isn't)

corpulse measures corpus health — which documents are retrieved, how often, and whether users act on them.

It does not measure answer quality, faithfulness, or relevance. For that, see tools like Ragas or DeepEval.

Think of it as a fitness tracker for your document corpus, not a grade on your answers.


Installation

# Core library
pip install corpulse

# With Qdrant wrapper support
pip install "corpulse[qdrant]"

# From source
pip install "git+https://github.com/arkadyb/corpulse.git"

Requires Python 3.10+. The [qdrant] extra installs qdrant-client>=1.7.

Maintainers should use .github/RELEASE_CHECKLIST.md for the first-release flow.


Contributing and Security

Contributions are welcome through reviewed pull requests. See CONTRIBUTING.md before opening a larger change.

Please report suspected vulnerabilities privately using the process in SECURITY.md, not through public issues.


Quickstart: Manual API

from corpulse import Corpulse

corp = Corpulse()  # writes to ./corpulse.db

# After your vector DB search returns results
results = [
    {"doc_id": "abc123", "filename": "guide.md", "score": 0.91},
    {"doc_id": "def456", "filename": "faq.md",   "score": 0.87},
]
corp.log_retrieval(results, query="how to install?")

# When user acts on a result
corp.log_engagement("abc123", event="opened")

# Print corpus health table
corp.report()

report() pretty-prints with tabulate if installed, falls back to plain text otherwise.


Quickstart: Qdrant Wrapper

Before (manual instrumentation):

from qdrant_client import QdrantClient
from corpulse import Corpulse

client = QdrantClient(":memory:")
corp = Corpulse()

result = client.query_points(collection_name="docs", query=[0.1, 0.2, ...], limit=5)
# Must manually extract results and call log_retrieval
records = [
    {"doc_id": str(p.id), "filename": p.payload.get("filename", str(p.id)), "score": p.score}
    for p in result.points
]
corp.log_retrieval(records, query="how to install?")

After (automatic via wrapper):

from qdrant_client import QdrantClient
from corpulse import Corpulse, QdrantCorpulseClient

client = QdrantClient(":memory:")
corp = Corpulse()
wrapped = QdrantCorpulseClient(client, corp)

result = wrapped.query_points(collection_name="docs", query=[0.1, 0.2, ...], limit=5)
# log_retrieval() called automatically — result is unchanged

Async variant:

import asyncio
from qdrant_client import AsyncQdrantClient
from corpulse import Corpulse, AsyncQdrantCorpulseClient

async def main():
    client = AsyncQdrantClient(":memory:")
    corp = Corpulse()
    wrapped = AsyncQdrantCorpulseClient(client, corp)

    result = await wrapped.query_points(
        collection_name="docs", query=[0.1, 0.2, ...], limit=5
    )
    # log_retrieval() called automatically

asyncio.run(main())

Constructor parameters:

  • payload_id_field — payload key to use as document ID (default: None, uses Qdrant point ID)
  • payload_filename_key — payload key for filename (default: "filename")

Advanced: generic wrapper engine

If a client exposes stable query methods and you can normalize its native response into Corpulse records, you can use the generic wrap() API instead of writing a dedicated class:

from corpulse import Corpulse, WrapMethod, wrap

wrapped = wrap(
    client,
    Corpulse(),
    methods={
        "search": WrapMethod(
            normalize=lambda result, args, kwargs: [
                {
                    "doc_id": hit["id"],
                    "filename": hit["name"],
                    "score": hit["score"],
                    "embedding": None,
                }
                for hit in result.hits
            ]
        )
    },
)

This removes most wrapper boilerplate, but each database still needs a normalization recipe for its response shape.


Async usage

corpulse ships a fully async interface via AsyncCorpulse. It returns structured data instead of printing, making it ideal for web services and async pipelines.

import asyncio
from corpulse import AsyncCorpulse
from corpulse.backends import AsyncPostgresBackend

async def main():
    backend = await AsyncPostgresBackend.create(
        "postgresql://user:pass@localhost/mydb"
    )
    async with AsyncCorpulse(backend=backend) as corp:
        # Ingest: called after every vector DB query in your RAG pipeline
        await corp.log_retrieval(
            [{"doc_id": "abc123", "filename": "guide.md", "score": 0.91}],
            query="how to install?",
        )
        await corp.log_engagement("abc123", event="opened")

        ghosts = await corp.get_ghosts()
        print(f"Ghost docs: {len(ghosts)}")

        report = await corp.report(window_days=30)
        print(report["summary"])
        print(report["rows"][:3])

        cleanup = await corp.cleanup_report()
        print(cleanup["ghosts"])
        print(cleanup["suspects"])

asyncio.run(main())

AsyncCorpulse.report() and AsyncCorpulse.cleanup_report() return dictionaries with structured payloads, so you can log them, send them over HTTP, or render them in your own UI without parsing stdout.

Workload Trace Capture

corpulse can capture append-only RAG request traces for observability and later replay work. Raw query and component content are optional; you can store hashes and references instead.

from corpulse import Corpulse, AsyncCorpulse

corp = Corpulse()
corp.log_rag_request(
    session_id="session-123",
    query="What is the answer?",
    request_id="req-123",
    components=[
        {"type": "system_prompt", "token_count": 12, "refs": None, "content_hash": "sp-1", "metadata": None},
        {"type": "vector_db", "token_count": 42, "refs": [{"doc_id": "abc123"}], "content_hash": "vec-1", "metadata": {"top_k": 5}},
        {"type": "chat_history", "token_count": 18, "refs": [{"turn": 3}], "content_hash": None, "metadata": {"window": 4}},
        {"type": "user_input", "token_count": 9, "refs": None, "content_hash": "ui-1", "metadata": None},
    ],
    timings={"ttft_ms": 210, "tpot_ms": 18, "retrieval_ms": 42},
)

async def log_async(async_corp: AsyncCorpulse) -> None:
    await async_corp.alog_rag_request(
        session_id="session-123",
        query=None,
        request_id="req-124",
        components=[{"type": "other", "token_count": None, "refs": None, "content_hash": "fallback", "metadata": {"mode": "hash-only"}}],
        timings={"queue_ms": 7, "total_latency_ms": 409},
        timeout=False,
    )

Workload Trace JSONL Import/Export

The JSONL schema version is corpulse.rag_request_trace.v1. Export is privacy-first by default: raw query text and component metadata are omitted unless you opt in.

from corpulse import Corpulse, AsyncCorpulse

corp = Corpulse()
corp.export_rag_request_traces_jsonl("traces.jsonl")
corp.import_rag_request_traces_jsonl("traces.jsonl")

async def round_trip(async_corp: AsyncCorpulse) -> None:
    await async_corp.aexport_rag_request_traces_jsonl("traces.jsonl")
    await async_corp.aimport_rag_request_traces_jsonl("traces.jsonl")

Default export keeps the trace portable without raw content:

{"captured_at":1710000000.0,"components":[{"content_hash":"vec-1","metadata":null,"refs":[{"doc_id":"abc123"}],"token_count":42,"type":"vector_db"}],"error":null,"input_token_count":42,"output_token_count":9,"query_hash":"abc123","query_text":null,"request_id":"req-123","schema_version":"corpulse.rag_request_trace.v1","session_id":"session-123","timeout":false,"timings":{"retrieval_ms":42.0,"ttft_ms":210.0}}

Use include_raw_text=True and include_component_metadata=True only when you explicitly want to export the raw trace payload.

Import is append-oriented and skips duplicate trace fingerprints by default, so re-importing the same JSONL file does not create duplicate analytics rows.

Callable Replay

Replay uses captured or JSONL-imported workload traces and invokes your supplied callable once per trace. corpulse sorts traces by capture time, builds a replay request envelope, records success or failure, and does not store the callable return value.

from corpulse import Corpulse

corp = Corpulse()

def replay_handler(request):
    print(request["request_id"], request["query_hash"])

replay = corp.replay_rag_request_traces(
    replay_handler,
    window_days=30,
    time_scale=None,
)
print(replay["summary"])
async def async_replay_handler(request):
    print(request["request_id"], request["query_hash"])

replay = await async_corp.areplay_rag_request_traces(async_replay_handler)

time_scale=None means no sleeping. time_scale=1.0 replays captured deltas in real time, and larger values replay faster. You can cap each scheduled delay with max_delay_seconds.

Core corpulse does not ship an OpenAI SDK, HTTP client, or benchmark exporter for replay. Users needing OpenAI-compatible endpoint replay should implement the supplied callable with their own raw prompt/message reconstruction, endpoint client, and result retention policy.

Workload and Serving Reports

Use workload_report() to summarize request volume, throughput, burst windows, token pressure, and component composition. Use serving_report() to inspect TTFT, TPOT, total latency, stage latencies, percentiles, timeout rate, error rate, and slow-request contributors. Use session_report() to inspect conversation-level behavior across captured or JSONL-imported traces.

from corpulse import Corpulse

corp = Corpulse()

workload = corp.workload_report(window_days=30)
print(workload["traffic"])
print(workload["tokens"])
print(workload["components"])

serving = corp.serving_report(window_days=30)
print(serving["ttft_ms"])
print(serving["slow_request_contributors"])

session = corp.session_report(window_days=30)
print(session["summary"])
print(session["sessions"])
print(session["context_reuse"])

These reports read the same captured or JSONL-imported traces exposed by get_rag_request_traces(). Session summary covers request count, turns per session, duration, follow-up rate, and history growth; sessions contains per-session timing and token growth details. context_reuse surfaces repeated refs or content hashes within the same session, without semantic matching, cache recommendations, LLM-as-judge, or online inference dependencies.

Generation trace capture

corpulse also supports append-only trace capture for future generation metrics. Use it to store the prompt or query text, the retrieved context references you fed into generation, the final answer text, and optional evaluation labels.

from corpulse import Corpulse

corp = Corpulse()
corp.log_generation_trace(
    prompt_text="Answer the user's question",
    retrieved_context_refs=[{"doc_id": "abc123", "chunk_id": "c-1"}],
    final_answer_text="Here is the response.",
    evaluation_labels=["grounded"],
)

traces = corp.get_generation_traces()

Trace records are read-only once written and do not change any existing corpus-health analytics.


What It Measures

  • Ghost documents — registered but never retrieved within a time window
  • Near-duplicates — embedding pairs above a cosine similarity threshold (requires scikit-learn)
  • Obsolete versions — e.g. api-v1.md superseded by api-v2.md
  • Stale embeddings — source file updated but embedding not refreshed
  • Low-engagement suspects — retrieved often but users rarely act on them
  • Mean Reciprocal Rank — retrieval-order quality proxy based on existing ranks plus engagement overlap
  • User Acceptance Rate — share of engagement rows whose event_type is one of opened, clicked, copied, or thumbs_up
  • Generation trace capture — append-only prompt/query text, retrieved context refs, final answer text, and optional labels for future generation metrics

Configuration

corp = Corpulse(
    db_path="./corpulse.db",          # SQLite database path
    ghost_threshold_days=30,         # Days before flagging as ghost
    duplicate_threshold=0.92,        # Cosine similarity threshold
    stale_threshold_days=14,         # Days of source-vs-embedding lag
    obsolete_pattern=r"v\d+",        # Regex for version detection in filenames
    top_k_report=20,                 # Documents shown in report()
)

Analysis Methods

All analysis methods use the configured lookback window. If you do not pass window_days, corpulse uses ghost_threshold_days.

Method What it measures Example use
get_ghosts() Documents that were registered but not retrieved during the lookback window. Find files that exist in the index but never show up in search, such as a stale draft nobody clicks.
get_duplicates() Pairs of documents whose embeddings are above the configured cosine similarity threshold. Spot near-identical files like api-v1.md and api-v1-copy.md that are both being indexed.
get_obsolete() Older documents that appear to have been superseded by a newer filename version. Detect versioned docs such as guide-v1.md that should probably be replaced by guide-v2.md.
get_stale_embeddings() Documents whose source file timestamp is newer than the stored embedding timestamp. Catch a document that was edited yesterday but still has an embedding from last week.
get_suspects() Documents with high retrieval volume but low engagement rate. Identify pages that are frequently returned by search but rarely opened or acted on.
mean_reciprocal_rank() A retrieval-order quality proxy based on retrieval rank and whether the document was engaged with. Higher is better. Use it to check whether documents that users actually interact with tend to appear near the top of results.
acceptance_rate() The share of engagement events whose normalized event_type is in the accepted allowlist: opened, clicked, copied, or thumbs_up. If you log 80 total engagement events and 60 are opens/clicks/copies/thumbs-up, the acceptance rate is 0.75.
corpus_health() A summary of corpus noise: ghosts, obsolete docs, stale embeddings, duplicates, plus a bloat warning and recommendation. Get a quick “how healthy is my index?” snapshot before deciding whether cleanup is urgent.
to_dataframe() A per-document pandas DataFrame with retrievals, engagements, engagement rate, and status. Load the full stats into a notebook or BI tool to sort by retrievals and inspect outliers.
report() A human-readable corpus health report printed to stdout. Run it in a CLI job or cron task to print a quick snapshot without writing custom formatting code.
cleanup_report() A prioritized cleanup payload with ghosts, obsolete docs, stale embeddings, and suspects. Feed it into a maintenance workflow that decides what to delete, refresh, or review first.

License

MPL 2.0 — see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

corpulse-1.9.3.tar.gz (51.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

corpulse-1.9.3-py3-none-any.whl (62.4 kB view details)

Uploaded Python 3

File details

Details for the file corpulse-1.9.3.tar.gz.

File metadata

  • Download URL: corpulse-1.9.3.tar.gz
  • Upload date:
  • Size: 51.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for corpulse-1.9.3.tar.gz
Algorithm Hash digest
SHA256 116f1cd183239dcc278a87694577beda8a45013f0d0d0201d34aa5d89eb7ded9
MD5 2591d30c22d78f30db40374a3a6060cf
BLAKE2b-256 d7a64d1714adbe25a8defc7ca1947cdd974480afc245096ae86d0b09e3cd44c8

See more details on using hashes here.

Provenance

The following attestation bundles were made for corpulse-1.9.3.tar.gz:

Publisher: release.yml on arkadyb/corpulse

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file corpulse-1.9.3-py3-none-any.whl.

File metadata

  • Download URL: corpulse-1.9.3-py3-none-any.whl
  • Upload date:
  • Size: 62.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for corpulse-1.9.3-py3-none-any.whl
Algorithm Hash digest
SHA256 9c6c0848ed0b9d21ace15965047c2652f490e9ddc7d787d3cafd93efc399fc6f
MD5 ffdf12bc3891c1dcafb14a402064ebee
BLAKE2b-256 1d987396961cddcecf64822ac0755fa32af356612d75f980b3b65493a9bd6812

See more details on using hashes here.

Provenance

The following attestation bundles were made for corpulse-1.9.3-py3-none-any.whl:

Publisher: release.yml on arkadyb/corpulse

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page