Silent data watcher — tells you what your pipeline did to your data.
Project description
The silent data watcher. Decorates your pipeline functions and tells you exactly what happened to your data — row counts, schema drift, null changes, memory usage, join explosions — automatically, with zero config.
The problem
You run a data pipeline. The output looks wrong. Your only clue:
Input: 1,000,000 rows
Output: 263,979 rows
Which step dropped the rows? Was it a filter, a null drop, or a bad join? You have no idea without adding print statements everywhere and re-running the whole thing.
watcher answers that — automatically.
Install
pip install watcher # core only (pandas)
pip install "watcher[rich]" # + coloured terminal output
pip install "watcher[full]" # + Rich + psutil memory tracking
Quickstart
import pandas as pd
from watcher import watch, session
raw = pd.DataFrame({
"customer_id": [1, 2, 3, 4],
"status": ["active", "inactive", "active", None]
})
orders = pd.DataFrame({
"customer_id": [1, 3],
"amount": [250.0, 150.0]
})
@watch
def clean(df):
return df.dropna()
@watch
def merge_orders(df):
return df.merge(orders, on="customer_id", how="left")
@watch
def filter_active(df):
return df[df["status"] == "active"]
# 3. Run the session to see the watcher summary!
if __name__ == "__main__":
with session("nightly ETL") as s:
df = clean(raw)
df = merge_orders(df)
df = filter_active(df)
#=====================================
# For more Examples : exammples/
# For Syntax and Usage : docs/usage.md
# ====================================
Output — automatically, no extra code:
──────────────────────── watcher · nightly ETL ─────────────────────────
clean() 1,000,000 → 964,203 ▼ -35,797 rows (-3.6%) 12.3 ms
nulls -35,797 status (35,797 → 0)
merge_orders() 964,203 → 1,069,104 ▲ +104,901 rows (+10.9%) ⚠ 41.1 ms
columns added : +tier
💥 join explosion · duplication ratio 10.9%
key column top value repeat count
customer_id 9182 184
customer_id 3310 97
filter_active() 1,069,104 → 631,822 ▼ -437,282 rows (-40.9%) 18.7 ms
╭──────────────── watcher · nightly ETL · summary ───────────────────╮
│ step rows in rows out Δ rows time (ms) │
│ clean 1,000,000 964,203 -35,797 12.3 │
│ merge_orders 964,203 1,069,104 +104,901 41.1 │
│ filter_active 1,069,104 631,822 -437,282 18.7 │
│ │
│ total 1,000,000 → 631,822 (-368,178 rows) 72.1 ms │
╰────────────────────────────────────────────────────────────────────╯
Documentation
For advanced pipeline patterns and debugging workflows, see the full documentation.
Features
Row tracking
Every decorated function shows rows before → after, the signed diff, percentage change, and elapsed time. Nothing is hidden, nothing needs configuring.
drop_nulls() 1,000,000 → 921,330 ▼ -78,670 rows (-7.9%) 68.5 ms
Null-count deltas
Per-column null counts are compared before and after each step. The worst offenders are shown first.
drop_nulls() 1,000,000 → 921,330 ▼ -78,670 rows (-7.9%)
nulls -2,477 status (2,477 → 0)
nulls -1,448 revenue (1,448 → 0)
Schema drift
Columns added or removed between steps are detected and reported immediately.
add_revenue_band() 582,246 → 582,246 ● +0 rows
columns added : +revenue_band
drop_temp_columns() 582,246 → 582,246 ● +0 rows
columns removed : -created_at
Dtype change detection
If a step changes a column's dtype — widening (int32 → int64) or narrowing (float64 → object) — watcher flags it.
coerce_step() 10,000 → 10,000 ● +0 rows
dtype change : customer_id int64 → object
Join explosion detection
When a merge fans out unexpectedly, watcher tells you which key column caused it, which values are duplicated, and how many times — not just that rows were gained.
merge_orders() 10,000 → 20,000 ▲ +10,000 rows (+100.0%) ⚠ 💥 join explosion
columns added : +tier
join explosion · duplication ratio 100.0%
key column top value repeat count
customer_id 72 30
customer_id 383 30
customer_id 1034 28
Threshold guards
Turn watcher into a data contract enforcer. Set soft warnings or hard stops on row gain or loss.
@watch(
warn_on_loss=0.05, # ⚠ warn if > 5 % rows lost
raise_on_loss=0.20, # ✗ raise if > 20 % rows lost
warn_on_gain=0.10, # ⚠ warn if > 10 % rows gained
raise_on_gain=1.00, # ✗ raise if rows more than double
)
def merge_orders(df):
return df.merge(orders, on="customer_id", how="left")
Catching exceptions in CI:
from watcher.exceptions import ThresholdExceeded, WatcherWarning
try:
result = pipeline(df)
except ThresholdExceeded as exc:
logger.error("Data contract violated: %s", exc)
raise
Memory tracking
@watch(track_memory="rss") # process RSS via psutil — captures NumPy/pandas C allocations
@watch(track_memory="peak") # Python-heap peak via tracemalloc — no psutil needed
@watch(track_memory="off") # disabled — zero overhead for production pipelines
@watch(track_memory=True) # alias for "rss"
@watch(track_memory=False) # alias for "off"
Example output with RSS tracking on a 1M-row allocation:
big_allocation() 1,000,000 → 1,000,000 ● +0 rows 56.2 ms mem +38.5 MB (rss)
columns added : +col1, +col2, +col3, +col4, +col5
Session grouping
Group multiple steps into one named pipeline run. Get a full summary table and a machine-readable dict for CI assertions.
with session("user churn model — daily run") as s:
df = clean(df)
df = merge(df)
df = score(df)
summary = s.summary()
assert summary["total_rows_out"] > 500_000, "Too many rows dropped!"
print(summary["total_elapsed_s"])
summary() returns:
{
"name": "user churn model — daily run",
"steps": [
{"func": "clean", "rows_in": 1000000, "rows_out": 964203, "diff": -35797, ...},
{"func": "merge", ...},
{"func": "score", ...},
],
"total_rows_in": 1000000,
"total_rows_out": 631822,
"total_elapsed_s": 0.072,
"total_memory_delta_mb": +38.5,
}
Custom handlers
Swap or extend the output layer without touching your pipeline code. Every step fires on_step() on all registered handlers.
from watcher import register_handler, deregister_handler
from watcher.handlers import HandlerBase
from watcher.core import StepResult
import json
class JSONLogHandler(HandlerBase):
def __init__(self):
self.log = []
def on_step(self, step: StepResult):
self.log.append({
"step": step.func_name,
"rows_in": step.rows_in,
"rows_out": step.rows_out,
"diff": step.row_diff,
"ms": round(step.elapsed_s * 1000, 2),
})
handler = JSONLogHandler()
register_handler(handler)
# ... run your pipeline ...
deregister_handler(handler)
print(json.dumps(handler.log, indent=2))
API reference
@watch
@watch(
label: str | None = None, # custom step name shown in output
warn_on_loss: float | None = None, # soft warning threshold (0.0–1.0)
raise_on_loss: float | None = None, # hard stop threshold (0.0–1.0)
warn_on_gain: float | None = None, # soft warning on row gain
raise_on_gain: float | None = None, # hard stop on row gain
track_memory: bool | str | MemoryMode = "rss",
verbose: bool = True, # False = silent, step still tracked in session
)
Can be used bare (@watch) or with arguments (@watch(warn_on_loss=0.05)).
session(name)
Context manager. Groups @watch steps into one named pipeline run and prints a summary table on exit. Access .summary() on the session object for machine-readable results.
MemoryMode
| Value | Meaning |
|---|---|
"rss" / True |
Process RSS via psutil — captures NumPy, pandas, Arrow C allocations |
"peak" |
Python-heap peak via tracemalloc — no extra dependencies |
"off" / False |
Disabled — zero overhead |
StepResult attributes
| Attribute | Type | Description |
|---|---|---|
func_name |
str |
Decorated function name (or label) |
rows_in |
int |
Row count before the step |
rows_out |
int |
Row count after the step |
row_diff |
int |
Signed difference (rows_out - rows_in) |
row_diff_pct |
float |
Fractional change relative to input |
lost_rows |
bool |
True when rows were dropped |
gained_rows |
bool |
True when rows were added |
is_join_explosion |
bool |
True when a fan-out was detected |
elapsed_s |
float |
Wall-clock time in seconds |
memory_delta_mb |
float |
Memory change in MB |
memory_mode |
MemoryMode |
Which memory strategy was used |
warned |
bool |
True when a warn_on_* threshold fired |
stats |
StepStats |
Full column-level stats (nulls, dtypes, schema drift) |
Exceptions
| Exception | When |
|---|---|
ThresholdExceeded |
A raise_on_* threshold is breached — hard stop |
WatcherWarning |
A warn_on_* threshold is breached — soft, pipeline continues |
ConfigurationError |
Invalid decorator arguments at decoration time |
BackendError |
A backend adapter failed at runtime |
All exceptions inherit from WatcherError so you can catch the entire family with one clause.
HandlerBase
| Method | Called when |
|---|---|
on_session_start(session) |
A session() block opens |
on_step(step) |
A decorated function completes |
on_session_end(session) |
A session() block closes |
Examples
python examples/basic_pipeline.py # 1M-row e-commerce ETL with session summary
python examples/threshold_demo.py # all four threshold modes demonstrated
Development
git clone https://github.com/Abineshabee/watcher
cd watcher
pip install -e ".[dev]"
pytest tests/ -v --cov=watcher
CI runs on Python 3.10–3.13 across Ubuntu, Windows, and macOS on every push.
Roadmap
- Polars backend
- DuckDB backend
- Notebook / HTML renderer
- JSON handler for structured logging pipelines
watcher.config— global defaults without decorator arguments
License
MIT — see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dfwatcher-0.1.0.tar.gz.
File metadata
- Download URL: dfwatcher-0.1.0.tar.gz
- Upload date:
- Size: 43.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c8acd59615b14179ff75bb09531a4631a1eb01857424a238165749cf9908ddd9
|
|
| MD5 |
15550e08090270ba2564c1ce558ae7ba
|
|
| BLAKE2b-256 |
233107f8692764ecccd75dd6f58c5d9c0e2c375a384737f17e991d8f7f618371
|
File details
Details for the file dfwatcher-0.1.0-py3-none-any.whl.
File metadata
- Download URL: dfwatcher-0.1.0-py3-none-any.whl
- Upload date:
- Size: 31.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7687f5b9f1f95928ce5862c7fb4611977e0e0e7540880cc9b3aea07c9a3ca668
|
|
| MD5 |
c318f805dc66ee0db37d8e53692f420a
|
|
| BLAKE2b-256 |
a45a1f51150cea29f4e7d2f5bacb855f86a491e8abdabca9d208545253b62dc7
|