Intelligent ML data observability for the lakehouse — sketch-based profiling with LLM interpretation
Project description
lakesense
Intelligent ML data observability for the lakehouse.
lakesense profiles your datasets using mergeable probabilistic sketches (MinHash, HyperLogLog, KLL) and deterministic column profiles, builds dynamic baselines per job, and uses an LLM agent to investigate and explain drift signals — with pluggable alerting and storage.
Why lakesense?
Existing tools stop at drift detection — they tell you a number changed. lakesense adds an interpretation layer: a two-tier pipeline that runs heuristic rules on every job, escalates to an LLM for nuanced interpretation on warn/alert, and fires an investigative agent only when something is actually wrong.
Key properties:
- Probabilistic sketches — MinHash, HLL, KLL for O(1) memory profiling with mergeable baselines
- Full column profiling — null rates, int ranges, categorical distributions, boolean ratios, string lengths, schema drift
- Distributed compute — Spark provider for distributed sketch computation via
mapInPandas - Zero-infra quickstart — Parquet backend, or native Lakehouse via Iceberg backend
- Plugin architecture — bring your own storage, alerting, and agent tools
- Tier 2 Investigative Agent — on critical data drift, an LLM agent automatically traces DataHub lineage and Slack for root causes
- Two-tier cost control — heuristics always run free; LLM only invoked on warn/alert; expensive agent only on alert
- No-network mode — works 100% locally using heuristic rules when no API key is set
Quickstart
pip install lakesense
import asyncio
from datetime import datetime, timezone
import pandas as pd
from lakesense.core import SketchFramework
from lakesense.storage.parquet import ParquetBackend
from lakesense.sketches.providers.pandas import PandasProvider
from lakesense.sketches.merge import BaselineConfig
# run_ts = the data interval this run covers (e.g. Airflow's data_interval_end)
# The baseline window queries historical sketches using run_ts as the upper bound,
# so each run needs a distinct timestamp to see prior runs as history.
run_ts = datetime(2026, 3, 30, tzinfo=timezone.utc)
# 1. Compute sketches from your data
df = pd.read_parquet("features/latest.parquet")
provider = PandasProvider()
records = provider.sketch(
data=df,
dataset_id="user_features",
job_id="train_job_42",
text_columns=["description"],
id_columns=["user_id"],
numeric_columns=["session_count", "revenue"],
run_ts=run_ts,
)
# 2. Persist sketches for baseline building
storage = ParquetBackend("./sketches")
asyncio.run(storage.write_sketches(records))
# 3. Run the interpretation pipeline
framework = SketchFramework(storage=storage)
result = asyncio.run(framework.run({
"dataset_id": "user_features",
"job_id": "train_job_42",
"sketch_records": records,
"data_interval_end": run_ts,
"baseline_config": BaselineConfig(dataset_id="user_features", window_days=7),
}))
print(result.severity) # ok | warn | alert
print(result.summary) # "Jaccard similarity dropped 34% vs 7-day baseline..."
Heuristic rules run on every job (free, instant). Set OPENAI_API_KEY or ANTHROPIC_API_KEY
to add LLM-powered interpretation — the LLM is only invoked when heuristics flag warn/alert,
so healthy runs never incur an API call.
The framework auto-detects your provider from the environment. To use a specific provider:
from lakesense.interpreter.providers.anthropic_provider import AnthropicProvider
from lakesense.interpreter.providers.openai_provider import OpenAIProvider
# Anthropic (default model: claude-sonnet-4-6)
framework = SketchFramework(storage=storage, llm_provider=AnthropicProvider())
# OpenAI (default model: gpt-4o)
framework = SketchFramework(storage=storage, llm_provider=OpenAIProvider())
# Custom model + token budget
framework = SketchFramework(
storage=storage,
llm_provider=AnthropicProvider(model="claude-sonnet-4-6", max_tokens=8192),
)
Run the full quickstart example (no API key needed):
pip install lakesense[duckdb]
python examples/quickstart.py
Architecture
Every run → Tier 1: sketch compute + baseline merge + LLM interpret → severity + summary
warn/alert → Tier 2: plugins (investigative agent, Slack, PagerDuty) → root cause + action
Tier 1 — base interpretation (always runs)
- Compute sketches (MinHash, HLL, KLL) and column profiles from the dataset
- Merge historical sketches into a baseline (rolling window, snapshot, or EWMA)
- Compute drift signals (Jaccard delta, cardinality ratio, quantile shifts, null rate, schema drift)
- Run heuristic rules — if severity is
ok, return immediately (no LLM cost) - On
warn/alert— call the LLM for nuanced interpretation + summary (LLM can upgrade severity but not downgrade below the heuristic floor)
Tier 2 — plugin chain (on warn/alert only)
Plugins run in registration order, each receiving the result enriched by prior plugins:
from lakesense.plugins.agent import InvestigativeAgentPlugin
from lakesense.plugins.slack import SlackAlertPlugin
from lakesense.plugins.tools.datahub import DataHubLineageTool, DataHubSearchTool
from lakesense.plugins.tools.slack import SlackIncidentSearchTool
# Configure agent tools — the LLM calls these during its ReAct loop
datahub = DataHubLineageTool(endpoint="https://my-datahub.local", token="...")
datahub_search = DataHubSearchTool(endpoint="https://my-datahub.local", token="...")
slack_search = SlackIncidentSearchTool(token="xoxb-your-slack-token")
framework = (
SketchFramework(storage=IcebergBackend(catalog_name="default", sketches_table="lakesense.sketches"))
# Tier 2 agent — traces lineage + searches Slack for root cause
.register(InvestigativeAgentPlugin(tools=[
datahub.get_upstream_lineage,
datahub.get_downstream_lineage,
datahub_search.search_datahub_dataset,
slack_search.search_slack_incidents,
]))
# Slack alerting — posts enriched alerts after the agent investigates
.register(SlackAlertPlugin(webhook="https://hooks.slack.com/services/..."))
)
Sketch providers
| Provider | Use case | Install |
|---|---|---|
PandasProvider |
Single-machine, local dev | pip install lakesense |
SparkProvider |
Distributed compute via mapInPandas |
pip install lakesense[spark] |
StreamingProvider |
Incremental / micro-batch | pip install lakesense |
LLM providers
| Provider | Default model | Install |
|---|---|---|
AnthropicProvider |
claude-sonnet-4-6 |
pip install lakesense[anthropic] |
OpenAIProvider |
gpt-4o |
pip install lakesense[openai] |
Both providers implement the LLMProvider interface (analyze for Tier 1 interpretation, act_and_reason for the Tier 2 ReAct agent loop). The framework auto-resolves the provider from your environment if not explicitly set.
Sketch types
| Sketch | Use case | Merge cost |
|---|---|---|
| MinHash (Theta) | Text/set similarity, near-duplicate detection | O(num_perm) |
| HyperLogLog | Cardinality estimation (unique users, items) | O(registers) |
| KLL | Quantile estimation, distribution shape shifts | approx via sorted sample |
| Profile | Deterministic column metrics (nulls, ranges, categoricals) | scalar comparison |
Storage backends
| Backend | Use case | Install |
|---|---|---|
ParquetBackend |
Zero-infra, local dev | pip install lakesense |
DuckDBBackend |
Local + SQL queries | pip install lakesense[duckdb] |
IcebergBackend |
Production lakehouse, native timestamps | pip install lakesense[iceberg] |
Agent tools
| Tool | Purpose | Install |
|---|---|---|
DataHubLineageTool |
Upstream/downstream lineage tracing | pip install lakesense[datahub] |
DataHubSearchTool |
Resolve dataset names to DataHub URNs | pip install lakesense[datahub] |
SlackIncidentSearchTool |
Search Slack for correlated incidents | pip install lakesense[slack] |
Baseline strategies
from lakesense.sketches.merge import BaselineConfig, BaselineStrategy
# Rolling window — merge all runs in the last N days
BaselineConfig(dataset_id="ds", strategy=BaselineStrategy.ROLLING_WINDOW, window_days=7)
# Snapshot — pin a known-good run as reference
BaselineConfig(dataset_id="ds", strategy=BaselineStrategy.SNAPSHOT,
snapshot_id="2024-01-15T00:00:00+00:00")
# EWMA — exponentially weight recent runs more
BaselineConfig(dataset_id="ds", strategy=BaselineStrategy.EWMA, decay_factor=0.85)
Writing a custom plugin
from lakesense.core import SketchPlugin, InterpretationResult, Severity
class PagerDutyPlugin(SketchPlugin):
def __init__(self, routing_key: str):
self._key = routing_key
def should_run(self, result: InterpretationResult) -> bool:
return result.severity == Severity.ALERT and result.is_agent_enriched()
async def run(self, result: InterpretationResult) -> InterpretationResult:
await self._page(result)
result.metadata["pagerduty"] = "paged"
return result
Roadmap
- v0.1 — core sketches, column profiles, Parquet + DuckDB storage, Tier 1 LLM interpret, Spark provider ✅
- v0.2 — provider-agnostic LLM interface (Anthropic + OpenAI), investigative agent with ReAct loop, DataHub lineage + search tools, Slack incident search tool, IcebergBackend with native timestamps ✅
- v0.3 — DeltaLake Backend, Airflow operator, OpenLineage support
- v0.4 — JIRA plugin, column-level lineage
Contributing
See CONTRIBUTING.md. PRs welcome — especially new storage backends and plugins.
pip install -e ".[dev]"
pytest tests/unit/
ruff check .
mypy lakesense/
License
Apache 2.0 — see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file lakesense-0.2.0.tar.gz.
File metadata
- Download URL: lakesense-0.2.0.tar.gz
- Upload date:
- Size: 54.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8cc153fcce2eedb6ffb1e4fa9c92a8280bb5c2bd5a5c0cfa1a12f55f1847b2e0
|
|
| MD5 |
fa255cc8f82ca667f476e733169e617f
|
|
| BLAKE2b-256 |
fa87efd0169edd699af91e5a8a986cc9b67e023b1167e073ac71bd60885eae56
|
Provenance
The following attestation bundles were made for lakesense-0.2.0.tar.gz:
Publisher:
publish.yml on ramannanda9/lakesense
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
lakesense-0.2.0.tar.gz -
Subject digest:
8cc153fcce2eedb6ffb1e4fa9c92a8280bb5c2bd5a5c0cfa1a12f55f1847b2e0 - Sigstore transparency entry: 1201571987
- Sigstore integration time:
-
Permalink:
ramannanda9/lakesense@bc85ecf94220407ffe30f0d0b50d982714bf5969 -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/ramannanda9
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@bc85ecf94220407ffe30f0d0b50d982714bf5969 -
Trigger Event:
push
-
Statement type:
File details
Details for the file lakesense-0.2.0-py3-none-any.whl.
File metadata
- Download URL: lakesense-0.2.0-py3-none-any.whl
- Upload date:
- Size: 57.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
91b3d4523aad33f5698cc06e7a658b54921f08c56002c62a10c0b4c20c771906
|
|
| MD5 |
efa7a53a192b0258677e901730bfa34d
|
|
| BLAKE2b-256 |
c8a2c948cc62ea5fec319c4fb17ea6c024d367d35353428e7e0e5d534451df81
|
Provenance
The following attestation bundles were made for lakesense-0.2.0-py3-none-any.whl:
Publisher:
publish.yml on ramannanda9/lakesense
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
lakesense-0.2.0-py3-none-any.whl -
Subject digest:
91b3d4523aad33f5698cc06e7a658b54921f08c56002c62a10c0b4c20c771906 - Sigstore transparency entry: 1201572005
- Sigstore integration time:
-
Permalink:
ramannanda9/lakesense@bc85ecf94220407ffe30f0d0b50d982714bf5969 -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/ramannanda9
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@bc85ecf94220407ffe30f0d0b50d982714bf5969 -
Trigger Event:
push
-
Statement type: