Skip to main content

AI Framework for fast integration of Private Data and LLM, Agent Ochestration Platform

Project description

LangXChange Framework

LangXChange Python License Version

A comprehensive Python Framework for LLM operations, vector databases, RAG implementations, Knowledge Graph (GraphRAG) construction, database integration, MCP Service Management and local model management

InstallationQuick StartDocumentationExamplesRAG Tutorial

🌟 Overview

LangXChange is a powerful, comprehensive Python Framework designed to streamline LLM operations and Retrieval-Augmented Generation (RAG) implementations. It provides unified interfaces for multiple LLM providers, vector databases, document processing, database integration, and local model management.

🚀 Key Features

  • 🤖 Multi-LLM Support: OpenAI, Anthropic, Google GenAI, DeepSeek, Llama, MiniMax
  • 🔍 Vector Databases: ChromaDB, Pinecone, FAISS integration
  • 📄 Document Processing: Universal document loader with multiple formats
  • 🧠 RAG Implementation: Two-stage retrieval with cross-encoder
  • 🕸️ GraphRAG: Knowledge Graph extraction from documents and Neo4j-based retrieval
  • 🏗️ Structured Parser Registry: Extensible domain-specific parsing (e.g., Curriculum, Legal) with zero-dependency base library
  • 🏠 Local LLM: Model downloading, fine-tuning, quantization
  • 💾 Database Integration: MySQL, MongoDB support
  • 🛠️ MCP Services: Standardized interoperability with any MCP server (stdio/SSE)
  • 💰 Cost Tracking: Built-in cost monitoring for API calls
  • ⚠️ Specialized Errors: Unified InsufficientQuotaError across all major providers (OpenAI, Anthropic, Google, DeepSeek, MiniMax)
  • ⚡ Performance: Hybrid Orchestration (Fast-Path for small files), Async operations, Native Batch Processing
  • 🔍 Parallel RetrieverX: Concurrent retrieval from VectorDB, GraphDB, MCP, GCS, and more with RRF fusion
  • 🕵️ Tracing & Observability: Integrated request-level tracing with TraceSpan and programmatic callbacks (v0.9.6+)
  • 🧠 Enhanced Prompting: Strategic RAG grounding with multiple modes (CONTEXTUAL, AUGMENTED, SUMMARIZED) (v0.9.7+)
  • 🕸️ Knowledge Graph: Parallel entity and relationship extraction for graph construction
  • 🚀 Real-time Monitoring: Stream trace spans directly to your frontend via Server-Sent Events (SSE)

🆕 What's New in [v0.10.11] - 2026-04-03

  • MongoDB Graph Support: Introduced MongoGraphHelper for using MongoDB as a property graph backend with $graphLookup traversal support.
  • Generic GraphSourceAdapter: Refactored ParallelRetrieverX adapters to support any graph backend (Neo4j, Memgraph, MongoDB) seamlessly.
  • Async Vector Search: Added search_similar_by_vector_async support in GraphSourceAdapter.

[v0.10.10] - 2026-03-29

Standardized GraphRAG Support: Unified achat() and chat() interfaces across all LLM helpers (Anthropic, DeepSeek, Google GenAI, OpenAI, MiniMax) for seamless Knowledge Graph construction.

  • Anthropic API Robustness: Fixed 400 Bad Request errors by normalizing system parameters into structured content blocks.
  • Enhanced Provider Consistency: Improved graph_service.py and worker.py to strictly respect external credential modes for agents.

🆕 What's New in v0.10.6

  • Fixed MiniMax 400 Error: Updated MiniMax endpoint and sanitized message content to prevent "chat content is empty" errors.
  • Fixed Typing Imports: Resolved NameError: name 'Callable' is not defined in google_genai_helper.py.

🆕 What's New in v0.10.5

  • Token-Aware Context Truncation: Added manage_context_length to OpenAIHelper for precise window management.
  • Robust RAG Trimming: Enhanced PromptBuilder to strictly respect max_context_length for retrieval results (Vector, Graph, MCP).
  • Enhanced OpenAI Helper: Improved token counting accuracy using tiktoken (cl100k_base fallback for newer models).

