Skip to main content

Offline data quality summarizer for LLM knowledge base integration

Project description

Data Quality Summarizer

An offline data processing system that transforms large CSV files containing data quality check results into LLM-optimized summary artifacts.

Python 3.11+ Test Coverage Code Style: Black

๐Ÿš€ Features

  • Memory Efficient: Processes 100k+ row CSV files using <1GB RAM via chunked streaming
  • High Performance: Completes processing in <2 minutes on consumer-grade hardware
  • Comprehensive Analytics: Generates rolling time-window metrics (1-month, 3-month, 12-month)
  • LLM-Ready Output: Produces natural language summaries optimized for knowledge base integration
  • Robust Architecture: Test-driven development with 90% test coverage
  • Production Ready: Full CLI interface with structured logging and error handling

๐Ÿ“‹ Table of Contents

โšก Quick Start

# Clone and setup
git clone <repository-url>
cd data-quality-summarizer

# Install with dependencies
python -m venv venv
source venv/bin/activate  # Linux/Mac
pip install -e ".[dev]"

# Run with sample data
python -m src sample_input.csv sample_rules.json

# View results
cat resources/artifacts/full_summary.csv
cat resources/artifacts/nl_all_rows.txt

๐Ÿ›  Installation

Requirements

  • Python: 3.11 or higher
  • Memory: 1GB+ RAM recommended
  • Storage: 100MB+ available space

Install from Source

# Create virtual environment (recommended)
python -m venv venv
source venv/bin/activate  # Linux/Mac
venv\Scripts\activate     # Windows

# Install package with all dependencies
pip install -e .

# Install development dependencies (optional)
pip install -e ".[dev]"

Dependencies

Core Dependencies:

  • pandas>=2.0.0 - Data processing and CSV handling
  • structlog>=23.0.0 - Structured logging
  • psutil>=5.9.0 - Memory monitoring

Development Dependencies:

  • pytest>=7.0.0 - Testing framework
  • pytest-cov>=4.0.0 - Coverage reporting
  • black>=23.0.0 - Code formatting
  • flake8>=6.0.0 - Linting
  • mypy>=1.0.0 - Type checking

๐Ÿ“– Usage

Command Line Interface

python -m src <csv_file> <rule_metadata_file> [options]

Arguments:

  • csv_file - Path to input CSV containing data quality results
  • rule_metadata_file - Path to JSON file with rule definitions

Options:

  • --chunk-size N - Rows per processing chunk (default: 20000)
  • --output-dir PATH - Output directory (default: resources/artifacts)

Examples

# Basic usage
python -m src input.csv rules.json

# Custom chunk size for large files
python -m src large_data.csv rules.json --chunk-size 50000

# Custom output directory
python -m src input.csv rules.json --output-dir /custom/path

# Performance monitoring
python -m src input.csv rules.json 2>&1 | tee processing.log

Sample Files

The repository includes sample data for testing:

  • sample_input.csv - Sample data quality results (12 rows)
  • sample_rules.json - Rule metadata definitions (4 rules)

๐Ÿ— Architecture

System Overview

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   CSV Input     โ”‚ => โ”‚  Streaming       โ”‚ => โ”‚   Summary       โ”‚
โ”‚  (100k+ rows)   โ”‚    โ”‚  Aggregation     โ”‚    โ”‚  Artifacts      โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
        โ”‚                        โ”‚                        โ”‚
        โ–ผ                        โ–ผ                        โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Chunked Reader  โ”‚    โ”‚ Time Windows     โ”‚    โ”‚ CSV + NL Text   โ”‚
โ”‚ (20k chunks)    โ”‚    โ”‚ (1m/3m/12m)      โ”‚    โ”‚ (LLM-ready)     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Core Components

1. Ingestion (src/ingestion.py)

  • Chunked CSV reading using pandas
  • Configurable chunk size (default: 20k rows)
  • Memory-efficient data type inference prevention

2. Rules Management (src/rules.py)

  • JSON-based rule metadata loading
  • Rule validation and enrichment
  • Missing rule code handling with warnings

3. Streaming Aggregation (src/aggregator.py)

  • Real-time row-by-row processing
  • Composite key grouping: (source, tenant_id, dataset_uuid, dataset_name, rule_code)
  • Rolling time window calculations from latest business_date

4. Summary Generation (src/summarizer.py)

  • Structured CSV export (27 columns)
  • Natural language sentence generation
  • LLM-optimized formatting

5. CLI Orchestration (src/__main__.py)

  • Complete pipeline coordination
  • Comprehensive error handling
  • Performance monitoring and reporting

Data Flow Pipeline

  1. Chunked Ingestion: Read CSV in 20k-row chunks
  2. Row Processing: Stream each row through aggregation engine
  3. Metrics Calculation: Compute pass/fail counts across time windows
  4. Rule Enrichment: Add metadata from rule definitions
  5. Artifact Export: Generate CSV and natural language outputs

๐Ÿ“Š Data Formats

Input CSV Schema

Required columns for data quality results:

