Skip to main content

Silent data watcher — tells you what your pipeline did to your data.

Project description

The silent data watcher. Decorates your pipeline functions and tells you exactly what happened to your data — row counts, schema drift, null changes, memory usage, join explosions — automatically, with zero config.

CI PyPI GitHub release Python License: MIT


The problem

You run a data pipeline. The output looks wrong. Your only clue:

Input:  1,000,000 rows
Output:   263,979 rows

Which step dropped the rows? Was it a filter, a null drop, or a bad join? You have no idea without adding print statements everywhere and re-running the whole thing.

watcher answers that — automatically.


Install

pip install watcher                 # core only (pandas)
pip install "watcher[rich]"         # + coloured terminal output
pip install "watcher[full]"         # + Rich + psutil memory tracking

Quickstart

import pandas as pd
from watcher import watch, session

raw = pd.DataFrame({
    "customer_id": [1, 2, 3, 4],
    "status": ["active", "inactive", "active", None]
})

orders = pd.DataFrame({
    "customer_id": [1, 3],
    "amount": [250.0, 150.0]
})

@watch
def clean(df):
    return df.dropna()

@watch
def merge_orders(df):
    return df.merge(orders, on="customer_id", how="left")

@watch
def filter_active(df):
    return df[df["status"] == "active"]

# 3. Run the session to see the watcher summary!
if __name__ == "__main__":
    with session("nightly ETL") as s:
        df = clean(raw)
        df = merge_orders(df)
        df = filter_active(df)

#=====================================
# For more Examples    : exammples/
# For Syntax and Usage : docs/usage.md
# ====================================

Output — automatically, no extra code:

──────────────────────── watcher · nightly ETL ─────────────────────────
clean()          1,000,000 → 964,203   ▼  -35,797 rows  (-3.6%)   12.3 ms
  nulls -35,797  status  (35,797 → 0)

merge_orders()     964,203 → 1,069,104  ▲ +104,901 rows (+10.9%)  ⚠  41.1 ms
  columns added : +tier
  💥 join explosion · duplication ratio 10.9%
  key column     top value    repeat count
  customer_id    9182               184
  customer_id    3310                97

filter_active() 1,069,104 → 631,822   ▼ -437,282 rows (-40.9%)   18.7 ms

╭──────────────── watcher · nightly ETL · summary ───────────────────╮
│  step            rows in    rows out      Δ rows   time (ms)       │
│  clean         1,000,000     964,203     -35,797       12.3        │
│  merge_orders    964,203   1,069,104    +104,901       41.1        │
│  filter_active 1,069,104     631,822    -437,282       18.7        │
│                                                                    │
│  total  1,000,000 → 631,822  (-368,178 rows)  72.1 ms              │
╰────────────────────────────────────────────────────────────────────╯

Documentation


For advanced pipeline patterns and debugging workflows, see the full documentation.

Features

Row tracking

Every decorated function shows rows before → after, the signed diff, percentage change, and elapsed time. Nothing is hidden, nothing needs configuring.

drop_nulls()  1,000,000 → 921,330  ▼ -78,670 rows (-7.9%)  68.5 ms

Null-count deltas

Per-column null counts are compared before and after each step. The worst offenders are shown first.

drop_nulls()  1,000,000 → 921,330  ▼ -78,670 rows (-7.9%)
  nulls -2,477  status   (2,477 → 0)
  nulls -1,448  revenue  (1,448 → 0)

Schema drift

Columns added or removed between steps are detected and reported immediately.

add_revenue_band()  582,246 → 582,246  ● +0 rows
  columns added   : +revenue_band

drop_temp_columns() 582,246 → 582,246  ● +0 rows
  columns removed : -created_at

Dtype change detection

If a step changes a column's dtype — widening (int32int64) or narrowing (float64object) — watcher flags it.

coerce_step()  10,000 → 10,000  ● +0 rows
  dtype change : customer_id  int64 → object

Join explosion detection

When a merge fans out unexpectedly, watcher tells you which key column caused it, which values are duplicated, and how many times — not just that rows were gained.

merge_orders()  10,000 → 20,000  ▲ +10,000 rows (+100.0%) ⚠ 💥 join explosion
  columns added : +tier
  join explosion · duplication ratio 100.0%
  key column     top value    repeat count
  customer_id    72                    30
  customer_id    383                   30
  customer_id    1034                  28

Threshold guards

Turn watcher into a data contract enforcer. Set soft warnings or hard stops on row gain or loss.

@watch(
    warn_on_loss=0.05,    # ⚠  warn  if > 5 %  rows lost
    raise_on_loss=0.20,   # ✗  raise if > 20 % rows lost
    warn_on_gain=0.10,    # ⚠  warn  if > 10 % rows gained
    raise_on_gain=1.00,   # ✗  raise if rows more than double
)
def merge_orders(df):
    return df.merge(orders, on="customer_id", how="left")

Catching exceptions in CI:

from watcher.exceptions import ThresholdExceeded, WatcherWarning

try:
    result = pipeline(df)
except ThresholdExceeded as exc:
    logger.error("Data contract violated: %s", exc)
    raise

Memory tracking

@watch(track_memory="rss")    # process RSS via psutil  — captures NumPy/pandas C allocations
@watch(track_memory="peak")   # Python-heap peak via tracemalloc — no psutil needed
@watch(track_memory="off")    # disabled — zero overhead for production pipelines
@watch(track_memory=True)     # alias for "rss"
@watch(track_memory=False)    # alias for "off"

Example output with RSS tracking on a 1M-row allocation:

