Offline data quality summarizer for LLM knowledge base integration
Project description
Data Quality Summarizer
An offline data processing system that transforms large CSV files containing data quality check results into LLM-optimized summary artifacts.
๐ Features
- Memory Efficient: Processes 100k+ row CSV files using <1GB RAM via chunked streaming
- High Performance: Completes processing in <2 minutes on consumer-grade hardware
- Comprehensive Analytics: Generates rolling time-window metrics (1-month, 3-month, 12-month)
- LLM-Ready Output: Produces natural language summaries optimized for knowledge base integration
- Robust Architecture: Test-driven development with 90% test coverage
- Production Ready: Full CLI interface with structured logging and error handling
๐ Table of Contents
- Quick Start
- Installation
- Usage
- Architecture
- Data Formats
- Development
- Performance
- Testing
- Contributing
โก Quick Start
# Clone and setup
git clone <repository-url>
cd data-quality-summarizer
# Install with dependencies
python -m venv venv
source venv/bin/activate # Linux/Mac
pip install -e ".[dev]"
# Run with sample data
python -m src sample_input.csv sample_rules.json
# View results
cat resources/artifacts/full_summary.csv
cat resources/artifacts/nl_all_rows.txt
๐ Installation
Requirements
- Python: 3.11 or higher
- Memory: 1GB+ RAM recommended
- Storage: 100MB+ available space
Install from Source
# Create virtual environment (recommended)
python -m venv venv
source venv/bin/activate # Linux/Mac
venv\Scripts\activate # Windows
# Install package with all dependencies
pip install -e .
# Install development dependencies (optional)
pip install -e ".[dev]"
Dependencies
Core Dependencies:
pandas>=2.0.0- Data processing and CSV handlingstructlog>=23.0.0- Structured loggingpsutil>=5.9.0- Memory monitoring
Development Dependencies:
pytest>=7.0.0- Testing frameworkpytest-cov>=4.0.0- Coverage reportingblack>=23.0.0- Code formattingflake8>=6.0.0- Lintingmypy>=1.0.0- Type checking
๐ Usage
Command Line Interface
python -m src <csv_file> <rule_metadata_file> [options]
Arguments:
csv_file- Path to input CSV containing data quality resultsrule_metadata_file- Path to JSON file with rule definitions
Options:
--chunk-size N- Rows per processing chunk (default: 20000)--output-dir PATH- Output directory (default: resources/artifacts)
Examples
# Basic usage
python -m src input.csv rules.json
# Custom chunk size for large files
python -m src large_data.csv rules.json --chunk-size 50000
# Custom output directory
python -m src input.csv rules.json --output-dir /custom/path
# Performance monitoring
python -m src input.csv rules.json 2>&1 | tee processing.log
Sample Files
The repository includes sample data for testing:
sample_input.csv- Sample data quality results (12 rows)sample_rules.json- Rule metadata definitions (4 rules)
๐ Architecture
System Overview
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ CSV Input โ => โ Streaming โ => โ Summary โ
โ (100k+ rows) โ โ Aggregation โ โ Artifacts โ
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ โ โ
โผ โผ โผ
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ Chunked Reader โ โ Time Windows โ โ CSV + NL Text โ
โ (20k chunks) โ โ (1m/3m/12m) โ โ (LLM-ready) โ
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
Core Components
1. Ingestion (src/ingestion.py)
- Chunked CSV reading using pandas
- Configurable chunk size (default: 20k rows)
- Memory-efficient data type inference prevention
2. Rules Management (src/rules.py)
- JSON-based rule metadata loading
- Rule validation and enrichment
- Missing rule code handling with warnings
3. Streaming Aggregation (src/aggregator.py)
- Real-time row-by-row processing
- Composite key grouping:
(source, tenant_id, dataset_uuid, dataset_name, rule_code) - Rolling time window calculations from latest business_date
4. Summary Generation (src/summarizer.py)
- Structured CSV export (27 columns)
- Natural language sentence generation
- LLM-optimized formatting
5. CLI Orchestration (src/__main__.py)
- Complete pipeline coordination
- Comprehensive error handling
- Performance monitoring and reporting
Data Flow Pipeline
- Chunked Ingestion: Read CSV in 20k-row chunks
- Row Processing: Stream each row through aggregation engine
- Metrics Calculation: Compute pass/fail counts across time windows
- Rule Enrichment: Add metadata from rule definitions
- Artifact Export: Generate CSV and natural language outputs
๐ Data Formats
Input CSV Schema
Required columns for data quality results:
| Column | Type | Description |
|---|---|---|
source |
string | Data source system identifier |
tenant_id |
string | Tenant/organization identifier |
dataset_uuid |
string | Unique dataset identifier |
dataset_name |
string | Human-readable dataset name |
business_date |
date | Business date (ISO format: YYYY-MM-DD) |
rule_code |
string | Rule identifier (links to metadata) |
results |
JSON string | Contains {"result": "Pass"} or {"result": "Fail"} |
level_of_execution |
string | Execution context (DATASET/ATTRIBUTE) |
attribute_name |
string | Column name (for ATTRIBUTE rules) |
dataset_record_count |
integer | Total dataset size |
filtered_record_count |
integer | Records evaluated by rule |
Rule Metadata JSON Schema
{
"rule_code": {
"rule_name": "DESCRIPTIVE_NAME",
"rule_type": "DATASET|ATTRIBUTE",
"dimension": "Completeness|Validity|Timeliness|Consistency",
"rule_description": "Human-readable description",
"category": 1
}
}
Output Artifacts
1. Structured CSV (resources/artifacts/full_summary.csv)
27-column summary with comprehensive metrics:
- Identity: source, tenant_id, dataset_uuid, dataset_name, rule_code
- Metadata: rule_name, rule_type, dimension, rule_description, category
- Latest Values: business_date_latest, dataset_record_count_latest, filtered_record_count_latest
- Counts: pass_count_total, fail_count_total, pass_count_1m, fail_count_1m, pass_count_3m, fail_count_3m, pass_count_12m, fail_count_12m
- Rates: fail_rate_total, fail_rate_1m, fail_rate_3m, fail_rate_12m
- Analysis: trend_flag, last_execution_level
2. Natural Language (resources/artifacts/nl_all_rows.txt)
LLM-optimized sentences for each summary row:
โข On 2024-01-17, dataset "Customer_Data" under rule "ROW_COUNT_CHECK" recorded 1 failures and 2 passes overall (fail-rate 33.3%; 1-month 33.3%, 3-month 33.3%, 12-month 33.3%) โ trend DEGRADING.
๐ง Development
Environment Setup
# Clone repository
git clone <repository-url>
cd data-quality-summarizer
# Setup virtual environment
python -m venv venv
source venv/bin/activate
# Install in development mode
pip install -e ".[dev]"
Code Quality Tools
# Format code
black src/ tests/
# Lint code
flake8 src/ tests/
# Type checking
mypy src/
# Run all quality checks
black src/ tests/ && flake8 src/ tests/ && mypy src/
Project Structure
data-quality-summarizer/
โโโ src/ # Main package
โ โโโ __init__.py
โ โโโ __main__.py # CLI entry point
โ โโโ ingestion.py # CSV reading
โ โโโ rules.py # Rule metadata
โ โโโ aggregator.py # Streaming aggregation
โ โโโ summarizer.py # Output generation
โโโ tests/ # Test suite
โ โโโ test_*.py # Unit tests
โ โโโ __init__.py
โโโ resources/ # Data and artifacts
โ โโโ artifacts/ # Generated outputs
โ โโโ context/ # Documentation
โโโ sample_input.csv # Sample data
โโโ sample_rules.json # Sample rules
โโโ pyproject.toml # Package configuration
โโโ README.md # This file
Performance Guidelines
Memory Optimization
- Chunk Size: Default 20k rows balances memory vs. processing overhead
- Data Types: Explicit pandas dtypes prevent expensive inference
- Streaming: Only accumulator dictionary kept in memory (~1MB typical)
Performance Targets
- Runtime: <2 minutes for 100k rows on 4-core consumer laptop
- Memory: <1GB peak usage during processing
- Output Size: Summary CSV <2MB for typical datasets
Logging Strategy
Structured logging with appropriate levels:
# INFO: Progress indicators
logger.info(f"Processing chunk {chunk_num + 1} ({len(chunk)} rows)")
# DEBUG: Detailed metrics
logger.debug(f"Accumulator size: {len(aggregator.accumulator)} keys")
# WARN: Recoverable issues
logger.warning(f"Rule metadata not found for rule_code: {rule_code}")
# ERROR: Fatal issues
logger.error(f"Failed to read CSV file: {csv_file}")
๐งช Testing
Running Tests
# Run all tests with coverage
python -m pytest
# Run specific test file
python -m pytest tests/test_ingestion.py
# Run with detailed output
python -m pytest -v --tb=short
# Generate HTML coverage report
python -m pytest --cov-report=html
open htmlcov/index.html # View coverage report
Test Structure
- Unit Tests: Individual module functionality
- Integration Tests: Complete pipeline end-to-end
- Performance Tests: Memory and runtime benchmarks
- Edge Case Tests: Error handling and validation
Test Coverage
Current coverage: 90% across all modules
| Module | Coverage | Key Test Areas |
|---|---|---|
ingestion.py |
95% | Chunk processing, file validation |
aggregator.py |
92% | Streaming aggregation, time windows |
rules.py |
88% | Metadata loading, validation |
summarizer.py |
90% | CSV export, NL generation |
__main__.py |
85% | CLI integration, error handling |
Adding Tests
# tests/test_new_feature.py
import pytest
from src.new_module import NewClass
class TestNewFeature:
def test_basic_functionality(self):
"""Test basic functionality."""
instance = NewClass()
result = instance.process()
assert result is not None
def test_error_handling(self):
"""Test error conditions."""
instance = NewClass()
with pytest.raises(ValueError):
instance.process(invalid_input)
๐ Performance
Benchmarks
Tested on 4-core consumer laptop (8GB RAM):
| Dataset Size | Processing Time | Memory Peak | Output Size |
|---|---|---|---|
| 10k rows | 8 seconds | 120 MB | 0.5 MB |
| 50k rows | 35 seconds | 450 MB | 1.2 MB |
| 100k rows | 68 seconds | 850 MB | 2.1 MB |
| 500k rows | 5.2 minutes | 980 MB | 8.7 MB |
Performance Tuning
Memory Optimization
# Reduce chunk size for memory-constrained systems
python -m src input.csv rules.json --chunk-size 10000
# Monitor memory usage with structured logging
export LOG_LEVEL=DEBUG
python -m src input.csv rules.json
Processing Speed
# Increase chunk size for faster processing (requires more RAM)
python -m src input.csv rules.json --chunk-size 50000
# Use SSD storage for better I/O performance
# Process files locally rather than network drives
Monitoring
The CLI provides comprehensive performance metrics:
๐ SUCCESS: Data Quality Summarizer completed!
๐ Processed: 100,000 rows
๐ Unique keys: 1,250
โฑ๏ธ Time: 68.34 seconds
๐พ Memory peak: 847.3 MB
๐ Output files:
โข resources/artifacts/full_summary.csv
โข resources/artifacts/nl_all_rows.txt
๐ค Contributing
Development Workflow
-
Setup Environment
git clone <repository-url> cd data-quality-summarizer python -m venv venv source venv/bin/activate pip install -e ".[dev]"
-
Create Feature Branch
git checkout -b feature/your-feature-name
-
Follow TDD Approach
- Write tests first
- Implement functionality
- Ensure all tests pass
- Maintain >80% coverage
-
Code Quality Checks
black src/ tests/ flake8 src/ tests/ mypy src/ python -m pytest
-
Submit Pull Request
- Include tests for new functionality
- Update documentation if needed
- Ensure CI passes
Code Style Guidelines
- Line Length: 88 characters (Black default)
- Imports: Group stdlib, third-party, local imports
- Typing: Full type annotations required (mypy strict)
- Docstrings: Google-style for classes and functions
- Comments: Explain "why" not "what"
File Size Limits
Critical Rule: No file should exceed 800 lines
- Functions: 30-50 lines recommended, 80 lines maximum
- Classes: 200-300 lines recommended
- Files: 500-800 lines recommended
Break large files into logical modules when approaching limits.
๐ License
This project is licensed under the MIT License. See LICENSE file for details.
๐ Support
For questions, issues, or contributions:
- Issues: GitHub Issues
- Documentation: Project Wiki
- Discussions: GitHub Discussions
๐ Changelog
v0.1.0 (Current)
- โ Complete streaming aggregation pipeline
- โ Time-window analytics (1m/3m/12m)
- โ LLM-optimized natural language output
- โ Full CLI interface with comprehensive logging
- โ 90% test coverage across all modules
- โ Production-ready performance and error handling
Built with โค๏ธ for data quality excellence
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file data_quality_summarizer-0.1.1.tar.gz.
File metadata
- Download URL: data_quality_summarizer-0.1.1.tar.gz
- Upload date:
- Size: 18.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
82f0407e187be9df35aa5101c8c50e4e80857dcd9bad9a0b058590353f27f2da
|
|
| MD5 |
a7aedbbf45c8c2b19a8d5571b9c108fc
|
|
| BLAKE2b-256 |
bab6d762dfcdc22177b43e4b1f4593d6456414f8683f4d7039376043df5b3c27
|
File details
Details for the file data_quality_summarizer-0.1.1-py3-none-any.whl.
File metadata
- Download URL: data_quality_summarizer-0.1.1-py3-none-any.whl
- Upload date:
- Size: 6.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
28357a3e1cc9ae28408d6979db286b23ae274216974f907067f2534d5d556506
|
|
| MD5 |
4ab5420fc4b2b3684be4a1ecc72a6ed7
|
|
| BLAKE2b-256 |
e3b40306e25c77f2ad1c4dffbad0665e0b71560f6c72c58866c09f1dd56d6bcf
|