Zero-dependency data observability with row-level audit trails and schema drift detection
Project description
CleanCore 🔍
Stop shipping unobserved data.
cleancore is a zero-dependency Python library that adds automatic audit trails and schema drift detection to your data pipelines.
Think of it like Git for your Data Rows — every change is tracked, every type mismatch is caught, before it breaks your production models.
Why CleanCore?
Data pipelines fail silently. A column quietly changes from int to str. A price field becomes None. Your model trains on garbage — and you never know why.
CleanCore wraps your existing pipeline functions and automatically:
- Records what changed (row-level diff)
- Detects type drift (
int → str,float → None) - Prints a clean dashboard after every run
- Exports a full JSON audit trail
No config. No new infrastructure. Just one decorator.
Features
| Feature | Description |
|---|---|
@audit_trail |
Decorator — wraps any function, records before/after diff |
| Schema Sentinel | Detects type drift (int → str) and null regressions (float → NoneType) |
| Big Data Engine | Chunk-based processing (10k rows/batch) — no memory crashes |
| JSON Export | Full audit trail saved to file for compliance or debugging |
| CLI Tool | cleancore report, cleancore validate from terminal |
| Zero Dependencies | Pure Python — no pandas, numpy, or anything required |
| Pandas / Polars | Optional support — works automatically if installed |
Installation
pip install cleancore
That's it. No extra dependencies needed.
Quick Start
Basic usage — 3 lines
from cleancore import audit_trail, ProvenaLogger
@audit_trail(rule_id="MASK_PII")
def clean_emails(data):
for row in data:
row['email'] = "***@***"
return data
my_data = [
{"id": 1, "email": "alice@example.com"},
{"id": 2, "email": "bob@example.com"},
]
with ProvenaLogger("My_Pipeline") as logger:
result = clean_emails(my_data, provena_logger=logger)
Output — printed automatically when the with block ends:
+--------------------------------------------------------------+
| PIPELINE START | My_Pipeline | MODE=LAZY |
+--------------------------------------------------------------+
...
[SUMMARY] Steps=1 | In=2 Out=2 Delta=0 | Wall=1.2ms
STEP RULE IN OUT MODIFIED ms
------------------------------------------------------------------------
clean_emails MASK_PII 2 2 2 1.1
Schema Sentinel — Type Drift Detection
CleanCore automatically catches when a column's type changes between steps.
from cleancore import audit_trail, ProvenaLogger
@audit_trail(rule_id="TYPE_BUG")
def process(data):
for row in data:
row['age'] = str(row['age']) # Bug: int accidentally cast to str
return data
data = [{"id": 1, "age": 25}, {"id": 2, "age": 30}]
with ProvenaLogger("Compliance_Pipeline") as logger:
process(data, provena_logger=logger)
Output:
[SCHEMA] Schema Sentinel
----------------------------------------
COLUMN KIND FROM TO DETECTED IN
------------------------------------------------------------------------
age [WARN] int str process
No more silent type bugs.
Multiple Steps in a Pipeline
from cleancore import audit_trail, ProvenaLogger
@audit_trail(rule_id="FILTER_INACTIVE")
def remove_inactive(data):
return [row for row in data if row['active']]
@audit_trail(rule_id="MASK_PII")
def mask_emails(data):
for row in data:
row['email'] = "***@***"
return data
@audit_trail(rule_id="NORMALIZE_SALARY")
def normalize(data):
for row in data:
row['salary'] = round(row['salary'] / 1000, 2)
return data
employees = [
{"id": 1, "email": "ali@co.com", "salary": 55000, "active": True},
{"id": 2, "email": "sara@co.com", "salary": 62000, "active": False},
{"id": 3, "email": "ahmed@co.com", "salary": 48000, "active": True},
]
with ProvenaLogger("HR_Pipeline") as logger:
step1 = remove_inactive(employees, provena_logger=logger)
step2 = mask_emails(step1, provena_logger=logger)
step3 = normalize(step2, provena_logger=logger)
Save Audit Trail to JSON
with ProvenaLogger("Production_Pipeline") as logger:
result = clean_emails(my_data, provena_logger=logger)
# Save full audit log to file
logger.export_json("audit_2024.json")
# → [EXPORT] audit_2024.json
The JSON file contains every step, every schema diff, timestamps, row counts, and duration.
Works with Pandas
import pandas as pd
from cleancore import audit_trail, ProvenaLogger
df = pd.DataFrame({
'name': ['Ali', 'Sara', 'Ahmed'],
'salary': [50000, 60000, 55000]
})
@audit_trail(rule_id="SALARY_BUMP")
def give_raise(df):
df['salary'] = df['salary'] + 5000
return df
with ProvenaLogger("HR_Pipeline") as logger:
result = give_raise(df, provena_logger=logger)
No extra setup needed — CleanCore detects pandas automatically.
Big Data — 100k+ Rows
CleanCore processes large datasets in 10,000-row chunks to prevent memory crashes:
from cleancore import audit_trail, ProvenaLogger
@audit_trail(rule_id="LARGE_FILTER", chunk_size=10_000)
def filter_data(data):
return [row for row in data if row['value'] > 0.5]
# Generator — memory-safe, no full load
large_data = ({"id": i, "value": i / 100000} for i in range(100_000))
with ProvenaLogger("Big_Data_Pipeline") as logger:
result = filter_data(large_data, provena_logger=logger)
print(f"Processed: {len(result)} rows")
CLI Tool
After installing, you get the cleancore command:
# Pretty-print an audit JSON file
cleancore report audit_2024.json
# Validate — exits with code 1 if critical drift found (use in CI/CD)
cleancore validate audit_2024.json
# Raw JSON dump
cleancore dump audit_2024.json
# Check version
cleancore --version
Use cleancore validate in your GitHub Actions to fail builds when data drift is detected.
API Reference
@audit_trail(rule_id, chunk_size)
Decorator that records input/output diff for any function.
| Parameter | Type | Default | Description |
|---|---|---|---|
rule_id |
str |
function name | Label for this rule in the audit log |
chunk_size |
int |
10_000 |
Rows per batch for large datasets |
The decorated function receives an extra keyword argument provena_logger at call time — pass your ProvenaLogger instance there.
ProvenaLogger(name, auto_report, lazy)
Context manager that collects all step logs and prints the dashboard.
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
required | Pipeline name shown in the report |
auto_report |
bool |
True |
Print dashboard automatically on exit |
lazy |
bool |
True |
Label in report header (LAZY / EAGER) |
Methods:
logger.export_json("path.json")— save full audit trail to filelogger.get_reporter()— get aProvenaReporterinstance for custom rendering
detect_drift(schema_before, schema_after)
Compare two schemas and return drifted columns.
from cleancore import detect_drift
before = {"age": "int", "price": "float"}
after = {"age": "str", "price": "NoneType"}
drifts = detect_drift(before, after)
# {"age": {"from": "int", "to": "str", "kind": "type_drift"},
# "price": {"from": "float", "to": "NoneType", "kind": "nullified"}}
Drift kinds: type_drift, nullified, imputed, added, dropped
infer_schema(rows)
Infer column types from a list of dicts.
from cleancore import infer_schema
rows = [{"id": 1, "name": "Ali", "score": 9.5}]
infer_schema(rows)
# {"id": "int", "name": "str", "score": "float"}
Contributing
CleanCore is open-source and contributions are welcome!
GitHub: github.com/Sidra-009/cleancore-python-library
To contribute:
- Fork the repo
- Create a branch:
git checkout -b feature/my-feature - Make your changes and add tests in
tests/ - Run tests:
pytest tests/ -v - Open a Pull Request
License
MIT License — see LICENSE for details.
Built by Sidra Saqlain · Published on PyPI
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cleancore-1.1.0.tar.gz.
File metadata
- Download URL: cleancore-1.1.0.tar.gz
- Upload date:
- Size: 19.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0861853e835de4d092cc9150f5f45eea1b2ecf30139a6cfe6aae67d64075d90c
|
|
| MD5 |
be7ccc0c73c6704646546c1fdd8862e4
|
|
| BLAKE2b-256 |
51e96319fe6ab7593dc468544bf37cdafc6960b8a1aabec62cd89ced9f69dde3
|
File details
Details for the file cleancore-1.1.0-py3-none-any.whl.
File metadata
- Download URL: cleancore-1.1.0-py3-none-any.whl
- Upload date:
- Size: 17.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
305cc7fc905ebe4300c8302d91070e32f1a041fc10cb721f55c79deea4d94b78
|
|
| MD5 |
51f3716d7fe55ea803af82a808b58f11
|
|
| BLAKE2b-256 |
6bfee9de1c92467c8ec89441a63587d719b628a1518399ad744e431c84cbdd3e
|