Skip to main content

Zero-dependency data observability with row-level audit trails and schema drift detection

Project description

CleanCore 🔍

Stop shipping unobserved data.

cleancore is a zero-dependency Python library that adds automatic audit trails and schema drift detection to your data pipelines.

Think of it like Git for your Data Rows — every change is tracked, every type mismatch is caught, before it breaks your production models.

PyPI version Python License: MIT


Why CleanCore?

Data pipelines fail silently. A column quietly changes from int to str. A price field becomes None. Your model trains on garbage — and you never know why.

CleanCore wraps your existing pipeline functions and automatically:

  • Records what changed (row-level diff)
  • Detects type drift (int → str, float → None)
  • Prints a clean dashboard after every run
  • Exports a full JSON audit trail

No config. No new infrastructure. Just one decorator.


Features

Feature Description
@audit_trail Decorator — wraps any function, records before/after diff
Schema Sentinel Detects type drift (int → str) and null regressions (float → NoneType)
Big Data Engine Chunk-based processing (10k rows/batch) — no memory crashes
JSON Export Full audit trail saved to file for compliance or debugging
CLI Tool cleancore report, cleancore validate from terminal
Zero Dependencies Pure Python — no pandas, numpy, or anything required
Pandas / Polars Optional support — works automatically if installed

Installation

pip install cleancore

That's it. No extra dependencies needed.


Quick Start

Basic usage — 3 lines

from cleancore import audit_trail, ProvenaLogger

@audit_trail(rule_id="MASK_PII")
def clean_emails(data):
    for row in data:
        row['email'] = "***@***"
    return data

my_data = [
    {"id": 1, "email": "alice@example.com"},
    {"id": 2, "email": "bob@example.com"},
]

with ProvenaLogger("My_Pipeline") as logger:
    result = clean_emails(my_data, provena_logger=logger)

Output — printed automatically when the with block ends:

+--------------------------------------------------------------+
| PIPELINE START | My_Pipeline | MODE=LAZY                    |
+--------------------------------------------------------------+
...
  [SUMMARY]  Steps=1  |  In=2  Out=2  Delta=0  |  Wall=1.2ms

  STEP                 RULE             IN      OUT    MODIFIED       ms
  ------------------------------------------------------------------------
  clean_emails         MASK_PII          2        2           2      1.1

Schema Sentinel — Type Drift Detection

CleanCore automatically catches when a column's type changes between steps.

from cleancore import audit_trail, ProvenaLogger

@audit_trail(rule_id="TYPE_BUG")
def process(data):
    for row in data:
        row['age'] = str(row['age'])   # Bug: int accidentally cast to str
    return data

data = [{"id": 1, "age": 25}, {"id": 2, "age": 30}]

with ProvenaLogger("Compliance_Pipeline") as logger:
    process(data, provena_logger=logger)

Output:

  [SCHEMA]  Schema Sentinel
  ----------------------------------------
  COLUMN               KIND         FROM         TO           DETECTED IN
  ------------------------------------------------------------------------
  age                  [WARN]       int          str          process

No more silent type bugs.


Multiple Steps in a Pipeline

from cleancore import audit_trail, ProvenaLogger

@audit_trail(rule_id="FILTER_INACTIVE")
def remove_inactive(data):
    return [row for row in data if row['active']]

@audit_trail(rule_id="MASK_PII")
def mask_emails(data):
    for row in data:
        row['email'] = "***@***"
    return data

@audit_trail(rule_id="NORMALIZE_SALARY")
def normalize(data):
    for row in data:
        row['salary'] = round(row['salary'] / 1000, 2)
    return data

employees = [
    {"id": 1, "email": "ali@co.com",   "salary": 55000, "active": True},
    {"id": 2, "email": "sara@co.com",  "salary": 62000, "active": False},
    {"id": 3, "email": "ahmed@co.com", "salary": 48000, "active": True},
]

with ProvenaLogger("HR_Pipeline") as logger:
    step1 = remove_inactive(employees,   provena_logger=logger)
    step2 = mask_emails(step1,           provena_logger=logger)
    step3 = normalize(step2,             provena_logger=logger)

Save Audit Trail to JSON

with ProvenaLogger("Production_Pipeline") as logger:
    result = clean_emails(my_data, provena_logger=logger)

# Save full audit log to file
logger.export_json("audit_2024.json")
# → [EXPORT] audit_2024.json

The JSON file contains every step, every schema diff, timestamps, row counts, and duration.


Works with Pandas

import pandas as pd
from cleancore import audit_trail, ProvenaLogger

df = pd.DataFrame({
    'name':   ['Ali', 'Sara', 'Ahmed'],
    'salary': [50000, 60000, 55000]
})