big_allocation()  1,000,000 → 1,000,000  ● +0 rows  56.2 ms  mem +38.5 MB (rss)
  columns added : +col1, +col2, +col3, +col4, +col5

Session grouping

Group multiple steps into one named pipeline run. Get a full summary table and a machine-readable dict for CI assertions.

with session("user churn model — daily run") as s:
    df = clean(df)
    df = merge(df)
    df = score(df)

summary = s.summary()
assert summary["total_rows_out"] > 500_000, "Too many rows dropped!"
print(summary["total_elapsed_s"])

summary() returns:

{
    "name": "user churn model — daily run",
    "steps": [
        {"func": "clean",  "rows_in": 1000000, "rows_out": 964203, "diff": -35797, ...},
        {"func": "merge",  ...},
        {"func": "score",  ...},
    ],
    "total_rows_in":        1000000,
    "total_rows_out":        631822,
    "total_elapsed_s":        0.072,
    "total_memory_delta_mb":  +38.5,
}

Custom handlers

Swap or extend the output layer without touching your pipeline code. Every step fires on_step() on all registered handlers.

from watcher import register_handler, deregister_handler
from watcher.handlers import HandlerBase
from watcher.core import StepResult
import json

class JSONLogHandler(HandlerBase):
    def __init__(self):
        self.log = []

    def on_step(self, step: StepResult):
        self.log.append({
            "step":     step.func_name,
            "rows_in":  step.rows_in,
            "rows_out": step.rows_out,
            "diff":     step.row_diff,
            "ms":       round(step.elapsed_s * 1000, 2),
        })

handler = JSONLogHandler()
register_handler(handler)

# ... run your pipeline ...

deregister_handler(handler)
print(json.dumps(handler.log, indent=2))

API reference

@watch

@watch(
    label:         str   | None = None,          # custom step name shown in output
    warn_on_loss:  float | None = None,          # soft warning threshold (0.0–1.0)
    raise_on_loss: float | None = None,          # hard stop threshold   (0.0–1.0)
    warn_on_gain:  float | None = None,          # soft warning on row gain
    raise_on_gain: float | None = None,          # hard stop on row gain
    track_memory:  bool | str | MemoryMode = "rss",
    verbose:       bool = True,                  # False = silent, step still tracked in session
)

Can be used bare (@watch) or with arguments (@watch(warn_on_loss=0.05)).


session(name)

Context manager. Groups @watch steps into one named pipeline run and prints a summary table on exit. Access .summary() on the session object for machine-readable results.


MemoryMode

Value Meaning
"rss" / True Process RSS via psutil — captures NumPy, pandas, Arrow C allocations
"peak" Python-heap peak via tracemalloc — no extra dependencies
"off" / False Disabled — zero overhead

StepResult attributes

Attribute Type Description
func_name str Decorated function name (or label)
rows_in int Row count before the step
rows_out int Row count after the step
row_diff int Signed difference (rows_out - rows_in)
row_diff_pct float Fractional change relative to input
lost_rows bool True when rows were dropped
gained_rows bool True when rows were added
is_join_explosion bool True when a fan-out was detected
elapsed_s float Wall-clock time in seconds
memory_delta_mb float Memory change in MB
memory_mode MemoryMode Which memory strategy was used
warned bool True when a warn_on_* threshold fired
stats StepStats Full column-level stats (nulls, dtypes, schema drift)

Exceptions

Exception When
ThresholdExceeded A raise_on_* threshold is breached — hard stop
WatcherWarning A warn_on_* threshold is breached — soft, pipeline continues
ConfigurationError Invalid decorator arguments at decoration time
BackendError A backend adapter failed at runtime

All exceptions inherit from WatcherError so you can catch the entire family with one clause.


HandlerBase

Method Called when
on_session_start(session) A session() block opens
on_step(step) A decorated function completes
on_session_end(session) A session() block closes

Examples

python examples/basic_pipeline.py     # 1M-row e-commerce ETL with session summary
python examples/threshold_demo.py     # all four threshold modes demonstrated

Development

git clone https://github.com/Abineshabee/watcher
cd watcher
pip install -e ".[dev]"
pytest tests/ -v --cov=watcher

CI runs on Python 3.10–3.13 across Ubuntu, Windows, and macOS on every push.


Roadmap

  • Polars backend
  • DuckDB backend
  • Notebook / HTML renderer
  • JSON handler for structured logging pipelines
  • watcher.config — global defaults without decorator arguments

License

MIT — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dfwatcher-0.1.0.tar.gz (43.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dfwatcher-0.1.0-py3-none-any.whl (31.6 kB view details)

Uploaded Python 3

File details

Details for the file dfwatcher-0.1.0.tar.gz.

File metadata

  • Download URL: dfwatcher-0.1.0.tar.gz
  • Upload date:
  • Size: 43.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for dfwatcher-0.1.0.tar.gz
Algorithm Hash digest
SHA256 c8acd59615b14179ff75bb09531a4631a1eb01857424a238165749cf9908ddd9
MD5 15550e08090270ba2564c1ce558ae7ba
BLAKE2b-256 233107f8692764ecccd75dd6f58c5d9c0e2c375a384737f17e991d8f7f618371

See more details on using hashes here.

File details

Details for the file dfwatcher-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: dfwatcher-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 31.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for dfwatcher-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7687f5b9f1f95928ce5862c7fb4611977e0e0e7540880cc9b3aea07c9a3ca668
MD5 c318f805dc66ee0db37d8e53692f420a
BLAKE2b-256 a45a1f51150cea29f4e7d2f5bacb855f86a491e8abdabca9d208545253b62dc7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page