AI Framework for fast integration of Private Data and LLM, Agent Ochestration Platform
Project description
LangXChange Framework
A comprehensive Python Framework for LLM operations, vector databases, RAG implementations, Knowledge Graph (GraphRAG) construction, database integration, MCP Service Management and local model management
Installation • Quick Start • Documentation • Examples • RAG Tutorial
🌟 Overview
LangXChange is a powerful, comprehensive Python Framework designed to streamline LLM operations and Retrieval-Augmented Generation (RAG) implementations. It provides unified interfaces for multiple LLM providers, vector databases, document processing, database integration, and local model management.
🚀 Key Features
- 🤖 Multi-LLM Support: OpenAI, Anthropic, Google GenAI, DeepSeek, Llama, MiniMax
- 🔍 Vector Databases: ChromaDB, Pinecone, FAISS integration
- 📄 Document Processing: Universal document loader with multiple formats
- 🧠 RAG Implementation: Two-stage retrieval with cross-encoder
- 🕸️ GraphRAG: Knowledge Graph extraction from documents and Neo4j-based retrieval
- 🏗️ Structured Parser Registry: Extensible domain-specific parsing (e.g., Curriculum, Legal) with zero-dependency base library
- 🏠 Local LLM: Model downloading, fine-tuning, quantization
- 💾 Database Integration: MySQL, MongoDB support
- 🛠️ MCP Services: Standardized interoperability with any MCP server (stdio/SSE)
- 💰 Cost Tracking: Built-in cost monitoring for API calls
- ⚠️ Specialized Errors: Unified
InsufficientQuotaErroracross all major providers (OpenAI, Anthropic, Google, DeepSeek, MiniMax) - ⚡ Performance: Hybrid Orchestration (Fast-Path for small files), Async operations, Native Batch Processing
- 🔍 Parallel RetrieverX: Concurrent retrieval from VectorDB, GraphDB, MCP, GCS, and more with RRF fusion
- 🕵️ Tracing & Observability: Integrated request-level tracing with
TraceSpanand programmatic callbacks (v0.9.6+) - 🧠 Enhanced Prompting: Strategic RAG grounding with multiple modes (
CONTEXTUAL,AUGMENTED,SUMMARIZED) (v0.9.7+) - 🕸️ Knowledge Graph: Parallel entity and relationship extraction for graph construction
- 🚀 Real-time Monitoring: Stream trace spans directly to your frontend via Server-Sent Events (SSE)
🆕 What's New in [v0.10.11] - 2026-04-03
- ✅ MongoDB Graph Support: Introduced
MongoGraphHelperfor using MongoDB as a property graph backend with$graphLookuptraversal support. - ✅ Generic GraphSourceAdapter: Refactored
ParallelRetrieverXadapters to support any graph backend (Neo4j, Memgraph, MongoDB) seamlessly. - ✅ Async Vector Search: Added
search_similar_by_vector_asyncsupport inGraphSourceAdapter.
[v0.10.10] - 2026-03-29
✅ Standardized GraphRAG Support: Unified achat() and chat() interfaces across all LLM helpers (Anthropic, DeepSeek, Google GenAI, OpenAI, MiniMax) for seamless Knowledge Graph construction.
- ✅ Anthropic API Robustness: Fixed
400 Bad Requesterrors by normalizingsystemparameters into structured content blocks. - ✅ Enhanced Provider Consistency: Improved
graph_service.pyandworker.pyto strictly respectexternalcredential modes for agents.
🆕 What's New in v0.10.6
- ✅ Fixed MiniMax 400 Error: Updated MiniMax endpoint and sanitized message content to prevent "chat content is empty" errors.
- ✅ Fixed Typing Imports: Resolved
NameError: name 'Callable' is not definedingoogle_genai_helper.py.
🆕 What's New in v0.10.5
- ✅ Token-Aware Context Truncation: Added
manage_context_lengthtoOpenAIHelperfor precise window management. - ✅ Robust RAG Trimming: Enhanced
PromptBuilderto strictly respectmax_context_lengthfor retrieval results (Vector, Graph, MCP). - ✅ Enhanced OpenAI Helper: Improved token counting accuracy using
tiktoken(cl100k_base fallback for newer models).
🆕 What's New in v0.10.4
- ✅ Neo4j SSL/TLS Support: Added support for encrypted Bolt connections (
bolt+s://,bolt+ssc://). - ✅ Neo4j 6.x Compatibility: Added explicit trust handling for self-signed certificates in newer Neo4j drivers by using
TrustAll()andtrusted_certificates. - ✅ Fixed Configuration Mapping: Corrected prioritization of
connection_stringoverhostin configuration parsing to prevent protocol loss.
🆕 What's New in v0.10.3
- ✅ Neo4j Config Evolution: Fixed
from_dict()to correctly map thehostfield, improving compatibility with various graph configuration formats. - ✅ Fixed Credential Overrides: Environment variables now only serve as fallback defaults in
Neo4jConfig, ensuring explicitly provided credentials (e.g., from dynamic configs) are never overridden.
What's New in v0.10.0
- ✅ MiniMax Endpoint Fix: Updated MiniMax API base URL from retired
api.minimax.chat→api.minimax.io(international domain) and embedding pathtext/embedding→embeddings(OpenAI-compatible) - ✅ MiniMax Cache Bug Fix: Fixed
_cache_key() got multiple values for keyword argument 'model'error inachat()caused by duplicate kwargs
📦 Installation
Via PyPI (Recommended)
pip install langxchange
Environment Variables
Create a .env file in your project directory:
# OpenAI
OPENAI_API_KEY=your_openai_key
# MySQL Configuration
MYSQL_HOST=localhost
MYSQL_DB=your_database
MYSQL_USER=your_username
MYSQL_PASSWORD=your_password
MYSQL_PORT=3306
MYSQL_CHARSET=utf8mb4
# ChromaDB
CHROMA_PERSIST_PATH=./chroma_db
# Vector Databases
PINECONE_API_KEY=your_pinecone_key
PINECONE_ENVIRONMENT=your_environment
# Milvus Configuration
MILVUS_HOST=localhost
MILVUS_PORT=19530
MILVUS_API_KEY=your_milvus_token # Optional for local, required for Zilliz Cloud
# Elasticsearch Configuration
ELASTICSEARCH_HOST=http://localhost:9200
ELASTICSEARCH_API_KEY=your_api_key
🚀 Quick Start
import os
from langxchange.openai_helper import EnhancedOpenAIHelper, OpenAIConfig
from langxchange.chroma_helper import EnhancedChromaHelper, ChromaConfig
from langxchange.documentloader import DocumentLoaderHelper, ChunkingStrategy
from langxchange.embeddings import EmbeddingHelper
# Set API key (or use environment variable)
os.environ["OPENAI_API_KEY"] = "your-api-key"
# Configure OpenAI with enhanced settings
openai_config = OpenAIConfig(
chat_model="gpt-4",
enable_caching=True,
enable_cost_tracking=True,
max_retries=3
)
# Initialize OpenAI client
llm = EnhancedOpenAIHelper(openai_config)
# Configure ChromaDB with enhanced settings
chroma_config = ChromaConfig(
persist_directory="./chroma_db",
batch_size=100,
progress_bar=True
)
# Initialize ChromaDB vector store
chroma = EnhancedChromaHelper(llm, chroma_config)
# Load and process documents with semantic chunking
loader = DocumentLoaderHelper(
chunking_strategy=ChunkingStrategy.SEMANTIC,
chunk_size=800,
preserve_formatting=True
)
# Process documents and store in vector database
documents = list(loader.load("document.pdf"))
chroma.insert_documents(
collection_name="my_collection",
documents=[doc.content for doc in documents],
metadatas=[doc.metadata for doc in documents],
generate_embeddings=True
)
# Query the vector database
results = chroma.query_collection(
collection_name="my_collection",
query_text="What is machine learning?",
top_k=5
)
### 🔍 Elasticsearch Vector Store (New in v0.9.9)
```python
from langxchange.elasticsearch_helper import EnhancedElasticsearchHelper, ElasticsearchConfig
from langxchange.openai_helper import EnhancedOpenAIHelper
# Initialize LLM helper for embeddings
llm = EnhancedOpenAIHelper()
# Configure Elasticsearch
es_config = ElasticsearchConfig(
host="https://your-elasticsearch-host.com",
api_key="your-base64-api-key", # Enhanced support for API keys
embedding_dim=1536
)
# Initialize helper
es_helper = EnhancedElasticsearchHelper(llm_helper=llm, config=es_config)
# Store and Query
es_helper.insert_documents(
collection_name="my_index", # Equivalent to index_name
documents=["LangXChange is a comprehensive LLM framework."],
generate_embeddings=True
)
results = es_helper.query(
collection_name="my_index",
query_text="What is LangXChange?",
top_k=1
)
## 📚 Modules
### 🤖 LLM Providers
#### OpenAI Integration (`openai_helper.py`)
```python
from langxchange.openai_helper import EnhancedOpenAIHelper, OpenAIConfig
# Configure OpenAI with enhanced settings
open_ai_config = OpenAIConfig(
chat_model="gpt-4",
enable_caching=True,
enable_cost_tracking=True,
max_retries=3,
log_level="INFO"
)
openai = EnhancedOpenAIHelper(open_ai_config)
response = openai.generate(
prompt="Explain quantum computing in simple terms.",
system_message="You are a helpful AI assistant."
)
# Cost tracking
cost = openai.get_cost_summary()
print(f"Total cost: ${cost['total_cost']:.4f}")
Features:
- ✅ API key management and validation
- ✅ Response caching for cost optimization
- ✅ Cost tracking and reporting
- ✅ Support for all OpenAI models (GPT-3.5, GPT-4, embeddings)
- ✅ Batch processing capabilities
- ✅ Error handling and retry logic
Anthropic Integration (anthropic_helper.py)
import os
from langxchange.anthropic_helper import EnhancedAnthropicHelper, AnthropicConfig
# Set API key (or use environment variable)
os.environ["ANTHROPIC_API_KEY"] = "your-anthropic-key"
# Configure Anthropic with enhanced settings
anthropic_config = AnthropicConfig(
model="claude-3-sonnet-20240229",
enable_caching=True,
enable_cost_tracking=True,
max_retries=3,
log_level="INFO"
)
# Initialize Anthropic client
anthropic = EnhancedAnthropicHelper(anthropic_config)
response = anthropic.generate(
prompt="Analyze the following text for sentiment and key themes.",
max_tokens=500,
system_message="You are a helpful AI assistant."
)
# Cost tracking
cost = anthropic.get_cost_summary()
print(f"Total cost: ${cost['total_cost']:.4f}")
- ✅ Claude model support (Haiku, Sonnet, Opus, 3.5, 3.7)
- ✅ Enhanced configuration and caching
- ✅ Cost tracking and reporting
- ✅ Context window optimization
- ✅ Token counting and cost estimation
- ✅ Streaming responses
- ✅ Native Tool-Calling Support (OpenAI-compatible formatting)
- ✅ Message Normalization (supports tool_use/tool_result blocks)
- ✅ Fixed empty response bug for Anthropic provider
- ✅ Fixed generator return bug in sync chat calls
- ✅ Improved message content block handling (supports objects and dicts)
- ✅ Error handling and retry logic
Google GenAI Integration (google_genai_helper.py) — Updated in v0.5.1
import os
import asyncio
from langxchange.google_genai_helper import EnhancedGoogleGenAIHelper, GoogleGenAIHelper
# Set API key (or use environment variable)
os.environ["GOOGLE_API_KEY"] = "your-google-key"
# ── EnhancedGoogleGenAIHelper (recommended) ──────────────────────────────────
google_genai = EnhancedGoogleGenAIHelper(
api_key="your-google-key",
chat_model="gemini-2.0-flash",
vision_model="gemini-2.0-flash",
tts_model="gemini-2.0-flash-preview-tts",
enable_usage_tracking=True,
enable_context_caching=True
)
# ── Synchronous multi-turn chat ───────────────────────────────────────────────
messages = [
{"role": "system", "content": "You are a helpful research assistant."},
{"role": "user", "content": "Summarize the following research paper."},
]
response, usage = google_genai.chat(messages, temperature=0.3, max_tokens=1000)
print(response)
# ── Async chat (non-blocking, for FastAPI / async backends) ───────────────────
async def async_example():
response_text, usage, tool_calls = await google_genai.chat_async(
messages=messages,
temperature=0.7,
max_tokens=2000,
)
print(response_text)
print(f"Tokens used: {usage.get('total_tokens', 0)}")
asyncio.run(async_example())
#### 🕸️ GraphRAG & Knowledge Graph (`kg_builder.py`, `neo4j_helper.py`)
LangXChange supports building and querying Knowledge Graphs for advanced RAG (GraphRAG). This allows for complex relationship-aware retrieval beyond simple vector similarity.
```python
import asyncio
from langxchange.documentloader import DocumentLoaderHelper
from langxchange.openai_helper import EnhancedOpenAIHelper, OpenAIConfig
from langxchange.kg_builder import KnowledgeGraphBuilder
from langxchange.neo4j_helper import Neo4jHelper, Neo4jConfig
async def graph_rag_example():
# 1. Initialize Helpers
llm = EnhancedOpenAIHelper(OpenAIConfig(chat_model="gpt-4"))
neo4j = Neo4jHelper(Neo4jConfig(
uri="bolt://localhost:7687",
username="neo4j",
password="password"
))
# 2. Build Knowledge Graph from Document
loader = DocumentLoaderHelper()
chunks = [c.content for c in loader.load("manual.docx")]
kg_builder = KnowledgeGraphBuilder(llm_helper=llm)
triples = await kg_builder.process_chunks(chunks)
# 3. Store in Neo4j
neo4j.connect()
for t in triples:
neo4j.merge_node(label=t.subject_type, properties={"name": t.subject}, merge_on="name")
neo4j.merge_node(label=t.object_type, properties={"name": t.object}, merge_on="name")
neo4j.merge_relationship(
source_label=t.subject_type, source_match={"name": t.subject},
target_label=t.object_type, target_match={"name": t.object},
rel_type=t.predicate.upper().replace(" ", "_")
)
# 4. Perform GraphRAG Query
query = "How does component X interact with Y?"
# (Optional: Use LLM to extract keywords for Neo4j search)
graph_context = "..." # Retrieved via neo4j.execute_read(...)
response = await llm.achat([{"role": "user", "content": f"Context: {graph_context}\nQuery: {query}"}])
print(response)
asyncio.run(graph_rag_example())
🔍 Parallel RetrieverX (retrieverX.py)
LangXChange features a high-performance Parallel RetrieverX engine that handles concurrent retrieval from multiple heterogeneous sources (VectorDB, Knowledge Graphs, MCP Tools, GCS, and Local Files). It uses Reciprocal Rank Fusion (RRF) to merge results into a unified, ranked context.
import asyncio
from langxchange.retrieverX import ParallelRetrieverX, RetrievalSource
# ... initialize your adapters ...
async def parallel_retrieval_example():
# 1. Initialize RetrieverX
retriever = ParallelRetrieverX(max_workers=10)
# 2. Register Sources with Adapters
retriever.add_source(
source_type=RetrievalSource.VECTOR,
adapter=chroma_adapter, # Instance of VectorSourceAdapter
weight=1.0
)
retriever.add_source(
source_type=RetrievalSource.GRAPH,
adapter=graph_adapter, # Instance of GraphSourceAdapter
weight=1.5
)
# 3. Perform Concurrent Retrieval
results = await retriever.retrieve(
query="What are the safety protocols for process X?",
top_k=5
)
# 4. Access Merged Context
for res in results:
print(f"[{res.source}] (Score: {res.score:.2f}) {res.document[:100]}...")
asyncio.run(parallel_retrieval_example())
Features:
- ✅ Native Concurrency: Parallel execution of all retrieval tasks using Python's
asyncio. - ✅ RRF Fusion: Built-in Reciprocal Rank Fusion for high-quality merged results.
- ✅ Heterogeneous Sources: Unified adapter interface for VectorDB, GraphDB, MCP, GCS, and Drive.
- ✅ Ranking Optimization: Weighted boosting for specific sources (e.g., GraphDB > VectorDB).
- ✅ Standardized Output: Returns
RetrievalResultobjects with consistent metadata.
Features:
- ✅ Async Support: Full async extraction and database operations.
🍃 MongoDB Graph Support (mongograph_helper.py) — New in v0.10.11
LangXChange now supports using MongoDB as a high-performance property graph database. This is ideal for teams already using MongoDB who want GraphRAG capabilities without managing a separate Neo4j/Memgraph instance.
import asyncio
from langxchange.mongograph_helper import MongoGraphHelper, MongoGraphConfig
async def mongo_graph_example():
# 1. Configure MongoDB Graph
config = MongoGraphConfig(
uri="mongodb://admin:password@localhost:27017/graph_db?authSource=admin",
node_collection="nodes",
edge_collection="edges"
)
# 2. Initialize Helper
helper = MongoGraphHelper(config)
await helper.connect_async()
# 3. Store Entities and Relationships
await helper.merge_node_async(label="Company", properties={"id": "c1", "name": "LangXChange"})
await helper.merge_node_async(label="Technology", properties={"id": "t1", "name": "MongoDB"})
await helper.merge_relationship_async(
source_label="Company", source_match={"id": "c1"},
target_label="Technology", target_match={"id": "t1"},
rel_type="USES_BACKEND"
)
# 4. Traversal ($graphLookup)
neighbors = helper.get_neighbors("c1", direction="out")
for n in neighbors:
print(f"Connected to: {n['to_id']} via {n['type']}")
asyncio.run(mongo_graph_example())
🧠 Enhanced Prompt Strategy (prompt_helper.py) — New in v0.9.7
The EnhancedPromptHelper provides advanced strategies for building LLM prompts from retrieval results. It supports multiple grounding modes to optimize how context is presented to the model.
from langxchange.prompt_helper import EnhancedPromptHelper, PromptMode
# Initialize with default mode (BASIC)
prompt_helper = EnhancedPromptHelper(
llm=openai,
system_prompt="You are a helpful assistant.",
default_mode=PromptMode.BASIC
)
# 1. BASIC Mode: Standard prompt without retrieval augmentation
response = prompt_helper.run(user_query="Hello!")
# 2. AUGMENTED Mode: Standard RAG (Context injected into User Query)
response = prompt_helper.run(
user_query="What is the safety protocol?",
retrieval_results=results,
mode=PromptMode.AUGMENTED
)
# 3. CONTEXTUAL Mode: Multi-turn Grounding (Context as Assistant messages)
# Best for maintaining conversational flow and strict grounding
response = prompt_helper.run(
user_query="Tell me about project X",
retrieval_results=results,
mode=PromptMode.CONTEXTUAL
)
# 4. SUMMARIZED Mode: Concise snippets
response = prompt_helper.run(
user_query="Summarize the main points",
retrieval_results=results,
mode=PromptMode.SUMMARIZED,
max_snippets=2
)
#### 🛡️ Response Validation & Multi-Pass (`prompt_pass`) — **New in v0.9.8**
The `prompt_pass` feature allows for automatic multi-pass validation and refinement of LLM responses. This is critical for ensuring quality, adherence to guidelines, and factual grounding.
```python
from langxchange.prompt_helper import EnhancedPromptHelper
# Initialize helper
prompt_helper = EnhancedPromptHelper(llm=openai, system_prompt="Answer only using the context.")
# Run with 2 validation passes
# Pass 1: Initial generation
# Pass 2: LLM-based validation and optional refinement/re-generation
result = prompt_helper.run(
user_query="What is the safety protocol?",
retrieval_results=results,
prompt_pass=2
)
# Access refined response and validation metadata
print(f"Final Response: {result['response']}")
print(f"Total passes: {result['pass_info']['total_passes']}")
print(f"Validation history: {result['pass_info']['validation_history']}")
Features:
- ✅ Multi-Pass Reasoning: Automatically detect and correct response errors.
- ✅ JSON-Based Validation: Precise quality checking using internal LLM-as-a-judge patterns.
- ✅ Custom Criteria: Pass a validation function (
bool) for hard constraints. - ✅ Feedback Loop: Automatically provides feedback to the LLM for refined generation.
**Prompt Modes:**
- ✅ **BASIC**: Direct interaction using only the user query.
- ✅ **AUGMENTED**: Traditional RAG; injects context snippets into the user's message.
- ✅ **CONTEXTUAL**: New strategic mode that presents retrieved context as specialized assistant reference messages. This improves model adherence to provided facts and maintains cleaner user message history.
- ✅ **SUMMARIZED**: Automatically truncates and summarizes retrieval hits for token-efficient processing.
#### ── Streaming chat ────────────────────────────────────────────────────────────
async def stream_example():
async for chunk in google_genai.chat_stream(messages, temperature=0.7):
print(chunk, end="", flush=True)
asyncio.run(stream_example())
# ── Tool / function calling ───────────────────────────────────────────────────
tools = [{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get current weather for a city",
"parameters": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"]
}
}
}]
async def tool_example():
content, usage, tool_calls = await google_genai.chat_async(
messages=[{"role": "user", "content": "What's the weather in London?"}],
tools=tools
)
if tool_calls:
for call in tool_calls:
print(f"Tool: {call['function']['name']}, Args: {call['function']['arguments']}")
asyncio.run(tool_example())
# ── Multi-modal vision processing ─────────────────────────────────────────────
vision_response = google_genai.chat_with_vision(
text="Analyze this image for key insights",
image_path="research_chart.png"
)
# ── Text-to-speech generation ─────────────────────────────────────────────────
audio_path = google_genai.text_to_speech(
text="Welcome to the research presentation",
voice="Zephyr",
output_format="wav"
)
# ── Usage tracking ────────────────────────────────────────────────────────────
stats = google_genai.get_usage_statistics()
print(f"Total requests: {stats.chat_requests}")
print(f"Input tokens: {stats.total_input_tokens}")
# ── Lightweight GoogleGenAIHelper (simple use cases) ─────────────────────────
llm = GoogleGenAIHelper(chat_model="gemini-2.0-flash")
async def simple_chat():
reply = await llm.chat(messages)
print(reply)
asyncio.run(simple_chat())
Features (v0.5.1):
- ✅ Native multi-turn chat — messages correctly formatted as
types.Contentwithuser/modelroles - ✅ System instruction support —
systemmessages extracted and passed viaGenerateContentConfig.system_instruction - ✅ Async chat (
chat_async) — non-blocking, returns(text, usage, tool_calls)tuple - ✅ Streaming chat (
chat_stream) — async generator yielding text chunks in real time - ✅ Tool / function calling — OpenAI-style tool definitions auto-converted to
types.FunctionDeclaration; tool calls returned in OpenAI-compatible format - ✅ Dual client — both sync
genai.Clientand asyncgenai.AsyncClientinitialized on startup - ✅ Gemini 2.0 Flash model support
- ✅ Multi-modal capabilities (text, images, audio)
- ✅ Text-to-speech and speech-to-text generation
- ✅ Context management and usage statistics
- ✅ Safety filtering and content moderation
- ✅ Embedding generation with
text-embedding-004 - ✅ Caching and performance optimization
DeepSeek Integration (deepseek_helper.py)
import os
from langxchange.deepseek_helper import EnhancedDeepSeekHelper, ModelType, ContextManagementStrategy
# Set API key (or use environment variable)
os.environ["DEEPSEEK_API_KEY"] = "your-deepseek-key"
# Initialize Enhanced DeepSeek client with advanced configuration
deepseek = EnhancedDeepSeekHelper(
api_key="your-deepseek-key",
base_url="https://api.deepseek.com/v1",
default_model=ModelType.CHAT.value,
embed_model=ModelType.EMBEDDING.value,
vision_model=ModelType.VISION.value,
timeout=30,
max_retries=3,
enable_logging=True,
log_level="INFO",
max_context_tokens=30000,
context_strategy=ContextManagementStrategy.SLIDING_WINDOW
)
# Generate text response with enhanced features
response = deepseek.generate(
prompt="Write a Python function for binary search.",
max_tokens=500,
temperature=0.3,
system_message="You are a helpful coding assistant.",
context_messages=[{"role": "user", "content": "Previous context"}]
)
# Code generation with syntax highlighting
code_response = deepseek.generate_code(
prompt="Create a REST API endpoint in Flask",
language="python",
include_docs=True,
error_handling=True
)
# Batch processing
batch_responses = deepseek.batch_generate([
{"prompt": "Explain recursion", "max_tokens": 200},
{"prompt": "Define Big O notation", "max_tokens": 200}
], temperature=0.7)
# Usage and cost tracking
cost_summary = deepseek.get_cost_summary()
print(f"Total cost: ${cost_summary['total_cost']:.4f}")
print(f"Total tokens: {cost_summary['total_tokens']}")
Features:
- ✅ Cost-effective alternative to OpenAI with enhanced features
- ✅ Multiple model types (chat, embedding, vision)
- ✅ Advanced context management with sliding window strategy
- ✅ Code generation with syntax highlighting and documentation
- ✅ Batch processing capabilities for multiple requests
- ✅ Streaming support with real-time response handling
- ✅ Usage tracking and cost monitoring
- ✅ Error handling and retry logic
- ✅ Compatible with OpenAI API format
Llama Integration (llama_helper.py)
from langxchange.llama_helper import EnhancedLLaMAHelper, LLaMAConfig
import os
# Set Hugging Face token (or use environment variable)
os.environ["HUGGINGFACE_TOKEN"] = "your-hf-token"
# Configure LLaMA with enhanced settings
llama_config = LLaMAConfig(
chat_model="meta-llama/Llama-2-7b-chat-hf",
embed_model="all-MiniLM-L6-v2",
device="auto",
max_memory_per_gpu="8GB",
load_in_8bit=False,
load_in_4bit=True,
cache_dir="./llama_cache",
trust_remote_code=False
)
# Initialize Enhanced LLaMA client
llama = EnhancedLLaMAHelper(config=llama_config)
# Generate text response
response = llama.generate(
prompt="Explain machine learning fundamentals.",
temperature=0.7,
max_tokens=2048,
system_message="You are a knowledgeable AI assistant specializing in ML.",
do_sample=True,
top_p=0.9
)
# Advanced text generation with stopping criteria
advanced_response = llama.generate_advanced(
prompt="Write a Python class for neural networks",
stopping_criteria=["def ", "class ", "\n\n"],
temperature=0.5,
repetition_penalty=1.1,
no_repeat_ngram_size=3
)
# Batch text processing
batch_responses = llama.batch_generate([
{"prompt": "What is deep learning?", "max_tokens": 200},
{"prompt": "Explain neural networks", "max_tokens": 200},
{"prompt": "Define backpropagation", "max_tokens": 200}
], temperature=0.7)
# Token counting and optimization
token_count = llama.count_tokens("This is a sample text for token counting.")
print(f"Token count: {token_count}")
# Model performance metrics
metrics = llama.get_model_metrics()
print(f"Memory usage: {metrics['memory_usage_gb']:.2f} GB")
print(f"Inference speed: {metrics['tokens_per_second']:.2f} tokens/sec")
Features:
- ✅ Local model deployment with Hugging Face integration
- ✅ Enhanced quantization support (4-bit, 8-bit)
- ✅ GPU acceleration with memory optimization
- ✅ Advanced configuration with LLaMAConfig
- ✅ Multiple quantization modes for different hardware
- ✅ Custom stopping criteria for precise generation control
- ✅ Batch processing capabilities
- ✅ Token counting and performance monitoring
- ✅ Memory management and optimization
- ✅ Support for various LLaMA model variants
- ✅ Hugging Face model hub integration
MiniMax Integration (minimax_helper.py)
from langxchange.minimax_helper import MiniMaxHelper, create_user_message, MiniMaxConfig
import os
# Set API key (or use environment variable)
os.environ["MINIMAX_API_KEY"] = "your-minimax-key"
# Initialize with default config
helper = MiniMaxHelper()
# Simple chat
messages = [create_user_message("Hello, how are you?")]
response = helper.chat(messages)
print(response)
# With custom config
config = MiniMaxConfig(
api_key="your-api-key",
api_secret="your-api-secret", # Optional
group_id="your-group-id", # Optional
chat_model="MiniMax-M2",
enable_caching=True,
)
helper = MiniMaxHelper(config)
# Cost tracking
cost = helper.get_cost_summary()
print(f"Total cost: ${cost['total_cost']:.4f}")
Features:
- ✅ Chat Completions (Synchronous and Async with streaming support)
- ✅ Embeddings (Single and batch generation)
- ✅ Audio (Text-to-speech and speech-to-text capabilities)
- ✅ Image Generation from text prompts
- ✅ In-memory TTL-based caching
- ✅ Automatic retry with exponential backoff
- ✅ Usage Tracking (Token and cost tracking with model pricing)
- ✅ Utilities (Helper functions for message creation)
Model Context Protocol (MCP) Integration (mcp_helper.py)
import asyncio
from langxchange.mcp_helper import MCPServiceManager
async def main():
# Initialize manager with JSON config
manager = MCPServiceManager("mcp_config.json")
# 1. Register functional capabilities for servers with priority
manager.register_server_capabilities("filesystem", ["files", "local_storage"], priority=10)
manager.register_server_capabilities("brave_search", ["web_search", "research"], priority=5)
# 2. Intelligent Routing: Resolve server by tool name or capability
# Automatically selects the best server (health + priority aware)
server = await manager.select_best_server_for_tool("read_file")
# Direct namespace resolution
server = manager.resolve_tool_server("filesystem::read_file")
# Name-based resolution with capability hints
context = {"preferred_capability": "web_search"}
server = manager.resolve_tool_server("search", context=context)
# 3. Health & Priority Aware Selection
# Automatically picks the best server based on priority, error rates and latency
best_server = manager.select_best_server(["server_a", "server_b"])
# 4. Discovery: Fetch all tools with routing metadata
all_tools = await manager.get_all_tools_with_metadata()
# 5. Call a tool (standard way)
result = await manager.call_tool(
server_name="filesystem",
tool_name="read_file",
arguments={"path": "data.txt"}
)
await manager.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Features:
- ✅ Standardized Interoperability: Connect to any MCP-compliant server (stdio or SSE)
- ✅ Intelligent Routing: Automatic server resolution based on tool names and namespaces (
server::tool) - ✅ Capability-Based Selection: Route tasks to servers based on functional tags (e.g., "web_search", "filesystem")
- ✅ Health & Priority Aware Selection: Automatically prioritizes healthy servers and uses priority scores for optimal routing
- ✅ Tool Registry: Comprehensive metadata registry for all available tools across all connected servers
- ✅ Lifecycle Management: Automatic server startup, health monitoring, and recovery
- ✅ Discovery: Dynamic tool discovery with TTL-based caching
- ✅ Production-Ready: Graceful shutdown, detailed logging, and error handling
🛠️ Complete Example: Calculator Server
This example demonstrates a full setup including a mock server, configuration, and execution script.
```python
# mcp_calculator_server.py
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("Calculator")
@mcp.tool()
def add(a: int, b: int) -> int:
"""Add two numbers"""
return a + b
@mcp.tool()
def multiply(a: int, b: int) -> int:
"""Multiply two numbers"""
return a * b
if __name__ == "__main__":
mcp.run(transport="stdio")
```
<!-- slide -->
```json
// mcp_test_config.json
{
"servers": [
{
"name": "calculator",
"transport": "stdio",
"command": "python3",
"args": ["mcp_calculator_server.py"]
}
]
}
```
<!-- slide -->
```python
# mcp_test_execution.py
import asyncio
from langxchange.mcp_helper import MCPServiceManager
async def run_test():
manager = MCPServiceManager("mcp_test_config.json")
await manager.initialize()
try:
# Call 'add' tool
result = await manager.call_tool(
server_name="calculator",
tool_name="add",
arguments={"a": 5, "b": 3}
)
print(f"Add Result: {result}")
finally:
await manager.shutdown()
if __name__ == "__main__":
asyncio.run(run_test())
```
<!-- slide -->
```text
# Expected Output
INFO:langxchange.mcp:MCPServiceManager initialized
INFO:langxchange.mcp:Started MCP server 'calculator'
Add Result: content=[TextContent(type='text', text='8', ...)] structuredContent={'result': 8}
INFO:langxchange.mcp:Stopped MCP server 'calculator'
```
🤖 Autonomous Agents (EnhancedAgent.py)
LangXChange 0.4.6 introduces a powerful EnhancedLLMAgentHelper for building production-ready autonomous agents. It features dynamic tool discovery, semantic memory, per-tool circuit breakers, and automatic observation summarization.
🚀 Quick Start: Manual Tool Definition
import asyncio
from langxchange.EnhancedAgent import EnhancedLLMAgentHelper
from langxchange.agent_memory_helper import AgentMemoryHelper
from langxchange.openai_helper import EnhancedOpenAIHelper, OpenAIConfig
async def main():
# 1. Initialize LLM and Memory
llm = EnhancedOpenAIHelper(OpenAIConfig(chat_model="gpt-4o"))
memory = AgentMemoryHelper(sqlite_path="agent_memory.db")
# 2. Define tools with JSON Schema for strict parameter generation
async def get_weather(params):
return f"The weather in {params['location']} is sunny, 25°C."
tools = [{
"action": "get_weather",
"description": "Get current weather for a location",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string", "description": "City and country"}
},
"required": ["location"]
},
"func": get_weather
}]
# 3. Initialize Agent
agent = EnhancedLLMAgentHelper(
llm=llm,
action_space=tools,
external_memory_helper=memory,
debug=True
)
# 4. Run autonomously to achieve a goal
agent.set_goal("What is the weather in London?")
results = await agent.run_autonomous(max_cycles=5)
for res in results:
print(f"Thought: {res['thought']}")
print(f"Action: {res['decision']['action']}")
print(f"Result: {res['outcome']['result']}")
if __name__ == "__main__":
asyncio.run(main())
🔌 Native MCP Integration
Instead of manually defining tools, you can pass an MCP configuration. The agent will automatically discover all tools from the configured servers and route calls dynamically.
mcp_config = {
"servers": [
{
"name": "filesystem",
"transport": "stdio",
"command": "mcp-server-filesystem",
"args": ["/home/user/Downloads"]
},
{
"name": "brave-search",
"transport": "stdio",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-brave-search"],
"env": {"BRAVE_API_KEY": "your-key"}
}
]
}
agent = EnhancedLLMAgentHelper(
llm=llm,
mcp_config=mcp_config, # Native MCP support
external_memory_helper=memory
)
# The agent will now have access to all tools from both filesystem and brave-search
agent.set_goal("Find the latest PDF in my Downloads and search for its main topic on the web.")
await agent.run_autonomous()
✨ Key Features
- ✅ Autonomous Loops: Use
run_autonomous()for multi-step goal achievement with automatic state management. - ✅ Dynamic Discovery: Tools are discovered at runtime from MCP servers or via a
discovery_callback. - ✅ Schema-Strict Parameters: Uses JSON Schema hints in prompts to ensure the LLM generates valid parameters.
- ✅ Per-Tool Circuit Breakers: Failures in one tool (e.g., a flaky API) won't crash the entire agent.
- ✅ Auto-Summarization: Large tool outputs are automatically summarized to preserve context window space.
- ✅ Semantic Memory: Integrates with
AgentMemoryHelperfor long-term history and semantic retrieval. - ✅ Observability: Built-in Prometheus-style metrics and correlation ID tracing across all operations.
🔍 Vector Database Integration
ChromaDB Integration (chroma_helper.py)
from langxchange.chroma_helper import EnhancedChromaHelper, ChromaConfig
# Configure Chroma with enhanced performance settings
chroma_config = ChromaConfig(
persist_directory="./chroma_db",
batch_size=100,
max_workers=8,
progress_bar=True
)
chroma = EnhancedChromaHelper(llm, chroma_config)
# Insert documents with metadata
chroma.insert_documents(
collection_name="my_collection",
documents=["Document content here"],
metadatas=[{"source": "file1.txt", "type": "text"}],
generate_embeddings=True
)
# Query with similarity search
results = chroma.query_collection(
collection_name="my_collection",
query_text="What is machine learning?",
top_k=5
)
Features:
- ✅ Persistent storage
- ✅ Metadata filtering
- ✅ Batch operations
- ✅ Collection management
- ✅ Performance optimization
Pinecone Integration (pinecone_helper.py)
import os
from langxchange.pinecone_helper import EnhancedPineconeHelper, PineconeConfig, CloudProvider, MetricType
from langxchange.openai_helper import EnhancedOpenAIHelper, OpenAIConfig
# Set API key (or use environment variable)
os.environ["PINECONE_API_KEY"] = "your-pinecone-key"
# Configure LLM helper for embeddings
openai_config = OpenAIConfig(enable_caching=True)
llm_helper = EnhancedOpenAIHelper(openai_config)
# Configure Pinecone with enhanced settings
pinecone_config = PineconeConfig(
api_key="your-pinecone-key",
environment="us-west1-gcp",
cloud_service=CloudProvider.GCP,
index_name="my-index",
dimension=1536, # OpenAI embedding dimension
metric=MetricType.COSINE,
batch_size=100,
max_workers=10,
progress_bar=True
)
# Initialize Enhanced Pinecone helper
pinecone = EnhancedPineconeHelper(
llm_helper=llm_helper,
config=pinecone_config
)
# Insert documents with automatic embedding generation
documents = [
"Machine learning is a subset of AI that focuses on algorithms.",
"Deep learning uses neural networks with multiple layers.",
"Natural language processing enables computers to understand text."
]
pinecone.insert_documents(
collection_name="my-index",
documents=documents,
metadatas=[{"source": "article1", "topic": "AI"},
{"source": "article2", "topic": "ML"},
{"source": "article3", "topic": "NLP"}],
generate_embeddings=True,
namespace="default"
)
# Query for similar vectors with filters
results = pinecone.query(
vector=None, # Will auto-generate embedding from query_text
query_text="What is machine learning?",
top_k=5,
filter_metadata={"topic": "AI"},
include_metadata=True
)
# DataFrame ingestion with batch processing
import pandas as pd
df = pd.DataFrame({
'text': ['Sample text 1', 'Sample text 2', 'Sample text 3'],
'category': ['tech', 'science', 'tech']
})
stats = pinecone.ingest_dataframe(
collection_name="my-index",
dataframe=df,
text_column='text',
metadata_columns=['category'],
namespace="default"
)
print(f"Inserted {stats['inserted']} documents")
print(f"Skipped {stats['skipped']} documents")
Features:
- ✅ Cloud-based vector storage with auto-scaling
- ✅ Automatic embedding generation using LLM helpers
- ✅ DataFrame ingestion with batch processing
- ✅ Advanced querying with metadata filters
- ✅ Performance monitoring and statistics
- ✅ Enterprise-grade error handling and retries
- ✅ Memory-efficient operations for large datasets
- ✅ Namespace management and resource cleanup
- ✅ Real-time updates with comprehensive logging
Milvus Integration (milvus_helper.py)
from langxchange.milvus_helper import EnhancedMilvusHelper, MilvusConfig
from langxchange.openai_helper import EnhancedOpenAIHelper, OpenAIConfig
# Configure LLM helper for embeddings
openai_config = OpenAIConfig(enable_caching=True)
llm_helper = EnhancedOpenAIHelper(openai_config)
# Configure Milvus with enhanced settings
milvus_config = MilvusConfig(
host="localhost",
port="19530",
api_key="your-milvus-token", # Optional for local
collection_prefix="lx_",
embedding_dim=1536,
batch_size=100,
progress_bar=True
)
# Initialize Enhanced Milvus client
milvus = EnhancedMilvusHelper(
llm_helper=llm_helper,
config=milvus_config
)
# Insert documents with automatic embedding generation
documents = [
"Milvus is an open-source vector database built for AI applications.",
"It supports high-performance vector similarity search and analytics.",
"Milvus can handle billions of vectors with millisecond latency."
]
milvus.insert_documents(
collection_name="ai_docs",
documents=documents,
metadatas=[{"category": "database"}, {"category": "search"}, {"category": "performance"}],
generate_embeddings=True
)
# Query for similar vectors
results = milvus.query(
collection_name="ai_docs",
query_text="What is Milvus?",
top_k=3
)
for hit in results[0]:
print(f"Score: {hit.score}")
print(f"Document: {hit.entity.get('document')}")
Features:
- ✅ High-performance vector search with HNSW index
- ✅ Support for local Milvus and Zilliz Cloud (via API key/token)
- ✅ Automatic collection creation and schema management
- ✅ Batch insertion and DataFrame ingestion
- ✅ Metadata filtering and JSON support
- ✅ Enterprise-grade error handling
Elasticsearch Integration (elasticsearch_helper.py)
from langxchange.elasticsearch_helper import EnhancedElasticsearchHelper, ElasticsearchConfig
from langxchange.openai_helper import EnhancedOpenAIHelper, OpenAIConfig
# Configure LLM helper for embeddings
openai_config = OpenAIConfig(enable_caching=True)
llm_helper = EnhancedOpenAIHelper(openai_config)
# Configure Elasticsearch with enhanced settings
es_config = ElasticsearchConfig(
host="http://localhost:9200",
index_prefix="lx_",
embedding_dim=1536,
batch_size=100,
progress_bar=True
)
# Initialize Enhanced Elasticsearch client
es = EnhancedElasticsearchHelper(
llm_helper=llm_helper,
config=es_config
)
# Insert documents with automatic embedding generation
documents = [
"Elasticsearch is a distributed, RESTful search and analytics engine.",
"It provides a distributed, multitenant-capable full-text search engine.",
"Elasticsearch is developed in Java and is open source."
]
es.insert_documents(
collection_name="tech_docs",
documents=documents,
metadatas=[{"topic": "search"}, {"topic": "analytics"}, {"topic": "java"}],
generate_embeddings=True
)
# Query for similar vectors
results = es.query(
collection_name="tech_docs",
query_text="What is Elasticsearch?",
top_k=3
)
for hit in results['hits']['hits']:
print(f"Score: {hit['_score']}")
print(f"Document: {hit['_source'].get('document')}")
Features:
- ✅ High-performance vector search with
dense_vectortype - ✅ Support for script-based cosine similarity scoring
- ✅ Automatic index creation and mapping management
- ✅ Batch insertion and DataFrame ingestion
- ✅ Metadata filtering support
- ✅ Unified interface consistent with Chroma and Milvus helpers
FAISS Integration (faiss_helper.py)
import os
import pandas as pd
from langxchange.faiss_helper import EnhancedFAISSHelper
# Initialize Enhanced FAISS helper with advanced configuration
faiss = EnhancedFAISSHelper(
dim=768, # Vector dimension
index_type="ivf", # Use IVF index for better performance on large datasets
normalize_vectors=True, # Normalize vectors for cosine similarity
nlist=100, # Number of clusters for IVF
auto_train=True # Automatically train IVF indices
)
# Insert individual vectors with documents and metadata
documents = [
"Machine learning is a subset of artificial intelligence.",
"Deep learning uses neural networks with multiple layers.",
"Natural language processing enables computers to understand text.",
"Computer vision allows machines to interpret visual information."
]
metadatas = [
{"source": "article1", "topic": "ML", "date": "2025-01-15"},
{"source": "article2", "topic": "DL", "date": "2025-01-16"},
{"source": "article3", "topic": "NLP", "date": "2025-01-17"},
{"source": "article4", "topic": "CV", "date": "2025-01-18"}
]
# Generate embeddings (this would come from your embedding model)
embeddings = [
[0.1, 0.2, 0.3] * 256, # 768-dimensional embedding
[0.4, 0.5, 0.6] * 256,
[0.7, 0.8, 0.9] * 256,
[0.2, 0.3, 0.4] * 256
]
faiss.insert(
vectors=embeddings,
documents=documents,
metadatas=metadatas
)
# DataFrame integration with batch processing
df = pd.DataFrame({
'embeddings': embeddings,
'documents': documents,
'metadata': metadatas
})
faiss.insert_dataframe(
dataframe=df,
embeddings_col="embeddings",
documents_col="documents",
metadata_col="metadata"
)
# Query for similar vectors with comprehensive results
query_vector = [0.1, 0.2, 0.3] * 256 # Sample query embedding
results = faiss.query(
embedding_vector=query_vector,
top_k=3,
include_distances=True # Include similarity scores
)
print(f"Found {len(results)} similar documents:")
for i, result in enumerate(results, 1):
print(f"{i}. Score: {result['distance']:.3f}")
print(f" Document: {result['document'][:100]}...")
print(f" Metadata: {result['metadata']}")
print()
# Batch querying for multiple queries at once
query_vectors = [
[0.1, 0.2, 0.3] * 256,
[0.4, 0.5, 0.6] * 256
]
batch_results = faiss.query_batch(
embedding_vectors=query_vectors,
top_k=2,
include_distances=True
)
print(f"Batch query results: {len(batch_results)} result sets")
# Retrieve documents by ID
doc_results = faiss.get_by_ids(["id_0", "id_1"])
print(f"Retrieved {len(doc_results)} documents by ID")
# Get comprehensive statistics
stats = faiss.get_stats()
print(f"Index Statistics:")
print(f" Total vectors: {stats['total_vectors']}")
print(f" Index type: {stats['index_type']}")
print(f" Dimension: {stats['dimension']}")
print(f" Is trained: {stats.get('is_trained', 'N/A')}")
print(f" Number of clusters: {stats.get('nlist', 'N/A')}")
# Persistence - save and load index
index_path = "./faiss_index.bin"
metadata_path = "./faiss_metadata.pkl"
faiss.save(index_path, metadata_path)
print(f"Saved index to {index_path}")
# Load saved index
loaded_faiss = EnhancedFAISSHelper(dim=768, index_type="ivf")
loaded_faiss.load(index_path, metadata_path)
print(f"Loaded index with {loaded_faiss.count()} vectors")
# Index management
index_vector_count = faiss.count()
print(f"Current index contains {index_vector_count} vectors")
# Delete specific document
deleted = faiss.delete_by_id("id_0")
print(f"Document deleted: {deleted}")
# Rebuild index with different configuration
rebuilt_count = faiss.rebuild_index(index_type="hnsw")
print(f"Rebuilt index with {rebuilt_count} vectors using HNSW")
Features:
- ✅ Multiple index types (Flat, IVF, HNSW) with automatic training
- ✅ High-performance similarity search with vector normalization
- ✅ Comprehensive DataFrame integration for pandas workflow
- ✅ Batch operations for efficient processing
- ✅ Advanced querying with distance scores and metadata
- ✅ Persistence and index management
- ✅ Document retrieval by ID and batch operations
- ✅ Comprehensive statistics and performance monitoring
- ✅ Index rebuilding and optimization
- ✅ Memory-efficient operations with proper validation
📄 Document Processing
Document Loader (documentloader.py)
"""
Demo script showing the enhanced DocumentLoaderHelper capabilities
optimized for LLM processing.
"""
import os
import sys
from langxchange.documentloader import DocumentLoaderHelper, ChunkingStrategy, ImageProcessingStrategy
# Initialize Document Loader with different configurations
loader = DocumentLoaderHelper(
chunk_size=800,
overlap_size=100,
chunking_strategy=ChunkingStrategy.SEMANTIC,
preserve_formatting=True
)
# Test different chunking strategies
strategies = [
(ChunkingStrategy.CHARACTER, "Character-based chunking"),
(ChunkingStrategy.SENTENCE, "Sentence-aware chunking"),
(ChunkingStrategy.PARAGRAPH, "Paragraph-aware chunking"),
(ChunkingStrategy.SEMANTIC, "Semantic chunking (recommended)"),
]
# Add token-based if available
try:
import tiktoken
strategies.append((ChunkingStrategy.TOKEN, "Token-based chunking"))
except ImportError:
print("Note: tiktoken not available, skipping token-based chunking")
# Process document with specific strategy
file_path = "documents/sample.txt"
for strategy, description in strategies:
print(f"\n--- {description} ---")
# Initialize loader with strategy
loader = DocumentLoaderHelper(
chunk_size=800,
overlap_size=100,
chunking_strategy=strategy,
preserve_formatting=True
)
try:
chunks = list(loader.load(file_path))
print(f"Total chunks created: {len(chunks)}")
print(f"Processing time: {loader.stats['times']['total']:.3f}s")
# Show first few chunks
for i, chunk in enumerate(chunks[:3]):
print(f"\nChunk {i+1}:")
print(f" Length: {len(chunk.content)} chars")
if chunk.metadata.token_count:
print(f" Tokens: {chunk.metadata.token_count}")
print(f" Content preview: {chunk.content[:100]}...")
except Exception as e:
print(f"Error with {strategy}: {e}")
# Multi-format document processing
loader = DocumentLoaderHelper(
chunk_size=800,
chunking_strategy=ChunkingStrategy.SEMANTIC,
preserve_formatting=True
)
files_to_process = [
"documents/sample.txt",
"documents/data.csv",
"documents/report.pdf"
]
for file_path in files_to_process:
if os.path.exists(file_path):
print(f"\n--- Processing: {file_path} ---")
try:
chunks = list(loader.load(file_path))
print(f"File type: {chunks[0].metadata.file_type}")
print(f"Total chunks: {len(chunks)}")
# Show metadata for first chunk
first_chunk = chunks[0]
print(f"First chunk metadata:")
print(f" Source: {first_chunk.metadata.source_file}")
print(f" Section: {first_chunk.metadata.section_title}")
print(f" Content length: {len(first_chunk.content)} chars")
except Exception as e:
print(f"Error processing {file_path}: {e}")
# Advanced configuration examples
configs = [
{
"name": "Minimal overlap",
"params": {"chunk_size": 400, "overlap_size": 20, "min_chunk_size": 30}
},
{
"name": "High overlap",
"params": {"chunk_size": 400, "overlap_size": 100, "min_chunk_size": 50}
},
{
"name": "Preserve formatting",
"params": {"chunk_size": 400, "preserve_formatting": True}
},
{
"name": "Normalize text",
"params": {"chunk_size": 400, "preserve_formatting": False}
}
]
file_path = "documents/sample.txt"
for config in configs:
print(f"\n--- CONFIG: {config['name']} ---")
loader = DocumentLoaderHelper(
chunking_strategy=ChunkingStrategy.SEMANTIC,
**config['params']
)
try:
chunks = list(loader.load(file_path))
stats = loader.get_statistics()
print(f"Chunks created: {len(chunks)}")
print(f"Avg chunk size: {sum(len(c.content) for c in chunks) / len(chunks):.0f} chars")
print(f"Processing time: {stats['processing_stats']['times']['total']:.3f}s")
except Exception as e:
print(f"Error: {e}")
# Image processing support
try:
from PIL import Image
pil_available = True
print("✅ PIL (Pillow) support: Available")
except ImportError:
pil_available = False
print("❌ PIL (Pillow) support: Not available")
try:
import pytesseract
tesseract_available = True
print("✅ OCR (pytesseract) support: Available")
except ImportError:
tesseract_available = False
print("❌ OCR (pytesseract) support: Not available")
# Image processing strategies
strategies = [
(ImageProcessingStrategy.OCR_TEXT, "Extract text using OCR"),
(ImageProcessingStrategy.DESCRIPTION, "Generate image descriptions"),
(ImageProcessingStrategy.METADATA, "Extract technical metadata"),
(ImageProcessingStrategy.COMBINED, "All-in-one processing"),
(ImageProcessingStrategy.VISUAL_ANALYSIS, "Advanced visual analysis")
]
for strategy, description in strategies:
print(f" • {strategy.value}: {description}")
# Supported formats
loader = DocumentLoaderHelper()
supported_formats = loader._get_supported_image_formats()
print(f"Supported image formats ({len(supported_formats)}):")
print(f" {', '.join(supported_formats)}")
# Image processing configuration
image_loader = DocumentLoaderHelper(
chunk_size=1000,
image_processing_strategy=ImageProcessingStrategy.COMBINED,
ocr_language="eng",
max_image_size=(2048, 2048),
image_quality_threshold=0.6
)
print(f"\nImage processing configuration:")
print(f" • Processing strategy: {image_loader.image_processing_strategy.value}")
print(f" • OCR language: {image_loader.ocr_language}")
print(f" • Max image size: {image_loader.max_image_size}")
print(f" • Quality threshold: {image_loader.image_quality_threshold}")
Features:
- ✅ Multiple formats (PDF, DOCX, TXT, CSV, images)
- ✅ Advanced chunking strategies (Character, Sentence, Paragraph, Semantic, Token)
- ✅ Intelligent overlap to preserve context
- ✅ Metadata tracking for better document understanding
- ✅ Token counting support (with tiktoken)
- ✅ Configurable text cleaning and formatting
- ✅ Parallel processing for better performance
- ✅ Comprehensive error handling and statistics
- ✅ Comprehensive image processing (15+ formats)
- ✅ OCR text extraction with confidence scoring
- ✅ Automatic image description generation
- ✅ Technical metadata extraction (EXIF, etc.)
- ✅ Multi-language OCR support
- ✅ Configurable image quality thresholds
- ✅ Metadata extraction
- ✅ Progress tracking
💾 Database Integration
MySQL Integration (mysql_helper.py)
from langxchange.mysql_helper import MySQLHelper
# Initialize MySQL helper (uses environment variables)
mysql = MySQLHelper(
pool_size=5,
max_overflow=10,
pool_timeout=30
)
# Check connection health
health = mysql.health_check()
print(f"Database status: {health['status']}")
# Insert DataFrame to MySQL
result = mysql.insert_dataframe(
table_name="user_data",
dataframe=df,
if_exists="append",
chunksize=100
)
# Execute parameterized query
df_result = mysql.query(
"SELECT * FROM users WHERE age > :min_age",
params={"min_age": 25}
)
# Batch operations
mysql.batch_execute(
"INSERT INTO users (name, age) VALUES (:name, :age)",
params_list=[
{"name": "Alice", "age": 30},
{"name": "Bob", " age": 25}
]
)
Features:
- ✅ Connection pooling
- ✅ SQL injection protection
- ✅ Transaction support
- ✅ DataFrame integration
- ✅ Health monitoring
- ✅ Context manager support
MongoDB Integration (mongo_helper.py)
import os
from langxchange.mongo_helper import EnhancedMongoHelper
import pandas as pd
# Set MongoDB URI (or use environment variable)
os.environ["MONGO_URI"] = "mongodb://localhost:27017"
# Initialize Enhanced MongoDB helper
mongo = EnhancedMongoHelper(
db_name="my_database",
collection_name="documents",
connect_timeout=5000,
server_selection_timeout=5000
)
# Check connection health
connection_health = mongo.ping()
print(f"MongoDB connection: {'Healthy' if connection_health else 'Failed'}")
# Insert single document
result = mongo.insert_one({
"title": "Machine Learning Guide",
"content": "ML is a subset of artificial intelligence...",
"tags": ["AI", "ML", "Tutorial"],
"created_at": "2025-11-25"
})
print(f"Inserted document ID: {result.inserted_id}")
# Insert multiple documents
documents = [
{
"title": "Deep Learning Basics",
"content": "Deep learning uses neural networks...",
"tags": ["DL", "Neural Networks"],
"author": "AI Researcher"
},
{
"title": "NLP Introduction",
"content": "Natural Language Processing enables...",
"tags": ["NLP", "Language"],
"author": "Data Scientist"
}
]
inserted_ids = mongo.insert(documents)
print(f"Inserted {len(inserted_ids)} documents")
# Insert DataFrame to MongoDB
df = pd.DataFrame({
'title': ['ML Guide', 'DL Tutorial', 'NLP Basics'],
'content': ['Machine learning concepts', 'Deep learning methods', 'NLP techniques'],
'category': ['educational', 'advanced', 'beginner'],
'views': [150, 200, 175]
})
mongo.insert(df)
# Query single document
doc = mongo.find_one({"title": "Machine Learning Guide"})
print(f"Found document: {doc.get('title')}")
# Query with filters and sorting
documents = mongo.query(
filter_query={"category": "educational", "views": {"$gte": 100}},
projection={"title": 1, "views": 1, "_id": 0},
sort=[("views", -1)],
limit=10
)
print(f"Found {len(documents)} matching documents")
# Count documents in collection
total_docs = mongo.count_documents({"tags": {"$in": ["AI", "ML"]}})
print(f"Total AI/ML documents: {total_docs}")
# Update document
update_result = mongo.update_one(
filter_query={"title": "Machine Learning Guide"},
update_query={"$set": {"updated_at": "2025-11-25", "views": 200}}
)
print(f"Modified {update_result.modified_count} documents")
# Delete documents
delete_result = mongo.delete_many({"category": "test"})
print(f"Deleted {delete_result.deleted_count} test documents")
# Create indexes for better performance
mongo.create_index("title", unique=True)
mongo.create_index([("tags", 1), ("category", 1)])
# Close connection
mongo.close()
Features:
- ✅ Document-based storage with flexible schema
- ✅ Connection health monitoring and timeout management
- ✅ Batch operations with multiple document insertion
- ✅ DataFrame integration for seamless pandas workflow
- ✅ Advanced querying with filters, projection, and sorting
- ✅ Index management for performance optimization
- ✅ CRUD operations with comprehensive error handling
- ✅ Aggregation pipeline support (when using aggregate methods)
- ✅ GridFS support for large file storage
- ✅ Connection pooling and automatic resource management
🧠 RAG Implementation
Enhanced Retriever (retrieverX.py)
from langxchange.retrieverX import EnhancedRetrieverX, create_retriever_from_config
# Configure retriever
retriever_config = {
"vector_db": chroma,
"embedder": llm,
"use_rerank": True,
"rerank_multiplier": 3.0,
"db_type": "chroma"
}
retriever = create_retriever_from_config(retriever_config)
# Retrieve documents
results = retriever.retrieve(
query="What are the benefits of machine learning?",
top_k=5,
collection_name="my_collection"
)
for result in results:
print(f"Score: {result.score:.3f}")
print(f"Document: {result.document[:100]}...")
Features:
- ✅ Two-stage retrieval
- ✅ Re-ranking
- ✅ Score aggregation
- ✅ Metadata filtering
Prompt Helper (prompt_helper.py)
from langxchange.prompt_helper import EnhancedPromptHelper, PromptMode
prompt_helper = EnhancedPromptHelper(
llm=llm,
system_prompt="You are a helpful assistant.",
default_mode=PromptMode.AUGMENTED,
max_context_length=1500,
max_snippets=3
)
# Generate RAG response
response = prompt_helper.run(
user_query="Explain deep learning",
retrieval_results=retrieval_results,
mode=PromptMode.AUGMENTED,
temperature=0.5
)
Features:
- ✅ Template-based prompts
- ✅ Context management
- ✅ Multi-mode support
- ✅ Token optimization
🏠 Local LLM Management
Local LLM (localllm.py)
from langxchange.localllm import LocalLLM
local_llm = LocalLLM(
model_path="./models/llama-7b",
device="cuda",
precision="fp16"
)
response = local_llm.generate(
prompt="Explain the concept of neural networks",
max_tokens=500,
temperature=0.7
)
Features:
- ✅ Model downloading
- ✅ Quantization (4-bit, 8-bit)
- ✅ GPU/CPU switching
- ✅ Memory management
Localize LLM (localizellm.py)
#!/usr/bin/env python3
"""
LocalizeLLM Example Script
Demonstrates key features of the LocalizeLLM class including:
- Model downloading and loading
- Text generation and chat
- Fine-tuning with custom data
- Parameter optimization
- Model management
Author: Langxchange
"""
import os
from langxchange.localizellm import LocalizeLLM, LocalLLMConfig, create_local_llm
def basic_usage_example():
"""Demonstrate basic LocalizeLLM usage."""
print("=== Basic Usage Example ===")
# Simple configuration using factory function
config = LocalLLMConfig(
chat_model="microsoft/DialoGPT-small", # Smaller model for demo
local_models_dir="./demo_models",
learning_rate=1e-4,
batch_size=1,
max_epochs=1,
load_in_8bit=False # Use False for smaller models
)
try:
with LocalizeLLM(config) as llm:
print("✓ LocalizeLLM initialized successfully")
# Download and load model
print("Downloading model (this may take a few minutes)...")
model_path = llm.download_model("microsoft/DialoGPT-small") #microsoft/Phi-3-mini-4k-instruct"
# model_path = llm.download_model("meta-llama/Llama-2-7b-chat-hf")
# model_path = llm.download_model("microsoft/Phi-3-mini-4k-instruct")
print(f"✓ Model downloaded to: {model_path}")
llm.load_local_model("microsoft/DialoGPT-small")
# llm.load_local_model("meta-llama/Llama-2-7b-chat-hf")
# llm.load_local_model("microsoft/Phi-3-mini-4k-instruct")
print("✓ Model loaded successfully")
# List available models
models = llm.list_available_models()
print(f"✓ Available models: {list(models.keys())}")
# Generate text
print("\n--- Text Generation ---")
response = llm.generate_text(
"What is machine learning?",
max_new_tokens=200,
temperature=0.7
)
print(f"Generated: {response}")
# Chat example
print("\n--- Chat Example ---")
messages = [
{"role": "user", "system": "You are a helpfull assistant"},
{"role": "user", "content": "What is machine learning?"}
]
chat_response = llm.chat(messages, max_new_tokens=50)
print(f"Chat response: {chat_response}")
# Model info
print("\n--- Model Information ---")
info = llm.get_model_info()
print(f"Model parameters: {info.get('model_parameters', 'N/A')}")
print(f"Device: {info.get('device', 'N/A')}")
except Exception as e:
print(f"Error in basic usage: {e}")
def fine_tuning_example():
"""Demonstrate fine-tuning capabilities."""
print("\n=== Fine-tuning Example ===")
# Custom training data (domain-specific examples)
training_data = [
"Python is a high-level programming language known for its simplicity.",
"Machine learning algorithms can learn patterns from data automatically.",
"Neural networks are inspired by the structure of the human brain.",
"Data preprocessing is crucial for successful machine learning projects.",
"Feature engineering helps improve model performance significantly.",
"Cross-validation helps assess model generalization capability.",
"Overfitting occurs when a model memorizes training data too closely.",
"Regularization techniques help prevent overfitting in machine learning.",
]
config = LocalLLMConfig(
chat_model="microsoft/DialoGPT-small",
local_models_dir="./demo_models",
learning_rate=5e-5,
batch_size=1,
max_epochs=1,
warmup_steps=10,
max_length=128,
lora_rank=8, # Smaller rank for demo
)
try:
with LocalizeLLM(config) as llm:
# Load the previously downloaded model
llm.load_local_model("microsoft/DialoGPT-small")
print("✓ Model loaded for fine-tuning")
# Prepare training data
print("Preparing training data...")
train_dataset, val_dataset = llm.prepare_fine_tuning_data(
training_data,
validation_split=0.2
)
print(f"✓ Training samples: {len(train_dataset)}")
print(f"✓ Validation samples: {len(val_dataset)}")
# Test before fine-tuning
print("\n--- Before Fine-tuning ---")
before_response = llm.generate_text(
"What is machine learning?",
max_new_tokens=30,
temperature=0.7
)
print(f"Response before fine-tuning: {before_response}")
# Fine-tune model
print("\nStarting fine-tuning (this may take a few minutes)...")
fine_tuned_path = llm.fine_tune_model(
train_dataset=train_dataset,
val_dataset=val_dataset,
experiment_name="ml_specialist_demo-1",
use_lora=True
)
print(f"✓ Fine-tuning completed: {fine_tuned_path}")
# Test after fine-tuning
print("\n--- After Fine-tuning ---")
# llm.load_local_model(fine_tuned_path)
after_response = llm.generate_text(
"What is machine learning?",
max_new_tokens=30,
temperature=0.7
)
print(f"Response after fine-tuning: {after_response}")
# Save snapshot
snapshot_path = llm.save_model_snapshot(
"ml_demo_snapshot",
"Fine-tuned model for ML demonstrations"
)
print(f"✓ Snapshot saved: {snapshot_path}")
except Exception as e:
print(f"Error in fine-tuning: {e}")
def parameter_optimization_example():
"""Demonstrate parameter optimization."""
print("\n=== Parameter Optimization Example ===")
config = LocalLLMConfig(
chat_model="microsoft/DialoGPT-small",
local_models_dir="./demo_models"
)
try:
with LocalizeLLM(config) as llm:
llm.load_local_model("microsoft/DialoGPT-small")
print("✓ Model loaded for parameter optimization")
# Create a small validation dataset
validation_texts = [
"The quick brown fox jumps over the lazy dog.",
"Artificial intelligence is transforming many industries.",
"Python programming language is widely used in data science.",
]
_, val_dataset = llm.prepare_fine_tuning_data(validation_texts)
# Define parameter ranges (keeping it small for demo)
parameter_ranges = {
"temperature": [0.5, 0.7, 1.0],
"top_p": [0.8, 0.9],
"top_k": [40, 50]
}
print("Optimizing parameters...")
best_params = llm.optimize_parameters(
parameter_ranges=parameter_ranges,
validation_dataset=val_dataset,
metric="perplexity"
)
print(f"✓ Best parameters: {best_params['best_parameters']}")
print(f"✓ Best perplexity: {best_params['best_score']:.4f}")
print(f"✓ Combinations tested: {best_params['total_combinations_tested']}")
except Exception as e:
print(f"Error in parameter optimization: {e}")
def model_management_example():
"""Demonstrate model management features."""
print("\n=== Model Management Example ===")
config = LocalLLMConfig(local_models_dir="./demo_models")
try:
with LocalizeLLM(config) as llm:
# List all available models
models = llm.list_available_models()
print(f"Available models: {len(models)}")
for name, info in models.items():
print(f" - {name}: {info.size_gb:.2f}GB, "
f"Type: {info.model_type}, "
f"Fine-tuned: {info.fine_tuned}")
if models:
# Load first available model
first_model = list(models.keys())[0]
llm.load_local_model(first_model)
print(f"✓ Loaded model: {first_model}")
# Get detailed model info
info = llm.get_model_info()
print(f"\nModel Details:")
print(f" Parameters: {info.get('model_parameters', 'N/A'):,}")
print(f" Trainable: {info.get('trainable_parameters', 'N/A'):,}")
print(f" Device: {info.get('device', 'N/A')}")
print(f" Vocab size: {info.get('vocab_size', 'N/A'):,}")
# Demonstrate token counting
test_text = "This is a test sentence for token counting."
token_count = llm.count_tokens(test_text)
print(f"\nToken count for '{test_text}': {token_count}")
# Demonstrate embeddings (if embedding model available)
try:
embedding = llm.get_embedding("Sample text for embedding")
print(f"Embedding dimension: {len(embedding)}")
except Exception as e:
print(f"Embedding not available: {e}")
except Exception as e:
print(f"Error in model management: {e}")
def main():
"""Run all examples."""
print("🚀 LocalizeLLM Demonstration Script")
print("=" * 50)
try:
# Check if we have HuggingFace token (optional for public models)
hf_token = os.getenv("HUGGINGFACE_TOKEN") or os.getenv("HF_TOKEN")
if not hf_token:
print("💡 Note: No HuggingFace token found. Using public models only.")
print(" Set HUGGINGFACE_TOKEN env var to access gated models.")
# Run examples
basic_usage_example()
fine_tuning_example()
# parameter_optimization_example()
# model_management_example()
print("\n🎉 All examples completed successfully!")
except KeyboardInterrupt:
print("\n⚠️ Demonstration interrupted by user")
except Exception as e:
print(f"\n❌ Error during demonstration: {e}")
print("💡 Make sure you have the required dependencies installed:")
print(" pip install torch transformers sentence-transformers datasets accelerate")
if __name__ == "__main__":
main()
Key Features:
- ✅ Model downloading and loading from HuggingFace Hub
- ✅ Text generation with customizable parameters (max_new_tokens, temperature)
- ✅ Chat-based conversation with structured message handling
- ✅ Fine-tuning with custom training data using LoRA optimization
- ✅ Parameter optimization for finding best generation settings
- ✅ Model management with snapshots and metadata tracking
- ✅ Multiple model support (DialoGPT, LLaMA, Phi-3, etc.)
- ✅ GPU/CPU switching with automatic device detection
- ✅ Memory-efficient quantization support (4-bit, 8-bit)
- ✅ Token counting and embedding generation
- ✅ Comprehensive logging and progress tracking
- ✅ Model caching system for improved performance
- ✅ Validation dataset preparation for fine-tuning
- ✅ Experiment tracking and snapshot management
Required Dependencies:
torchtransformerssentence-transformersdatasetsacceleratepeft(for LoRA fine-tuning)bitsandbytes(for quantization)
🔤 Embeddings & Prompts
Embeddings (embeddings.py)
from langxchange.embeddings import EmbeddingHelper
embedder = EmbeddingHelper(
model_name="sentence-transformers/all-MiniLM-L6-v2",
device="cpu"
)
# Generate embeddings
embedding = embedder.embed("This is a sample text.")
similarity = embedder.similarity(text1_embedding, text2_embedding)
Features:
- ✅ Multiple embedding models
- ✅ Batch processing
- ✅ Similarity metrics
- ✅ GPU acceleration
Prompt Helper (prompt_helper.py)
from langxchange.prompt_helper import PromptTemplate
template = PromptTemplate(
template="Answer the question: {question}\nContext: {context}",
input_variables=["question", "context"]
)
prompt = template.format(
question="What is AI?",
context="AI stands for Artificial Intelligence..."
)
Features:
- ✅ Template engine
- ✅ Variable substitution
- ✅ Validation
- ✅ Custom functions
🔬 Examples
Basic RAG Implementation
import os
from langxchange.openai_helper import EnhancedOpenAIHelper, OpenAIConfig
from langxchange.chroma_helper import EnhancedChromaHelper, ChromaConfig
from langxchange.documentloader import DocumentLoaderHelper, ChunkingStrategy
# Set API key (or use environment variable)
os.environ["OPENAI_API_KEY"] = "your-openai-key"
# Initialize components with enhanced configurations
openai_config = OpenAIConfig(model="gpt-4", enable_cost_tracking=True)
openai = EnhancedOpenAIHelper(openai_config)
chroma_config = ChromaConfig(persist_directory="./chroma", batch_size=100)
chroma = EnhancedChromaHelper(openai, chroma_config)
# Load and process documents
loader = DocumentLoaderHelper(
chunking_strategy=ChunkingStrategy.SEMANTIC,
chunk_size=800
)
chunks = list(loader.load("data.csv"))
# Store in vector database
chroma.insert_documents(
collection_name="knowledge_base",
documents=[chunk.content for chunk in chunks],
metadatas=[chunk.metadata for chunk in chunks],
generate_embeddings=True
)
# Query
results = chroma.query_collection(
collection_name="knowledge_base",
query_text="What are the main topics?",
top_k=5
)
Database + Vector Search
# Store structured data in MySQL
mysql = MySQLHelper()
mysql.insert_dataframe("products", product_df)
# Store unstructured data in vector DB
chroma.insert_documents(
collection_name="product_descriptions",
documents=product_df["description"].tolist(),
generate_embeddings=True
)
# Cross-reference search
sql_results = mysql.query("SELECT * FROM products WHERE category = :cat", {"cat": "electronics"})
vector_results = chroma.query_collection(
collection_name="product_descriptions",
query_text="high quality electronics",
top_k=10
)
Multi-Modal RAG
# Process documents with images
loader = DocumentLoaderHelper(
chunking_strategy=ChunkingStrategy.SEMANTIC,
image_processing=ImageProcessingStrategy.EXTRACT_TEXT
)
chunks = list(loader.load("document_with_images.pdf"))
# Store text and image metadata
chroma.insert_documents(
collection_name="multimodal_docs",
documents=[chunk.content for chunk in chunks],
metadatas=[
{
"has_images": chunk.has_images,
"image_count": len(chunk.images),
"page_number": chunk.metadata.get("page", 1)
} for chunk in chunks
],
generate_embeddings=True
)
🎯 RAG Tutorial
This tutorial demonstrates a complete RAG (Retrieval-Augmented Generation) implementation using the LangXChange toolkit. We'll build a system that processes CSV data, stores it in a vector database, and enables intelligent querying with LLM responses.
Complete RAG Example
#!/usr/bin/env python3
"""
Complete RAG Implementation using LangXChange
This example demonstrates:
- CSV data processing and chunking
- Vector database storage with ChromaDB
- Intelligent retrieval and re-ranking
- LLM-powered response generation
- Cost tracking and performance monitoring
"""
import os
import time
from pathlib import Path
import pandas as pd
from dotenv import load_dotenv
from langxchange.documentloader import DocumentLoaderHelper, ChunkingStrategy, ImageProcessingStrategy
from langxchange.embeddings import EmbeddingHelper
from langxchange.mysql_helper import MySQLHelper
from langxchange.openai_helper import EnhancedOpenAIHelper, OpenAIConfig
from langxchange.chroma_helper import EnhancedChromaHelper, ChromaConfig
from langxchange.localllm import LocalLLM
from langxchange.retrieverX import EnhancedRetrieverX, RetrievalResult, create_retriever_from_config, batch_retrieve
from langxchange.prompt_helper import EnhancedPromptHelper, PromptMode
from langxchange.localizellm import LocalizeLLM, LocalLLMConfig, create_local_llm
def main():
"""Complete RAG implementation demonstrating all LangXChange capabilities."""
# Load environment variables
load_dotenv()
# ─── 1) Configuration ────────────────────────────────────────────────────────
# Set your API keys (you can also use environment variables)
os.environ["OPENAI_API_KEY"] = "your-openai-api-key"
os.environ["CHROMA_PERSIST_PATH"] = "__db/chromadb"
# Configuration constants
CHROMA_DIR = "__db/chromadb"
COLLECTION_NAME = "student_info_collection"
# ─── 2) Initialize Components ────────────────────────────────────────────────
# Enhanced OpenAI configuration
open_ai_config = OpenAIConfig(
chat_model="gpt-3.5-turbo",
enable_caching=True,
enable_cost_tracking=True,
max_retries=3,
log_level="INFO"
)
# Enhanced Chroma configuration for better performance
chroma_config = ChromaConfig(
persist_directory=CHROMA_DIR,
batch_size=100,
max_workers=8,
progress_bar=True
)
# Document loader with optimized settings for CSV data
loader = DocumentLoaderHelper(
chunk_size=800,
chunking_strategy=ChunkingStrategy.SEMANTIC,
preserve_formatting=True
)
# Initialize core components
llm = EnhancedOpenAIHelper(open_ai_config)
chroma = EnhancedChromaHelper(llm, chroma_config)
# ─── 3) Retriever Configuration ─────────────────────────────────────────────
retreiver_config = {
"vector_db": chroma,
"embedder": llm,
"reranker_model": None,
"use_rerank": False,
"rerank_multiplier": 3.0,
"db_type": "chroma"
}
# Prompt helper for RAG responses
prompt_response = EnhancedPromptHelper(
llm=llm,
system_prompt="You are a helpful assistant that answers questions based on the provided context.",
default_mode=PromptMode.AUGMENTED,
max_context_length=1500,
max_snippets=3
)
# ─── 4) Data Preparation ─────────────────────────────────────────────────────
# Create data directory if it doesn't exist
Path("data").mkdir(exist_ok=True)
output_path = "data/student_scores.csv"
# Sample CSV data (in practice, this would be your actual data)
sample_data = {
'student_id': range(1, 101),
'name': [f'Student_{i}' for i in range(1, 101)],
'math_score': [85 + (i % 15) for i in range(1, 101)],
'science_score': [80 + (i % 20) for i in range(1, 101)],
'english_score': [75 + (i % 25) for i in range(1, 101)],
'grade': ['A' if i > 90 else 'B' if i > 80 else 'C' for i in range(90, 190)]
}
df = pd.DataFrame(sample_data)
df.to_csv(output_path, index=False)
print(f"💾 Sample data created: {output_path} ({len(df)} records)")
# ─── 5) CSV Processing and Chroma Ingestion ─────────────────────────────────
print("🔄 Processing CSV data for Chroma ingestion...")
# Load and process the CSV file
chunks = list(loader.load(output_path))
print(f"📝 Generated {len(chunks)} chunks from CSV data")
print(f"Processing time: {loader.stats['times']['total']:.3f}s")
# Prepare enhanced metadata for better searchability
enhanced_metadata = []
documents = []
for i, chunk in enumerate(chunks):
metadata = {
"source": Path(output_path).name,
"chunk_id": i,
"data_type": "student_data",
"extraction_date": time.strftime("%Y-%m-%d %H:%M:%S"),
}
enhanced_metadata.append(metadata)
documents.append(str(chunk.content))
# ─── 6) Batch Processing and Ingestion to Chroma ─────────────────────────────
print("⚡ Starting enhanced batch ingestion to ChromaDB...")
ingest_start = time.perf_counter()
try:
# Batch process for efficiency
batch_size = chroma_config.batch_size
total_chunks = len(documents)
for i in range(0, total_chunks, batch_size):
batch_end = min(i + batch_size, total_chunks)
batch_chunks = documents[i:batch_end]
batch_metadata = enhanced_metadata[i:batch_end]
print(f"📤 Processing batch {i//batch_size + 1}/{(total_chunks + batch_size - 1)//batch_size}")
# Insert batch to Chroma
chroma.insert_documents(
collection_name=COLLECTION_NAME,
documents=batch_chunks,
metadatas=batch_metadata,
generate_embeddings=True,
)
print(f"✅ Batch {i//batch_size + 1} inserted successfully")
# Get final collection stats
collection_count = chroma.get_collection_count(COLLECTION_NAME)
ingest_time = time.perf_counter() - ingest_start
print(f"🎉 Ingestion completed successfully!")
print(f"📊 Collection '{COLLECTION_NAME}' now contains {collection_count} documents")
print(f"⏱️ Total ingestion time: {ingest_time:.2f} seconds")
print(f"📈 Processing rate: {collection_count/ingest_time:.2f} documents/second")
except Exception as e:
print(f"❌ Error during ingestion: {str(e)}")
raise
# ─── 7) Retrieval System Testing ────────────────────────────────────────────
print("\n🔍 Testing retrieval system...")
# Create retriever from configuration
retriever = create_retriever_from_config(retreiver_config)
# Test queries
test_queries = [
"Select best student with highest scores",
"What students have A grades?",
"Show me students with high math scores",
"Find students who excel in science"
]
for query in test_queries:
print(f"\n🔎 Query: {query}")
# Retrieve documents
results = retriever.retrieve(
query=query,
top_k=3,
collection_name=COLLECTION_NAME
)
print(f"📋 Retrieved {len(results)} results:")
for i, result in enumerate(results, 1):
print(f" {i}. Score: {result.score:.3f}")
print(f" Content: {result.document[:100]}...")
# ─── 8) RAG Response Generation ─────────────────────────────────────────────
print("\n🧠 Generating RAG responses...")
# Select a query for detailed RAG demonstration
main_query = "What are the characteristics of top-performing students?"
# Get retrieval results
retrieval_results = retriever.retrieve(
query=main_query,
top_k=2,
collection_name=COLLECTION_NAME
)
# Generate RAG response
response = prompt_response.run(
user_query=main_query,
retrieval_results=retrieval_results,
mode=PromptMode.AUGMENTED,
temperature=0.5
)
print(f"\n🤖 RAG Response:")
print(f"Query: {main_query}")
print(f"Answer: {response}")
# ─── 9) MySQL Integration Example ───────────────────────────────────────────
print("\n💾 Demonstrating MySQL integration...")
try:
# Initialize MySQL helper
mysql = MySQLHelper()
# Check connection health
health = mysql.health_check()
print(f"Database status: {health['status']}")
if health['status'] == 'healthy':
# Store student data in MySQL for structured queries
result = mysql.insert_dataframe(
table_name="students",
dataframe=df,
if_exists="replace"
)
print(f"MySQL: {result}")
# Perform structured query
structured_results = mysql.query(
"SELECT * FROM students WHERE math_score > :min_score",
params={"min_score": 90}
)
print(f"Found {len(structured_results)} high-performing students in math")
except Exception as e:
print(f"MySQL integration skipped: {str(e)}")
# ─── 10) Performance Summary ────────────────────────────────────────────────
print("\n📊 Performance Summary:")
print(f"• Documents processed: {collection_count}")
print(f"• Ingestion time: {ingest_time:.2f} seconds")
print(f"• Average processing rate: {collection_count/ingest_time:.2f} docs/sec")
# Cost tracking (if enabled)
try:
cost_summary = llm.get_cost_summary()
print(f"• API costs: ${cost_summary['total_cost']:.4f}")
print(f"• Total tokens: {cost_summary['total_tokens']}")
except:
print("• Cost tracking not available")
print("\n✅ RAG implementation complete!")
if __name__ == "__main__":
main()
Key Features Demonstrated
This RAG example showcases:
- 📊 Data Processing: CSV parsing and semantic chunking
- 🔍 Vector Storage: ChromaDB with batch processing
- 🧠 Intelligent Retrieval: Two-stage retrieval with scoring
- 🤖 LLM Integration: OpenAI for response generation
- 💾 Structured Data: MySQL for relational queries
- 📈 Performance Monitoring: Cost tracking and timing
- 🔧 Modular Design: Easy to customize and extend
Customization Options
- Vector Database: Switch between ChromaDB, Pinecone, or FAISS
- LLM Provider: Use Anthropic, Google, or local models
- Document Formats: Support for PDF, DOCX, images
- Chunking Strategies: Semantic, fixed-size, or custom
- Retrieval Modes: Basic, re-ranked, or hybrid
Google Drive Integration (google_drive_helper.py)
import os
from langxchange.google_drive_helper import EnhancedGoogleDriveHelper
# Initialize Google Drive helper with enhanced configuration
drive = EnhancedGoogleDriveHelper(
credentials_path="credentials.json", # Path to Google OAuth2 credentials file
token_path="token.pickle", # Path to store authentication token
config={
'max_retries': 5,
'retry_delay': 2.0,
'chunk_size': 2 * 1024 * 1024, # 2MB chunks
'max_workers': 8,
'log_level': 'INFO',
'progress_callback': True
}
)
# Create folder structure
folder_id = drive.create_folder(
name="AI Research Documents",
description="Collection of AI research papers and documents"
)
# Upload single file with progress tracking
file_id = drive.upload_file(
file_path="research_paper.pdf",
parent_id=folder_id,
description="Latest AI research paper",
progress_callback=lambda current, total: print(f"Upload progress: {(current/total)*100:.1f}%")
)
# Batch upload multiple files
file_paths = ["doc1.pdf", "doc2.docx", "data.xlsx", "notes.txt"]
upload_results = drive.batch_upload_files(
file_paths=file_paths,
parent_id=folder_id,
progress_callback=lambda completed, total: print(f"Batch upload: {completed}/{total} files")
)
# Search and filter files
search_results = drive.search_files(
query="mimeType='application/pdf' and name contains 'AI'",
max_results=10,
fields="id,name,mimeType,size,modifiedTime"
)
# List files in folder with type filtering
pdf_files = drive.list_files_in_folder(
folder_id=folder_id,
include_subfolders=True,
file_types=["application/pdf"]
)
# Download file with progress tracking
drive.download_file(
file_id=file_id,
output_path="downloaded_file.pdf",
progress_callback=lambda current, total: print(f"Download progress: {(current/total)*100:.1f}%")
)
# Manage file permissions and sharing
permission_id = drive.share_file(
file_id=file_id,
email="colleague@example.com",
role="writer",
type_="user"
)
# Get storage quota information
quota_info = drive.get_storage_quota()
print(f"Storage used: {quota_info['usage']} / {quota_info['limit']} bytes")
# File management operations
# Rename file
new_name = drive.rename_file(file_id=file_id, new_name="Updated_Research_Paper.pdf")
# Move file to different folder
new_folder_id = drive.create_folder("Archive", parent_id=folder_id)
drive.move_file(file_id=file_id, new_parent_id=new_folder_id)
# Copy file
copy_id = drive.copy_file(file_id=file_id, new_name="Backup_Research_Paper.pdf")
# Export Google Docs to different formats
# Export formats include: 'pdf', 'docx', 'odt', 'txt', 'html' for Google Docs
# 'xlsx', 'ods', 'csv', 'tsv' for Google Sheets
# 'pptx', 'odp', 'jpeg', 'png' for Google Slides
drive.export_google_doc(
file_id=file_id,
export_format="application/pdf",
output_path="exported_document.pdf"
)
# Get detailed file metadata
metadata = drive.get_file_metadata(file_id=file_id)
print(f"File: {metadata['name']} ({metadata['size']} bytes)")
# Clean up
drive.close()
Key Features:
- ✅ Robust error handling with exponential backoff retry mechanisms
- ✅ Progress tracking for uploads/downloads with configurable callbacks
- ✅ Batch operations with concurrent file processing
- ✅ Advanced search capabilities using Google Drive query syntax
- ✅ File permissions management and sharing controls
- ✅ Comprehensive logging and configuration management
- ✅ Export Google Docs, Sheets, and Slides to various formats
- ✅ Memory-efficient operations with configurable chunk sizes
- ✅ Support for both individual and batch file operations
- ✅ Authentication handling with automatic token refresh
Required Dependencies:
google-api-python-clientgoogle-auth-oauthlibgoogle-auth
Google Cloud Storage Integration (google_cs_helper.py)
import os
from langxchange.google_cs_helper import EnhancedGoogleCloudStorageHelper
import tempfile
# Initialize GCS helper with enhanced configuration
gcs = EnhancedGoogleCloudStorageHelper(
credentials_path="service-account.json", # Path to service account JSON file
project_id="my-gcp-project", # GCP project ID
logger=logging.getLogger("gcs_helper") # Custom logger
)
# Create bucket with custom configuration
bucket = gcs.create_bucket(
bucket_name="my-storage-bucket",
location="US-CENTRAL1", # Bucket location
storage_class="STANDARD", # Storage class
force_create=True # Create even if exists
)
# Upload single file with metadata
upload_result = gcs.upload_file(
bucket_name="my-storage-bucket",
file_path="research_paper.pdf",
destination_blob_name="documents/research_paper.pdf",
metadata={
"purpose": "research",
"author": "team",
"version": "1.0"
},
content_type="application/pdf"
)
print(f"Uploaded: {upload_result['public_url']}")
# Batch upload multiple files
file_mappings = [
("data1.csv", "data/raw/data1.csv"),
("data2.csv", "data/raw/data2.csv"),
("config.json", "config/application.json")
]
batch_results = gcs.batch_upload(
bucket_name="my-storage-bucket",
file_mappings=file_mappings,
metadata={"batch": "2025-01-data-import"}
)
# List blobs with filtering and metadata
pdf_blobs = gcs.list_blobs(
bucket_name="my-storage-bucket",
prefix="documents/",
include_metadata=True
)
for blob in pdf_blobs:
print(f"Found: {blob['name']} ({blob['size']} bytes)")
# Download file with automatic directory creation
download_info = gcs.download_file(
bucket_name="my-storage-bucket",
blob_name="documents/research_paper.pdf",
destination_file_path="downloads/research_paper.pdf",
create_dirs=True
)
print(f"Downloaded to: {download_info['local_path']}")
# Get detailed blob metadata
metadata = gcs.get_blob_metadata(
bucket_name="my-storage-bucket",
blob_name="documents/research_paper.pdf"
)
print(f"Content type: {metadata['content_type']}")
print(f"Size: {metadata['size']} bytes")
print(f"Custom metadata: {metadata['custom_metadata']}")
# Copy blob to different bucket
copy_result = gcs.copy_blob(
source_bucket="my-storage-bucket",
source_blob="documents/research_paper.pdf",
destination_bucket="backup-storage-bucket",
destination_blob="archive/research_paper.pdf"
)
print(f"Copied: {copy_result['source']} -> {copy_result['destination']}")
# Generate signed URL for temporary access
signed_url = gcs.generate_signed_url(
bucket_name="my-storage-bucket",
blob_name="documents/research_paper.pdf",
expiration_minutes=60, # URL expires in 1 hour
method="GET" # HTTP method
)
print(f"Signed URL: {signed_url}")
# Get comprehensive bucket information
bucket_info = gcs.get_bucket_info("my-storage-bucket")
print(f"Bucket location: {bucket_info['location']}")
print(f"Storage class: {bucket_info['storage_class']}")
print(f"Versioning enabled: {bucket_info['versioning_enabled']}")
print(f"Labels: {bucket_info['labels']}")
# Clean up operations
# Delete specific blob
success = gcs.delete_blob(
bucket_name="my-storage-bucket",
blob_name="documents/old_file.txt"
)
# Delete bucket (with force option to clear all blobs)
# gcs.delete_bucket("my-storage-bucket", force=True)
Key Features:
- ✅ Comprehensive error handling with logging and validation
- ✅ Support for both service account and application default credentials
- ✅ Bucket creation with custom configuration (location, storage class)
- ✅ File upload/download with metadata and content type support
- ✅ Batch operations for efficient multi-file handling
- ✅ Blob listing with filtering, prefix matching, and metadata inclusion
- ✅ Blob copying between buckets and locations
- ✅ Signed URL generation for secure temporary access
- ✅ Detailed metadata retrieval for blobs and buckets
- ✅ Bucket information and lifecycle management
- ✅ Path validation and automatic directory creation
- ✅ Proper cleanup operations for resources
Required Dependencies:
google-cloud-storagegoogle-authgoogle-auth-oauthlibgoogle-auth-httplib2
🛠️ Advanced Usage
Custom Embeddings
import os
import logging
import numpy as np
from sentence_transformers import SentenceTransformer
from langxchange.embeddings import EmbeddingHelper
# Setup logging
logging.basicConfig(level=logging.INFO)
# Example with SentenceTransformer
try:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')
helper = EmbeddingHelper(
model,
batch_size=16,
max_workers=4,
return_numpy=True )
# Test texts
texts = [
"This is a test sentence.",
"Another example text.",
"Machine learning is fascinating.",
"", # Edge case: empty string
"Final test sentence."
]
# Generate embeddings
embeddings = helper.embed(texts)
print(f"Generated {len([e for e in embeddings if e is not None])}/{len(texts)} embeddings")
print(f"Embedding dimension: {helper.get_embedding_dimension()}")
except ImportError:
print("SentenceTransformer not available for testing")
Batch Operations
# Batch query multiple collections
results = batch_retrieve(
retrievers=[retriever1, retriever2],
query="your question",
top_k=5
)
Custom Prompts
"""
Example usage of the improved PromptHelper class.
Demonstrates different prompt building modes and tool integration.
"""
import os
from langxchange.prompt_helper import EnhancedPromptHelper, PromptMode
from typing import Dict, List, Any
from langxchange.openai_helper import EnhancedOpenAIHelper, OpenAIConfig
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Initialize LLM helper with configuration
open_ai_config = OpenAIConfig(
chat_model="gpt-3.5-turbo",
enable_caching=True,
enable_cost_tracking=True,
max_retries=3,
log_level="INFO"
)
llm = EnhancedOpenAIHelper(open_ai_config)
# Initialize PromptHelper with different configurations
helper = EnhancedPromptHelper(
llm=llm,
system_prompt="You are a helpful assistant.",
default_mode=PromptMode.AUGMENTED,
max_context_length=1500,
max_snippets=3
)
# Register tools for enhanced functionality
def sample_search_tool(query: str, limit: int = 3) -> List[str]:
"""Sample search function."""
return [f"Result {i+1} for '{query}'" for i in range(limit)]
helper.register_tool("search", sample_search_tool)
# Sample retrieval results
sample_retrieval_results = [
{
"text": "Machine learning is a subset of artificial intelligence that focuses on algorithms that can learn from data.",
"metadata": {"source": "ML_Basics.pdf", "score": 0.95}
},
{
"text": "Deep learning uses neural networks with multiple layers to model and understand complex patterns.",
"metadata": {"source": "Deep_Learning_Guide.pdf", "score": 0.87}
}
]
user_query = "What is machine learning and how does it relate to AI?"
# Test different prompt modes
modes_to_test = [PromptMode.BASIC, PromptMode.AUGMENTED, PromptMode.CONTEXTUAL, PromptMode.SUMMARIZED]
for mode in modes_to_test:
print(f"\n--- {mode.value.upper()} MODE ---")
# Build prompt with retrieval results
messages = helper.build_prompt(
user_query=user_query,
retrieval_results=sample_retrieval_results,
mode=mode
)
print(f"Generated {len(messages)} messages")
# Run with LLM
response = helper.run(
user_query=user_query,
retrieval_results=sample_retrieval_results,
mode=mode,
temperature=0.5
)
print(f"LLM Response: {response}")
# Tool integration example
print("\n=== Tool Integration ===")
# Use tools with queries
search_result = helper.call_tool("search", "artificial intelligence", limit=2)
print(f"Search tool result: {search_result}")
# Run with multiple tools
tool_calls = [
{"name": "search", "kwargs": {"query": "machine learning", "limit": 2}}
]
result = helper.run_with_tools(
user_query=user_query,
tool_calls=tool_calls,
retrieval_results=sample_retrieval_results,
mode=PromptMode.SUMMARIZED
)
print(f"\nRun with tools result:")
print(f" LLM Response: {result['llm_response']}")
print(f" Tool Results: {result['tool_results']}")
# Configuration updates
helper.update_config(
default_mode=PromptMode.CONTEXTUAL,
max_context_length=2500,
max_snippets=5
)
# Custom system prompt
custom_response = helper.run(
user_query="Explain briefly",
retrieval_results=sample_retrieval_results[:1],
custom_system_prompt="You are a concise technical expert. Provide brief, accurate answers."
)
print(f"Custom system prompt response: {custom_response}")
# Edge case handling
no_retrieval_response = helper.run(
user_query="What is 2+2?",
retrieval_results=None,
mode=PromptMode.AUGMENTED
)
print(f"No retrieval results: {no_retrieval_response}")
📈 Performance Tips
- Use Connection Pooling: Configure pool sizes for database connections
- Batch Operations: Process documents in batches for better performance
- Cache Results: Enable caching for frequently accessed data
- GPU Acceleration: Use GPU for embedding generation when available
- Memory Management: Monitor memory usage for large datasets
🐛 Troubleshooting
Common Issues
Connection Errors:
- Verify API keys and network connectivity
- Check environment variables
- Ensure database services are running
Memory Issues:
- Reduce batch sizes for large datasets
- Use memory-efficient embedding models
- Monitor system resources
Performance Issues:
- Enable progress bars for monitoring
- Use appropriate chunk sizes
- Optimize vector database settings
Debug Mode
import logging
logging.basicConfig(level=logging.DEBUG)
# All components will now output detailed logs
🤝 Contributing
We welcome contributions!
🕵️ Tracing & Observability
LangXChange v0.9.6+ includes a built-in tracing system designed for production observability. It captures every LLM call, retrieval operation, and tool execution as structured TraceSpan objects.
Quick Start: Logging Traces
The easiest way to see tracing in action is to register a callback that prints span metadata:
from langxchange import set_trace_callback
from langxchange.openai_helper import EnhancedOpenAIHelper, OpenAIConfig
# 1. Define a simple listener
def on_trace(span):
print(f"DEBUG: {span['name']} took {span['duration_ms']:.2f}ms")
if "model" in span['metadata']:
print(f"Model: {span['metadata']['model']}")
# 2. Register it globally
set_trace_callback(on_trace)
# 3. Operations now automatically emit traces
llm = EnhancedOpenAIHelper(OpenAIConfig(api_key="sk-...", allow_tracing=True))
llm.chat([{"role": "user", "content": "What is 2+2?"}])
Callback-based Tracing (Advanced)
You can register a global or context-specific callback to capture spans as they complete. This is ideal for streaming traces to a frontend or logging them to an external system.
from langxchange import set_trace_callback, start_trace
def my_trace_handler(span_data):
print(f"Span Ended: {span_data['name']} ({span_data['duration_ms']:.2f}ms)")
# Send to SSE, WebSocket, or database
# Register the callback
set_trace_callback(my_trace_handler)
# Start a new trace context
start_trace()
# Your LLM/RAG operations...
# Every finished span will trigger my_trace_handler
TraceSpan Structure
Each span contains hierarchy information (parent/child relationships), timing, status, and metadata:
name: Operation name (e.g.,openai_achat,vector_source)trace_id: Unique ID for the entire requestspan_id: Unique ID for this specific operationparent_id: ID of the parent operation (for tree visualization)duration_ms: Precise execution timemetadata: Provider-specific details (model name, tokens, etc.)
📄 License
This project is licensed under the MIT License - see the LICENSE file for details.
🙏 Acknowledgments
- OpenAI for GPT models
- ChromaDB for vector storage
- Sentence Transformers for embeddings
- Community contributors
📞 Support
- 📧 Email: support@langxchange.ai
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file langxchange-0.10.12.tar.gz.
File metadata
- Download URL: langxchange-0.10.12.tar.gz
- Upload date:
- Size: 279.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
40ad196222e9621c7b40750eb44b127e48520811b45a1f1de642d6e78de8a559
|
|
| MD5 |
55811958e4a1589b373eb2c0a36be64e
|
|
| BLAKE2b-256 |
ec9a28290bb400390c42cdd8e294788a101bf42b2b02af08349485fe02e27bda
|
File details
Details for the file langxchange-0.10.12-py3-none-any.whl.
File metadata
- Download URL: langxchange-0.10.12-py3-none-any.whl
- Upload date:
- Size: 229.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
04d1ecf63631f93ed69f73daea09978725da92ac76216f1969604913eda8abf7
|
|
| MD5 |
2bc7526943018f3c08f43be0c23c8dd2
|
|
| BLAKE2b-256 |
d62c66201feb5e4b78d12c1b82b4e62ce93bd2d57303448d790cacf3e0c3808a
|