Skip to main content

Silent data watcher — tells you what your pipeline did to your data.

Project description

The silent data watcher. Decorates your pipeline functions and tells you exactly what happened to your data — row counts, schema drift, null changes, memory usage, join explosions — automatically, with zero config.

CI PyPI Python Downloads GitHub release License: MIT


The problem

You run a data pipeline. The output looks wrong. Your only clue:

Input:  1,000,000 rows
Output:   263,979 rows

Which step dropped the rows? Was it a filter, a null drop, or a bad join? You have no idea without adding print statements everywhere and re-running the whole thing.

watcher answers that — automatically.


Install

pip install dfwatcher                 # core only (pandas)
pip install "dfwatcher[rich]"         # + coloured terminal output
pip install "dfwatcher[full]"         # + Rich + psutil memory tracking

Quickstart

import pandas as pd
from watcher import watch, session

raw = pd.DataFrame({
    "customer_id": [1, 2, 3, 4],
    "status": ["active", "inactive", "active", None]
})

orders = pd.DataFrame({
    "customer_id": [1, 3],
    "amount": [250.0, 150.0]
})

@watch
def clean(df):
    return df.dropna()

@watch
def merge_orders(df):
    return df.merge(orders, on="customer_id", how="left")

@watch
def filter_active(df):
    return df[df["status"] == "active"]

# 3. Run the session to see the watcher summary!
if __name__ == "__main__":
    with session("nightly ETL") as s:
        df = clean(raw)
        df = merge_orders(df)
        df = filter_active(df)

#=====================================
# For more Examples    : exammples/
# For Syntax and Usage : docs/usage.md
# ====================================

Output — automatically, no extra code:

──────────────────────── watcher · nightly ETL ─────────────────────────
clean()          1,000,000 → 964,203   ▼  -35,797 rows  (-3.6%)   12.3 ms
  nulls -35,797  status  (35,797 → 0)

merge_orders()     964,203 → 1,069,104  ▲ +104,901 rows (+10.9%)  ⚠  41.1 ms
  columns added : +tier
  💥 join explosion · duplication ratio 10.9%
  key column     top value    repeat count
  customer_id    9182               184
  customer_id    3310                97

filter_active() 1,069,104 → 631,822   ▼ -437,282 rows (-40.9%)   18.7 ms

╭──────────────── watcher · nightly ETL · summary ───────────────────╮
│  step            rows in    rows out      Δ rows   time (ms)       │
│  clean         1,000,000     964,203     -35,797       12.3        │
│  merge_orders    964,203   1,069,104    +104,901       41.1        │
│  filter_active 1,069,104     631,822    -437,282       18.7        │
│                                                                    │
│  total  1,000,000 → 631,822  (-368,178 rows)  72.1 ms              │
╰────────────────────────────────────────────────────────────────────╯

Documentation


For advanced pipeline patterns and debugging workflows, see the full documentation.

Features

Row tracking

Every decorated function shows rows before → after, the signed diff, percentage change, and elapsed time. Nothing is hidden, nothing needs configuring.

drop_nulls()  1,000,000 → 921,330  ▼ -78,670 rows (-7.9%)  68.5 ms

Null-count deltas

Per-column null counts are compared before and after each step. The worst offenders are shown first.

drop_nulls()  1,000,000 → 921,330  ▼ -78,670 rows (-7.9%)
  nulls -2,477  status   (2,477 → 0)
  nulls -1,448  revenue  (1,448 → 0)

Schema drift

Columns added or removed between steps are detected and reported immediately.

add_revenue_band()  582,246 → 582,246  ● +0 rows
  columns added   : +revenue_band

drop_temp_columns() 582,246 → 582,246  ● +0 rows
  columns removed : -created_at

Dtype change detection

If a step changes a column's dtype — widening (int32int64) or narrowing (float64object) — watcher flags it.

coerce_step()  10,000 → 10,000  ● +0 rows
  dtype change : customer_id  int64 → object

Join explosion detection

When a merge fans out unexpectedly, watcher tells you which key column caused it, which values are duplicated, and how many times — not just that rows were gained.

merge_orders()  10,000 → 20,000  ▲ +10,000 rows (+100.0%) ⚠ 💥 join explosion
  columns added : +tier
  join explosion · duplication ratio 100.0%
  key column     top value    repeat count
  customer_id    72                    30
  customer_id    383                   30
  customer_id    1034                  28

Threshold guards

Turn watcher into a data contract enforcer. Set soft warnings or hard stops on row gain or loss.

@watch(
    warn_on_loss=0.05,    # ⚠  warn  if > 5 %  rows lost
    raise_on_loss=0.20,   # ✗  raise if > 20 % rows lost
    warn_on_gain=0.10,    # ⚠  warn  if > 10 % rows gained
    raise_on_gain=1.00,   # ✗  raise if rows more than double
)
def merge_orders(df):
    return df.merge(orders, on="customer_id", how="left")

Catching exceptions in CI:

from watcher.exceptions import ThresholdExceeded, WatcherWarning

try:
    result = pipeline(df)
except ThresholdExceeded as exc:
    logger.error("Data contract violated: %s", exc)
    raise

Memory tracking

@watch(track_memory="rss")    # process RSS via psutil  — captures NumPy/pandas C allocations
@watch(track_memory="peak")   # Python-heap peak via tracemalloc — no psutil needed
@watch(track_memory="off")    # disabled — zero overhead for production pipelines
@watch(track_memory=True)     # alias for "rss"
@watch(track_memory=False)    # alias for "off"

Example output with RSS tracking on a 1M-row allocation:

