Skip to main content

Production-grade document chunking library for RAG systems - Rust-powered Python library

Project description

Krira Augment

High-Performance Rust Chunking Engine for RAG Pipelines

PyPI version License: MIT Rust

Process gigabytes of text in seconds. 40x faster than LangChain with O(1) memory usage.


Performance Benchmarks

Dataset Size LangChain/Pandas Krira (Rust) Speedup
100 MB ~45 sec 0.8 sec 56x
1 GB ~8.0 min 12.0 sec 40x
10 GB Timeout / OOM 2.1 min Stable

Memory stays constant (O(1)) regardless of file size.


Installation

pip install krira-augment

Complete Example: OpenAI + Pinecone

from krira_augment import Pipeline, PipelineConfig
import json
import openai
import pinecone

# API Keys
OPENAI_API_KEY = "sk-..."        # https://platform.openai.com/api-keys
PINECONE_API_KEY = "pcone-..."   # https://app.pinecone.io/
PINECONE_INDEX_NAME = "my-rag"

# Step 1: Chunk the file
config = PipelineConfig(chunk_size=512, chunk_overlap=50)
pipeline = Pipeline(config=config)
result = pipeline.process("sample.csv", output_path="chunks.jsonl")

print(f"Chunks Created: {result.chunks_created}")
print(f"Execution Time: {result.execution_time:.2f}s")
print(f"Throughput: {result.mb_per_second:.2f} MB/s")
print(f"Preview: {result.preview_chunks[:3]}")

# Step 2: Embed and store
openai.api_key = OPENAI_API_KEY
pinecone.init(api_key=PINECONE_API_KEY)
index = pinecone.Index(PINECONE_INDEX_NAME)

with open("chunks.jsonl", "r") as f:
    for line_num, line in enumerate(f, 1):
        chunk = json.loads(line)
        
        response = openai.Embedding.create(
            input=chunk["text"],
            model="text-embedding-3-small"
        )
        embedding = response["data"][0]["embedding"]
        
        index.upsert(vectors=[(f"chunk_{line_num}", embedding, chunk.get("metadata", {}))])
        
        if line_num % 100 == 0:
            print(f"Processed {line_num} chunks...")

print("Done! All chunks embedded and stored in Pinecone.")

Other Integrations

After running Step 1 (chunking), replace Step 2 with any of these integrations:

OpenAI + Qdrant

import openai
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct

openai.api_key = "sk-..."
qdrant = QdrantClient(url="https://xyz.qdrant.io", api_key="qdrant-...")

with open("chunks.jsonl", "r") as f:
    for line_num, line in enumerate(f, 1):
        chunk = json.loads(line)
        response = openai.Embedding.create(input=chunk["text"], model="text-embedding-3-small")
        embedding = response["data"][0]["embedding"]
        qdrant.upsert(collection_name="my-chunks", points=[PointStruct(id=line_num, vector=embedding, payload=chunk.get("metadata", {}))])
        
        if line_num % 100 == 0:
            print(f"Processed {line_num} chunks...")

OpenAI + Weaviate

import openai
import weaviate

openai.api_key = "sk-..."
client = weaviate.Client(
    url="https://xyz.weaviate.network",
    auth_client_secret=weaviate.AuthApiKey(api_key="weaviate-...")
)

with open("chunks.jsonl", "r") as f:
    for line_num, line in enumerate(f, 1):
        chunk = json.loads(line)
        response = openai.Embedding.create(input=chunk["text"], model="text-embedding-3-small")
        embedding = response["data"][0]["embedding"]
        client.data_object.create(
            data_object={"text": chunk["text"], "metadata": chunk.get("metadata", {})},
            class_name="Chunk",
            vector=embedding
        )
        
        if line_num % 100 == 0:
            print(f"Processed {line_num} chunks...")

Cohere + Pinecone

import cohere
import pinecone

co = cohere.Client("co-...")
pinecone.init(api_key="pcone-...")
index = pinecone.Index("my-rag")

with open("chunks.jsonl", "r") as f:
    for line_num, line in enumerate(f, 1):
        chunk = json.loads(line)
        response = co.embed(texts=[chunk["text"]], model="embed-english-v3.0")
        embedding = response.embeddings[0]
        index.upsert(vectors=[(f"chunk_{line_num}", embedding, chunk.get("metadata", {}))])
        
        if line_num % 100 == 0:
            print(f"Processed {line_num} chunks...")

Cohere + Qdrant

import cohere
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct

co = cohere.Client("co-...")
qdrant = QdrantClient(url="https://xyz.qdrant.io", api_key="qdrant-...")

