Skip to main content

LLM-powered incremental data cleaning pipeline that processes massive datasets in chunks and generates Python cleaning functions

Project description

Recursive Data Cleaner

LLM-powered incremental data cleaning for massive datasets. Process files in chunks, identify quality issues, and automatically generate Python cleaning functions.

How It Works

  1. Chunk your data (JSONL, CSV, JSON, Parquet, PDF, Word, Excel, XML, and more)
  2. Analyze each chunk with an LLM to identify issues
  3. Generate one cleaning function per issue
  4. Validate functions on holdout data before accepting
  5. Output a ready-to-use cleaning_functions.py

The system maintains a "docstring registry" - feeding generated function descriptions back into prompts so the LLM knows what's already solved and avoids duplicate work.

Installation

pip install -e .

For Apple Silicon (MLX backend):

pip install -e ".[mlx]"

For document conversion (PDF, Word, Excel, HTML, etc.):

pip install -e ".[markitdown]"

For Parquet files:

pip install -e ".[parquet]"

For Terminal UI (Rich dashboard):

pip install -e ".[tui]"

Quick Start

from recursive_cleaner import DataCleaner
from backends import MLXBackend

# Any LLM with generate(prompt) -> str works
llm = MLXBackend(model_path="your-model")

cleaner = DataCleaner(
    llm_backend=llm,
    file_path="messy_data.jsonl",
    chunk_size=50,
    instructions="""
    - Normalize phone numbers to E.164
    - Fix typos in status field (valid: active, pending, churned)
    - Convert dates to ISO 8601
    """,
)

cleaner.run()  # Generates cleaning_functions.py

Features

Core

  • Chunked Processing: Handle files larger than LLM context windows
  • Incremental Generation: One function per issue, building up a complete solution
  • Docstring Registry: Automatic context management with FIFO eviction
  • AST Validation: All generated code validated before output
  • Error Recovery: Retries with error feedback on parse failures

Data Quality (v0.4.0+)

  • Holdout Validation: Test functions on unseen 20% of each chunk
  • Sampling Strategies: Sequential, random, or stratified sampling
  • Quality Metrics: Before/after comparison with improvement reports
  • Dependency Resolution: Topological sort for correct function ordering

Optimization (v0.5.0+)

  • Two-Pass Consolidation: Merge redundant functions after generation
  • Early Termination: Stop when LLM detects pattern saturation
  • LLM Agency: Model decides chunk cleanliness and saturation

Security (v0.5.1+)

  • Dangerous Code Detection: AST-based detection of exec, eval, subprocess, network calls

Observability (v0.6.0)

  • Latency Metrics: Track min/max/avg/total LLM call times
  • Import Consolidation: Deduplicate and merge imports in output
  • Cleaning Reports: Markdown summary with functions, timing, quality delta
  • Dry-Run Mode: Analyze data without generating functions

Format Expansion (v0.7.0)

  • Markitdown Integration: Convert 20+ formats (PDF, Word, Excel, PowerPoint, HTML, EPUB, etc.) to text
  • Parquet Support: Load parquet files as structured data via pyarrow
  • LLM-Generated Parsers: Auto-generate parsers for XML and unknown formats (auto_parse=True)

Terminal UI (v0.8.0)

  • Mission Control Dashboard: Rich-based live terminal UI with retro aesthetic
  • Real-time Progress: Animated progress bars, chunk/iteration counters
  • Transmission Log: Parsed LLM responses showing issues detected and functions being generated
  • Token Estimation: Track estimated input/output tokens across the run
  • Graceful Fallback: Works without Rich installed (falls back to callbacks)

Configuration

cleaner = DataCleaner(
    # Required
    llm_backend=llm,
    file_path="data.jsonl",

    # Chunking
    chunk_size=50,              # Items per chunk (or chars for text mode)
    max_iterations=5,           # Max iterations per chunk
    context_budget=8000,        # Max chars for docstring context

    # Validation
    validate_runtime=True,      # Test functions before accepting
    schema_sample_size=10,      # Records for schema inference
    holdout_ratio=0.2,          # Fraction held out for validation

    # Sampling
    sampling_strategy="stratified",  # "sequential", "random", "stratified"
    stratify_field="status",         # Field for stratified sampling

    # Optimization
    optimize=True,              # Consolidate redundant functions
    early_termination=True,     # Stop when patterns saturate
    track_metrics=True,         # Measure before/after quality

    # Observability
    report_path="report.md",    # Markdown report output (None to disable)
    dry_run=False,              # Analyze without generating functions

    # Format Expansion
    auto_parse=False,           # LLM generates parser for unknown formats

    # Terminal UI
    tui=True,                   # Enable Rich dashboard (requires [tui] extra)

    # Progress & State
    on_progress=callback,       # Progress event callback
    state_file="state.json",    # Enable resume on interrupt
)

Progress Events

