Skip to main content

LLM-powered incremental data cleaning pipeline that processes massive datasets in chunks and generates Python cleaning functions

Project description

Recursive Data Cleaner

LLM-powered incremental data cleaning for massive datasets. Process files in chunks, identify quality issues, and automatically generate Python cleaning functions.

How It Works

  1. Chunk your data (JSONL, CSV, JSON, Parquet, PDF, Word, Excel, XML, and more)
  2. Analyze each chunk with an LLM to identify issues
  3. Generate one cleaning function per issue
  4. Validate functions on holdout data before accepting
  5. Output a ready-to-use cleaning_functions.py

The system maintains a "docstring registry" - feeding generated function descriptions back into prompts so the LLM knows what's already solved and avoids duplicate work.

Installation

pip install -e .

For Apple Silicon (MLX backend):

pip install -e ".[mlx]"

For document conversion (PDF, Word, Excel, HTML, etc.):

pip install -e ".[markitdown]"

For Parquet files:

pip install -e ".[parquet]"

Quick Start

from recursive_cleaner import DataCleaner
from backends import MLXBackend

# Any LLM with generate(prompt) -> str works
llm = MLXBackend(model_path="your-model")

cleaner = DataCleaner(
    llm_backend=llm,
    file_path="messy_data.jsonl",
    chunk_size=50,
    instructions="""
    - Normalize phone numbers to E.164
    - Fix typos in status field (valid: active, pending, churned)
    - Convert dates to ISO 8601
    """,
)

cleaner.run()  # Generates cleaning_functions.py

Features

Core

  • Chunked Processing: Handle files larger than LLM context windows
  • Incremental Generation: One function per issue, building up a complete solution
  • Docstring Registry: Automatic context management with FIFO eviction
  • AST Validation: All generated code validated before output
  • Error Recovery: Retries with error feedback on parse failures

Data Quality (v0.4.0+)

  • Holdout Validation: Test functions on unseen 20% of each chunk
  • Sampling Strategies: Sequential, random, or stratified sampling
  • Quality Metrics: Before/after comparison with improvement reports
  • Dependency Resolution: Topological sort for correct function ordering

Optimization (v0.5.0+)

  • Two-Pass Consolidation: Merge redundant functions after generation
  • Early Termination: Stop when LLM detects pattern saturation
  • LLM Agency: Model decides chunk cleanliness and saturation

Security (v0.5.1+)

  • Dangerous Code Detection: AST-based detection of exec, eval, subprocess, network calls

Observability (v0.6.0)

  • Latency Metrics: Track min/max/avg/total LLM call times
  • Import Consolidation: Deduplicate and merge imports in output
  • Cleaning Reports: Markdown summary with functions, timing, quality delta
  • Dry-Run Mode: Analyze data without generating functions

Format Expansion (v0.7.0)

  • Markitdown Integration: Convert 20+ formats (PDF, Word, Excel, PowerPoint, HTML, EPUB, etc.) to text
  • Parquet Support: Load parquet files as structured data via pyarrow
  • LLM-Generated Parsers: Auto-generate parsers for XML and unknown formats (auto_parse=True)

Configuration

cleaner = DataCleaner(
    # Required
    llm_backend=llm,
    file_path="data.jsonl",

    # Chunking
    chunk_size=50,              # Items per chunk (or chars for text mode)
    max_iterations=5,           # Max iterations per chunk
    context_budget=8000,        # Max chars for docstring context

    # Validation
    validate_runtime=True,      # Test functions before accepting
    schema_sample_size=10,      # Records for schema inference
    holdout_ratio=0.2,          # Fraction held out for validation

    # Sampling
    sampling_strategy="stratified",  # "sequential", "random", "stratified"
    stratify_field="status",         # Field for stratified sampling

    # Optimization
    optimize=True,              # Consolidate redundant functions
    early_termination=True,     # Stop when patterns saturate
    track_metrics=True,         # Measure before/after quality

    # Observability
    report_path="report.md",    # Markdown report output (None to disable)
    dry_run=False,              # Analyze without generating functions

    # Format Expansion
    auto_parse=False,           # LLM generates parser for unknown formats

    # Progress & State
    on_progress=callback,       # Progress event callback
    state_file="state.json",    # Enable resume on interrupt
)

Progress Events

def on_progress(event):
    match event["type"]:
        case "chunk_start":
            print(f"Chunk {event['chunk_index']}/{event['total_chunks']}")
        case "llm_call":
            print(f"LLM latency: {event['latency_ms']}ms")
        case "function_generated":
            print(f"Generated: {event['function_name']}")
        case "issues_detected":  # dry-run mode
            print(f"Found {len(event['issues'])} issues")
        case "complete":
            stats = event["latency_stats"]
            print(f"Done! Avg latency: {stats['avg_ms']}ms")

