LLM-powered incremental data cleaning pipeline that processes massive datasets in chunks and generates Python cleaning functions
Project description
Recursive Data Cleaner
LLM-powered incremental data cleaning for massive datasets. Process files in chunks, identify quality issues, and automatically generate Python cleaning functions.
How It Works
- Chunk your data (JSONL, CSV, JSON, or text)
- Analyze each chunk with an LLM to identify issues
- Generate one cleaning function per issue
- Validate functions on holdout data before accepting
- Output a ready-to-use
cleaning_functions.py
The system maintains a "docstring registry" - feeding generated function descriptions back into prompts so the LLM knows what's already solved and avoids duplicate work.
Installation
pip install -e .
For Apple Silicon (MLX backend):
pip install -e ".[mlx]"
Quick Start
from recursive_cleaner import DataCleaner
from backends import MLXBackend
# Any LLM with generate(prompt) -> str works
llm = MLXBackend(model_path="your-model")
cleaner = DataCleaner(
llm_backend=llm,
file_path="messy_data.jsonl",
chunk_size=50,
instructions="""
- Normalize phone numbers to E.164
- Fix typos in status field (valid: active, pending, churned)
- Convert dates to ISO 8601
""",
)
cleaner.run() # Generates cleaning_functions.py
Features
Core
- Chunked Processing: Handle files larger than LLM context windows
- Incremental Generation: One function per issue, building up a complete solution
- Docstring Registry: Automatic context management with FIFO eviction
- AST Validation: All generated code validated before output
- Error Recovery: Retries with error feedback on parse failures
Data Quality (v0.4.0+)
- Holdout Validation: Test functions on unseen 20% of each chunk
- Sampling Strategies: Sequential, random, or stratified sampling
- Quality Metrics: Before/after comparison with improvement reports
- Dependency Resolution: Topological sort for correct function ordering
Optimization (v0.5.0+)
- Two-Pass Consolidation: Merge redundant functions after generation
- Early Termination: Stop when LLM detects pattern saturation
- LLM Agency: Model decides chunk cleanliness and saturation
Security (v0.5.1+)
- Dangerous Code Detection: AST-based detection of exec, eval, subprocess, network calls
Observability (v0.6.0)
- Latency Metrics: Track min/max/avg/total LLM call times
- Import Consolidation: Deduplicate and merge imports in output
- Cleaning Reports: Markdown summary with functions, timing, quality delta
- Dry-Run Mode: Analyze data without generating functions
Configuration
cleaner = DataCleaner(
# Required
llm_backend=llm,
file_path="data.jsonl",
# Chunking
chunk_size=50, # Items per chunk (or chars for text mode)
max_iterations=5, # Max iterations per chunk
context_budget=8000, # Max chars for docstring context
# Validation
validate_runtime=True, # Test functions before accepting
schema_sample_size=10, # Records for schema inference
holdout_ratio=0.2, # Fraction held out for validation
# Sampling
sampling_strategy="stratified", # "sequential", "random", "stratified"
stratify_field="status", # Field for stratified sampling
# Optimization
optimize=True, # Consolidate redundant functions
early_termination=True, # Stop when patterns saturate
track_metrics=True, # Measure before/after quality
# Observability
report_path="report.md", # Markdown report output (None to disable)
dry_run=False, # Analyze without generating functions
# Progress & State
on_progress=callback, # Progress event callback
state_file="state.json", # Enable resume on interrupt
)
Progress Events
def on_progress(event):
match event["type"]:
case "chunk_start":
print(f"Chunk {event['chunk_index']}/{event['total_chunks']}")
case "llm_call":
print(f"LLM latency: {event['latency_ms']}ms")
case "function_generated":
print(f"Generated: {event['function_name']}")
case "issues_detected": # dry-run mode
print(f"Found {len(event['issues'])} issues")
case "complete":
stats = event["latency_stats"]
print(f"Done! Avg latency: {stats['avg_ms']}ms")
Output
The cleaner generates cleaning_functions.py:
# Auto-generated cleaning functions
import re
def normalize_phone_numbers(data):
"""Normalize phone numbers to E.164 format."""
# ... implementation ...
def fix_status_typos(data):
"""Fix typos in status field."""
# ... implementation ...
def clean_data(data):
"""Apply all cleaning functions in order."""
data = normalize_phone_numbers(data)
data = fix_status_typos(data)
return data
Custom LLM Backend
Implement the simple protocol:
class MyBackend:
def generate(self, prompt: str) -> str:
# Call your LLM (OpenAI, Anthropic, local, etc.)
return response
Text Mode
For plain text files (PDFs, documents):
cleaner = DataCleaner(
llm_backend=llm,
file_path="document.txt",
chunk_size=4000, # Characters, not items
instructions="Fix OCR errors, normalize whitespace",
)
Text mode uses sentence-aware chunking to avoid splitting mid-sentence.
Resume on Interrupt
# Start with state file
cleaner = DataCleaner(
llm_backend=llm,
file_path="huge_file.jsonl",
state_file="cleaning_state.json",
)
cleaner.run()
# If interrupted, resume later:
cleaner = DataCleaner.resume("cleaning_state.json", llm)
cleaner.run()
Architecture
recursive_cleaner/
├── cleaner.py # Main DataCleaner class (~580 lines)
├── context.py # Docstring registry with FIFO eviction
├── dependencies.py # Topological sort for function ordering
├── metrics.py # Quality metrics before/after
├── optimizer.py # Two-pass consolidation with LLM agency
├── output.py # Function file generation + import consolidation
├── parsers.py # Chunking for JSONL/CSV/JSON/text + sampling
├── prompt.py # LLM prompt templates
├── report.py # Markdown report generation
├── response.py # XML/markdown parsing + agency dataclasses
├── schema.py # Schema inference
├── validation.py # Runtime validation + holdout
└── vendor/
└── chunker.py # Vendored sentence-aware chunker
Testing
pytest tests/ -v
392 tests covering all features. Test datasets in test_cases/:
- E-commerce product catalogs
- Healthcare patient records
- Financial transaction data
Philosophy
- Simplicity over extensibility: ~3,000 lines that do one thing well
- stdlib over dependencies: Only
tenacityrequired - Retry over recover: On error, retry with error in prompt
- Wu wei: Let the LLM make decisions about data it understands
Version History
| Version | Features |
|---|---|
| v0.6.0 | Latency metrics, import consolidation, cleaning report, dry-run mode |
| v0.5.1 | Dangerous code detection (AST-based security) |
| v0.5.0 | Two-pass optimization, early termination, LLM agency |
| v0.4.0 | Holdout validation, dependency resolution, sampling, quality metrics |
| v0.3.0 | Text mode with sentence-aware chunking |
| v0.2.0 | Runtime validation, schema inference, callbacks, incremental saves |
| v0.1.0 | Core pipeline, chunking, docstring registry |
Acknowledgments
- Sentence-aware text chunking adapted from Chonkie (MIT License)
- Development assisted by Claude Code
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file recursive_cleaner-0.7.0.tar.gz.
File metadata
- Download URL: recursive_cleaner-0.7.0.tar.gz
- Upload date:
- Size: 178.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3d4bc4b56b32482e0a8368f3aa50f0f910b319dd574f94b2ab0181b34ce95ca4
|
|
| MD5 |
68fd638719cb535e1df7714cb37fb13e
|
|
| BLAKE2b-256 |
25af45004792082c2290861c4bd6b31ef6f69856fea8d760a74ee86bf2164c07
|
File details
Details for the file recursive_cleaner-0.7.0-py3-none-any.whl.
File metadata
- Download URL: recursive_cleaner-0.7.0-py3-none-any.whl
- Upload date:
- Size: 41.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
09ebd6e7fc0341a21f7f2921dd60b57193422fc1f270418583df50cd977775ba
|
|
| MD5 |
ae661506af7dacc344be83c7f7538925
|
|
| BLAKE2b-256 |
6eb075b2ba5f356e0e60f3585424a55652a48fa17e767a55cab2894071224af1
|