def on_progress(event):
    match event["type"]:
        case "chunk_start":
            print(f"Chunk {event['chunk_index']}/{event['total_chunks']}")
        case "llm_call":
            print(f"LLM latency: {event['latency_ms']}ms")
        case "function_generated":
            print(f"Generated: {event['function_name']}")
        case "issues_detected":  # dry-run mode
            print(f"Found {len(event['issues'])} issues")
        case "complete":
            stats = event["latency_stats"]
            print(f"Done! Avg latency: {stats['avg_ms']}ms")

Output

The cleaner generates cleaning_functions.py:

# Auto-generated cleaning functions
import re

def normalize_phone_numbers(data):
    """Normalize phone numbers to E.164 format."""
    # ... implementation ...

def fix_status_typos(data):
    """Fix typos in status field."""
    # ... implementation ...

def clean_data(data):
    """Apply all cleaning functions in order."""
    data = normalize_phone_numbers(data)
    data = fix_status_typos(data)
    return data

Custom LLM Backend

Implement the simple protocol:

class MyBackend:
    def generate(self, prompt: str) -> str:
        # Call your LLM (OpenAI, Anthropic, local, etc.)
        return response

Text Mode

For plain text files (PDFs, documents):

cleaner = DataCleaner(
    llm_backend=llm,
    file_path="document.txt",
    chunk_size=4000,  # Characters, not items
    instructions="Fix OCR errors, normalize whitespace",
)

Text mode uses sentence-aware chunking to avoid splitting mid-sentence.

Resume on Interrupt

# Start with state file
cleaner = DataCleaner(
    llm_backend=llm,
    file_path="huge_file.jsonl",
    state_file="cleaning_state.json",
)
cleaner.run()

# If interrupted, resume later:
cleaner = DataCleaner.resume("cleaning_state.json", llm)
cleaner.run()

Architecture

recursive_cleaner/
├── cleaner.py          # Main DataCleaner class
├── context.py          # Docstring registry with FIFO eviction
├── dependencies.py     # Topological sort for function ordering
├── metrics.py          # Quality metrics before/after
├── optimizer.py        # Two-pass consolidation with LLM agency
├── output.py           # Function file generation + import consolidation
├── parser_generator.py # LLM-generated parsers for unknown formats
├── parsers.py          # Chunking for all formats + sampling
├── prompt.py           # LLM prompt templates
├── report.py           # Markdown report generation
├── response.py         # XML/markdown parsing + agency dataclasses
├── schema.py           # Schema inference
├── tui.py              # Rich terminal dashboard
├── validation.py       # Runtime validation + holdout
└── vendor/
    └── chunker.py      # Vendored sentence-aware chunker

Testing

pytest tests/ -v

465 tests covering all features. Test datasets in test_cases/:

  • E-commerce product catalogs
  • Healthcare patient records
  • Financial transaction data

Philosophy

  • Simplicity over extensibility: ~3,000 lines that do one thing well
  • stdlib over dependencies: Only tenacity required
  • Retry over recover: On error, retry with error in prompt
  • Wu wei: Let the LLM make decisions about data it understands

Version History

Version Features
v0.8.0 Terminal UI with Rich dashboard, mission control aesthetic, transmission log
v0.7.0 Markitdown (20+ formats), Parquet support, LLM-generated parsers
v0.6.0 Latency metrics, import consolidation, cleaning report, dry-run mode
v0.5.1 Dangerous code detection (AST-based security)
v0.5.0 Two-pass optimization, early termination, LLM agency
v0.4.0 Holdout validation, dependency resolution, sampling, quality metrics
v0.3.0 Text mode with sentence-aware chunking
v0.2.0 Runtime validation, schema inference, callbacks, incremental saves
v0.1.0 Core pipeline, chunking, docstring registry

Acknowledgments

  • Sentence-aware text chunking adapted from Chonkie (MIT License)
  • Development assisted by Claude Code

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

recursive_cleaner-0.8.0.tar.gz (193.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

recursive_cleaner-0.8.0-py3-none-any.whl (48.5 kB view details)

Uploaded Python 3

File details

Details for the file recursive_cleaner-0.8.0.tar.gz.

File metadata

  • Download URL: recursive_cleaner-0.8.0.tar.gz
  • Upload date:
  • Size: 193.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for recursive_cleaner-0.8.0.tar.gz
Algorithm Hash digest
SHA256 506b64d1a9e6ffc5515b2bf300f2f6e37f2d4fb62f658d5373649551e5e07bbb
MD5 14aa738775f72f48946f6e6a798922f8
BLAKE2b-256 13f032e750e40bbb7bf4e1cdbc82a2f1dbc40c7d4a08992dbff64755128ed0eb

See more details on using hashes here.

File details

Details for the file recursive_cleaner-0.8.0-py3-none-any.whl.

File metadata

File hashes

Hashes for recursive_cleaner-0.8.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f34937201febb1c168581c3aed9f494829b102ca438b939fc47a43943565aede
MD5 db08bed8e383206b1066b8044a1f2f88
BLAKE2b-256 0686e93d3521d7b5d4c7d8029c4ba5931716a3cf266dea26b2c5794f0c386d9b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page