@audit_trail(rule_id="SALARY_BUMP")
def give_raise(df):
    df['salary'] = df['salary'] + 5000
    return df

with ProvenaLogger("HR_Pipeline") as logger:
    result = give_raise(df, provena_logger=logger)

No extra setup needed — CleanCore detects pandas automatically.


Big Data — 100k+ Rows

CleanCore processes large datasets in 10,000-row chunks to prevent memory crashes:

from cleancore import audit_trail, ProvenaLogger

@audit_trail(rule_id="LARGE_FILTER", chunk_size=10_000)
def filter_data(data):
    return [row for row in data if row['value'] > 0.5]

# Generator — memory-safe, no full load
large_data = ({"id": i, "value": i / 100000} for i in range(100_000))

with ProvenaLogger("Big_Data_Pipeline") as logger:
    result = filter_data(large_data, provena_logger=logger)

print(f"Processed: {len(result)} rows")

CLI Tool

After installing, you get the cleancore command:

# Pretty-print an audit JSON file
cleancore report audit_2024.json

# Validate — exits with code 1 if critical drift found (use in CI/CD)
cleancore validate audit_2024.json

# Raw JSON dump
cleancore dump audit_2024.json

# Check version
cleancore --version

Use cleancore validate in your GitHub Actions to fail builds when data drift is detected.


API Reference

@audit_trail(rule_id, chunk_size)

Decorator that records input/output diff for any function.

Parameter Type Default Description
rule_id str function name Label for this rule in the audit log
chunk_size int 10_000 Rows per batch for large datasets

The decorated function receives an extra keyword argument provena_logger at call time — pass your ProvenaLogger instance there.


ProvenaLogger(name, auto_report, lazy)

Context manager that collects all step logs and prints the dashboard.

Parameter Type Default Description
name str required Pipeline name shown in the report
auto_report bool True Print dashboard automatically on exit
lazy bool True Label in report header (LAZY / EAGER)

Methods:

  • logger.export_json("path.json") — save full audit trail to file
  • logger.get_reporter() — get a ProvenaReporter instance for custom rendering

detect_drift(schema_before, schema_after)

Compare two schemas and return drifted columns.

from cleancore import detect_drift

before = {"age": "int", "price": "float"}
after  = {"age": "str", "price": "NoneType"}

drifts = detect_drift(before, after)
# {"age": {"from": "int", "to": "str", "kind": "type_drift"},
#  "price": {"from": "float", "to": "NoneType", "kind": "nullified"}}

Drift kinds: type_drift, nullified, imputed, added, dropped


infer_schema(rows)

Infer column types from a list of dicts.

from cleancore import infer_schema

rows = [{"id": 1, "name": "Ali", "score": 9.5}]
infer_schema(rows)
# {"id": "int", "name": "str", "score": "float"}

Contributing

CleanCore is open-source and contributions are welcome!

GitHub: github.com/Sidra-009/cleancore-python-library

To contribute:

  1. Fork the repo
  2. Create a branch: git checkout -b feature/my-feature
  3. Make your changes and add tests in tests/
  4. Run tests: pytest tests/ -v
  5. Open a Pull Request

License

MIT License — see LICENSE for details.


Built by Sidra Saqlain · Published on PyPI

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cleancore-1.1.0.tar.gz (19.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cleancore-1.1.0-py3-none-any.whl (17.5 kB view details)

Uploaded Python 3

File details

Details for the file cleancore-1.1.0.tar.gz.

File metadata

  • Download URL: cleancore-1.1.0.tar.gz
  • Upload date:
  • Size: 19.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.0

File hashes

Hashes for cleancore-1.1.0.tar.gz
Algorithm Hash digest
SHA256 0861853e835de4d092cc9150f5f45eea1b2ecf30139a6cfe6aae67d64075d90c
MD5 be7ccc0c73c6704646546c1fdd8862e4
BLAKE2b-256 51e96319fe6ab7593dc468544bf37cdafc6960b8a1aabec62cd89ced9f69dde3

See more details on using hashes here.

File details

Details for the file cleancore-1.1.0-py3-none-any.whl.

File metadata

  • Download URL: cleancore-1.1.0-py3-none-any.whl
  • Upload date:
  • Size: 17.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.0

File hashes

Hashes for cleancore-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 305cc7fc905ebe4300c8302d91070e32f1a041fc10cb721f55c79deea4d94b78
MD5 51f3716d7fe55ea803af82a808b58f11
BLAKE2b-256 6bfee9de1c92467c8ec89441a63587d719b628a1518399ad744e431c84cbdd3e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page