big_allocation()  1,000,000 → 1,000,000  ● +0 rows  56.2 ms  mem +38.5 MB (rss)
  columns added : +col1, +col2, +col3, +col4, +col5

Session grouping

Group multiple steps into one named pipeline run. Get a full summary table and a machine-readable dict for CI assertions.

with session("user churn model — daily run") as s:
    df = clean(df)
    df = merge(df)
    df = score(df)

summary = s.summary()
assert summary["total_rows_out"] > 500_000, "Too many rows dropped!"
print(summary["total_elapsed_s"])

summary() returns:

{
    "name": "user churn model — daily run",
    "steps": [
        {"func": "clean",  "rows_in": 1000000, "rows_out": 964203, "diff": -35797, ...},
        {"func": "merge",  ...},
        {"func": "score",  ...},
    ],
    "total_rows_in":        1000000,
    "total_rows_out":        631822,
    "total_elapsed_s":        0.072,
    "total_memory_delta_mb":  +38.5,
}

Custom handlers

Swap or extend the output layer without touching your pipeline code. Every step fires on_step() on all registered handlers.

from watcher import register_handler, deregister_handler
from watcher.handlers import HandlerBase
from watcher.core import StepResult
import json

class JSONLogHandler(HandlerBase):
    def __init__(self):
        self.log = []

    def on_step(self, step: StepResult):
        self.log.append({
            "step":     step.func_name,
            "rows_in":  step.rows_in,
            "rows_out": step.rows_out,
            "diff":     step.row_diff,
            "ms":       round(step.elapsed_s * 1000, 2),
        })

handler = JSONLogHandler()
register_handler(handler)

# ... run your pipeline ...

deregister_handler(handler)
print(json.dumps(handler.log, indent=2))

API reference

@watch

@watch(
    label:         str   | None = None,          # custom step name shown in output
    warn_on_loss:  float | None = None,          # soft warning threshold (0.0–1.0)
    raise_on_loss: float | None = None,          # hard stop threshold   (0.0–1.0)
    warn_on_gain:  float | None = None,          # soft warning on row gain
    raise_on_gain: float | None = None,          # hard stop on row gain
    track_memory:  bool | str | MemoryMode = "rss",
    verbose:       bool = True,                  # False = silent, step still tracked in session
)

Can be used bare (@watch) or with arguments (@watch(warn_on_loss=0.05)).


session(name)

Context manager. Groups @watch steps into one named pipeline run and prints a summary table on exit. Access .summary() on the session object for machine-readable results.


MemoryMode

Value Meaning
"rss" / True Process RSS via psutil — captures NumPy, pandas, Arrow C allocations
"peak" Python-heap peak via tracemalloc — no extra dependencies
"off" / False Disabled — zero overhead

StepResult attributes

Attribute Type Description
func_name str Decorated function name (or label)
rows_in int Row count before the step
rows_out int Row count after the step
row_diff int Signed difference (rows_out - rows_in)
row_diff_pct float Fractional change relative to input
lost_rows bool True when rows were dropped
gained_rows bool True when rows were added
is_join_explosion bool True when a fan-out was detected
elapsed_s float Wall-clock time in seconds
memory_delta_mb float Memory change in MB
memory_mode MemoryMode Which memory strategy was used
warned bool True when a warn_on_* threshold fired
stats StepStats Full column-level stats (nulls, dtypes, schema drift)

Exceptions

Exception When
ThresholdExceeded A raise_on_* threshold is breached — hard stop
WatcherWarning A warn_on_* threshold is breached — soft, pipeline continues
ConfigurationError Invalid decorator arguments at decoration time
BackendError A backend adapter failed at runtime

All exceptions inherit from WatcherError so you can catch the entire family with one clause.


HandlerBase

Method Called when
on_session_start(session) A session() block opens
on_step(step) A decorated function completes
on_session_end(session) A session() block closes

Examples

python examples/basic_pipeline.py     # 1M-row e-commerce ETL with session summary
python examples/threshold_demo.py     # all four threshold modes demonstrated

Development

git clone https://github.com/Abineshabee/watcher
cd watcher
pip install -e ".[dev]"
pytest tests/ -v --cov=watcher

CI runs on Python 3.10–3.13 across Ubuntu, Windows, and macOS on every push.


Roadmap

  • Polars backend
  • DuckDB backend
  • Notebook / HTML renderer
  • JSON handler for structured logging pipelines
  • watcher.config — global defaults without decorator arguments

License

MIT — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dfwatcher-0.1.1.tar.gz (43.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dfwatcher-0.1.1-py3-none-any.whl (30.7 kB view details)

Uploaded Python 3

File details

Details for the file dfwatcher-0.1.1.tar.gz.

File metadata

  • Download URL: dfwatcher-0.1.1.tar.gz
  • Upload date:
  • Size: 43.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for dfwatcher-0.1.1.tar.gz
Algorithm Hash digest
SHA256 eae060fdb08afa297df14a419cc7858edde417d4d9a744a10852e916a3617855
MD5 94106da1ef0f0fc640ba531529703308
BLAKE2b-256 b51ec64f95e6a912351ac0e3e1b01d5fa8689e445801b6fe379860ee96d09b49

See more details on using hashes here.

File details

Details for the file dfwatcher-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: dfwatcher-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 30.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for dfwatcher-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 86a58a22582e239c811cf0efd672aa4d248cb6b86ba2d6ab2b5b0a2127cb0aed
MD5 c2ce3b2950ab4623501badf2c7d9f5ac
BLAKE2b-256 5d22cdcdf52357272092875375c1c75ffd20b4e75f0af744ccd672c0f9d273f7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page