🆕 What's New in v0.10.4

  • Neo4j SSL/TLS Support: Added support for encrypted Bolt connections (bolt+s://, bolt+ssc://).
  • Neo4j 6.x Compatibility: Added explicit trust handling for self-signed certificates in newer Neo4j drivers by using TrustAll() and trusted_certificates.
  • Fixed Configuration Mapping: Corrected prioritization of connection_string over host in configuration parsing to prevent protocol loss.

🆕 What's New in v0.10.3

  • Neo4j Config Evolution: Fixed from_dict() to correctly map the host field, improving compatibility with various graph configuration formats.
  • Fixed Credential Overrides: Environment variables now only serve as fallback defaults in Neo4jConfig, ensuring explicitly provided credentials (e.g., from dynamic configs) are never overridden.

What's New in v0.10.0

  • MiniMax Endpoint Fix: Updated MiniMax API base URL from retired api.minimax.chatapi.minimax.io (international domain) and embedding path text/embeddingembeddings (OpenAI-compatible)
  • MiniMax Cache Bug Fix: Fixed _cache_key() got multiple values for keyword argument 'model' error in achat() caused by duplicate kwargs

📦 Installation

Via PyPI (Recommended)

pip install langxchange

Environment Variables

Create a .env file in your project directory:

# OpenAI
OPENAI_API_KEY=your_openai_key

# MySQL Configuration
MYSQL_HOST=localhost
MYSQL_DB=your_database
MYSQL_USER=your_username
MYSQL_PASSWORD=your_password
MYSQL_PORT=3306
MYSQL_CHARSET=utf8mb4

# ChromaDB
CHROMA_PERSIST_PATH=./chroma_db

# Vector Databases
PINECONE_API_KEY=your_pinecone_key
PINECONE_ENVIRONMENT=your_environment

# Milvus Configuration
MILVUS_HOST=localhost
MILVUS_PORT=19530
MILVUS_API_KEY=your_milvus_token  # Optional for local, required for Zilliz Cloud

# Elasticsearch Configuration
ELASTICSEARCH_HOST=http://localhost:9200
ELASTICSEARCH_API_KEY=your_api_key

🚀 Quick Start

import os
from langxchange.openai_helper import EnhancedOpenAIHelper, OpenAIConfig
from langxchange.chroma_helper import EnhancedChromaHelper, ChromaConfig
from langxchange.documentloader import DocumentLoaderHelper, ChunkingStrategy
from langxchange.embeddings import EmbeddingHelper

# Set API key (or use environment variable)
os.environ["OPENAI_API_KEY"] = "your-api-key"

# Configure OpenAI with enhanced settings
openai_config = OpenAIConfig(
    chat_model="gpt-4",
    enable_caching=True,
    enable_cost_tracking=True,
    max_retries=3
)

# Initialize OpenAI client
llm = EnhancedOpenAIHelper(openai_config)

# Configure ChromaDB with enhanced settings
chroma_config = ChromaConfig(
    persist_directory="./chroma_db",
    batch_size=100,
    progress_bar=True
)

# Initialize ChromaDB vector store
chroma = EnhancedChromaHelper(llm, chroma_config)

# Load and process documents with semantic chunking
loader = DocumentLoaderHelper(
    chunking_strategy=ChunkingStrategy.SEMANTIC,
    chunk_size=800,
    preserve_formatting=True
)

# Process documents and store in vector database
documents = list(loader.load("document.pdf"))
chroma.insert_documents(
    collection_name="my_collection",
    documents=[doc.content for doc in documents],
    metadatas=[doc.metadata for doc in documents],
    generate_embeddings=True
)

# Query the vector database
results = chroma.query_collection(
    collection_name="my_collection",
    query_text="What is machine learning?",
    top_k=5
)

### 🔍 Elasticsearch Vector Store (New in v0.9.9)

```python
from langxchange.elasticsearch_helper import EnhancedElasticsearchHelper, ElasticsearchConfig
from langxchange.openai_helper import EnhancedOpenAIHelper

# Initialize LLM helper for embeddings
llm = EnhancedOpenAIHelper()

# Configure Elasticsearch
es_config = ElasticsearchConfig(
    host="https://your-elasticsearch-host.com",
    api_key="your-base64-api-key",  # Enhanced support for API keys
    embedding_dim=1536
)

# Initialize helper
es_helper = EnhancedElasticsearchHelper(llm_helper=llm, config=es_config)

# Store and Query
es_helper.insert_documents(
    collection_name="my_index",  # Equivalent to index_name
    documents=["LangXChange is a comprehensive LLM framework."],
    generate_embeddings=True
)

results = es_helper.query(
    collection_name="my_index",
    query_text="What is LangXChange?",
    top_k=1
)

## 📚 Modules

### 🤖 LLM Providers

#### OpenAI Integration (`openai_helper.py`)
```python
from langxchange.openai_helper import EnhancedOpenAIHelper, OpenAIConfig

# Configure OpenAI with enhanced settings
open_ai_config = OpenAIConfig(
    chat_model="gpt-4",
    enable_caching=True,
    enable_cost_tracking=True,
    max_retries=3,
    log_level="INFO"
)

openai = EnhancedOpenAIHelper(open_ai_config)

response = openai.generate(
    prompt="Explain quantum computing in simple terms.",
    system_message="You are a helpful AI assistant."
)

# Cost tracking
cost = openai.get_cost_summary()
print(f"Total cost: ${cost['total_cost']:.4f}")

Features:

  • ✅ API key management and validation
  • ✅ Response caching for cost optimization
  • ✅ Cost tracking and reporting
  • ✅ Support for all OpenAI models (GPT-3.5, GPT-4, embeddings)
  • ✅ Batch processing capabilities
  • ✅ Error handling and retry logic

Anthropic Integration (anthropic_helper.py)

import os
from langxchange.anthropic_helper import EnhancedAnthropicHelper, AnthropicConfig

# Set API key (or use environment variable)
os.environ["ANTHROPIC_API_KEY"] = "your-anthropic-key"

# Configure Anthropic with enhanced settings
anthropic_config = AnthropicConfig(
    model="claude-3-sonnet-20240229",
    enable_caching=True,
    enable_cost_tracking=True,
    max_retries=3,
    log_level="INFO"
)

# Initialize Anthropic client
anthropic = EnhancedAnthropicHelper(anthropic_config)

response = anthropic.generate(
    prompt="Analyze the following text for sentiment and key themes.",
    max_tokens=500,
    system_message="You are a helpful AI assistant."
)

# Cost tracking
cost = anthropic.get_cost_summary()
print(f"Total cost: ${cost['total_cost']:.4f}")
  • ✅ Claude model support (Haiku, Sonnet, Opus, 3.5, 3.7)
  • ✅ Enhanced configuration and caching
  • ✅ Cost tracking and reporting
  • ✅ Context window optimization
  • ✅ Token counting and cost estimation
  • ✅ Streaming responses
  • Native Tool-Calling Support (OpenAI-compatible formatting)
  • Message Normalization (supports tool_use/tool_result blocks)
  • Fixed empty response bug for Anthropic provider
  • Fixed generator return bug in sync chat calls
  • Improved message content block handling (supports objects and dicts)
  • ✅ Error handling and retry logic

Google GenAI Integration (google_genai_helper.py) — Updated in v0.5.1

import os
import asyncio
from langxchange.google_genai_helper import EnhancedGoogleGenAIHelper, GoogleGenAIHelper

# Set API key (or use environment variable)
os.environ["GOOGLE_API_KEY"] = "your-google-key"

# ── EnhancedGoogleGenAIHelper (recommended) ──────────────────────────────────
google_genai = EnhancedGoogleGenAIHelper(
    api_key="your-google-key",
    chat_model="gemini-2.0-flash",
    vision_model="gemini-2.0-flash",
    tts_model="gemini-2.0-flash-preview-tts",
    enable_usage_tracking=True,
    enable_context_caching=True
)

# ── Synchronous multi-turn chat ───────────────────────────────────────────────
messages = [
    {"role": "system",    "content": "You are a helpful research assistant."},
    {"role": "user",      "content": "Summarize the following research paper."},
]
response, usage = google_genai.chat(messages, temperature=0.3, max_tokens=1000)
print(response)

# ── Async chat (non-blocking, for FastAPI / async backends) ───────────────────
async def async_example():
    response_text, usage, tool_calls = await google_genai.chat_async(
        messages=messages,
        temperature=0.7,
        max_tokens=2000,
    )
    print(response_text)
    print(f"Tokens used: {usage.get('total_tokens', 0)}")

asyncio.run(async_example())

#### 🕸️ GraphRAG & Knowledge Graph (`kg_builder.py`, `neo4j_helper.py`)

LangXChange supports building and querying Knowledge Graphs for advanced RAG (GraphRAG). This allows for complex relationship-aware retrieval beyond simple vector similarity.

```python
import asyncio
from langxchange.documentloader import DocumentLoaderHelper
from langxchange.openai_helper import EnhancedOpenAIHelper, OpenAIConfig
from langxchange.kg_builder import KnowledgeGraphBuilder
from langxchange.neo4j_helper import Neo4jHelper, Neo4jConfig

async def graph_rag_example():
    # 1. Initialize Helpers
    llm = EnhancedOpenAIHelper(OpenAIConfig(chat_model="gpt-4"))
    neo4j = Neo4jHelper(Neo4jConfig(
        uri="bolt://localhost:7687", 
        username="neo4j", 
        password="password"
    ))
    
    # 2. Build Knowledge Graph from Document
    loader = DocumentLoaderHelper()
    chunks = [c.content for c in loader.load("manual.docx")]
    
    kg_builder = KnowledgeGraphBuilder(llm_helper=llm)
    triples = await kg_builder.process_chunks(chunks)
    
    # 3. Store in Neo4j
    neo4j.connect()
    for t in triples:
        neo4j.merge_node(label=t.subject_type, properties={"name": t.subject}, merge_on="name")
        neo4j.merge_node(label=t.object_type, properties={"name": t.object}, merge_on="name")
        neo4j.merge_relationship(
            source_label=t.subject_type, source_match={"name": t.subject},
            target_label=t.object_type, target_match={"name": t.object},
            rel_type=t.predicate.upper().replace(" ", "_")
        )

    # 4. Perform GraphRAG Query
    query = "How does component X interact with Y?"
    # (Optional: Use LLM to extract keywords for Neo4j search)
    graph_context = "..." # Retrieved via neo4j.execute_read(...)
    response = await llm.achat([{"role": "user", "content": f"Context: {graph_context}\nQuery: {query}"}])
    print(response)

asyncio.run(graph_rag_example())

🔍 Parallel RetrieverX (retrieverX.py)

LangXChange features a high-performance Parallel RetrieverX engine that handles concurrent retrieval from multiple heterogeneous sources (VectorDB, Knowledge Graphs, MCP Tools, GCS, and Local Files). It uses Reciprocal Rank Fusion (RRF) to merge results into a unified, ranked context.

import asyncio
from langxchange.retrieverX import ParallelRetrieverX, RetrievalSource
# ... initialize your adapters ...

async def parallel_retrieval_example():
    # 1. Initialize RetrieverX
    retriever = ParallelRetrieverX(max_workers=10)
    
    # 2. Register Sources with Adapters
    retriever.add_source(
        source_type=RetrievalSource.VECTOR,
        adapter=chroma_adapter,  # Instance of VectorSourceAdapter
        weight=1.0
    )
    retriever.add_source(
        source_type=RetrievalSource.GRAPH,
        adapter=graph_adapter,   # Instance of GraphSourceAdapter
        weight=1.5
    )
    
    # 3. Perform Concurrent Retrieval
    results = await retriever.retrieve(
        query="What are the safety protocols for process X?",
        top_k=5
    )
    
    # 4. Access Merged Context
    for res in results:
        print(f"[{res.source}] (Score: {res.score:.2f}) {res.document[:100]}...")
        
asyncio.run(parallel_retrieval_example())

Features:

  • Native Concurrency: Parallel execution of all retrieval tasks using Python's asyncio.
  • RRF Fusion: Built-in Reciprocal Rank Fusion for high-quality merged results.
  • Heterogeneous Sources: Unified adapter interface for VectorDB, GraphDB, MCP, GCS, and Drive.
  • Ranking Optimization: Weighted boosting for specific sources (e.g., GraphDB > VectorDB).
  • Standardized Output: Returns RetrievalResult objects with consistent metadata.

Features:

  • Async Support: Full async extraction and database operations.

🍃 MongoDB Graph Support (mongograph_helper.py) — New in v0.10.11

LangXChange now supports using MongoDB as a high-performance property graph database. This is ideal for teams already using MongoDB who want GraphRAG capabilities without managing a separate Neo4j/Memgraph instance.

import asyncio
from langxchange.mongograph_helper import MongoGraphHelper, MongoGraphConfig

async def mongo_graph_example():
    # 1. Configure MongoDB Graph
    config = MongoGraphConfig(
        uri="mongodb://admin:password@localhost:27017/graph_db?authSource=admin",
        node_collection="nodes",
        edge_collection="edges"
    )
    
    # 2. Initialize Helper
    helper = MongoGraphHelper(config)
    await helper.connect_async()
    
    # 3. Store Entities and Relationships
    await helper.merge_node_async(label="Company", properties={"id": "c1", "name": "LangXChange"})
    await helper.merge_node_async(label="Technology", properties={"id": "t1", "name": "MongoDB"})
    
    await helper.merge_relationship_async(
        source_label="Company", source_match={"id": "c1"},
        target_label="Technology", target_match={"id": "t1"},
        rel_type="USES_BACKEND"
    )
    
    # 4. Traversal ($graphLookup)
    neighbors = helper.get_neighbors("c1", direction="out")
    for n in neighbors:
        print(f"Connected to: {n['to_id']} via {n['type']}")

asyncio.run(mongo_graph_example())

🧠 Enhanced Prompt Strategy (prompt_helper.py) — New in v0.9.7

The EnhancedPromptHelper provides advanced strategies for building LLM prompts from retrieval results. It supports multiple grounding modes to optimize how context is presented to the model.

from langxchange.prompt_helper import EnhancedPromptHelper, PromptMode

# Initialize with default mode (BASIC)
prompt_helper = EnhancedPromptHelper(
    llm=openai,
    system_prompt="You are a helpful assistant.",
    default_mode=PromptMode.BASIC
)

# 1. BASIC Mode: Standard prompt without retrieval augmentation
response = prompt_helper.run(user_query="Hello!")

# 2. AUGMENTED Mode: Standard RAG (Context injected into User Query)
response = prompt_helper.run(
    user_query="What is the safety protocol?",
    retrieval_results=results,
    mode=PromptMode.AUGMENTED
)

# 3. CONTEXTUAL Mode: Multi-turn Grounding (Context as Assistant messages)
# Best for maintaining conversational flow and strict grounding
response = prompt_helper.run(
    user_query="Tell me about project X",
    retrieval_results=results,
    mode=PromptMode.CONTEXTUAL
)

# 4. SUMMARIZED Mode: Concise snippets
response = prompt_helper.run(
    user_query="Summarize the main points",
    retrieval_results=results,
    mode=PromptMode.SUMMARIZED,
    max_snippets=2
)

#### 🛡️ Response Validation & Multi-Pass (`prompt_pass`) — **New in v0.9.8**

The `prompt_pass` feature allows for automatic multi-pass validation and refinement of LLM responses. This is critical for ensuring quality, adherence to guidelines, and factual grounding.

```python
from langxchange.prompt_helper import EnhancedPromptHelper

# Initialize helper
prompt_helper = EnhancedPromptHelper(llm=openai, system_prompt="Answer only using the context.")

# Run with 2 validation passes
# Pass 1: Initial generation
# Pass 2: LLM-based validation and optional refinement/re-generation
result = prompt_helper.run(
    user_query="What is the safety protocol?",
    retrieval_results=results,
    prompt_pass=2
)

# Access refined response and validation metadata
print(f"Final Response: {result['response']}")
print(f"Total passes: {result['pass_info']['total_passes']}")
print(f"Validation history: {result['pass_info']['validation_history']}")

Features:

  • Multi-Pass Reasoning: Automatically detect and correct response errors.
  • JSON-Based Validation: Precise quality checking using internal LLM-as-a-judge patterns.
  • Custom Criteria: Pass a validation function (bool) for hard constraints.
  • Feedback Loop: Automatically provides feedback to the LLM for refined generation.

**Prompt Modes:**
- ✅ **BASIC**: Direct interaction using only the user query.
- ✅ **AUGMENTED**: Traditional RAG; injects context snippets into the user's message.
- ✅ **CONTEXTUAL**: New strategic mode that presents retrieved context as specialized assistant reference messages. This improves model adherence to provided facts and maintains cleaner user message history.
- ✅ **SUMMARIZED**: Automatically truncates and summarizes retrieval hits for token-efficient processing.

#### ── Streaming chat ────────────────────────────────────────────────────────────
async def stream_example():
    async for chunk in google_genai.chat_stream(messages, temperature=0.7):
        print(chunk, end="", flush=True)

asyncio.run(stream_example())

# ── Tool / function calling ───────────────────────────────────────────────────
tools = [{
    "type": "function",
    "function": {
        "name": "get_weather",
        "description": "Get current weather for a city",
        "parameters": {
            "type": "object",
            "properties": {"city": {"type": "string"}},
            "required": ["city"]
        }
    }
}]

async def tool_example():
    content, usage, tool_calls = await google_genai.chat_async(
        messages=[{"role": "user", "content": "What's the weather in London?"}],
        tools=tools
    )
    if tool_calls:
        for call in tool_calls:
            print(f"Tool: {call['function']['name']}, Args: {call['function']['arguments']}")

asyncio.run(tool_example())

# ── Multi-modal vision processing ─────────────────────────────────────────────
vision_response = google_genai.chat_with_vision(
    text="Analyze this image for key insights",
    image_path="research_chart.png"
)

# ── Text-to-speech generation ─────────────────────────────────────────────────
audio_path = google_genai.text_to_speech(
    text="Welcome to the research presentation",
    voice="Zephyr",
    output_format="wav"
)

# ── Usage tracking ────────────────────────────────────────────────────────────
stats = google_genai.get_usage_statistics()
print(f"Total requests: {stats.chat_requests}")
print(f"Input tokens:   {stats.total_input_tokens}")

# ── Lightweight GoogleGenAIHelper (simple use cases) ─────────────────────────
llm = GoogleGenAIHelper(chat_model="gemini-2.0-flash")
async def simple_chat():
    reply = await llm.chat(messages)
    print(reply)
asyncio.run(simple_chat())

Features (v0.5.1):

  • Native multi-turn chat — messages correctly formatted as types.Content with user/model roles
  • System instruction supportsystem messages extracted and passed via GenerateContentConfig.system_instruction
  • Async chat (chat_async) — non-blocking, returns (text, usage, tool_calls) tuple
  • Streaming chat (chat_stream) — async generator yielding text chunks in real time
  • Tool / function calling — OpenAI-style tool definitions auto-converted to types.FunctionDeclaration; tool calls returned in OpenAI-compatible format
  • Dual client — both sync genai.Client and async genai.AsyncClient initialized on startup
  • ✅ Gemini 2.0 Flash model support
  • ✅ Multi-modal capabilities (text, images, audio)
  • ✅ Text-to-speech and speech-to-text generation
  • ✅ Context management and usage statistics
  • ✅ Safety filtering and content moderation
  • ✅ Embedding generation with text-embedding-004
  • ✅ Caching and performance optimization

DeepSeek Integration (deepseek_helper.py)

import os
from langxchange.deepseek_helper import EnhancedDeepSeekHelper, ModelType, ContextManagementStrategy

# Set API key (or use environment variable)
os.environ["DEEPSEEK_API_KEY"] = "your-deepseek-key"

# Initialize Enhanced DeepSeek client with advanced configuration
deepseek = EnhancedDeepSeekHelper(
    api_key="your-deepseek-key",
    base_url="https://api.deepseek.com/v1",
    default_model=ModelType.CHAT.value,
    embed_model=ModelType.EMBEDDING.value,
    vision_model=ModelType.VISION.value,
    timeout=30,
    max_retries=3,
    enable_logging=True,
    log_level="INFO",
    max_context_tokens=30000,
    context_strategy=ContextManagementStrategy.SLIDING_WINDOW
)

# Generate text response with enhanced features
response = deepseek.generate(
    prompt="Write a Python function for binary search.",
    max_tokens=500,
    temperature=0.3,
    system_message="You are a helpful coding assistant.",
    context_messages=[{"role": "user", "content": "Previous context"}]
)

# Code generation with syntax highlighting
code_response = deepseek.generate_code(
    prompt="Create a REST API endpoint in Flask",
    language="python",
    include_docs=True,
    error_handling=True
)

# Batch processing
batch_responses = deepseek.batch_generate([
    {"prompt": "Explain recursion", "max_tokens": 200},
    {"prompt": "Define Big O notation", "max_tokens": 200}
], temperature=0.7)

# Usage and cost tracking
cost_summary = deepseek.get_cost_summary()
print(f"Total cost: ${cost_summary['total_cost']:.4f}")
print(f"Total tokens: {cost_summary['total_tokens']}")

Features:

  • ✅ Cost-effective alternative to OpenAI with enhanced features
  • ✅ Multiple model types (chat, embedding, vision)
  • ✅ Advanced context management with sliding window strategy
  • ✅ Code generation with syntax highlighting and documentation
  • ✅ Batch processing capabilities for multiple requests
  • ✅ Streaming support with real-time response handling
  • ✅ Usage tracking and cost monitoring
  • ✅ Error handling and retry logic
  • ✅ Compatible with OpenAI API format

Llama Integration (llama_helper.py)

from langxchange.llama_helper import EnhancedLLaMAHelper, LLaMAConfig
import os

# Set Hugging Face token (or use environment variable)
os.environ["HUGGINGFACE_TOKEN"] = "your-hf-token"

# Configure LLaMA with enhanced settings
llama_config = LLaMAConfig(
    chat_model="meta-llama/Llama-2-7b-chat-hf",
    embed_model="all-MiniLM-L6-v2",
    device="auto",
    max_memory_per_gpu="8GB",
    load_in_8bit=False,
    load_in_4bit=True,
    cache_dir="./llama_cache",
    trust_remote_code=False
)

# Initialize Enhanced LLaMA client
llama = EnhancedLLaMAHelper(config=llama_config)

# Generate text response
response = llama.generate(
    prompt="Explain machine learning fundamentals.",
    temperature=0.7,
    max_tokens=2048,
    system_message="You are a knowledgeable AI assistant specializing in ML.",
    do_sample=True,
    top_p=0.9
)

# Advanced text generation with stopping criteria
advanced_response = llama.generate_advanced(
    prompt="Write a Python class for neural networks",
    stopping_criteria=["def ", "class ", "\n\n"],
    temperature=0.5,
    repetition_penalty=1.1,
    no_repeat_ngram_size=3
)

# Batch text processing
batch_responses = llama.batch_generate([
    {"prompt": "What is deep learning?", "max_tokens": 200},
    {"prompt": "Explain neural networks", "max_tokens": 200},
    {"prompt": "Define backpropagation", "max_tokens": 200}
], temperature=0.7)

# Token counting and optimization
token_count = llama.count_tokens("This is a sample text for token counting.")
print(f"Token count: {token_count}")

# Model performance metrics
metrics = llama.get_model_metrics()
print(f"Memory usage: {metrics['memory_usage_gb']:.2f} GB")
print(f"Inference speed: {metrics['tokens_per_second']:.2f} tokens/sec")

Features:

  • ✅ Local model deployment with Hugging Face integration
  • ✅ Enhanced quantization support (4-bit, 8-bit)
  • ✅ GPU acceleration with memory optimization
  • ✅ Advanced configuration with LLaMAConfig
  • ✅ Multiple quantization modes for different hardware
  • ✅ Custom stopping criteria for precise generation control
  • ✅ Batch processing capabilities
  • ✅ Token counting and performance monitoring
  • ✅ Memory management and optimization
  • ✅ Support for various LLaMA model variants
  • ✅ Hugging Face model hub integration

MiniMax Integration (minimax_helper.py)

from langxchange.minimax_helper import MiniMaxHelper, create_user_message, MiniMaxConfig
import os

# Set API key (or use environment variable)
os.environ["MINIMAX_API_KEY"] = "your-minimax-key"

# Initialize with default config
helper = MiniMaxHelper()

# Simple chat
messages = [create_user_message("Hello, how are you?")]
response = helper.chat(messages)
print(response)

# With custom config
config = MiniMaxConfig(
    api_key="your-api-key",
    api_secret="your-api-secret", # Optional
    group_id="your-group-id",     # Optional
    chat_model="MiniMax-M2",
    enable_caching=True,
)
helper = MiniMaxHelper(config)

# Cost tracking
cost = helper.get_cost_summary()
print(f"Total cost: ${cost['total_cost']:.4f}")

Features:

  • ✅ Chat Completions (Synchronous and Async with streaming support)
  • ✅ Embeddings (Single and batch generation)
  • ✅ Audio (Text-to-speech and speech-to-text capabilities)
  • ✅ Image Generation from text prompts
  • ✅ In-memory TTL-based caching
  • ✅ Automatic retry with exponential backoff
  • ✅ Usage Tracking (Token and cost tracking with model pricing)
  • ✅ Utilities (Helper functions for message creation)

Model Context Protocol (MCP) Integration (mcp_helper.py)

import asyncio
from langxchange.mcp_helper import MCPServiceManager

async def main():
    # Initialize manager with JSON config
    manager = MCPServiceManager("mcp_config.json")
    
    # 1. Register functional capabilities for servers with priority
    manager.register_server_capabilities("filesystem", ["files", "local_storage"], priority=10)
    manager.register_server_capabilities("brave_search", ["web_search", "research"], priority=5)
    
    # 2. Intelligent Routing: Resolve server by tool name or capability
    # Automatically selects the best server (health + priority aware)
    server = await manager.select_best_server_for_tool("read_file")
    
    # Direct namespace resolution
    server = manager.resolve_tool_server("filesystem::read_file")
    
    # Name-based resolution with capability hints
    context = {"preferred_capability": "web_search"}
    server = manager.resolve_tool_server("search", context=context)
    
    # 3. Health & Priority Aware Selection
    # Automatically picks the best server based on priority, error rates and latency
    best_server = manager.select_best_server(["server_a", "server_b"])
    
    # 4. Discovery: Fetch all tools with routing metadata
    all_tools = await manager.get_all_tools_with_metadata()
    
    # 5. Call a tool (standard way)
    result = await manager.call_tool(
        server_name="filesystem",
        tool_name="read_file",
        arguments={"path": "data.txt"}
    )
    
    await manager.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

Features:

  • Standardized Interoperability: Connect to any MCP-compliant server (stdio or SSE)
  • Intelligent Routing: Automatic server resolution based on tool names and namespaces (server::tool)
  • Capability-Based Selection: Route tasks to servers based on functional tags (e.g., "web_search", "filesystem")
  • Health & Priority Aware Selection: Automatically prioritizes healthy servers and uses priority scores for optimal routing
  • Tool Registry: Comprehensive metadata registry for all available tools across all connected servers
  • Lifecycle Management: Automatic server startup, health monitoring, and recovery
  • Discovery: Dynamic tool discovery with TTL-based caching
  • Production-Ready: Graceful shutdown, detailed logging, and error handling

🛠️ Complete Example: Calculator Server

This example demonstrates a full setup including a mock server, configuration, and execution script.

```python
# mcp_calculator_server.py
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("Calculator")

@mcp.tool()
def add(a: int, b: int) -> int:
    """Add two numbers"""
    return a + b

@mcp.tool()
def multiply(a: int, b: int) -> int:
    """Multiply two numbers"""
    return a * b

if __name__ == "__main__":
    mcp.run(transport="stdio")
```
<!-- slide -->
```json
// mcp_test_config.json
{
  "servers": [
    {
      "name": "calculator",
      "transport": "stdio",
      "command": "python3",
      "args": ["mcp_calculator_server.py"]
    }
  ]
}
```
<!-- slide -->
```python
# mcp_test_execution.py
import asyncio
from langxchange.mcp_helper import MCPServiceManager

async def run_test():
    manager = MCPServiceManager("mcp_test_config.json")
    await manager.initialize()
    
    try:
        # Call 'add' tool
        result = await manager.call_tool(
            server_name="calculator",
            tool_name="add",
            arguments={"a": 5, "b": 3}
        )
        print(f"Add Result: {result}")
        
    finally:
        await manager.shutdown()

if __name__ == "__main__":
    asyncio.run(run_test())
```
<!-- slide -->
```text
# Expected Output
INFO:langxchange.mcp:MCPServiceManager initialized
INFO:langxchange.mcp:Started MCP server 'calculator'
Add Result: content=[TextContent(type='text', text='8', ...)] structuredContent={'result': 8}
INFO:langxchange.mcp:Stopped MCP server 'calculator'
```

🤖 Autonomous Agents (EnhancedAgent.py)

LangXChange 0.4.6 introduces a powerful EnhancedLLMAgentHelper for building production-ready autonomous agents. It features dynamic tool discovery, semantic memory, per-tool circuit breakers, and automatic observation summarization.

🚀 Quick Start: Manual Tool Definition

import asyncio
from langxchange.EnhancedAgent import EnhancedLLMAgentHelper
from langxchange.agent_memory_helper import AgentMemoryHelper
from langxchange.openai_helper import EnhancedOpenAIHelper, OpenAIConfig

async def main():
    # 1. Initialize LLM and Memory
    llm = EnhancedOpenAIHelper(OpenAIConfig(chat_model="gpt-4o"))
    memory = AgentMemoryHelper(sqlite_path="agent_memory.db")

    # 2. Define tools with JSON Schema for strict parameter generation
    async def get_weather(params):
        return f"The weather in {params['location']} is sunny, 25°C."

    tools = [{
        "action": "get_weather",
        "description": "Get current weather for a location",
        "parameters": {
            "type": "object",
            "properties": {
                "location": {"type": "string", "description": "City and country"}
            },
            "required": ["location"]
        },
        "func": get_weather
    }]

    # 3. Initialize Agent
    agent = EnhancedLLMAgentHelper(
        llm=llm,
        action_space=tools,
        external_memory_helper=memory,
        debug=True
    )

    # 4. Run autonomously to achieve a goal
    agent.set_goal("What is the weather in London?")
    results = await agent.run_autonomous(max_cycles=5)
    
    for res in results:
        print(f"Thought: {res['thought']}")
        print(f"Action: {res['decision']['action']}")
        print(f"Result: {res['outcome']['result']}")

if __name__ == "__main__":
    asyncio.run(main())

🔌 Native MCP Integration

Instead of manually defining tools, you can pass an MCP configuration. The agent will automatically discover all tools from the configured servers and route calls dynamically.

mcp_config = {
    "servers": [
        {
            "name": "filesystem",
            "transport": "stdio",
            "command": "mcp-server-filesystem",
            "args": ["/home/user/Downloads"]
        },
        {
            "name": "brave-search",
            "transport": "stdio",
            "command": "npx",
            "args": ["-y", "@modelcontextprotocol/server-brave-search"],
            "env": {"BRAVE_API_KEY": "your-key"}
        }
    ]
}

agent = EnhancedLLMAgentHelper(
    llm=llm,
    mcp_config=mcp_config, # Native MCP support
    external_memory_helper=memory
)

# The agent will now have access to all tools from both filesystem and brave-search
agent.set_goal("Find the latest PDF in my Downloads and search for its main topic on the web.")
await agent.run_autonomous()

✨ Key Features

  • Autonomous Loops: Use run_autonomous() for multi-step goal achievement with automatic state management.
  • Dynamic Discovery: Tools are discovered at runtime from MCP servers or via a discovery_callback.
  • Schema-Strict Parameters: Uses JSON Schema hints in prompts to ensure the LLM generates valid parameters.
  • Per-Tool Circuit Breakers: Failures in one tool (e.g., a flaky API) won't crash the entire agent.
  • Auto-Summarization: Large tool outputs are automatically summarized to preserve context window space.
  • Semantic Memory: Integrates with AgentMemoryHelper for long-term history and semantic retrieval.
  • Observability: Built-in Prometheus-style metrics and correlation ID tracing across all operations.

🔍 Vector Database Integration

ChromaDB Integration (chroma_helper.py)

from langxchange.chroma_helper import EnhancedChromaHelper, ChromaConfig

# Configure Chroma with enhanced performance settings
chroma_config = ChromaConfig(
    persist_directory="./chroma_db",
    batch_size=100,
    max_workers=8,
    progress_bar=True
)

chroma = EnhancedChromaHelper(llm, chroma_config)

# Insert documents with metadata
chroma.insert_documents(
    collection_name="my_collection",
    documents=["Document content here"],
    metadatas=[{"source": "file1.txt", "type": "text"}],
    generate_embeddings=True
)

# Query with similarity search
results = chroma.query_collection(
    collection_name="my_collection",
    query_text="What is machine learning?",
    top_k=5
)

Features:

  • ✅ Persistent storage
  • ✅ Metadata filtering
  • ✅ Batch operations
  • ✅ Collection management
  • ✅ Performance optimization

Pinecone Integration (pinecone_helper.py)

import os
from langxchange.pinecone_helper import EnhancedPineconeHelper, PineconeConfig, CloudProvider, MetricType
from langxchange.openai_helper import EnhancedOpenAIHelper, OpenAIConfig

# Set API key (or use environment variable)
os.environ["PINECONE_API_KEY"] = "your-pinecone-key"

# Configure LLM helper for embeddings
openai_config = OpenAIConfig(enable_caching=True)
llm_helper = EnhancedOpenAIHelper(openai_config)

# Configure Pinecone with enhanced settings
pinecone_config = PineconeConfig(
    api_key="your-pinecone-key",
    environment="us-west1-gcp",
    cloud_service=CloudProvider.GCP,
    index_name="my-index",
    dimension=1536,  # OpenAI embedding dimension
    metric=MetricType.COSINE,
    batch_size=100,
    max_workers=10,
    progress_bar=True
)

# Initialize Enhanced Pinecone helper
pinecone = EnhancedPineconeHelper(
    llm_helper=llm_helper,
    config=pinecone_config
)

# Insert documents with automatic embedding generation
documents = [
    "Machine learning is a subset of AI that focuses on algorithms.",
    "Deep learning uses neural networks with multiple layers.",
    "Natural language processing enables computers to understand text."
]

pinecone.insert_documents(
    collection_name="my-index",
    documents=documents,
    metadatas=[{"source": "article1", "topic": "AI"}, 
               {"source": "article2", "topic": "ML"}, 
               {"source": "article3", "topic": "NLP"}],
    generate_embeddings=True,
    namespace="default"
)

# Query for similar vectors with filters
results = pinecone.query(
    vector=None,  # Will auto-generate embedding from query_text
    query_text="What is machine learning?",
    top_k=5,
    filter_metadata={"topic": "AI"},
    include_metadata=True
)

# DataFrame ingestion with batch processing
import pandas as pd

df = pd.DataFrame({
    'text': ['Sample text 1', 'Sample text 2', 'Sample text 3'],
    'category': ['tech', 'science', 'tech']
})

stats = pinecone.ingest_dataframe(
    collection_name="my-index",
    dataframe=df,
    text_column='text',
    metadata_columns=['category'],
    namespace="default"
)

print(f"Inserted {stats['inserted']} documents")
print(f"Skipped {stats['skipped']} documents")

Features:

  • ✅ Cloud-based vector storage with auto-scaling
  • ✅ Automatic embedding generation using LLM helpers
  • ✅ DataFrame ingestion with batch processing
  • ✅ Advanced querying with metadata filters
  • ✅ Performance monitoring and statistics
  • ✅ Enterprise-grade error handling and retries
  • ✅ Memory-efficient operations for large datasets
  • ✅ Namespace management and resource cleanup
  • ✅ Real-time updates with comprehensive logging

Milvus Integration (milvus_helper.py)

from langxchange.milvus_helper import EnhancedMilvusHelper, MilvusConfig
from langxchange.openai_helper import EnhancedOpenAIHelper, OpenAIConfig

# Configure LLM helper for embeddings
openai_config = OpenAIConfig(enable_caching=True)
llm_helper = EnhancedOpenAIHelper(openai_config)

# Configure Milvus with enhanced settings
milvus_config = MilvusConfig(
    host="localhost",
    port="19530",
    api_key="your-milvus-token", # Optional for local
    collection_prefix="lx_",
    embedding_dim=1536,
    batch_size=100,
    progress_bar=True
)

# Initialize Enhanced Milvus client
milvus = EnhancedMilvusHelper(
    llm_helper=llm_helper,
    config=milvus_config
)

# Insert documents with automatic embedding generation
documents = [
    "Milvus is an open-source vector database built for AI applications.",
    "It supports high-performance vector similarity search and analytics.",
    "Milvus can handle billions of vectors with millisecond latency."
]

milvus.insert_documents(
    collection_name="ai_docs",
    documents=documents,
    metadatas=[{"category": "database"}, {"category": "search"}, {"category": "performance"}],
    generate_embeddings=True
)

# Query for similar vectors
results = milvus.query(
    collection_name="ai_docs",
    query_text="What is Milvus?",
    top_k=3
)

for hit in results[0]:
    print(f"Score: {hit.score}")
    print(f"Document: {hit.entity.get('document')}")

Features:

  • ✅ High-performance vector search with HNSW index
  • ✅ Support for local Milvus and Zilliz Cloud (via API key/token)
  • ✅ Automatic collection creation and schema management
  • ✅ Batch insertion and DataFrame ingestion
  • ✅ Metadata filtering and JSON support
  • ✅ Enterprise-grade error handling

Elasticsearch Integration (elasticsearch_helper.py)

from langxchange.elasticsearch_helper import EnhancedElasticsearchHelper, ElasticsearchConfig
from langxchange.openai_helper import EnhancedOpenAIHelper, OpenAIConfig

# Configure LLM helper for embeddings
openai_config = OpenAIConfig(enable_caching=True)
llm_helper = EnhancedOpenAIHelper(openai_config)

# Configure Elasticsearch with enhanced settings
es_config = ElasticsearchConfig(
    host="http://localhost:9200",
    index_prefix="lx_",
    embedding_dim=1536,
    batch_size=100,
    progress_bar=True
)

# Initialize Enhanced Elasticsearch client
es = EnhancedElasticsearchHelper(
    llm_helper=llm_helper,
    config=es_config
)

# Insert documents with automatic embedding generation
documents = [
    "Elasticsearch is a distributed, RESTful search and analytics engine.",
    "It provides a distributed, multitenant-capable full-text search engine.",
    "Elasticsearch is developed in Java and is open source."
]

es.insert_documents(
    collection_name="tech_docs",
    documents=documents,
    metadatas=[{"topic": "search"}, {"topic": "analytics"}, {"topic": "java"}],
    generate_embeddings=True
)

# Query for similar vectors
results = es.query(
    collection_name="tech_docs",
    query_text="What is Elasticsearch?",
    top_k=3
)

for hit in results['hits']['hits']:
    print(f"Score: {hit['_score']}")
    print(f"Document: {hit['_source'].get('document')}")

Features:

  • ✅ High-performance vector search with dense_vector type
  • ✅ Support for script-based cosine similarity scoring
  • ✅ Automatic index creation and mapping management
  • ✅ Batch insertion and DataFrame ingestion
  • ✅ Metadata filtering support
  • ✅ Unified interface consistent with Chroma and Milvus helpers

FAISS Integration (faiss_helper.py)

import os
import pandas as pd
from langxchange.faiss_helper import EnhancedFAISSHelper

# Initialize Enhanced FAISS helper with advanced configuration
faiss = EnhancedFAISSHelper(
    dim=768,  # Vector dimension
    index_type="ivf",  # Use IVF index for better performance on large datasets
    normalize_vectors=True,  # Normalize vectors for cosine similarity
    nlist=100,  # Number of clusters for IVF
    auto_train=True  # Automatically train IVF indices
)

# Insert individual vectors with documents and metadata
documents = [
    "Machine learning is a subset of artificial intelligence.",
    "Deep learning uses neural networks with multiple layers.",
    "Natural language processing enables computers to understand text.",
    "Computer vision allows machines to interpret visual information."
]

metadatas = [
    {"source": "article1", "topic": "ML", "date": "2025-01-15"},
    {"source": "article2", "topic": "DL", "date": "2025-01-16"},
    {"source": "article3", "topic": "NLP", "date": "2025-01-17"},
    {"source": "article4", "topic": "CV", "date": "2025-01-18"}
]

# Generate embeddings (this would come from your embedding model)
embeddings = [
    [0.1, 0.2, 0.3] * 256,  # 768-dimensional embedding
    [0.4, 0.5, 0.6] * 256,
    [0.7, 0.8, 0.9] * 256,
    [0.2, 0.3, 0.4] * 256
]

faiss.insert(
    vectors=embeddings,
    documents=documents,
    metadatas=metadatas
)

# DataFrame integration with batch processing
df = pd.DataFrame({
    'embeddings': embeddings,
    'documents': documents,
    'metadata': metadatas
})

faiss.insert_dataframe(
    dataframe=df,
    embeddings_col="embeddings",
    documents_col="documents", 
    metadata_col="metadata"
)

# Query for similar vectors with comprehensive results
query_vector = [0.1, 0.2, 0.3] * 256  # Sample query embedding
results = faiss.query(
    embedding_vector=query_vector,
    top_k=3,
    include_distances=True  # Include similarity scores
)

print(f"Found {len(results)} similar documents:")
for i, result in enumerate(results, 1):
    print(f"{i}. Score: {result['distance']:.3f}")
    print(f"   Document: {result['document'][:100]}...")
    print(f"   Metadata: {result['metadata']}")
    print()

# Batch querying for multiple queries at once
query_vectors = [
    [0.1, 0.2, 0.3] * 256,
    [0.4, 0.5, 0.6] * 256
]

batch_results = faiss.query_batch(
    embedding_vectors=query_vectors,
    top_k=2,
    include_distances=True
)

print(f"Batch query results: {len(batch_results)} result sets")

# Retrieve documents by ID
doc_results = faiss.get_by_ids(["id_0", "id_1"])
print(f"Retrieved {len(doc_results)} documents by ID")

# Get comprehensive statistics
stats = faiss.get_stats()
print(f"Index Statistics:")
print(f"  Total vectors: {stats['total_vectors']}")
print(f"  Index type: {stats['index_type']}")
print(f"  Dimension: {stats['dimension']}")
print(f"  Is trained: {stats.get('is_trained', 'N/A')}")
print(f"  Number of clusters: {stats.get('nlist', 'N/A')}")

# Persistence - save and load index
index_path = "./faiss_index.bin"
metadata_path = "./faiss_metadata.pkl"

faiss.save(index_path, metadata_path)
print(f"Saved index to {index_path}")

# Load saved index
loaded_faiss = EnhancedFAISSHelper(dim=768, index_type="ivf")
loaded_faiss.load(index_path, metadata_path)
print(f"Loaded index with {loaded_faiss.count()} vectors")

# Index management
index_vector_count = faiss.count()
print(f"Current index contains {index_vector_count} vectors")

# Delete specific document
deleted = faiss.delete_by_id("id_0")
print(f"Document deleted: {deleted}")

# Rebuild index with different configuration
rebuilt_count = faiss.rebuild_index(index_type="hnsw")
print(f"Rebuilt index with {rebuilt_count} vectors using HNSW")

Features:

  • ✅ Multiple index types (Flat, IVF, HNSW) with automatic training
  • ✅ High-performance similarity search with vector normalization
  • ✅ Comprehensive DataFrame integration for pandas workflow
  • ✅ Batch operations for efficient processing
  • ✅ Advanced querying with distance scores and metadata
  • ✅ Persistence and index management
  • ✅ Document retrieval by ID and batch operations
  • ✅ Comprehensive statistics and performance monitoring
  • ✅ Index rebuilding and optimization
  • ✅ Memory-efficient operations with proper validation

📄 Document Processing

Document Loader (documentloader.py)

"""
Demo script showing the enhanced DocumentLoaderHelper capabilities
optimized for LLM processing.
"""
import os
import sys
from langxchange.documentloader import DocumentLoaderHelper, ChunkingStrategy, ImageProcessingStrategy

# Initialize Document Loader with different configurations
loader = DocumentLoaderHelper(
    chunk_size=800,
    overlap_size=100,
    chunking_strategy=ChunkingStrategy.SEMANTIC,
    preserve_formatting=True
)

# Test different chunking strategies
strategies = [
    (ChunkingStrategy.CHARACTER, "Character-based chunking"),
    (ChunkingStrategy.SENTENCE, "Sentence-aware chunking"),
    (ChunkingStrategy.PARAGRAPH, "Paragraph-aware chunking"),
    (ChunkingStrategy.SEMANTIC, "Semantic chunking (recommended)"),
]

# Add token-based if available
try:
    import tiktoken
    strategies.append((ChunkingStrategy.TOKEN, "Token-based chunking"))
except ImportError:
    print("Note: tiktoken not available, skipping token-based chunking")

# Process document with specific strategy
file_path = "documents/sample.txt"

for strategy, description in strategies:
    print(f"\n--- {description} ---")
    
    # Initialize loader with strategy
    loader = DocumentLoaderHelper(
        chunk_size=800,
        overlap_size=100,
        chunking_strategy=strategy,
        preserve_formatting=True
    )
    
    try:
        chunks = list(loader.load(file_path))
        
        print(f"Total chunks created: {len(chunks)}")
        print(f"Processing time: {loader.stats['times']['total']:.3f}s")
        
        # Show first few chunks
        for i, chunk in enumerate(chunks[:3]):
            print(f"\nChunk {i+1}:")
            print(f"  Length: {len(chunk.content)} chars")
            if chunk.metadata.token_count:
                print(f"  Tokens: {chunk.metadata.token_count}")
            print(f"  Content preview: {chunk.content[:100]}...")
            
    except Exception as e:
        print(f"Error with {strategy}: {e}")

# Multi-format document processing
loader = DocumentLoaderHelper(
    chunk_size=800,
    chunking_strategy=ChunkingStrategy.SEMANTIC,
    preserve_formatting=True
)

files_to_process = [
    "documents/sample.txt",
    "documents/data.csv",
    "documents/report.pdf"
]

for file_path in files_to_process:
    if os.path.exists(file_path):
        print(f"\n--- Processing: {file_path} ---")
        
        try:
            chunks = list(loader.load(file_path))
            
            print(f"File type: {chunks[0].metadata.file_type}")
            print(f"Total chunks: {len(chunks)}")
            
            # Show metadata for first chunk
            first_chunk = chunks[0]
            print(f"First chunk metadata:")
            print(f"  Source: {first_chunk.metadata.source_file}")
            print(f"  Section: {first_chunk.metadata.section_title}")
            print(f"  Content length: {len(first_chunk.content)} chars")
            
        except Exception as e:
            print(f"Error processing {file_path}: {e}")

# Advanced configuration examples
configs = [
    {
        "name": "Minimal overlap",
        "params": {"chunk_size": 400, "overlap_size": 20, "min_chunk_size": 30}
    },
    {
        "name": "High overlap",
        "params": {"chunk_size": 400, "overlap_size": 100, "min_chunk_size": 50}
    },
    {
        "name": "Preserve formatting",
        "params": {"chunk_size": 400, "preserve_formatting": True}
    },
    {
        "name": "Normalize text",
        "params": {"chunk_size": 400, "preserve_formatting": False}
    }
]

file_path = "documents/sample.txt"

for config in configs:
    print(f"\n--- CONFIG: {config['name']} ---")
    
    loader = DocumentLoaderHelper(
        chunking_strategy=ChunkingStrategy.SEMANTIC,
        **config['params']
    )
    
    try:
        chunks = list(loader.load(file_path))
        stats = loader.get_statistics()
        
        print(f"Chunks created: {len(chunks)}")
        print(f"Avg chunk size: {sum(len(c.content) for c in chunks) / len(chunks):.0f} chars")
        print(f"Processing time: {stats['processing_stats']['times']['total']:.3f}s")
        
    except Exception as e:
        print(f"Error: {e}")

# Image processing support
try:
    from PIL import Image
    pil_available = True
    print("✅ PIL (Pillow) support: Available")
except ImportError:
    pil_available = False
    print("❌ PIL (Pillow) support: Not available")

try:
    import pytesseract
    tesseract_available = True
    print("✅ OCR (pytesseract) support: Available")
except ImportError:
    tesseract_available = False
    print("❌ OCR (pytesseract) support: Not available")

# Image processing strategies
strategies = [
    (ImageProcessingStrategy.OCR_TEXT, "Extract text using OCR"),
    (ImageProcessingStrategy.DESCRIPTION, "Generate image descriptions"),
    (ImageProcessingStrategy.METADATA, "Extract technical metadata"),
    (ImageProcessingStrategy.COMBINED, "All-in-one processing"),
    (ImageProcessingStrategy.VISUAL_ANALYSIS, "Advanced visual analysis")
]

for strategy, description in strategies:
    print(f"  • {strategy.value}: {description}")

# Supported formats
loader = DocumentLoaderHelper()
supported_formats = loader._get_supported_image_formats()
print(f"Supported image formats ({len(supported_formats)}):")
print(f"  {', '.join(supported_formats)}")

# Image processing configuration
image_loader = DocumentLoaderHelper(
    chunk_size=1000,
    image_processing_strategy=ImageProcessingStrategy.COMBINED,
    ocr_language="eng",
    max_image_size=(2048, 2048),
    image_quality_threshold=0.6
)

print(f"\nImage processing configuration:")
print(f"  • Processing strategy: {image_loader.image_processing_strategy.value}")
print(f"  • OCR language: {image_loader.ocr_language}")
print(f"  • Max image size: {image_loader.max_image_size}")
print(f"  • Quality threshold: {image_loader.image_quality_threshold}")

Features:

  • ✅ Multiple formats (PDF, DOCX, TXT, CSV, images)
  • ✅ Advanced chunking strategies (Character, Sentence, Paragraph, Semantic, Token)
  • ✅ Intelligent overlap to preserve context
  • ✅ Metadata tracking for better document understanding
  • ✅ Token counting support (with tiktoken)
  • ✅ Configurable text cleaning and formatting
  • ✅ Parallel processing for better performance
  • ✅ Comprehensive error handling and statistics
  • ✅ Comprehensive image processing (15+ formats)
  • ✅ OCR text extraction with confidence scoring
  • ✅ Automatic image description generation
  • ✅ Technical metadata extraction (EXIF, etc.)
  • ✅ Multi-language OCR support
  • ✅ Configurable image quality thresholds
  • ✅ Metadata extraction
  • ✅ Progress tracking

💾 Database Integration

MySQL Integration (mysql_helper.py)

from langxchange.mysql_helper import MySQLHelper

# Initialize MySQL helper (uses environment variables)
mysql = MySQLHelper(
    pool_size=5,
    max_overflow=10,
    pool_timeout=30
)

# Check connection health
health = mysql.health_check()
print(f"Database status: {health['status']}")

# Insert DataFrame to MySQL
result = mysql.insert_dataframe(
    table_name="user_data",
    dataframe=df,
    if_exists="append",
    chunksize=100
)

# Execute parameterized query
df_result = mysql.query(
    "SELECT * FROM users WHERE age > :min_age",
    params={"min_age": 25}
)

# Batch operations
mysql.batch_execute(
    "INSERT INTO users (name, age) VALUES (:name, :age)",
    params_list=[
        {"name": "Alice", "age": 30},
        {"name": "Bob", " age": 25}
    ]
)

Features:

  • ✅ Connection pooling
  • ✅ SQL injection protection
  • ✅ Transaction support
  • ✅ DataFrame integration
  • ✅ Health monitoring
  • ✅ Context manager support

MongoDB Integration (mongo_helper.py)

import os
from langxchange.mongo_helper import EnhancedMongoHelper
import pandas as pd

# Set MongoDB URI (or use environment variable)
os.environ["MONGO_URI"] = "mongodb://localhost:27017"

# Initialize Enhanced MongoDB helper
mongo = EnhancedMongoHelper(
    db_name="my_database",
    collection_name="documents",
    connect_timeout=5000,
    server_selection_timeout=5000
)

# Check connection health
connection_health = mongo.ping()
print(f"MongoDB connection: {'Healthy' if connection_health else 'Failed'}")

# Insert single document
result = mongo.insert_one({
    "title": "Machine Learning Guide",
    "content": "ML is a subset of artificial intelligence...",
    "tags": ["AI", "ML", "Tutorial"],
    "created_at": "2025-11-25"
})
print(f"Inserted document ID: {result.inserted_id}")

# Insert multiple documents
documents = [
    {
        "title": "Deep Learning Basics",
        "content": "Deep learning uses neural networks...",
        "tags": ["DL", "Neural Networks"],
        "author": "AI Researcher"
    },
    {
        "title": "NLP Introduction", 
        "content": "Natural Language Processing enables...",
        "tags": ["NLP", "Language"],
        "author": "Data Scientist"
    }
]

inserted_ids = mongo.insert(documents)
print(f"Inserted {len(inserted_ids)} documents")

# Insert DataFrame to MongoDB
df = pd.DataFrame({
    'title': ['ML Guide', 'DL Tutorial', 'NLP Basics'],
    'content': ['Machine learning concepts', 'Deep learning methods', 'NLP techniques'],
    'category': ['educational', 'advanced', 'beginner'],
    'views': [150, 200, 175]
})

mongo.insert(df)

# Query single document
doc = mongo.find_one({"title": "Machine Learning Guide"})
print(f"Found document: {doc.get('title')}")

# Query with filters and sorting
documents = mongo.query(
    filter_query={"category": "educational", "views": {"$gte": 100}},
    projection={"title": 1, "views": 1, "_id": 0},
    sort=[("views", -1)],
    limit=10
)

print(f"Found {len(documents)} matching documents")

# Count documents in collection
total_docs = mongo.count_documents({"tags": {"$in": ["AI", "ML"]}})
print(f"Total AI/ML documents: {total_docs}")

# Update document
update_result = mongo.update_one(
    filter_query={"title": "Machine Learning Guide"},
    update_query={"$set": {"updated_at": "2025-11-25", "views": 200}}
)
print(f"Modified {update_result.modified_count} documents")

# Delete documents
delete_result = mongo.delete_many({"category": "test"})
print(f"Deleted {delete_result.deleted_count} test documents")

# Create indexes for better performance
mongo.create_index("title", unique=True)
mongo.create_index([("tags", 1), ("category", 1)])

# Close connection
mongo.close()

Features:

  • ✅ Document-based storage with flexible schema
  • ✅ Connection health monitoring and timeout management
  • ✅ Batch operations with multiple document insertion
  • ✅ DataFrame integration for seamless pandas workflow
  • ✅ Advanced querying with filters, projection, and sorting
  • ✅ Index management for performance optimization
  • ✅ CRUD operations with comprehensive error handling
  • ✅ Aggregation pipeline support (when using aggregate methods)
  • ✅ GridFS support for large file storage
  • ✅ Connection pooling and automatic resource management

🧠 RAG Implementation

Enhanced Retriever (retrieverX.py)

from langxchange.retrieverX import EnhancedRetrieverX, create_retriever_from_config

# Configure retriever
retriever_config = {
    "vector_db": chroma,
    "embedder": llm,
    "use_rerank": True,
    "rerank_multiplier": 3.0,
    "db_type": "chroma"
}

retriever = create_retriever_from_config(retriever_config)

# Retrieve documents
results = retriever.retrieve(
    query="What are the benefits of machine learning?",
    top_k=5,
    collection_name="my_collection"
)

for result in results:
    print(f"Score: {result.score:.3f}")
    print(f"Document: {result.document[:100]}...")

Features:

  • ✅ Two-stage retrieval
  • ✅ Re-ranking
  • ✅ Score aggregation
  • ✅ Metadata filtering

Prompt Helper (prompt_helper.py)

from langxchange.prompt_helper import EnhancedPromptHelper, PromptMode

prompt_helper = EnhancedPromptHelper(
    llm=llm,
    system_prompt="You are a helpful assistant.",
    default_mode=PromptMode.AUGMENTED,
    max_context_length=1500,
    max_snippets=3
)

# Generate RAG response
response = prompt_helper.run(
    user_query="Explain deep learning",
    retrieval_results=retrieval_results,
    mode=PromptMode.AUGMENTED,
    temperature=0.5
)

Features:

  • ✅ Template-based prompts
  • ✅ Context management
  • ✅ Multi-mode support
  • ✅ Token optimization

🏠 Local LLM Management

Local LLM (localllm.py)

from langxchange.localllm import LocalLLM

local_llm = LocalLLM(
    model_path="./models/llama-7b",
    device="cuda",
    precision="fp16"
)

response = local_llm.generate(
    prompt="Explain the concept of neural networks",
    max_tokens=500,
    temperature=0.7
)

Features:

  • ✅ Model downloading
  • ✅ Quantization (4-bit, 8-bit)
  • ✅ GPU/CPU switching
  • ✅ Memory management

Localize LLM (localizellm.py)

#!/usr/bin/env python3
"""
LocalizeLLM Example Script
Demonstrates key features of the LocalizeLLM class including:
- Model downloading and loading
- Text generation and chat
- Fine-tuning with custom data
- Parameter optimization
- Model management

Author: Langxchange
"""

import os
from langxchange.localizellm import LocalizeLLM, LocalLLMConfig, create_local_llm


def basic_usage_example():
    """Demonstrate basic LocalizeLLM usage."""
    print("=== Basic Usage Example ===")
    
    # Simple configuration using factory function
    config = LocalLLMConfig(
        chat_model="microsoft/DialoGPT-small",  # Smaller model for demo
        local_models_dir="./demo_models",
        learning_rate=1e-4,
        batch_size=1,
        max_epochs=1,
        load_in_8bit=False  # Use False for smaller models
    )
    
    try:
        with LocalizeLLM(config) as llm:
            print("✓ LocalizeLLM initialized successfully")
            
            # Download and load model
            print("Downloading model (this may take a few minutes)...")
            model_path = llm.download_model("microsoft/DialoGPT-small") #microsoft/Phi-3-mini-4k-instruct"
            # model_path = llm.download_model("meta-llama/Llama-2-7b-chat-hf")
            # model_path = llm.download_model("microsoft/Phi-3-mini-4k-instruct")
            print(f"✓ Model downloaded to: {model_path}")
            
            llm.load_local_model("microsoft/DialoGPT-small")
            # llm.load_local_model("meta-llama/Llama-2-7b-chat-hf")
            # llm.load_local_model("microsoft/Phi-3-mini-4k-instruct")
            print("✓ Model loaded successfully")
            
            # List available models
            models = llm.list_available_models()
            print(f"✓ Available models: {list(models.keys())}")
            
            # Generate text
            print("\n--- Text Generation ---")
            response = llm.generate_text(
                "What is machine learning?",
                max_new_tokens=200,
                temperature=0.7
            )
            print(f"Generated: {response}")
            
            # Chat example
            print("\n--- Chat Example ---")
            messages = [
                {"role": "user", "system": "You are a helpfull assistant"},
                {"role": "user", "content": "What is machine learning?"}
            ]
            
            chat_response = llm.chat(messages, max_new_tokens=50)
            print(f"Chat response: {chat_response}")
            
            # Model info
            print("\n--- Model Information ---")
            info = llm.get_model_info()
            print(f"Model parameters: {info.get('model_parameters', 'N/A')}")
            print(f"Device: {info.get('device', 'N/A')}")
            
    except Exception as e:
        print(f"Error in basic usage: {e}")


def fine_tuning_example():
    """Demonstrate fine-tuning capabilities."""
    print("\n=== Fine-tuning Example ===")
    
    # Custom training data (domain-specific examples)
    training_data = [
        "Python is a high-level programming language known for its simplicity.",
        "Machine learning algorithms can learn patterns from data automatically.",
        "Neural networks are inspired by the structure of the human brain.",
        "Data preprocessing is crucial for successful machine learning projects.",
        "Feature engineering helps improve model performance significantly.",
        "Cross-validation helps assess model generalization capability.",
        "Overfitting occurs when a model memorizes training data too closely.",
        "Regularization techniques help prevent overfitting in machine learning.",
    ]
    
    config = LocalLLMConfig(
        chat_model="microsoft/DialoGPT-small",
        local_models_dir="./demo_models",
        learning_rate=5e-5,
        batch_size=1,
        max_epochs=1,
        warmup_steps=10,
        max_length=128,
        lora_rank=8,  # Smaller rank for demo
    )
    
    try:
        with LocalizeLLM(config) as llm:
            # Load the previously downloaded model
            llm.load_local_model("microsoft/DialoGPT-small")
            print("✓ Model loaded for fine-tuning")
            
            # Prepare training data
            print("Preparing training data...")
            train_dataset, val_dataset = llm.prepare_fine_tuning_data(
                training_data, 
                validation_split=0.2
            )
            print(f"✓ Training samples: {len(train_dataset)}")
            print(f"✓ Validation samples: {len(val_dataset)}")
            
            # Test before fine-tuning
            print("\n--- Before Fine-tuning ---")
            before_response = llm.generate_text(
                "What is machine learning?",
                max_new_tokens=30,
                temperature=0.7
            )
            print(f"Response before fine-tuning: {before_response}")
            
            # Fine-tune model
            print("\nStarting fine-tuning (this may take a few minutes)...")
            fine_tuned_path = llm.fine_tune_model(
                train_dataset=train_dataset,
                val_dataset=val_dataset,
                experiment_name="ml_specialist_demo-1",
                use_lora=True
            )
            print(f"✓ Fine-tuning completed: {fine_tuned_path}")
            
            # Test after fine-tuning
            print("\n--- After Fine-tuning ---")
            # llm.load_local_model(fine_tuned_path)
            after_response = llm.generate_text(
                "What is machine learning?",
                max_new_tokens=30,
                temperature=0.7
            )
            print(f"Response after fine-tuning: {after_response}")
            
            # Save snapshot
            snapshot_path = llm.save_model_snapshot(
                "ml_demo_snapshot",
                "Fine-tuned model for ML demonstrations"
            )
            print(f"✓ Snapshot saved: {snapshot_path}")
            
    except Exception as e:
        print(f"Error in fine-tuning: {e}")


def parameter_optimization_example():
    """Demonstrate parameter optimization."""
    print("\n=== Parameter Optimization Example ===")
    
    config = LocalLLMConfig(
        chat_model="microsoft/DialoGPT-small",
        local_models_dir="./demo_models"
    )
    
    try:
        with LocalizeLLM(config) as llm:
            llm.load_local_model("microsoft/DialoGPT-small")
            print("✓ Model loaded for parameter optimization")
            
            # Create a small validation dataset
            validation_texts = [
                "The quick brown fox jumps over the lazy dog.",
                "Artificial intelligence is transforming many industries.",
                "Python programming language is widely used in data science.",
            ]
            
            _, val_dataset = llm.prepare_fine_tuning_data(validation_texts)
            
            # Define parameter ranges (keeping it small for demo)
            parameter_ranges = {
                "temperature": [0.5, 0.7, 1.0],
                "top_p": [0.8, 0.9],
                "top_k": [40, 50]
            }
            
            print("Optimizing parameters...")
            best_params = llm.optimize_parameters(
                parameter_ranges=parameter_ranges,
                validation_dataset=val_dataset,
                metric="perplexity"
            )
            
            print(f"✓ Best parameters: {best_params['best_parameters']}")
            print(f"✓ Best perplexity: {best_params['best_score']:.4f}")
            print(f"✓ Combinations tested: {best_params['total_combinations_tested']}")
            
    except Exception as e:
        print(f"Error in parameter optimization: {e}")


def model_management_example():
    """Demonstrate model management features."""
    print("\n=== Model Management Example ===")
    
    config = LocalLLMConfig(local_models_dir="./demo_models")
    
    try:
        with LocalizeLLM(config) as llm:
            # List all available models
            models = llm.list_available_models()
            print(f"Available models: {len(models)}")
            
            for name, info in models.items():
                print(f"  - {name}: {info.size_gb:.2f}GB, "
                      f"Type: {info.model_type}, "
                      f"Fine-tuned: {info.fine_tuned}")
            
            if models:
                # Load first available model
                first_model = list(models.keys())[0]
                llm.load_local_model(first_model)
                print(f"✓ Loaded model: {first_model}")
                
                # Get detailed model info
                info = llm.get_model_info()
                print(f"\nModel Details:")
                print(f"  Parameters: {info.get('model_parameters', 'N/A'):,}")
                print(f"  Trainable: {info.get('trainable_parameters', 'N/A'):,}")
                print(f"  Device: {info.get('device', 'N/A')}")
                print(f"  Vocab size: {info.get('vocab_size', 'N/A'):,}")
                
                # Demonstrate token counting
                test_text = "This is a test sentence for token counting."
                token_count = llm.count_tokens(test_text)
                print(f"\nToken count for '{test_text}': {token_count}")
                
                # Demonstrate embeddings (if embedding model available)
                try:
                    embedding = llm.get_embedding("Sample text for embedding")
                    print(f"Embedding dimension: {len(embedding)}")
                except Exception as e:
                    print(f"Embedding not available: {e}")
            
    except Exception as e:
        print(f"Error in model management: {e}")


def main():
    """Run all examples."""
    print("🚀 LocalizeLLM Demonstration Script")
    print("=" * 50)
    
    
    try:
        # Check if we have HuggingFace token (optional for public models)
        hf_token = os.getenv("HUGGINGFACE_TOKEN") or os.getenv("HF_TOKEN")
        if not hf_token:
            print("💡 Note: No HuggingFace token found. Using public models only.")
            print("   Set HUGGINGFACE_TOKEN env var to access gated models.")
        
        # Run examples
        basic_usage_example()
        fine_tuning_example()
        # parameter_optimization_example()
        # model_management_example()
        
        print("\n🎉 All examples completed successfully!")
        
    except KeyboardInterrupt:
        print("\n⚠️  Demonstration interrupted by user")
    except Exception as e:
        print(f"\n❌ Error during demonstration: {e}")
        print("💡 Make sure you have the required dependencies installed:")
        print("   pip install torch transformers sentence-transformers datasets accelerate")


if __name__ == "__main__":
    main()

Key Features:

  • ✅ Model downloading and loading from HuggingFace Hub
  • ✅ Text generation with customizable parameters (max_new_tokens, temperature)
  • ✅ Chat-based conversation with structured message handling
  • ✅ Fine-tuning with custom training data using LoRA optimization
  • ✅ Parameter optimization for finding best generation settings
  • ✅ Model management with snapshots and metadata tracking
  • ✅ Multiple model support (DialoGPT, LLaMA, Phi-3, etc.)
  • ✅ GPU/CPU switching with automatic device detection
  • ✅ Memory-efficient quantization support (4-bit, 8-bit)
  • ✅ Token counting and embedding generation
  • ✅ Comprehensive logging and progress tracking
  • ✅ Model caching system for improved performance
  • ✅ Validation dataset preparation for fine-tuning
  • ✅ Experiment tracking and snapshot management

Required Dependencies:

  • torch
  • transformers
  • sentence-transformers
  • datasets
  • accelerate
  • peft (for LoRA fine-tuning)
  • bitsandbytes (for quantization)

🔤 Embeddings & Prompts

Embeddings (embeddings.py)

from langxchange.embeddings import EmbeddingHelper

embedder = EmbeddingHelper(
    model_name="sentence-transformers/all-MiniLM-L6-v2",
    device="cpu"
)

# Generate embeddings
embedding = embedder.embed("This is a sample text.")
similarity = embedder.similarity(text1_embedding, text2_embedding)

Features:

  • ✅ Multiple embedding models
  • ✅ Batch processing
  • ✅ Similarity metrics
  • ✅ GPU acceleration

Prompt Helper (prompt_helper.py)

from langxchange.prompt_helper import PromptTemplate

template = PromptTemplate(
    template="Answer the question: {question}\nContext: {context}",
    input_variables=["question", "context"]
)

prompt = template.format(
    question="What is AI?",
    context="AI stands for Artificial Intelligence..."
)

Features:

  • ✅ Template engine
  • ✅ Variable substitution
  • ✅ Validation
  • ✅ Custom functions

🔬 Examples

Basic RAG Implementation

import os
from langxchange.openai_helper import EnhancedOpenAIHelper, OpenAIConfig
from langxchange.chroma_helper import EnhancedChromaHelper, ChromaConfig
from langxchange.documentloader import DocumentLoaderHelper, ChunkingStrategy

# Set API key (or use environment variable)
os.environ["OPENAI_API_KEY"] = "your-openai-key"

# Initialize components with enhanced configurations
openai_config = OpenAIConfig(model="gpt-4", enable_cost_tracking=True)
openai = EnhancedOpenAIHelper(openai_config)

chroma_config = ChromaConfig(persist_directory="./chroma", batch_size=100)
chroma = EnhancedChromaHelper(openai, chroma_config)

# Load and process documents
loader = DocumentLoaderHelper(
    chunking_strategy=ChunkingStrategy.SEMANTIC,
    chunk_size=800
)
chunks = list(loader.load("data.csv"))

# Store in vector database
chroma.insert_documents(
    collection_name="knowledge_base",
    documents=[chunk.content for chunk in chunks],
    metadatas=[chunk.metadata for chunk in chunks],
    generate_embeddings=True
)

# Query
results = chroma.query_collection(
    collection_name="knowledge_base",
    query_text="What are the main topics?",
    top_k=5
)

Database + Vector Search

# Store structured data in MySQL
mysql = MySQLHelper()
mysql.insert_dataframe("products", product_df)

# Store unstructured data in vector DB
chroma.insert_documents(
    collection_name="product_descriptions",
    documents=product_df["description"].tolist(),
    generate_embeddings=True
)

# Cross-reference search
sql_results = mysql.query("SELECT * FROM products WHERE category = :cat", {"cat": "electronics"})
vector_results = chroma.query_collection(
    collection_name="product_descriptions",
    query_text="high quality electronics",
    top_k=10
)

Multi-Modal RAG

# Process documents with images
loader = DocumentLoaderHelper(
    chunking_strategy=ChunkingStrategy.SEMANTIC,
    image_processing=ImageProcessingStrategy.EXTRACT_TEXT
)

chunks = list(loader.load("document_with_images.pdf"))

# Store text and image metadata
chroma.insert_documents(
    collection_name="multimodal_docs",
    documents=[chunk.content for chunk in chunks],
    metadatas=[
        {
            "has_images": chunk.has_images,
            "image_count": len(chunk.images),
            "page_number": chunk.metadata.get("page", 1)
        } for chunk in chunks
    ],
    generate_embeddings=True
)

🎯 RAG Tutorial

This tutorial demonstrates a complete RAG (Retrieval-Augmented Generation) implementation using the LangXChange toolkit. We'll build a system that processes CSV data, stores it in a vector database, and enables intelligent querying with LLM responses.

Complete RAG Example

#!/usr/bin/env python3
"""
Complete RAG Implementation using LangXChange
This example demonstrates:
- CSV data processing and chunking
- Vector database storage with ChromaDB
- Intelligent retrieval and re-ranking
- LLM-powered response generation
- Cost tracking and performance monitoring
"""

import os
import time
from pathlib import Path
import pandas as pd
from dotenv import load_dotenv

from langxchange.documentloader import DocumentLoaderHelper, ChunkingStrategy, ImageProcessingStrategy
from langxchange.embeddings import EmbeddingHelper
from langxchange.mysql_helper import MySQLHelper
from langxchange.openai_helper import EnhancedOpenAIHelper, OpenAIConfig
from langxchange.chroma_helper import EnhancedChromaHelper, ChromaConfig
from langxchange.localllm import LocalLLM
from langxchange.retrieverX import EnhancedRetrieverX, RetrievalResult, create_retriever_from_config, batch_retrieve
from langxchange.prompt_helper import EnhancedPromptHelper, PromptMode
from langxchange.localizellm import LocalizeLLM, LocalLLMConfig, create_local_llm

def main():
    """Complete RAG implementation demonstrating all LangXChange capabilities."""
    
    # Load environment variables
    load_dotenv()
    
    # ─── 1) Configuration ────────────────────────────────────────────────────────
    # Set your API keys (you can also use environment variables)
    os.environ["OPENAI_API_KEY"] = "your-openai-api-key"
    os.environ["CHROMA_PERSIST_PATH"] = "__db/chromadb"
    
    # Configuration constants
    CHROMA_DIR = "__db/chromadb"
    COLLECTION_NAME = "student_info_collection"
    
    # ─── 2) Initialize Components ────────────────────────────────────────────────
    
    # Enhanced OpenAI configuration
    open_ai_config = OpenAIConfig(
        chat_model="gpt-3.5-turbo",
        enable_caching=True,
        enable_cost_tracking=True,
        max_retries=3,
        log_level="INFO"
    )
    
    # Enhanced Chroma configuration for better performance
    chroma_config = ChromaConfig(
        persist_directory=CHROMA_DIR,
        batch_size=100,
        max_workers=8,
        progress_bar=True
    )
    
    # Document loader with optimized settings for CSV data
    loader = DocumentLoaderHelper(
        chunk_size=800,
        chunking_strategy=ChunkingStrategy.SEMANTIC,
        preserve_formatting=True
    )
    
    # Initialize core components
    llm = EnhancedOpenAIHelper(open_ai_config)
    chroma = EnhancedChromaHelper(llm, chroma_config)
    
    # ─── 3) Retriever Configuration ─────────────────────────────────────────────
    retreiver_config = {
        "vector_db": chroma,
        "embedder": llm,
        "reranker_model": None,
        "use_rerank": False,
        "rerank_multiplier": 3.0,
        "db_type": "chroma"
    }
    
    # Prompt helper for RAG responses
    prompt_response = EnhancedPromptHelper(
        llm=llm,
        system_prompt="You are a helpful assistant that answers questions based on the provided context.",
        default_mode=PromptMode.AUGMENTED,
        max_context_length=1500,
        max_snippets=3
    )
    
    # ─── 4) Data Preparation ─────────────────────────────────────────────────────
    
    # Create data directory if it doesn't exist
    Path("data").mkdir(exist_ok=True)
    output_path = "data/student_scores.csv"
    
    # Sample CSV data (in practice, this would be your actual data)
    sample_data = {
        'student_id': range(1, 101),
        'name': [f'Student_{i}' for i in range(1, 101)],
        'math_score': [85 + (i % 15) for i in range(1, 101)],
        'science_score': [80 + (i % 20) for i in range(1, 101)],
        'english_score': [75 + (i % 25) for i in range(1, 101)],
        'grade': ['A' if i > 90 else 'B' if i > 80 else 'C' for i in range(90, 190)]
    }
    
    df = pd.DataFrame(sample_data)
    df.to_csv(output_path, index=False)
    print(f"💾 Sample data created: {output_path} ({len(df)} records)")
    
    # ─── 5) CSV Processing and Chroma Ingestion ─────────────────────────────────
    
    print("🔄 Processing CSV data for Chroma ingestion...")
    
    # Load and process the CSV file
    chunks = list(loader.load(output_path))
    print(f"📝 Generated {len(chunks)} chunks from CSV data")
    print(f"Processing time: {loader.stats['times']['total']:.3f}s")
    
    # Prepare enhanced metadata for better searchability
    enhanced_metadata = []
    documents = []
    
    for i, chunk in enumerate(chunks):
        metadata = {
            "source": Path(output_path).name,
            "chunk_id": i,
            "data_type": "student_data",
            "extraction_date": time.strftime("%Y-%m-%d %H:%M:%S"),
        }
        enhanced_metadata.append(metadata)
        documents.append(str(chunk.content))
    
    # ─── 6) Batch Processing and Ingestion to Chroma ─────────────────────────────
    
    print("⚡ Starting enhanced batch ingestion to ChromaDB...")
    ingest_start = time.perf_counter()
    
    try:
        # Batch process for efficiency
        batch_size = chroma_config.batch_size
        total_chunks = len(documents)
        
        for i in range(0, total_chunks, batch_size):
            batch_end = min(i + batch_size, total_chunks)
            batch_chunks = documents[i:batch_end]
            batch_metadata = enhanced_metadata[i:batch_end]
            
            print(f"📤 Processing batch {i//batch_size + 1}/{(total_chunks + batch_size - 1)//batch_size}")
            
            # Insert batch to Chroma
            chroma.insert_documents(
                collection_name=COLLECTION_NAME,
                documents=batch_chunks,
                metadatas=batch_metadata,
                generate_embeddings=True,
            )
            
            print(f"✅ Batch {i//batch_size + 1} inserted successfully")
        
        # Get final collection stats
        collection_count = chroma.get_collection_count(COLLECTION_NAME)
        
        ingest_time = time.perf_counter() - ingest_start
        print(f"🎉 Ingestion completed successfully!")
        print(f"📊 Collection '{COLLECTION_NAME}' now contains {collection_count} documents")
        print(f"⏱️  Total ingestion time: {ingest_time:.2f} seconds")
        print(f"📈 Processing rate: {collection_count/ingest_time:.2f} documents/second")
        
    except Exception as e:
        print(f"❌ Error during ingestion: {str(e)}")
        raise
    
    # ─── 7) Retrieval System Testing ────────────────────────────────────────────
    
    print("\n🔍 Testing retrieval system...")
    
    # Create retriever from configuration
    retriever = create_retriever_from_config(retreiver_config)
    
    # Test queries
    test_queries = [
        "Select best student with highest scores",
        "What students have A grades?",
        "Show me students with high math scores",
        "Find students who excel in science"
    ]
    
    for query in test_queries:
        print(f"\n🔎 Query: {query}")
        
        # Retrieve documents
        results = retriever.retrieve(
            query=query,
            top_k=3,
            collection_name=COLLECTION_NAME
        )
        
        print(f"📋 Retrieved {len(results)} results:")
        for i, result in enumerate(results, 1):
            print(f"  {i}. Score: {result.score:.3f}")
            print(f"     Content: {result.document[:100]}...")
    
    # ─── 8) RAG Response Generation ─────────────────────────────────────────────
    
    print("\n🧠 Generating RAG responses...")
    
    # Select a query for detailed RAG demonstration
    main_query = "What are the characteristics of top-performing students?"
    
    # Get retrieval results
    retrieval_results = retriever.retrieve(
        query=main_query,
        top_k=2,
        collection_name=COLLECTION_NAME
    )
    
    # Generate RAG response
    response = prompt_response.run(
        user_query=main_query,
        retrieval_results=retrieval_results,
        mode=PromptMode.AUGMENTED,
        temperature=0.5
    )
    
    print(f"\n🤖 RAG Response:")
    print(f"Query: {main_query}")
    print(f"Answer: {response}")
    
    # ─── 9) MySQL Integration Example ───────────────────────────────────────────
    
    print("\n💾 Demonstrating MySQL integration...")
    
    try:
        # Initialize MySQL helper
        mysql = MySQLHelper()
        
        # Check connection health
        health = mysql.health_check()
        print(f"Database status: {health['status']}")
        
        if health['status'] == 'healthy':
            # Store student data in MySQL for structured queries
            result = mysql.insert_dataframe(
                table_name="students",
                dataframe=df,
                if_exists="replace"
            )
            print(f"MySQL: {result}")
            
            # Perform structured query
            structured_results = mysql.query(
                "SELECT * FROM students WHERE math_score > :min_score",
                params={"min_score": 90}
            )
            print(f"Found {len(structured_results)} high-performing students in math")
            
    except Exception as e:
        print(f"MySQL integration skipped: {str(e)}")
    
    # ─── 10) Performance Summary ────────────────────────────────────────────────
    
    print("\n📊 Performance Summary:")
    print(f"• Documents processed: {collection_count}")
    print(f"• Ingestion time: {ingest_time:.2f} seconds")
    print(f"• Average processing rate: {collection_count/ingest_time:.2f} docs/sec")
    
    # Cost tracking (if enabled)
    try:
        cost_summary = llm.get_cost_summary()
        print(f"• API costs: ${cost_summary['total_cost']:.4f}")
        print(f"• Total tokens: {cost_summary['total_tokens']}")
    except:
        print("• Cost tracking not available")
    
    print("\n✅ RAG implementation complete!")

if __name__ == "__main__":
    main()

Key Features Demonstrated

This RAG example showcases:

  1. 📊 Data Processing: CSV parsing and semantic chunking
  2. 🔍 Vector Storage: ChromaDB with batch processing
  3. 🧠 Intelligent Retrieval: Two-stage retrieval with scoring
  4. 🤖 LLM Integration: OpenAI for response generation
  5. 💾 Structured Data: MySQL for relational queries
  6. 📈 Performance Monitoring: Cost tracking and timing
  7. 🔧 Modular Design: Easy to customize and extend

Customization Options

  • Vector Database: Switch between ChromaDB, Pinecone, or FAISS
  • LLM Provider: Use Anthropic, Google, or local models
  • Document Formats: Support for PDF, DOCX, images
  • Chunking Strategies: Semantic, fixed-size, or custom
  • Retrieval Modes: Basic, re-ranked, or hybrid

Google Drive Integration (google_drive_helper.py)

import os
from langxchange.google_drive_helper import EnhancedGoogleDriveHelper

# Initialize Google Drive helper with enhanced configuration
drive = EnhancedGoogleDriveHelper(
    credentials_path="credentials.json",  # Path to Google OAuth2 credentials file
    token_path="token.pickle",  # Path to store authentication token
    config={
        'max_retries': 5,
        'retry_delay': 2.0,
        'chunk_size': 2 * 1024 * 1024,  # 2MB chunks
        'max_workers': 8,
        'log_level': 'INFO',
        'progress_callback': True
    }
)

# Create folder structure
folder_id = drive.create_folder(
    name="AI Research Documents",
    description="Collection of AI research papers and documents"
)

# Upload single file with progress tracking
file_id = drive.upload_file(
    file_path="research_paper.pdf",
    parent_id=folder_id,
    description="Latest AI research paper",
    progress_callback=lambda current, total: print(f"Upload progress: {(current/total)*100:.1f}%")
)

# Batch upload multiple files
file_paths = ["doc1.pdf", "doc2.docx", "data.xlsx", "notes.txt"]
upload_results = drive.batch_upload_files(
    file_paths=file_paths,
    parent_id=folder_id,
    progress_callback=lambda completed, total: print(f"Batch upload: {completed}/{total} files")
)

# Search and filter files
search_results = drive.search_files(
    query="mimeType='application/pdf' and name contains 'AI'",
    max_results=10,
    fields="id,name,mimeType,size,modifiedTime"
)

# List files in folder with type filtering
pdf_files = drive.list_files_in_folder(
    folder_id=folder_id,
    include_subfolders=True,
    file_types=["application/pdf"]
)

# Download file with progress tracking
drive.download_file(
    file_id=file_id,
    output_path="downloaded_file.pdf",
    progress_callback=lambda current, total: print(f"Download progress: {(current/total)*100:.1f}%")
)

# Manage file permissions and sharing
permission_id = drive.share_file(
    file_id=file_id,
    email="colleague@example.com",
    role="writer",
    type_="user"
)

# Get storage quota information
quota_info = drive.get_storage_quota()
print(f"Storage used: {quota_info['usage']} / {quota_info['limit']} bytes")

# File management operations
# Rename file
new_name = drive.rename_file(file_id=file_id, new_name="Updated_Research_Paper.pdf")

# Move file to different folder
new_folder_id = drive.create_folder("Archive", parent_id=folder_id)
drive.move_file(file_id=file_id, new_parent_id=new_folder_id)

# Copy file
copy_id = drive.copy_file(file_id=file_id, new_name="Backup_Research_Paper.pdf")

# Export Google Docs to different formats
# Export formats include: 'pdf', 'docx', 'odt', 'txt', 'html' for Google Docs
# 'xlsx', 'ods', 'csv', 'tsv' for Google Sheets
# 'pptx', 'odp', 'jpeg', 'png' for Google Slides
drive.export_google_doc(
    file_id=file_id,
    export_format="application/pdf",
    output_path="exported_document.pdf"
)

# Get detailed file metadata
metadata = drive.get_file_metadata(file_id=file_id)
print(f"File: {metadata['name']} ({metadata['size']} bytes)")

# Clean up
drive.close()

Key Features:

  • ✅ Robust error handling with exponential backoff retry mechanisms
  • ✅ Progress tracking for uploads/downloads with configurable callbacks
  • ✅ Batch operations with concurrent file processing
  • ✅ Advanced search capabilities using Google Drive query syntax
  • ✅ File permissions management and sharing controls
  • ✅ Comprehensive logging and configuration management
  • ✅ Export Google Docs, Sheets, and Slides to various formats
  • ✅ Memory-efficient operations with configurable chunk sizes
  • ✅ Support for both individual and batch file operations
  • ✅ Authentication handling with automatic token refresh

Required Dependencies:

  • google-api-python-client
  • google-auth-oauthlib
  • google-auth

Google Cloud Storage Integration (google_cs_helper.py)

import os
from langxchange.google_cs_helper import EnhancedGoogleCloudStorageHelper
import tempfile

# Initialize GCS helper with enhanced configuration
gcs = EnhancedGoogleCloudStorageHelper(
    credentials_path="service-account.json",  # Path to service account JSON file
    project_id="my-gcp-project",  # GCP project ID
    logger=logging.getLogger("gcs_helper")  # Custom logger
)

# Create bucket with custom configuration
bucket = gcs.create_bucket(
    bucket_name="my-storage-bucket",
    location="US-CENTRAL1",  # Bucket location
    storage_class="STANDARD",  # Storage class
    force_create=True  # Create even if exists
)

# Upload single file with metadata
upload_result = gcs.upload_file(
    bucket_name="my-storage-bucket",
    file_path="research_paper.pdf",
    destination_blob_name="documents/research_paper.pdf",
    metadata={
        "purpose": "research",
        "author": "team",
        "version": "1.0"
    },
    content_type="application/pdf"
)
print(f"Uploaded: {upload_result['public_url']}")

# Batch upload multiple files
file_mappings = [
    ("data1.csv", "data/raw/data1.csv"),
    ("data2.csv", "data/raw/data2.csv"),
    ("config.json", "config/application.json")
]
batch_results = gcs.batch_upload(
    bucket_name="my-storage-bucket",
    file_mappings=file_mappings,
    metadata={"batch": "2025-01-data-import"}
)

# List blobs with filtering and metadata
pdf_blobs = gcs.list_blobs(
    bucket_name="my-storage-bucket",
    prefix="documents/",
    include_metadata=True
)
for blob in pdf_blobs:
    print(f"Found: {blob['name']} ({blob['size']} bytes)")

# Download file with automatic directory creation
download_info = gcs.download_file(
    bucket_name="my-storage-bucket",
    blob_name="documents/research_paper.pdf",
    destination_file_path="downloads/research_paper.pdf",
    create_dirs=True
)
print(f"Downloaded to: {download_info['local_path']}")

# Get detailed blob metadata
metadata = gcs.get_blob_metadata(
    bucket_name="my-storage-bucket",
    blob_name="documents/research_paper.pdf"
)
print(f"Content type: {metadata['content_type']}")
print(f"Size: {metadata['size']} bytes")
print(f"Custom metadata: {metadata['custom_metadata']}")

# Copy blob to different bucket
copy_result = gcs.copy_blob(
    source_bucket="my-storage-bucket",
    source_blob="documents/research_paper.pdf",
    destination_bucket="backup-storage-bucket",
    destination_blob="archive/research_paper.pdf"
)
print(f"Copied: {copy_result['source']} -> {copy_result['destination']}")

# Generate signed URL for temporary access
signed_url = gcs.generate_signed_url(
    bucket_name="my-storage-bucket",
    blob_name="documents/research_paper.pdf",
    expiration_minutes=60,  # URL expires in 1 hour
    method="GET"  # HTTP method
)
print(f"Signed URL: {signed_url}")

# Get comprehensive bucket information
bucket_info = gcs.get_bucket_info("my-storage-bucket")
print(f"Bucket location: {bucket_info['location']}")
print(f"Storage class: {bucket_info['storage_class']}")
print(f"Versioning enabled: {bucket_info['versioning_enabled']}")
print(f"Labels: {bucket_info['labels']}")

# Clean up operations
# Delete specific blob
success = gcs.delete_blob(
    bucket_name="my-storage-bucket",
    blob_name="documents/old_file.txt"
)

# Delete bucket (with force option to clear all blobs)
# gcs.delete_bucket("my-storage-bucket", force=True)

Key Features:

  • ✅ Comprehensive error handling with logging and validation
  • ✅ Support for both service account and application default credentials
  • ✅ Bucket creation with custom configuration (location, storage class)
  • ✅ File upload/download with metadata and content type support
  • ✅ Batch operations for efficient multi-file handling
  • ✅ Blob listing with filtering, prefix matching, and metadata inclusion
  • ✅ Blob copying between buckets and locations
  • ✅ Signed URL generation for secure temporary access
  • ✅ Detailed metadata retrieval for blobs and buckets
  • ✅ Bucket information and lifecycle management
  • ✅ Path validation and automatic directory creation
  • ✅ Proper cleanup operations for resources

Required Dependencies:

  • google-cloud-storage
  • google-auth
  • google-auth-oauthlib
  • google-auth-httplib2

🛠️ Advanced Usage

Custom Embeddings

import os
import logging
import numpy as np
from sentence_transformers import SentenceTransformer
from langxchange.embeddings import EmbeddingHelper

# Setup logging
logging.basicConfig(level=logging.INFO)
    
# Example with SentenceTransformer
try:
    from sentence_transformers import SentenceTransformer
        
    model = SentenceTransformer('all-MiniLM-L6-v2')
    helper = EmbeddingHelper(
        model, 
        batch_size=16, 
        max_workers=4,
        return_numpy=True )
        
    # Test texts
    texts = [
            "This is a test sentence.",
            "Another example text.",
            "Machine learning is fascinating.",
            "",  # Edge case: empty string
            "Final test sentence."
    ]
        
    # Generate embeddings
    embeddings = helper.embed(texts)
        
    print(f"Generated {len([e for e in embeddings if e is not None])}/{len(texts)} embeddings")
    print(f"Embedding dimension: {helper.get_embedding_dimension()}")
        
except ImportError:
    print("SentenceTransformer not available for testing")

Batch Operations

# Batch query multiple collections
results = batch_retrieve(
    retrievers=[retriever1, retriever2],
    query="your question",
    top_k=5
)

Custom Prompts

"""
Example usage of the improved PromptHelper class.
Demonstrates different prompt building modes and tool integration.
"""
import os
from langxchange.prompt_helper import EnhancedPromptHelper, PromptMode
from typing import Dict, List, Any
from langxchange.openai_helper import EnhancedOpenAIHelper, OpenAIConfig
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Initialize LLM helper with configuration
open_ai_config = OpenAIConfig(
    chat_model="gpt-3.5-turbo",
    enable_caching=True,
    enable_cost_tracking=True,
    max_retries=3,
    log_level="INFO"
)
llm = EnhancedOpenAIHelper(open_ai_config)

# Initialize PromptHelper with different configurations
helper = EnhancedPromptHelper(
    llm=llm,
    system_prompt="You are a helpful assistant.",
    default_mode=PromptMode.AUGMENTED,
    max_context_length=1500,
    max_snippets=3
)

# Register tools for enhanced functionality
def sample_search_tool(query: str, limit: int = 3) -> List[str]:
    """Sample search function."""
    return [f"Result {i+1} for '{query}'" for i in range(limit)]

helper.register_tool("search", sample_search_tool)

# Sample retrieval results
sample_retrieval_results = [
    {
        "text": "Machine learning is a subset of artificial intelligence that focuses on algorithms that can learn from data.",
        "metadata": {"source": "ML_Basics.pdf", "score": 0.95}
    },
    {
        "text": "Deep learning uses neural networks with multiple layers to model and understand complex patterns.",
        "metadata": {"source": "Deep_Learning_Guide.pdf", "score": 0.87}
    }
]

user_query = "What is machine learning and how does it relate to AI?"

# Test different prompt modes
modes_to_test = [PromptMode.BASIC, PromptMode.AUGMENTED, PromptMode.CONTEXTUAL, PromptMode.SUMMARIZED]

for mode in modes_to_test:
    print(f"\n--- {mode.value.upper()} MODE ---")
    
    # Build prompt with retrieval results
    messages = helper.build_prompt(
        user_query=user_query,
        retrieval_results=sample_retrieval_results,
        mode=mode
    )
    
    print(f"Generated {len(messages)} messages")
    
    # Run with LLM
    response = helper.run(
        user_query=user_query,
        retrieval_results=sample_retrieval_results,
        mode=mode,
        temperature=0.5
    )
    print(f"LLM Response: {response}")

# Tool integration example
print("\n=== Tool Integration ===")

# Use tools with queries
search_result = helper.call_tool("search", "artificial intelligence", limit=2)
print(f"Search tool result: {search_result}")

# Run with multiple tools
tool_calls = [
    {"name": "search", "kwargs": {"query": "machine learning", "limit": 2}}
]

result = helper.run_with_tools(
    user_query=user_query,
    tool_calls=tool_calls,
    retrieval_results=sample_retrieval_results,
    mode=PromptMode.SUMMARIZED
)

print(f"\nRun with tools result:")
print(f"  LLM Response: {result['llm_response']}")
print(f"  Tool Results: {result['tool_results']}")

# Configuration updates
helper.update_config(
    default_mode=PromptMode.CONTEXTUAL,
    max_context_length=2500,
    max_snippets=5
)

# Custom system prompt
custom_response = helper.run(
    user_query="Explain briefly",
    retrieval_results=sample_retrieval_results[:1],
    custom_system_prompt="You are a concise technical expert. Provide brief, accurate answers."
)

print(f"Custom system prompt response: {custom_response}")

# Edge case handling
no_retrieval_response = helper.run(
    user_query="What is 2+2?",
    retrieval_results=None,
    mode=PromptMode.AUGMENTED
)
print(f"No retrieval results: {no_retrieval_response}")

📈 Performance Tips

  1. Use Connection Pooling: Configure pool sizes for database connections
  2. Batch Operations: Process documents in batches for better performance
  3. Cache Results: Enable caching for frequently accessed data
  4. GPU Acceleration: Use GPU for embedding generation when available
  5. Memory Management: Monitor memory usage for large datasets

🐛 Troubleshooting

Common Issues

Connection Errors:

  • Verify API keys and network connectivity
  • Check environment variables
  • Ensure database services are running

Memory Issues:

  • Reduce batch sizes for large datasets
  • Use memory-efficient embedding models
  • Monitor system resources

Performance Issues:

  • Enable progress bars for monitoring
  • Use appropriate chunk sizes
  • Optimize vector database settings

Debug Mode

import logging
logging.basicConfig(level=logging.DEBUG)

# All components will now output detailed logs

🤝 Contributing

We welcome contributions!

🕵️ Tracing & Observability

LangXChange v0.9.6+ includes a built-in tracing system designed for production observability. It captures every LLM call, retrieval operation, and tool execution as structured TraceSpan objects.

Quick Start: Logging Traces

The easiest way to see tracing in action is to register a callback that prints span metadata:

from langxchange import set_trace_callback
from langxchange.openai_helper import EnhancedOpenAIHelper, OpenAIConfig

# 1. Define a simple listener
def on_trace(span):
    print(f"DEBUG: {span['name']} took {span['duration_ms']:.2f}ms")
    if "model" in span['metadata']:
        print(f"Model: {span['metadata']['model']}")

# 2. Register it globally
set_trace_callback(on_trace)

# 3. Operations now automatically emit traces
llm = EnhancedOpenAIHelper(OpenAIConfig(api_key="sk-...", allow_tracing=True))
llm.chat([{"role": "user", "content": "What is 2+2?"}])

Callback-based Tracing (Advanced)

You can register a global or context-specific callback to capture spans as they complete. This is ideal for streaming traces to a frontend or logging them to an external system.

from langxchange import set_trace_callback, start_trace

def my_trace_handler(span_data):
    print(f"Span Ended: {span_data['name']} ({span_data['duration_ms']:.2f}ms)")
    # Send to SSE, WebSocket, or database
    
# Register the callback
set_trace_callback(my_trace_handler)

# Start a new trace context
start_trace()

# Your LLM/RAG operations...
# Every finished span will trigger my_trace_handler

TraceSpan Structure

Each span contains hierarchy information (parent/child relationships), timing, status, and metadata:

  • name: Operation name (e.g., openai_achat, vector_source)
  • trace_id: Unique ID for the entire request
  • span_id: Unique ID for this specific operation
  • parent_id: ID of the parent operation (for tree visualization)
  • duration_ms: Precise execution time
  • metadata: Provider-specific details (model name, tokens, etc.)

📄 License

This project is licensed under the MIT License - see the LICENSE file for details.

🙏 Acknowledgments

  • OpenAI for GPT models
  • ChromaDB for vector storage
  • Sentence Transformers for embeddings
  • Community contributors

📞 Support


Made with ❤️ by the LangXChange AI Labs Team

Website

PyPI

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

langxchange-0.10.12.tar.gz (279.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

langxchange-0.10.12-py3-none-any.whl (229.6 kB view details)

Uploaded Python 3

File details

Details for the file langxchange-0.10.12.tar.gz.

File metadata

  • Download URL: langxchange-0.10.12.tar.gz
  • Upload date:
  • Size: 279.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for langxchange-0.10.12.tar.gz
Algorithm Hash digest
SHA256 40ad196222e9621c7b40750eb44b127e48520811b45a1f1de642d6e78de8a559
MD5 55811958e4a1589b373eb2c0a36be64e
BLAKE2b-256 ec9a28290bb400390c42cdd8e294788a101bf42b2b02af08349485fe02e27bda

See more details on using hashes here.

File details

Details for the file langxchange-0.10.12-py3-none-any.whl.

File metadata

  • Download URL: langxchange-0.10.12-py3-none-any.whl
  • Upload date:
  • Size: 229.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for langxchange-0.10.12-py3-none-any.whl
Algorithm Hash digest
SHA256 04d1ecf63631f93ed69f73daea09978725da92ac76216f1969604913eda8abf7
MD5 2bc7526943018f3c08f43be0c23c8dd2
BLAKE2b-256 d62c66201feb5e4b78d12c1b82b4e62ce93bd2d57303448d790cacf3e0c3808a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page