Skip to main content

Production-ready text cleaning, deduplication, and vectorization pipeline with C++ acceleration

Project description

VecClean Documentation

VecClean is an ultra-low latency text cleaning, deduplication, and vectorization pipeline designed for production-ready document processing at scale. It combines the flexibility of Python with the performance of C++ to deliver maximum throughput for RAG pipelines.

Installation

From PyPI (Development Version)

pip install vecclean

Quick Start

Basic Text Processing

import asyncio
from vecclean import Pipeline

async def main():
    # Initialize the pipeline with default configuration
    pipeline = Pipeline()
    
    # Process a simple text
    sample_text = """
    This is a sample document for testing.
    It contains multiple sentences that need processing.
    
    Some sentences might have    extra   whitespace.
    Others might contain HTML tags like <b>bold text</b>.
    """
    
    # Process the text
    result = await pipeline.process_text(sample_text)
    
    # Access the processed chunks
    for i, chunk in enumerate(result.chunks):
        print(f"Chunk {i}: {chunk.text}")
        print(f"Hash: {chunk.text_hash}")
        print(f"Word count: {chunk.word_count}")
        print(f"Embedding shape: {chunk.embedding.shape if chunk.embedding is not None else 'None'}")
        print("---")

# Run the async function
asyncio.run(main())

Processing Files

import asyncio
from vecclean import Pipeline
from pathlib import Path

async def process_files():
    pipeline = Pipeline()
    
    # Process multiple files
    files = [
        "document1.pdf",
        "document2.docx", 
        "document3.txt"
    ]
    
    result = await pipeline.process_files(files)
    
    print(f"Processed {len(result.chunks)} chunks from {len(files)} files")
    print(f"Processing time: {result.stats.total_processing_time:.2f} seconds")
    
    # Save results to JSON
    import json
    with open("processed_chunks.json", "w") as f:
        json.dump(result.to_dict(), f, indent=2)

asyncio.run(process_files())

Configuration

Custom Configuration

from vecclean import Pipeline, Config

# Create custom configuration
config = Config(
    chunking={
        "chunk_size": 512,
        "chunk_overlap": 50,
        "strategy": "sentence"
    },
    cleaning={
        "normalize_whitespace": True,
        "strip_html_tags": True,
        "remove_stopwords": True
    },
    dedup={
        "sentence_dedup": True,
        "chunk_dedup": True,
        "similarity_threshold": 0.85
    },
    embedding={
        "model_name": "all-MiniLM-L6-v2",
        "device": "auto"
    }
)

# Initialize pipeline with custom config
pipeline = Pipeline(config)

Configuration Options

Chunking Configuration

  • chunk_size: Maximum size of each chunk (default: 512)
  • chunk_overlap: Overlap between chunks (default: 50)
  • strategy: Chunking strategy - "sentence", "token", "recursive" (default: "sentence")
  • min_chunk_size: Minimum chunk size (default: 100)
  • max_chunk_size: Maximum chunk size (default: 1000)

Cleaning Configuration

  • normalize_unicode: Unicode normalization form (default: "NFC")
  • normalize_whitespace: Normalize whitespace (default: True)
  • standardize_punctuation: Standardize punctuation (default: True)
  • strip_html_tags: Remove HTML tags (default: True)
  • remove_stopwords: Remove stopwords (default: True)
  • min_text_length: Minimum text length (default: 10)

Deduplication Configuration

  • sentence_dedup: Enable sentence-level deduplication (default: True)
  • chunk_dedup: Enable chunk-level deduplication (default: True)
  • similarity_threshold: Similarity threshold for deduplication (default: 0.85)
  • hash_algorithm: Hash algorithm for deduplication (default: "xxhash")

Embedding Configuration

  • model_name: Embedding model name (default: "all-MiniLM-L6-v2")
  • device: Device for embedding generation (default: "auto")
  • batch_size: Batch size for embedding generation (default: 32)
  • cache_embeddings: Cache embeddings (default: True)

API Reference

Pipeline Class

The main entry point for text processing.

Methods

process_text(text: str) -> ProcessingResult

Process a single text string.

result = await pipeline.process_text("Your text here")
process_files(files: List[str]) -> ProcessingResult

Process multiple files.

result = await pipeline.process_files(["file1.pdf", "file2.txt"])
process_documents(documents: List[Document]) -> ProcessingResult

Process a list of document objects.

from vecclean.core.types import Document

documents = [
    Document(content="Text 1", metadata={"source": "file1.txt"}),
    Document(content="Text 2", metadata={"source": "file2.txt"})
]
result = await pipeline.process_documents(documents)

ProcessingResult

The result object containing processed chunks and metadata.

Attributes

  • chunks: List of CleanedChunk objects
  • stats: Processing statistics
  • status: Processing status (COMPLETED, FAILED, etc.)
  • errors: List of error messages
  • warnings: List of warning messages