Output

The cleaner generates cleaning_functions.py:

# Auto-generated cleaning functions
import re

def normalize_phone_numbers(data):
    """Normalize phone numbers to E.164 format."""
    # ... implementation ...

def fix_status_typos(data):
    """Fix typos in status field."""
    # ... implementation ...

def clean_data(data):
    """Apply all cleaning functions in order."""
    data = normalize_phone_numbers(data)
    data = fix_status_typos(data)
    return data

Custom LLM Backend

Implement the simple protocol:

class MyBackend:
    def generate(self, prompt: str) -> str:
        # Call your LLM (OpenAI, Anthropic, local, etc.)
        return response

Text Mode

For plain text files (PDFs, documents):

cleaner = DataCleaner(
    llm_backend=llm,
    file_path="document.txt",
    chunk_size=4000,  # Characters, not items
    instructions="Fix OCR errors, normalize whitespace",
)

Text mode uses sentence-aware chunking to avoid splitting mid-sentence.

Resume on Interrupt

# Start with state file
cleaner = DataCleaner(
    llm_backend=llm,
    file_path="huge_file.jsonl",
    state_file="cleaning_state.json",
)
cleaner.run()

# If interrupted, resume later:
cleaner = DataCleaner.resume("cleaning_state.json", llm)
cleaner.run()

Architecture

recursive_cleaner/
├── cleaner.py          # Main DataCleaner class
├── context.py          # Docstring registry with FIFO eviction
├── dependencies.py     # Topological sort for function ordering
├── metrics.py          # Quality metrics before/after
├── optimizer.py        # Two-pass consolidation with LLM agency
├── output.py           # Function file generation + import consolidation
├── parser_generator.py # LLM-generated parsers for unknown formats
├── parsers.py          # Chunking for all formats + sampling
├── prompt.py           # LLM prompt templates
├── report.py           # Markdown report generation
├── response.py         # XML/markdown parsing + agency dataclasses
├── schema.py           # Schema inference
├── validation.py       # Runtime validation + holdout
└── vendor/
    └── chunker.py      # Vendored sentence-aware chunker

Testing

pytest tests/ -v

432 tests covering all features. Test datasets in test_cases/:

  • E-commerce product catalogs
  • Healthcare patient records
  • Financial transaction data

Philosophy

  • Simplicity over extensibility: ~3,000 lines that do one thing well
  • stdlib over dependencies: Only tenacity required
  • Retry over recover: On error, retry with error in prompt
  • Wu wei: Let the LLM make decisions about data it understands

Version History

Version Features
v0.7.0 Markitdown (20+ formats), Parquet support, LLM-generated parsers
v0.6.0 Latency metrics, import consolidation, cleaning report, dry-run mode
v0.5.1 Dangerous code detection (AST-based security)
v0.5.0 Two-pass optimization, early termination, LLM agency
v0.4.0 Holdout validation, dependency resolution, sampling, quality metrics
v0.3.0 Text mode with sentence-aware chunking
v0.2.0 Runtime validation, schema inference, callbacks, incremental saves
v0.1.0 Core pipeline, chunking, docstring registry

Acknowledgments

  • Sentence-aware text chunking adapted from Chonkie (MIT License)
  • Development assisted by Claude Code

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

recursive_cleaner-0.7.1.tar.gz (178.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

recursive_cleaner-0.7.1-py3-none-any.whl (42.0 kB view details)

Uploaded Python 3

File details

Details for the file recursive_cleaner-0.7.1.tar.gz.

File metadata

  • Download URL: recursive_cleaner-0.7.1.tar.gz
  • Upload date:
  • Size: 178.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for recursive_cleaner-0.7.1.tar.gz
Algorithm Hash digest
SHA256 10b67d2bb84010c9b0db5f869abf0f883d63943f8cdcbddbd46c965e56a8989e
MD5 9869e7c1bbc57b75354b174d127100bb
BLAKE2b-256 746a07c97be6696ad5118d3d6fbc8eea69f3bd5d888b795a2a299dd0babf138b

See more details on using hashes here.

File details

Details for the file recursive_cleaner-0.7.1-py3-none-any.whl.

File metadata

File hashes

Hashes for recursive_cleaner-0.7.1-py3-none-any.whl
Algorithm Hash digest
SHA256 7a7ed2ad0ad07501306a007d0a47046ae689d6a4685e91524af39bc55565975e
MD5 22b2290a6aba0319cda82a931b051099
BLAKE2b-256 4acf39a1c99b948e971528c55758e2bdc5356d38a11dd3eaa5da19c20d2be491

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page