with open("chunks.jsonl", "r") as f:
    for line_num, line in enumerate(f, 1):
        chunk = json.loads(line)
        response = co.embed(texts=[chunk["text"]], model="embed-english-v3.0")
        embedding = response.embeddings[0]
        qdrant.upsert(
            collection_name="my-chunks",
            points=[PointStruct(id=line_num, vector=embedding, payload=chunk.get("metadata", {}))]
        )
        
        if line_num % 100 == 0:
            print(f"Processed {line_num} chunks...")

Local (Sentence Transformers) + ChromaDB (FREE)

from sentence_transformers import SentenceTransformer
import chromadb
import json

model = SentenceTransformer('all-MiniLM-L6-v2')
client = chromadb.Client()
collection = client.create_collection("my_chunks")

with open("chunks.jsonl", "r") as f:
    for line_num, line in enumerate(f, 1):
        chunk = json.loads(line)
        embedding = model.encode(chunk["text"])
        collection.add(
            ids=[f"chunk_{line_num}"],
            embeddings=[embedding.tolist()],
            metadatas=[chunk.get("metadata", {})],
            documents=[chunk["text"]]
        )
        
        if line_num % 100 == 0:
            print(f"Processed {line_num} chunks...")

Hugging Face + FAISS (FREE)

from transformers import AutoTokenizer, AutoModel
import torch
import faiss
import numpy as np
import json

tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
index = faiss.IndexFlatL2(384)

embeddings_list = []
with open("chunks.jsonl", "r") as f:
    for line_num, line in enumerate(f, 1):
        chunk = json.loads(line)
        inputs = tokenizer(chunk["text"], return_tensors="pt", truncation=True, max_length=512)
        with torch.no_grad():
            outputs = model(**inputs)
            embedding = outputs.last_hidden_state.mean(dim=1).squeeze().numpy()
        embeddings_list.append(embedding)
        
        if line_num % 100 == 0:
            print(f"Processed {line_num} chunks...")

embeddings_array = np.array(embeddings_list).astype('float32')
index.add(embeddings_array)
faiss.write_index(index, "my_vectors.index")
print("Done! Vectors saved to my_vectors.index")

Streaming Mode (No Files)

Process chunks without saving to disk - maximum efficiency for real-time pipelines:

Complete Example: OpenAI + Pinecone (Streaming)

from krira_augment import Pipeline, PipelineConfig
import openai
import pinecone

# API Keys
OPENAI_API_KEY = "sk-..."        # https://platform.openai.com/api-keys
PINECONE_API_KEY = "pcone-..."   # https://app.pinecone.io/
PINECONE_INDEX_NAME = "my-rag"

# Initialize
openai.api_key = OPENAI_API_KEY
pinecone.init(api_key=PINECONE_API_KEY)
index = pinecone.Index(PINECONE_INDEX_NAME)

# Configure pipeline
config = PipelineConfig(chunk_size=512, chunk_overlap=50)
pipeline = Pipeline(config=config)

# Stream and embed (no file created)
chunk_count = 0
print("Starting streaming pipeline...")

for chunk in pipeline.process_stream("data.csv"):
    chunk_count += 1
    
    # Embed
    response = openai.Embedding.create(
        input=chunk["text"],
        model="text-embedding-3-small"
    )
    embedding = response["data"][0]["embedding"]
    
    # Store immediately
    index.upsert(vectors=[(
        f"chunk_{chunk_count}",
        embedding,
        chunk["metadata"]
    )])
    
    # Progress
    if chunk_count % 100 == 0:
        print(f"Processed {chunk_count} chunks...")

print(f"Done! Embedded {chunk_count} chunks. No intermediate file created.")

Other Streaming Integrations

Replace the embedding/storage logic with any of these:

OpenAI + Qdrant (Streaming)

from krira_augment import Pipeline, PipelineConfig
import openai
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct

# Initialize
openai.api_key = "sk-..."
qdrant = QdrantClient(url="https://xyz.qdrant.io", api_key="qdrant-...")

# Configure and stream
config = PipelineConfig(chunk_size=512, chunk_overlap=50)
pipeline = Pipeline(config=config)

chunk_count = 0
for chunk in pipeline.process_stream("data.csv"):
    chunk_count += 1
    
    # Embed
    response = openai.Embedding.create(input=chunk["text"], model="text-embedding-3-small")
    embedding = response["data"][0]["embedding"]
    
    # Store
    qdrant.upsert(
        collection_name="my-chunks",
        points=[PointStruct(id=chunk_count, vector=embedding, payload=chunk["metadata"])]
    )
    
    if chunk_count % 100 == 0:
        print(f"Processed {chunk_count} chunks...")

print(f"Done! {chunk_count} chunks embedded.")

OpenAI + Weaviate (Streaming)

from krira_augment import Pipeline, PipelineConfig
import openai
import weaviate

# Initialize
openai.api_key = "sk-..."
client = weaviate.Client(
    url="https://xyz.weaviate.network",
    auth_client_secret=weaviate.AuthApiKey(api_key="weaviate-...")
)