Column Type Description
source string Data source system identifier
tenant_id string Tenant/organization identifier
dataset_uuid string Unique dataset identifier
dataset_name string Human-readable dataset name
business_date date Business date (ISO format: YYYY-MM-DD)
rule_code string Rule identifier (links to metadata)
results JSON string Contains {"result": "Pass"} or {"result": "Fail"}
level_of_execution string Execution context (DATASET/ATTRIBUTE)
attribute_name string Column name (for ATTRIBUTE rules)
dataset_record_count integer Total dataset size
filtered_record_count integer Records evaluated by rule

Rule Metadata JSON Schema

{
  "rule_code": {
    "rule_name": "DESCRIPTIVE_NAME",
    "rule_type": "DATASET|ATTRIBUTE", 
    "dimension": "Completeness|Validity|Timeliness|Consistency",
    "rule_description": "Human-readable description",
    "category": 1
  }
}

Output Artifacts

1. Structured CSV (resources/artifacts/full_summary.csv)

27-column summary with comprehensive metrics:

  • Identity: source, tenant_id, dataset_uuid, dataset_name, rule_code
  • Metadata: rule_name, rule_type, dimension, rule_description, category
  • Latest Values: business_date_latest, dataset_record_count_latest, filtered_record_count_latest
  • Counts: pass_count_total, fail_count_total, pass_count_1m, fail_count_1m, pass_count_3m, fail_count_3m, pass_count_12m, fail_count_12m
  • Rates: fail_rate_total, fail_rate_1m, fail_rate_3m, fail_rate_12m
  • Analysis: trend_flag, last_execution_level

2. Natural Language (resources/artifacts/nl_all_rows.txt)

LLM-optimized sentences for each summary row:

โ€ข On 2024-01-17, dataset "Customer_Data" under rule "ROW_COUNT_CHECK" recorded 1 failures and 2 passes overall (fail-rate 33.3%; 1-month 33.3%, 3-month 33.3%, 12-month 33.3%) โ€” trend DEGRADING.

๐Ÿ”ง Development

Environment Setup

# Clone repository
git clone <repository-url>
cd data-quality-summarizer

# Setup virtual environment
python -m venv venv
source venv/bin/activate

# Install in development mode
pip install -e ".[dev]"

Code Quality Tools

# Format code
black src/ tests/

# Lint code  
flake8 src/ tests/

# Type checking
mypy src/

# Run all quality checks
black src/ tests/ && flake8 src/ tests/ && mypy src/

Project Structure

data-quality-summarizer/
โ”œโ”€โ”€ src/                    # Main package
โ”‚   โ”œโ”€โ”€ __init__.py
โ”‚   โ”œโ”€โ”€ __main__.py        # CLI entry point
โ”‚   โ”œโ”€โ”€ ingestion.py       # CSV reading
โ”‚   โ”œโ”€โ”€ rules.py          # Rule metadata
โ”‚   โ”œโ”€โ”€ aggregator.py     # Streaming aggregation
โ”‚   โ””โ”€โ”€ summarizer.py     # Output generation
โ”œโ”€โ”€ tests/                 # Test suite
โ”‚   โ”œโ”€โ”€ test_*.py         # Unit tests
โ”‚   โ””โ”€โ”€ __init__.py
โ”œโ”€โ”€ resources/            # Data and artifacts
โ”‚   โ”œโ”€โ”€ artifacts/        # Generated outputs
โ”‚   โ””โ”€โ”€ context/         # Documentation  
โ”œโ”€โ”€ sample_input.csv      # Sample data
โ”œโ”€โ”€ sample_rules.json     # Sample rules
โ”œโ”€โ”€ pyproject.toml        # Package configuration
โ””โ”€โ”€ README.md            # This file

Performance Guidelines

Memory Optimization

  • Chunk Size: Default 20k rows balances memory vs. processing overhead
  • Data Types: Explicit pandas dtypes prevent expensive inference
  • Streaming: Only accumulator dictionary kept in memory (~1MB typical)

Performance Targets

  • Runtime: <2 minutes for 100k rows on 4-core consumer laptop
  • Memory: <1GB peak usage during processing
  • Output Size: Summary CSV <2MB for typical datasets

Logging Strategy

Structured logging with appropriate levels:

# INFO: Progress indicators
logger.info(f"Processing chunk {chunk_num + 1} ({len(chunk)} rows)")

# DEBUG: Detailed metrics  
logger.debug(f"Accumulator size: {len(aggregator.accumulator)} keys")

# WARN: Recoverable issues
logger.warning(f"Rule metadata not found for rule_code: {rule_code}")

# ERROR: Fatal issues
logger.error(f"Failed to read CSV file: {csv_file}")

๐Ÿงช Testing

Running Tests

# Run all tests with coverage
python -m pytest

# Run specific test file
python -m pytest tests/test_ingestion.py

# Run with detailed output
python -m pytest -v --tb=short

# Generate HTML coverage report
python -m pytest --cov-report=html
open htmlcov/index.html  # View coverage report

Test Structure

  • Unit Tests: Individual module functionality
  • Integration Tests: Complete pipeline end-to-end
  • Performance Tests: Memory and runtime benchmarks
  • Edge Case Tests: Error handling and validation