Methods

  • to_dict(): Convert to dictionary for serialization
  • to_json(): Convert to JSON string

CleanedChunk

Represents a processed text chunk.

Attributes

  • text: The cleaned text content
  • text_hash: Hash of the text for deduplication
  • embedding: Vector embedding (numpy array)
  • chunk_index: Index of the chunk in the document
  • start_char: Starting character position
  • end_char: Ending character position
  • word_count: Number of words in the chunk
  • char_count: Number of characters in the chunk

Supported File Formats

VecClean supports processing various file formats:

  • PDF (.pdf) - Text extraction with metadata
  • Word Documents (.docx) - Text and formatting
  • PowerPoint (.pptx) - Text from slides
  • Text Files (.txt) - Plain text
  • HTML (.html, .htm) - Web content
  • Email (.eml) - Email messages

Performance Features

C++ Acceleration

VecClean includes C++ optimizations for high-performance text processing:

  • SIMD-optimized text cleaning
  • Parallel processing with work-stealing thread pools
  • Memory-efficient streaming for large files

Async Processing

All operations are asynchronous for better performance:

# Process multiple texts concurrently
tasks = [
    pipeline.process_text(text1),
    pipeline.process_text(text2),
    pipeline.process_text(text3)
]
results = await asyncio.gather(*tasks)

Error Handling

try:
    result = await pipeline.process_text(text)
    if result.status == ProcessingStatus.COMPLETED:
        print(f"Successfully processed {len(result.chunks)} chunks")
    else:
        print(f"Processing failed: {result.errors}")
except Exception as e:
    print(f"Error during processing: {e}")

Integration Examples

With FastAPI

from fastapi import FastAPI, UploadFile, File
from vecclean import Pipeline

app = FastAPI()
pipeline = Pipeline()

@app.post("/process-text")
async def process_text(text: str):
    result = await pipeline.process_text(text)
    return {
        "chunks": [chunk.to_dict() for chunk in result.chunks],
        "stats": result.stats.to_dict()
    }

@app.post("/process-file")
async def process_file(file: UploadFile = File(...)):
    content = await file.read()
    result = await pipeline.process_text(content.decode())
    return {"chunks": len(result.chunks)}

With LangChain

from langchain.text_splitter import TextSplitter
from vecclean import Pipeline

class VecCleanTextSplitter(TextSplitter):
    def __init__(self, pipeline: Pipeline):
        self.pipeline = pipeline
    
    async def split_text(self, text: str):
        result = await self.pipeline.process_text(text)
        return [chunk.text for chunk in result.chunks]

# Usage
pipeline = Pipeline()
splitter = VecCleanTextSplitter(pipeline)
chunks = await splitter.split_text("Your long text here")

Best Practices

  1. Batch Processing: Process multiple documents together for better performance
  2. Memory Management: Use streaming for very large files
  3. Configuration: Tune chunking parameters based on your use case
  4. Error Handling: Always check the processing status and handle errors
  5. Caching: Enable embedding caching for repeated processing

Troubleshooting

Common Issues

  1. C++ Backend Not Available: VecClean will fall back to Python implementation
  2. Memory Issues: Reduce batch size or use streaming for large files
  3. Slow Processing: Check if C++ backend is enabled and consider using GPU for embeddings

Debug Mode

Enable debug logging to see detailed processing information:

import logging
logging.basicConfig(level=logging.DEBUG)

pipeline = Pipeline()
# Processing will now show detailed logs

License

VecClean is licensed under the MIT License. See the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

vecclean-1.0.0.1.tar.gz (110.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

vecclean-1.0.0.1-py3-none-any.whl (109.9 kB view details)

Uploaded Python 3

File details

Details for the file vecclean-1.0.0.1.tar.gz.

File metadata

  • Download URL: vecclean-1.0.0.1.tar.gz
  • Upload date:
  • Size: 110.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.9.7

File hashes

Hashes for vecclean-1.0.0.1.tar.gz
Algorithm Hash digest
SHA256 bf10f6246d77c270acc49fce86a20c6dec1e53920efe24a06030c832fb8432df
MD5 4e12feec2f84dd97b78439f4fa49ee67
BLAKE2b-256 b5247f8bbce62fa3f8015422857744f91024d83e1c45ef828c774b4193ef308f

See more details on using hashes here.

File details

Details for the file vecclean-1.0.0.1-py3-none-any.whl.

File metadata

  • Download URL: vecclean-1.0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 109.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.9.7

File hashes

Hashes for vecclean-1.0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 909dfb01dfbf047e838498df0bc35ac4169013594db3cde0709ad9f05e75019b
MD5 294e1392de14e5806ccc51ea8420aab0
BLAKE2b-256 3bae8535850b197c13fad78179cb38b226808daea1fbf6e753f8305913cde077

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page