# Configure and stream
config = PipelineConfig(chunk_size=512, chunk_overlap=50)
pipeline = Pipeline(config=config)

chunk_count = 0
for chunk in pipeline.process_stream("data.csv"):
    chunk_count += 1
    
    # Embed
    response = openai.Embedding.create(input=chunk["text"], model="text-embedding-3-small")
    embedding = response["data"][0]["embedding"]
    
    # Store
    client.data_object.create(
        data_object={"text": chunk["text"], "metadata": chunk["metadata"]},
        class_name="Chunk",
        vector=embedding
    )
    
    if chunk_count % 100 == 0:
        print(f"Processed {chunk_count} chunks...")

print(f"Done! {chunk_count} chunks embedded.")

Cohere + Pinecone (Streaming)

from krira_augment import Pipeline, PipelineConfig
import cohere
import pinecone

# Initialize
co = cohere.Client("co-...")
pinecone.init(api_key="pcone-...")
index = pinecone.Index("my-rag")

# Configure and stream
config = PipelineConfig(chunk_size=512, chunk_overlap=50)
pipeline = Pipeline(config=config)

chunk_count = 0
for chunk in pipeline.process_stream("data.csv"):
    chunk_count += 1
    
    # Embed
    response = co.embed(texts=[chunk["text"]], model="embed-english-v3.0")
    embedding = response.embeddings[0]
    
    # Store
    index.upsert(vectors=[(f"chunk_{chunk_count}", embedding, chunk["metadata"])])
    
    if chunk_count % 100 == 0:
        print(f"Processed {chunk_count} chunks...")

print(f"Done! {chunk_count} chunks embedded.")

Cohere + Qdrant (Streaming)

from krira_augment import Pipeline, PipelineConfig
import cohere
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct

# Initialize
co = cohere.Client("co-...")
qdrant = QdrantClient(url="https://xyz.qdrant.io", api_key="qdrant-...")

# Configure and stream
config = PipelineConfig(chunk_size=512, chunk_overlap=50)
pipeline = Pipeline(config=config)

chunk_count = 0
for chunk in pipeline.process_stream("data.csv"):
    chunk_count += 1
    
    # Embed
    response = co.embed(texts=[chunk["text"]], model="embed-english-v3.0")
    embedding = response.embeddings[0]
    
    # Store
    qdrant.upsert(
        collection_name="my-chunks",
        points=[PointStruct(id=chunk_count, vector=embedding, payload=chunk["metadata"])]
    )
    
    if chunk_count % 100 == 0:
        print(f"Processed {chunk_count} chunks...")

print(f"Done! {chunk_count} chunks embedded.")

Local (Sentence Transformers) + ChromaDB (Streaming, FREE)

from krira_augment import Pipeline, PipelineConfig
from sentence_transformers import SentenceTransformer
import chromadb

# Initialize (no API keys needed)
model = SentenceTransformer('all-MiniLM-L6-v2')
client = chromadb.Client()
collection = client.create_collection("my_chunks")

# Configure and stream
config = PipelineConfig(chunk_size=512, chunk_overlap=50)
pipeline = Pipeline(config=config)

chunk_count = 0
for chunk in pipeline.process_stream("data.csv"):
    chunk_count += 1
    
    # Embed locally (free, runs on your machine)
    embedding = model.encode(chunk["text"])
    
    # Store locally
    collection.add(
        ids=[f"chunk_{chunk_count}"],
        embeddings=[embedding.tolist()],
        metadatas=[chunk["metadata"]],
        documents=[chunk["text"]]
    )
    
    if chunk_count % 100 == 0:
        print(f"Processed {chunk_count} chunks...")

print(f"Done! {chunk_count} chunks embedded. All local, no API costs.")

Hugging Face + FAISS (Streaming, FREE)

from krira_augment import Pipeline, PipelineConfig
from transformers import AutoTokenizer, AutoModel
import torch
import faiss
import numpy as np

# Initialize (no API keys needed)
tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
index = faiss.IndexFlatL2(384)

# Configure and stream
config = PipelineConfig(chunk_size=512, chunk_overlap=50)
pipeline = Pipeline(config=config)

chunk_count = 0
embeddings_batch = []
BATCH_SIZE = 100  # Process in batches for efficiency

for chunk in pipeline.process_stream("data.csv"):
    chunk_count += 1
    
    # Embed locally
    inputs = tokenizer(chunk["text"], return_tensors="pt", truncation=True, max_length=512)
    with torch.no_grad():
        outputs = model(**inputs)
        embedding = outputs.last_hidden_state.mean(dim=1).squeeze().numpy()
    
    embeddings_batch.append(embedding)
    
    # Add to FAISS in batches
    if len(embeddings_batch) >= BATCH_SIZE:
        embeddings_array = np.array(embeddings_batch).astype('float32')
        index.add(embeddings_array)
        embeddings_batch = []
        print(f"Processed {chunk_count} chunks...")