Test Coverage

Current coverage: 90% across all modules

Module Coverage Key Test Areas
ingestion.py 95% Chunk processing, file validation
aggregator.py 92% Streaming aggregation, time windows
rules.py 88% Metadata loading, validation
summarizer.py 90% CSV export, NL generation
__main__.py 85% CLI integration, error handling

Adding Tests

# tests/test_new_feature.py
import pytest
from src.new_module import NewClass

class TestNewFeature:
    def test_basic_functionality(self):
        """Test basic functionality."""
        instance = NewClass()
        result = instance.process()
        assert result is not None
    
    def test_error_handling(self):
        """Test error conditions."""
        instance = NewClass()
        with pytest.raises(ValueError):
            instance.process(invalid_input)

๐Ÿ“ˆ Performance

Benchmarks

Tested on 4-core consumer laptop (8GB RAM):

Dataset Size Processing Time Memory Peak Output Size
10k rows 8 seconds 120 MB 0.5 MB
50k rows 35 seconds 450 MB 1.2 MB
100k rows 68 seconds 850 MB 2.1 MB
500k rows 5.2 minutes 980 MB 8.7 MB

Performance Tuning

Memory Optimization

# Reduce chunk size for memory-constrained systems
python -m src input.csv rules.json --chunk-size 10000

# Monitor memory usage with structured logging
export LOG_LEVEL=DEBUG
python -m src input.csv rules.json

Processing Speed

# Increase chunk size for faster processing (requires more RAM)
python -m src input.csv rules.json --chunk-size 50000

# Use SSD storage for better I/O performance
# Process files locally rather than network drives

Monitoring

The CLI provides comprehensive performance metrics:

๐ŸŽ‰ SUCCESS: Data Quality Summarizer completed!
   ๐Ÿ“Š Processed: 100,000 rows
   ๐Ÿ”‘ Unique keys: 1,250
   โฑ๏ธ  Time: 68.34 seconds  
   ๐Ÿ’พ Memory peak: 847.3 MB
   ๐Ÿ“ Output files:
      โ€ข resources/artifacts/full_summary.csv
      โ€ข resources/artifacts/nl_all_rows.txt

๐Ÿค Contributing

Development Workflow

  1. Setup Environment

    git clone <repository-url>
    cd data-quality-summarizer
    python -m venv venv
    source venv/bin/activate
    pip install -e ".[dev]"
    
  2. Create Feature Branch

    git checkout -b feature/your-feature-name
    
  3. Follow TDD Approach

    • Write tests first
    • Implement functionality
    • Ensure all tests pass
    • Maintain >80% coverage
  4. Code Quality Checks

    black src/ tests/
    flake8 src/ tests/
    mypy src/
    python -m pytest
    
  5. Submit Pull Request

    • Include tests for new functionality
    • Update documentation if needed
    • Ensure CI passes

Code Style Guidelines

  • Line Length: 88 characters (Black default)
  • Imports: Group stdlib, third-party, local imports
  • Typing: Full type annotations required (mypy strict)
  • Docstrings: Google-style for classes and functions
  • Comments: Explain "why" not "what"

File Size Limits

Critical Rule: No file should exceed 800 lines

  • Functions: 30-50 lines recommended, 80 lines maximum
  • Classes: 200-300 lines recommended
  • Files: 500-800 lines recommended

Break large files into logical modules when approaching limits.

๐Ÿ“„ License

This project is licensed under the MIT License. See LICENSE file for details.

๐Ÿ“ž Support

For questions, issues, or contributions:

๐Ÿ”„ Changelog

v0.1.0 (Current)

  • โœ… Complete streaming aggregation pipeline
  • โœ… Time-window analytics (1m/3m/12m)
  • โœ… LLM-optimized natural language output
  • โœ… Full CLI interface with comprehensive logging
  • โœ… 90% test coverage across all modules
  • โœ… Production-ready performance and error handling

Built with โค๏ธ for data quality excellence

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

data_quality_summarizer_3-0.1.1.tar.gz (33.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

data_quality_summarizer_3-0.1.1-py3-none-any.whl (19.8 kB view details)

Uploaded Python 3

File details

Details for the file data_quality_summarizer_3-0.1.1.tar.gz.

File metadata

File hashes

Hashes for data_quality_summarizer_3-0.1.1.tar.gz
Algorithm Hash digest
SHA256 6d8bd1b2f03e10d262a06f4cece7c3cf4288c7b805f8cb67d209ab7f05f5c59b
MD5 0f6ab7279fef71ca2b925c972333c554
BLAKE2b-256 d7560799031e0d03330f149c3592c5c35a0cb9f3bc68f3bce364831a9981b1fd

See more details on using hashes here.

File details

Details for the file data_quality_summarizer_3-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for data_quality_summarizer_3-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 e77af25d25406130acd51060103a18d0f7e424055191848c9913dd0d30a17647
MD5 d6c5f4e9188880a909e2ede77cba8316
BLAKE2b-256 c57cbaf42fe16de0c047fc95b70c0f25bbf16b4f58f1ec3b43058ad183b9d765

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page