# Add remaining embeddings
if embeddings_batch:
    embeddings_array = np.array(embeddings_batch).astype('float32')
    index.add(embeddings_array)

# Save index
faiss.write_index(index, "my_vectors.index")
print(f"Done! {chunk_count} chunks embedded and saved to my_vectors.index")

Streaming Mode Advantages

Feature File-Based Streaming
Disk I/O Creates chunks.jsonl None
Memory Usage O(1) constant O(1) constant
Speed Chunking + Embedding Overlapped (faster)
Use Case Large files, batch processing Real-time, no storage
Flexibility Can re-process chunks Single pass only

When to Use Streaming vs File-Based

Use Streaming When:

  • You want maximum speed (no disk writes)
  • You don't need to save chunks for later
  • You're building real-time pipelines
  • You have limited disk space

Use File-Based When:

  • You want to inspect/debug chunks
  • You need to re-process with different embeddings
  • You want to share chunks with your team
  • You're experimenting with different models

Error Handling (Production Ready)

from krira_augment import Pipeline, PipelineConfig
import openai
import pinecone
import time

openai.api_key = "sk-..."
pinecone.init(api_key="pcone-...")
index = pinecone.Index("my-rag")

config = PipelineConfig(chunk_size=512, chunk_overlap=50)
pipeline = Pipeline(config=config)

chunk_count = 0
error_count = 0

for chunk in pipeline.process_stream("data.csv"):
    chunk_count += 1
    
    try:
        # Embed
        response = openai.Embedding.create(input=chunk["text"], model="text-embedding-3-small")
        embedding = response["data"][0]["embedding"]
        
        # Store
        index.upsert(vectors=[(f"chunk_{chunk_count}", embedding, chunk["metadata"])])
        
    except Exception as e:
        error_count += 1
        print(f"Error on chunk {chunk_count}: {e}")
        
        # Retry logic
        if "rate_limit" in str(e).lower():
            print("Rate limited, waiting 60 seconds...")
            time.sleep(60)
            # Retry (add your retry logic here)
    
    if chunk_count % 100 == 0:
        print(f"Processed {chunk_count} chunks, {error_count} errors")

print(f"Done! {chunk_count} chunks processed, {error_count} errors")

Supported Formats

Format Extension Method
CSV .csv Direct processing
Text .txt Direct processing
JSONL .jsonl Direct processing
JSON .json Auto-flattening
PDF .pdf pdfplumber extraction
Word .docx python-docx extraction
Excel .xlsx openpyxl extraction
XML .xml ElementTree parsing
URLs http:// BeautifulSoup scraping

Provider Comparison

Embedding Vector Store Cost API Keys Streaming Support
OpenAI Pinecone Paid 2 ✅ Yes
OpenAI Qdrant Paid 2 ✅ Yes
OpenAI Weaviate Paid 2 ✅ Yes
Cohere Pinecone Paid 2 ✅ Yes
Cohere Qdrant Paid 2 ✅ Yes
SentenceTransformers ChromaDB FREE 0 ✅ Yes
Hugging Face FAISS FREE 0 ✅ Yes

API Keys Setup

Get your keys from:


Development

  1. Clone the repo
  2. Install Maturin
    pip install maturin
    
  3. Build and Install locally
    python -m build
    pip install dist/*.whl --force-reinstall
    

License

MIT License. (c) 2025 Krira Labs.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

krira_augment-2.1.0.tar.gz (164.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

krira_augment-2.1.0-cp313-cp313-win_amd64.whl (716.6 kB view details)

Uploaded CPython 3.13Windows x86-64

File details

Details for the file krira_augment-2.1.0.tar.gz.

File metadata

  • Download URL: krira_augment-2.1.0.tar.gz
  • Upload date:
  • Size: 164.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for krira_augment-2.1.0.tar.gz
Algorithm Hash digest
SHA256 433167615b69bd651814f41aa650c59c31df7aa1cc725cef93a82523145c00e2
MD5 959fb985d9b1116ce8ffab05fb8d9705
BLAKE2b-256 41d2fc97d63396fe8e8092220d225bf7b7fac3dc772d350f6828211a661bfbb8

See more details on using hashes here.

File details

Details for the file krira_augment-2.1.0-cp313-cp313-win_amd64.whl.

File metadata

File hashes

Hashes for krira_augment-2.1.0-cp313-cp313-win_amd64.whl
Algorithm Hash digest
SHA256 6716929232f1d363a7c7c151b8d661a29a99e97cb14e10de3c9fbd36589b8c25
MD5 177a6f4eaba83aeac2a08a13709d1b0b
BLAKE2b-256 1920a23e7d0a197b23f43009ea80d4ab269e1f05dcc9d7f9aa5824daec961060

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page