A comprehensive, professional-grade agentic Retrieval-Augmented Generation (RAG) library — core building blocks for building RAG applications
Project description
AI Prishtina · Agentic RAG
Your documents had one job. Now they have an agent.
A Python library for agentic retrieval-augmented generation (RAG) that turns your documents into a self-improving, tool-wielding, multi-agent knowledge system. Ships with 18 LLM providers, 13 vector stores, a cognitive layer that literally critiques its own answers, and enough integrations to automate your entire Tuesday. Current release: v0.1.3.
Table of contents
- Why this library?
- Key features
- AC-RAG Cognitive Layer
- Novel Retrieval Patterns
- Clean Architecture
- Production Infrastructure
- 18 LLM Providers
- 13 Vector Stores
- Core Tools
- Contrib Integrations
- Package structure
- Quick start
- Installation
- Basic usage
- The full cookbook
- Recipe 1 — Hello, RAG world
- Recipe 2 — Agentic mode
- Recipe 3 — Cognitive pipeline
- Recipe 4 — LLM providers (18 options)
- Recipe 5 — Vector stores (13 options)
- Recipe 6 — Document processing
- Recipe 7 — Graph RAG
- Recipe 8 — Streaming
- Recipe 9 — Evaluation
- Recipe 10 — Production hardening
- Recipe 11 — Contrib tools
- Recipe 12 — Custom tools
- Recipe 13 — FastAPI server
- Recipe 14 — Bulk document loading
- Recipe 15 — Parent-child chunking
- Recipe 16 — ReAct Agent
- Recipe 17 — Conversation memory
- Recipe 18 — Monitoring dashboard
Why this library?
Most RAG libraries stop at "retrieve then generate." This one keeps going.
It plans multi-step queries, remembers what worked last time, critiques its own answers, fuses knowledge from multiple sources, and — if you let it — sends a Slack message about the results. Think of it as the difference between a search bar and an intern who actually reads the documents.
# TL;DR — three lines to go from "I have documents" to "I have answers"
from agentic_rag import AgenticRAG
rag = AgenticRAG(vector_store=my_store, llm_provider=my_llm, enable_agent=True)
response = await rag.aquery("Summarize Q4 revenue trends", use_tools=True)
Key features
The Brain — Adaptive Cognitive RAG (AC-RAG)
The optional metacognitive layer that makes this library agentic rather than just retrieval-augmented.
| Component | What it does | Module |
|---|---|---|
| Neural Query Router | Routes queries to the optimal retrieval strategy (rule-based + LLM fallback) | cognitive.query_router |
| Reflective Agent | Critiques its own answers and iteratively improves them | cognitive.reflective_agent |
| Hierarchical Memory | Three-tier memory (episodic / semantic / procedural) that learns from interactions | cognitive.hierarchical_memory |
| Progressive Retrieval | Reformulates queries when initial retrieval quality is low | cognitive.progressive_retrieval |
| Calibrated Confidence | Platt-scaled confidence scores trained against actual accuracy | cognitive.confidence |
| Knowledge Fusion | Merges results from multiple sources with learned trust weights | cognitive.knowledge_fusion |
| Tool Composer | Discovers tools and builds DAG-based execution chains automatically | cognitive.tool_composer |
| Multi-Agent Orchestrator | Event-driven multi-agent collaboration for complex queries | cognitive.multi_agent |
| Neural Classifier | Sub-millisecond DistilBERT-based intent classification | cognitive.neural_classifier |
| Query Decomposer | Breaks complex questions into sub-queries with dependency tracking | cognitive.agentic_components |
| Query Rewriter | Multi-query, step-back, and sub-question rewriting strategies | cognitive.query_rewriter |
| Corrective RAG (CRAG) | Evaluate retrieval quality → refine or web-fallback | cognitive.corrective_rag |
| Self-RAG | Four-checkpoint pipeline: retrieve? relevant? supported? useful? | cognitive.self_rag |
| Adaptive RAG | Decides whether to retrieve based on query complexity | cognitive.adaptive_rag |
| Speculative RAG | Parallel draft generation → verification → best pick | cognitive.speculative_rag |
| ReAct Agent | Interleaved reasoning + acting with automatic tool selection | cognitive.react_agent |
The Secret Sauce — Novel Retrieval Patterns
Research-backed retrieval techniques that no other PyPI RAG library bundles together. Because why settle for "good enough" when you can have "actually novel"?
| Feature | What it does (in plain English) | Module |
|---|---|---|
| HyDE | Generates a fake-but-plausible answer, embeds that instead of your question, and retrieves docs that match the hypothetical. It sounds wrong, but it works. | retrieval.hyde |
| RAPTOR | Builds a hierarchical tree of document summaries at multiple abstraction levels. Retrieves from the right level based on query specificity. Like a Russian nesting doll, but for knowledge. | retrieval.raptor |
| Citation Grounding | Maps every sentence in the LLM's answer back to specific source chunks with [1] markers. Because "trust me bro" is not a valid citation format. |
core.citation |
The Janitor — Clean Architecture
We just Marie Kondo'd the codebase. Deprecated modules? Thanked them for their service and sent them packing. What remains is a clean, logical structure where:
providers/holds all your LLMs, embeddings, and vector stores (because they provide value)document_processing/has loaders and chunkers in their own subdirectories (no more flat chaos)- Everything else is exactly where your intuition expects it to be
If it doesn't spark joy (or serve a purpose), it's gone. The imports work. The structure makes sense. You're welcome.
The Suit — Production-Grade Infrastructure
Enterprise features that keep your RAG system running in production without embarrassing itself. Or you.
| Feature | What it does (in plain English) | Module |
|---|---|---|
| Per-Document Access Control | ACL-based document filtering during retrieval. Each document gets read_groups/read_users. Essential for multi-tenant deployments where not everyone should see everything. |
core.access_control |
| Streaming Structured Output | SSE-compatible JSON events (answer chunk, source, confidence, tool_call) for real-time UIs. Your frontend will thank you. | core.structured_stream |
| Prompt Compression | LLMLingua-inspired context compression. Fits more context into your token budget while preserving relevance. Because LLMs are expensive and tokens don't grow on trees. | core.prompt_compression |
| Answer Scorer | Automatic answer quality scoring with reference-free and reference-based modes. Integrates with the feedback loop for continuous improvement. It grades your homework so you don't have to. | evaluation.answer_scorer |
The Muscles — 18 LLM Providers
OpenAI, Anthropic, Cohere, Gemini, Mistral, Ollama, Groq, DeepSeek, xAI/Grok, AWS Bedrock, Azure OpenAI, Together.ai, AI21, Fireworks, Perplexity, llama.cpp, HuggingFace local models, and any OpenAI-compatible API. Swap one line, keep everything else.
The Memory — 13 Vector Stores
ChromaDB, FAISS, Pinecone, Weaviate, Qdrant, pgvector, Milvus, Redis, MongoDB Atlas, Elasticsearch, Vespa, Azure AI Search, and an in-memory store for tests.
The Hands — Core Tools
| Category | Tools | Import |
|---|---|---|
| Search | WebSearchTool, WebScrapeTool, HTTPRequestTool, APISpecTool, GraphQLTool |
tools.search |
| Calculation | CalculatorTool, StatisticsTool, UnitConverterTool |
tools.calculation |
| Code | CodeExecutorTool (sandboxed Python/JS) |
tools.code |
| File | FileReadTool, FileWriteTool, FileListTool, DocumentLoaderTool |
tools.file |
| Data | JSONTool, SQLTool |
tools.data |
| Utility | VectorStoreTool, KnowledgeGraphTool, DateTimeTool, TextProcessingTool, TextSummarizerTool |
tools.utility |
The Wardrobe — Contrib Integrations
Optional integrations that live in contrib/ because not every RAG system needs to send a Slack message.
| Category | Tools | Import path |
|---|---|---|
| Communication | EmailTool, GoogleCalendarTool, SlackTool, DiscordTool, JiraTool |
contrib.communication |
| Productivity | NotionTool, ConfluenceTool, AsanaTool, TrelloTool, LinearTool, ClickUpTool, AirtableTool, MondayTool |
contrib.productivity |
| API Integrations | StripeTool, TwilioTool, SendGridTool, PagerDutyTool, DatadogTool, ZapierTool |
contrib.api_integrations |
| Storage | S3Tool, GCSTool, AzureBlobTool, MemoryTool |
contrib.storage |
| DevOps | GitTool, GitHubTool |
contrib.devops |
| Media | AudioTool, WhisperTranscriptionTool, ImageGenerationTool, ImageEditTool |
contrib.media |
| Database | MongoDBTool, RedisTool, PostgreSQLTool |
contrib.database |
| Infrastructure | TenantManager, ModelCompressor, FederatedCoordinator |
contrib.infrastructure |
The Librarian — Document Processing
Ingest anything short of a napkin sketch (we're working on it): PDF, DOCX, HTML, Markdown, CSV, Excel, JSON, XML, PowerPoint, EPUB, ZIP archives, Jupyter notebooks, images (OCR), audio (Whisper), and video (frame + audio analysis).
Organization that makes sense:
document_processing.loaders.*— 20+ document loaders (PDFLoader, DocxLoader, HTMLLoader, etc.)document_processing.chunkers.*— 10+ chunking strategies (FixedSizeChunker, SemanticChunker, MarkdownChunker, CodeChunker, AgenticChunker, etc.)
No more "where did I put that import?" moments. Everything is exactly where it should be.
The Armor — Production Infrastructure
| Component | What it does |
|---|---|
SemanticCache |
In-memory or Redis — stop paying for the same question twice |
CircuitBreaker |
Prevent cascading failures when your LLM provider has a bad day |
OutputGuardrails |
PII redaction, toxicity filtering, content validation |
CostTracker |
Token usage monitoring with configurable pricing |
FeedbackLoop |
Collect feedback, track trends, learn from interactions |
DocumentVersionStore |
Diff-based change tracking with SHA-256 hashing |
BatchIngestionPipeline |
Async bulk ingestion with back-pressure and retry |
ModelCompressor |
INT8/FP16 quantization, ONNX export, pruning for edge deployment |
FederatedCoordinator |
Privacy-preserving FedAvg strategy learning across deployments |
| OpenTelemetry Tracing | Distributed tracing via get_tracer() |
The Judge — Evaluation & Monitoring
ComprehensiveEvaluator (relevance, faithfulness, answer quality, latency), RAGBenchmark + PerformanceBenchmark suites, and ABTest for statistically rigorous variant comparison. 298+ tests in the suite.
Developer experience
- Configuration-first: YAML, INI, or env vars — no hardcoded models or thresholds
- Factory pattern:
create_tool("web_search"),create_provider("openai")— config-driven instantiation - Type safety: Pydantic v2 models across all public APIs
- Modular: Swap vector stores, LLM providers, tools, and cognitive components independently
Package structure
agentic_rag/
├── base/ # Abstract base classes (BaseTool, BaseProvider, ...)
├── factories/ # Factory pattern (create_tool, create_provider, ...)
├── core/ # AgenticRAG, planner, orchestrator, memory, cache, guardrails
├── cognitive/ # AC-RAG: router, reflection, memory, fusion, multi-agent
├── tools/ # Core tools (search, calculation, code, file, data, utility)
├── contrib/ # Optional integrations
│ ├── communication/ # Slack, email, calendar, Jira, Discord
│ ├── productivity/ # Notion, Confluence, Asana, Trello, Linear, ...
│ ├── api_integrations/ # Stripe, Twilio, SendGrid, PagerDuty, ...
│ ├── storage/ # S3, GCS, Azure Blob
│ ├── devops/ # Git, GitHub
│ ├── media/ # Audio, image generation (DALL-E)
│ ├── database/ # MongoDB, Redis, PostgreSQL
│ └── infrastructure/ # Multi-tenancy, model compression, federated learning
├── providers/ # Everything that provides a service
│ ├── llm/ # 18 LLM providers (was: llm/)
│ ├── embeddings/ # 8 embedding providers + cache/utils (was: embeddings/)
│ └── vector_stores/ # 13 vector store backends
├── retrieval/ # Retrievers, rerankers, BM25, ColBERT
├── document_processing/ # Loaders, chunkers, preprocessors (organized in subdirs)
│ ├── loaders/ # 20+ document loaders (pdf, docx, html, etc.)
│ └── chunkers/ # 10+ chunking strategies
├── strategies/ # Pluggable chunking & retrieval strategies
├── graph/ # Knowledge graph, entity extraction, graph retrieval
├── evaluation/ # Metrics, benchmarks, A/B testing
├── server/ # FastAPI app (optional)
└── utils/ # Config, exceptions, logging
Quick start
Installation
# The basics
pip install ai-prishtina-agentic-rag
# I want everything and I want it now
pip install ai-prishtina-agentic-rag[all]
# I'm a responsible adult who only installs what I need
pip install ai-prishtina-agentic-rag[openai,chroma]
# Development (you beautiful contributor, you)
pip install -e .[dev]
| Use case | Install command |
|---|---|
| Core library | pip install ai-prishtina-agentic-rag |
| Single vector backend | pip install ai-prishtina-agentic-rag[chroma] (or pinecone, weaviate, faiss) |
| All vector backends | pip install ai-prishtina-agentic-rag[vector-all] |
| LLM providers | pip install ai-prishtina-agentic-rag[openai] (or anthropic, cohere, llm-all) |
| Document processing | pip install ai-prishtina-agentic-rag[documents,nlp,multimodal] |
| PDF (advanced) | pip install ai-prishtina-agentic-rag[pdf-advanced] (docling, unstructured, camelot, kreuzberg) |
| PDF (tables only) | pip install ai-prishtina-agentic-rag[pdf-camelot] |
| PDF (figures/equations) | pip install ai-prishtina-agentic-rag[pdf-docling] |
| Production API | pip install ai-prishtina-agentic-rag[server,observability] |
Environment setup
# Required: at least one LLM provider key
export OPENAI_API_KEY=sk-your-key-here
# Optional: more providers, vector stores, tool keys
export ANTHROPIC_API_KEY=sk-ant-...
export PINECONE_API_KEY=...
export SERP_API_KEY=... # for WebSearchTool
The full cookbook
Recipe 1 — Hello, RAG world
The absolute minimum to go from zero to answers. No bells, no whistles, just vibes.
Basic Setup
import asyncio
from agentic_rag import AgenticRAG
from agentic_rag.providers.llm import OpenAIProvider
from agentic_rag.providers.vector_stores import InMemoryVectorStore
async def main():
store = InMemoryVectorStore()
llm = OpenAIProvider(api_key="sk-...", model="gpt-4o")
rag = AgenticRAG(vector_store=store, llm_provider=llm)
await rag.add_documents([
{"content": "Python was created by Guido van Rossum in 1991."},
{"content": "The Zen of Python includes 'Beautiful is better than ugly.'"},
])
response = await rag.aquery("Who created Python and when?")
print(response.answer)
# Output: Python was created by Guido van Rossum in 1991.
asyncio.run(main())
Load Documents from Files
from agentic_rag.document_processing.loaders import (
PDFLoader, DocxLoader, TextLoader, MarkdownLoader,
HTMLLoader, JSONLoader, CSVLoader
)
# PDF documents (basic extraction with PyPDF2)
docs = PDFLoader().load("report.pdf")
await rag.add_documents([{"content": d.content, "metadata": d.metadata} for d in docs])
# Advanced PDF extraction - tables, figures, equations
# Install: pip install ai-prishtina-agentic-rag[pdf-advanced]
docs = PDFLoader(
extraction_backend="docling", # or "unstructured", "camelot", "kreuzberg"
extract_tables=True,
extract_figures=True,
extract_equations=True
).load("research_paper.pdf")
# Word documents
docs = DocxLoader().load("contract.docx")
await rag.add_documents([{"content": d.content, "metadata": d.metadata} for d in docs])
# Text files
docs = TextLoader(encoding="utf-8").load("notes.txt")
# Markdown
docs = MarkdownLoader().load("documentation.md")
# HTML (web pages)
docs = HTMLLoader().load("page.html")
# Structured data
json_docs = JSONLoader().load("data.json")
csv_docs = CSVLoader().load("spreadsheet.csv")
Smart Chunking
from agentic_rag.document_processing.chunkers import (
SemanticChunker, FixedSizeChunker, MarkdownChunker,
CodeChunker, Language
)
# Semantic chunking (preserves sentence boundaries)
chunker = SemanticChunker(max_chunk_size=1000, overlap=100)
chunks = chunker.chunk(long_document)
# Fixed size with overlap
chunker = FixedSizeChunker(chunk_size=500, overlap=50)
# Markdown-aware (respects headers)
chunker = MarkdownChunker(max_chunk_size=1500, header_split_levels=[1, 2, 3])
# Code-aware chunking
chunker = CodeChunker(language=Language.PYTHON, max_chunk_size=2000)
Query with Options
# Basic query
response = await rag.aquery("What is machine learning?")
# With filters
response = await rag.aquery(
"Q4 revenue",
filters={"source": "financial_reports", "year": 2024}
)
# With source citations
response = await rag.aquery(
"Explain neural networks",
include_citations=True,
max_sources=5
)
print(response.answer) # Includes [1], [2] citations
print(response.sources) # List of source documents
# Control creativity
response = await rag.aquery(
"Write a poem about AI",
temperature=0.9,
max_tokens=500
)
Migration from LlamaIndex/LangChain
from agentic_rag.contrib.migration import LlamaIndexImporter, LangChainImporter
# Import from LlamaIndex
from llama_index.core import VectorStoreIndex
li_index = VectorStoreIndex.from_documents(docs)
importer = LlamaIndexImporter(vector_store=store)
result = await importer.import_vector_store_index(li_index)
print(f"Migrated {result.documents_imported} documents")
# Import from LangChain
from langchain_chroma import Chroma
lc_store = Chroma(persist_directory="./my_chroma_db")
importer = LangChainImporter(vector_store=store)
result = await importer.import_vectorstore(lc_store)
print(f"Migrated {result.documents_imported} documents")
Why you need this: Because every journey starts with a single step, and this is that step. It's the "hello world" of making your documents actually useful instead of just taking up disk space.
Recipe 2 — Agentic mode (let it plan)
When your question is too complex for a single retrieval pass, let the planner break it down.
Basic Agent Setup
from agentic_rag.tools import WebSearchTool, CalculatorTool
rag = AgenticRAG(
vector_store=store,
llm_provider=llm,
enable_agent=True,
enable_memory=True,
)
rag.register_tool(WebSearchTool(api_key="your-serp-key"))
rag.register_tool(CalculatorTool())
response = await rag.aquery(
"Find the GDP of France and Germany, then calculate the difference",
enable_planning=True,
use_tools=True,
)
print(response.answer)
print("Steps taken:", response.reasoning_steps)
print("Confidence:", response.confidence)
All Available Tools
from agentic_rag.tools import (
# Search tools
WebSearchTool, # Google/Bing search via SERP API
WebScrapeTool, # Scrape web pages
HTTPRequestTool, # Generic HTTP requests
APISpecTool, # Read OpenAPI specs
GraphQLTool, # GraphQL queries
# Calculation tools
CalculatorTool, # Math expressions
StatisticsTool, # Statistical analysis
UnitConverterTool, # Unit conversions
# Code execution
CodeExecutorTool, # Sandboxed Python/JS execution
# File operations
FileReadTool, # Read local files
FileWriteTool, # Write files
FileListTool, # List directories
DocumentLoaderTool, # Load docs with auto-detection
# Data processing
JSONTool, # JSON operations
SQLTool, # SQL queries on data
# Knowledge tools
VectorStoreTool, # Direct vector search
KnowledgeGraphTool, # Graph queries
DateTimeTool, # Date/time operations
TextProcessingTool, # Text transformations
TextSummarizerTool, # Summarize text
)
# Register multiple tools
rag.register_tools([
WebSearchTool(api_key=os.getenv("SERP_API_KEY")),
CalculatorTool(),
CodeExecutorTool(allowed_languages=["python", "javascript"]),
FileReadTool(base_path="/app/data"),
JSONTool(),
DateTimeTool(),
])
Multi-Step Planning
# Complex query requiring multiple steps
response = await rag.aquery(
"""
Research the top 3 LLM models released in 2024,
calculate their average parameter count,
and save the results to a JSON file
""",
enable_planning=True,
use_tools=True,
max_steps=10, # Allow up to 10 planning steps
)
# The agent will:
# 1. Search for "top LLM models 2024"
# 2. Extract parameter counts from results
# 3. Use Calculator to compute average
# 4. Use FileWriteTool to save JSON
Tool Chains
from agentic_rag.cognitive import ToolComposer
# Auto-compose tool chains for complex workflows
composer = ToolComposer(rag)
chain = composer.create_chain([
"web_search",
"web_scrape",
"text_summarize",
"file_write"
])
result = await chain.execute(
"Find articles about climate change, summarize them, and save to climate_research.txt"
)
Why you need this: Because real questions aren't simple. Sometimes you need to search the web, do math, and synthesize results. This is like giving your RAG system a Swiss Army knife and the intelligence to know when to use each tool.
Recipe 3 — The cognitive pipeline (full AC-RAG)
For when you want the system to route, retrieve progressively, reflect, and learn.
Basic Cognitive Query
result = await rag.run_cognitive_query(
"Compare the economic impact of AI adoption in healthcare vs finance",
enable_reflection=True,
enable_progressive_retrieval=True,
)
print(result.answer)
print(f"Confidence: {result.confidence:.2f}")
print(f"Reflections: {result.reflection_count}")
All 15 Cognitive Components
from agentic_rag.cognitive import (
# Routing & Classification
NeuralQueryRouter, # Route queries to optimal strategy
NeuralQueryClassifier, # Sub-millisecond intent classification
# Query Processing
QueryDecomposer, # Break complex questions into sub-queries
QueryRewriter, # Multi-query, step-back, sub-question rewriting
# Retrieval Strategies
ProgressiveRetriever, # Reformulate when quality is low
CorrectiveRAG, # Evaluate quality → refine or web-fallback
SelfRAG, # Four-checkpoint pipeline
AdaptiveRAG, # Decide whether to retrieve
SpeculativeRAG, # Parallel draft → verify → pick best
# Answer Quality
ReflectiveAgent, # Self-critique and iterative improvement
CalibratedConfidence, # Platt-scaled confidence scores
KnowledgeFusion, # Merge multi-source with trust weights
# Multi-Agent
MultiAgentOrchestrator, # Event-driven collaboration
ToolComposer, # Auto-discover and compose tool chains
# Memory
HierarchicalMemory, # Episodic / semantic / procedural
)
# Use individual components
router = NeuralQueryRouter()
decision = router.route("What were Apple's Q3 earnings?")
print(decision.strategy) # "financial_retrieval"
# Query decomposition
decomposer = QueryDecomposer()
decomposed = decomposer.decompose(
"Compare Tesla and BMW's EV market share in Europe"
)
for sub in decomposed.sub_queries:
print(f" - {sub.query}") # "Tesla EV market share Europe"
# "BMW EV market share Europe"
# Progressive retrieval
progressive = ProgressiveRetriever(vector_store=store)
result = await progressive.retrieve(
"Quantum computing breakthroughs 2024",
min_confidence=0.7,
max_attempts=3
)
RAG Pattern Selection
from agentic_rag.cognitive import (
CorrectiveRAG, SelfRAG, AdaptiveRAG, SpeculativeRAG
)
# Corrective RAG: Fix bad retrieval
crag = CorrectiveRAG(vector_store=store, web_search_tool=web_search)
result = await crag.query("Latest SpaceX Starship launch date")
# If retrieval quality is low, automatically falls back to web search
# Self-RAG: Four checkpoints
self_rag = SelfRAG(vector_store=store)
result = await self_rag.query(
"Explain transformer architecture",
checkpoints=["retrieve", "relevant", "supported", "useful"]
)
print(result.checkpoints_passed) # [True, True, True, True]
# Adaptive RAG: Smart retrieval decisions
adaptive = AdaptiveRAG(vector_store=store)
result = await adaptive.query("What is 2+2?") # No retrieval needed
result = await adaptive.query("Explain RAPTOR paper") # Retrieves automatically
# Speculative RAG: Draft and verify
speculative = SpeculativeRAG(llm=llm)
result = await speculative.query(
"Summarize climate change impacts",
num_drafts=3, # Generate 3 parallel drafts
)
print(result.best_draft.confidence)
Reflective Agent with Critique
from agentic_rag.cognitive import ReflectiveAgent
reflective = ReflectiveAgent(llm=llm, max_iterations=3)
result = await reflective.process(
"What are the main causes of World War I?",
reflection_focus=["factual_accuracy", "completeness", "source_diversity"]
)
print(f"Answer: {result.answer}")
print(f"Iterations: {result.iteration_count}")
for critique in result.critiques:
print(f" Issue: {critique.issue} | Severity: {critique.severity}")
Hierarchical Memory
from agentic_rag.cognitive import HierarchicalMemory
memory = HierarchicalMemory()
# Episodic: Remember past interactions
await memory.episodic.record_interaction(
query="Python list comprehensions",
answer="List comprehensions provide...",
outcome="helpful"
)
# Semantic: Learn facts
await memory.semantic.store_fact(
subject="Python",
predicate="created_by",
object="Guido van Rossum"
)
# Procedural: Learn strategies
await memory.procedural.record_strategy(
situation="vague_query",
action="ask_clarifying_question",
outcome="success"
)
# Query memory
similar = await memory.episodic.find_similar("list comprehension syntax")
print(f"Found {len(similar)} similar past queries")
Why you need this: Because sometimes one pass isn't enough. This is like giving your RAG system a PhD - it questions its own answers, retrieves more when uncertain, and learns from every interaction.
Recipe 4 — Swap your LLM like changing socks (18 providers)
Every provider has the same interface. Swap one line, keep everything else.
All 18 LLM Providers
from agentic_rag.providers.llm import (
# Cloud Providers
OpenAIProvider, # GPT-4o, o1, o3, GPT-4.5
AnthropicProvider, # Claude 3.5 / 4 Sonnet, Opus, Haiku
GeminiProvider, # Google Gemini 1.5/2.0 Pro/Flash
CohereProvider, # Command R/R+
MistralProvider, # Mistral Large/Medium/Small
# Fast Inference
GroqProvider, # Llama/Mixtral at 500+ tok/sec
TogetherAIProvider, # 100+ open models
FireworksProvider, # Fast inference for open models
# Specialized
DeepSeekProvider, # DeepSeek V3/R1 (reasoning)
XAIProvider, # Grok models
PerplexityProvider, # Sonar models with citations
AI21Provider, # Jamba models
# Enterprise / Cloud
AzureOpenAIProvider, # GPT-4 via Azure
BedrockProvider, # AWS Bedrock (Claude, Llama, etc.)
# OpenAI-Compatible
OpenAICompatibleProvider, # Any OpenAI-compatible API
# Local / Self-Hosted
OllamaProvider, # Local models (Llama, Mistral, etc.)
LocalModelProvider, # Generic local model wrapper
LlamaCppProvider, # llama.cpp backend
)
Quick Examples
# OpenAI - The reliable choice
llm = OpenAIProvider(api_key="sk-...", model="gpt-4o")
# Anthropic - Best for long context
llm = AnthropicProvider(
api_key="sk-ant-...",
model="claude-sonnet-4-20250514",
max_tokens=8192
)
# Google Gemini - Multimodal powerhouse
llm = GeminiProvider(api_key="...", model="gemini-2.0-flash")
# Groq - Speed demon (500+ tok/sec)
llm = GroqProvider(api_key="...", model="llama-3.3-70b-versatile")
# DeepSeek - Best reasoning model
llm = DeepSeekProvider(api_key="...", model="deepseek-r1")
# Ollama - Run locally, pay nothing
llm = OllamaProvider(model="llama3.3", host="http://localhost:11434")
# Azure OpenAI - Enterprise ready
llm = AzureOpenAIProvider(
api_key="...",
endpoint="https://myresource.openai.azure.com",
deployment_name="gpt-4o"
)
# AWS Bedrock - Your AWS bill's new friend
llm = BedrockProvider(
aws_access_key="...",
aws_secret_key="...",
region="us-east-1",
model="anthropic.claude-3-sonnet-20240229-v1:0"
)
# Any OpenAI-compatible API
llm = OpenAICompatibleProvider(
base_url="https://api.mystartup.com/v1",
api_key="...",
model="custom-model"
)
Using the Factory
from agentic_rag.factories import create_provider
# Create by name (useful for config-driven setups)
llm = create_provider("openai", model="gpt-4o", api_key="...")
llm = create_provider("anthropic", model="claude-sonnet-4", api_key="...")
llm = create_provider("ollama", model="llama3.3")
# Same interface everywhere
rag = AgenticRAG(vector_store=store, llm_provider=llm)
Why you need this: Because vendor lock-in is the adult version of "you can't sit with us." We believe in playing the field - try OpenAI today, switch to local models tomorrow, your code stays the same.
Recipe 5 — Vector stores (13 options + hybrid retrieval)
All 13 Vector Stores
from agentic_rag.providers.vector_stores import (
# Local / Development
InMemoryVectorStore, # Zero-config, for testing
ChromaVectorStore, # Local ChromaDB
FAISSVectorStore, # Facebook AI Similarity Search
# Production / Managed
PineconeVectorStore, # Managed vector search
WeaviateVectorStore, # Vector + semantic search
QdrantVectorStore, # High-performance vector DB
PGVectorStore, # PostgreSQL + pgvector
MilvusVectorStore, # Distributed vector DB
# Cloud / Enterprise
RedisVectorStore, # Redis Stack with RediSearch
MongoDBAtlasVectorStore, # MongoDB Atlas Vector Search
ElasticsearchVectorStore, # Elasticsearch dense vectors
VespaVectorStore, # Vespa search engine
AzureAISearchVectorStore, # Azure AI Search
)
Quick Setup Examples
# In-Memory (testing/development)
store = InMemoryVectorStore()
# ChromaDB (local development)
store = ChromaVectorStore(
collection_name="my_docs",
persist_directory="./chroma_db"
)
# Pinecone (managed production)
store = PineconeVectorStore(
api_key="your-key",
environment="us-west1-gcp",
index_name="production-index",
dimension=1536 # Match your embedding model
)
# Weaviate (local or cloud)
store = WeaviateVectorStore(
host="http://localhost:8080",
class_name="Documents"
)
# Qdrant (local or cloud)
store = QdrantVectorStore(
host="localhost",
port=6333,
collection_name="docs"
)
# PostgreSQL + pgvector
store = PGVectorStore(
connection_string="postgresql://user:pass@localhost/db",
table_name="embeddings",
dimension=1536
)
# Milvus (distributed)
store = MilvusVectorStore(
host="localhost",
port="19530",
collection_name="documents"
)
# Redis (with RediSearch)
store = RedisVectorStore(
redis_url="redis://localhost:6379",
index_name="rag_docs"
)
# MongoDB Atlas
store = MongoDBAtlasVectorStore(
connection_string="mongodb+srv://...",
database="rag",
collection="documents",
index_name="vector_index"
)
# Elasticsearch
store = ElasticsearchVectorStore(
hosts=["http://localhost:9200"],
index_name="rag_documents"
)
# Azure AI Search
store = AzureAISearchVectorStore(
endpoint="https://search.search.windows.net",
api_key="...",
index_name="documents"
)
Hybrid Retrieval (Dense + Sparse)
from agentic_rag.retrieval import HybridRetriever, BM25Retriever
# Combine vector search with BM25 keyword search
hybrid = HybridRetriever(
vector_store=store,
dense_weight=0.7,
sparse_weight=0.3
)
# Or with explicit BM25
from agentic_rag.retrieval import BM25Retriever
bm25 = BM25Retriever(documents=all_docs)
hybrid = HybridRetriever(
vector_store=store,
sparse_retriever=bm25,
dense_weight=0.6,
sparse_weight=0.4
)
# Use with RAG
rag = AgenticRAG(vector_store=store, llm_provider=llm)
response = await rag.aquery("What is machine learning?", retriever=hybrid)
Using the Factory
from agentic_rag.factories import create_vector_store
# Create by name from config
store = create_vector_store("chroma", collection_name="docs")
store = create_vector_store("pinecone", api_key="...", index_name="prod")
store = create_vector_store("qdrant", host="localhost", port=6333)
Why you need this: Because your use case matters. Local development shouldn't require a PhD in cloud architecture, and production shouldn't run on your laptop. We've got you covered from prototype to scale.
Recipe 6 — Document processing (20+ loaders, 10+ chunkers)
All Document Loaders
from agentic_rag.document_processing.loaders import (
# Text & Documents
TextLoader, # Plain text files (.txt)
PDFLoader, # PDF documents (.pdf) - with 5 backends!
DocxLoader, # Word documents (.docx)
MarkdownLoader, # Markdown files (.md)
HTMLLoader, # HTML files (.html, .htm)
# Structured Data
JSONLoader, # JSON files (.json)
CSVLoader, # CSV files (.csv)
XMLLoader, # XML files (.xml)
# Spreadsheets & Presentations
ExcelLoader, # Excel files (.xlsx, .xls)
PowerPointLoader, # PowerPoint (.pptx)
# Media & Archives
ImageLoader, # Images with OCR (.jpg, .png, etc.)
AudioLoader, # Audio files with transcription
VideoLoader, # Video analysis (frames + audio)
EPubLoader, # eBooks (.epub)
NotebookLoader, # Jupyter notebooks (.ipynb)
ArchiveLoader, # Zip, tar, etc.
# Advanced PDF Loaders (specialized)
DoclingPDFLoader, # Full document AI (tables, figures, equations)
CamelotTableLoader, # Table extraction specialist
UnstructuredPDFLoader, # Multi-element extraction
KreuzbergPDFLoader, # Modern extraction with math support
)
# Load any document
docs = PDFLoader().load("report.pdf")
docs = DocxLoader().load("contract.docx")
docs = MarkdownLoader().load("docs.md")
docs = HTMLLoader().load("page.html")
docs = JSONLoader().load("data.json")
docs = CSVLoader().load("spreadsheet.csv")
# Excel with sheet selection
excel_docs = ExcelLoader(sheet_name="Revenue").load("financials.xlsx")
# Image with OCR
image_docs = ImageLoader(
ImageLoaderConfig(enable_ocr=True, extract_metadata=True)
).load("diagram.png")
# Video analysis (extract frames + transcribe audio)
from agentic_rag.document_processing.loaders import VideoLoader, VideoLoaderConfig
video_docs = VideoLoader(
VideoLoaderConfig(
extract_frames_every_seconds=5,
transcribe_audio=True,
max_frames=20
)
).load("presentation.mp4")
All Chunking Strategies
from agentic_rag.document_processing.chunkers import (
# Basic chunkers
FixedSizeChunker, # Fixed size with overlap
SemanticChunker, # Sentence/paragraph boundaries
# Structure-aware
MarkdownChunker, # Respects headers (# ## ###)
HTMLDOMChunker, # HTML DOM tree chunking
PDFLayoutChunker, # Layout-aware PDF chunking
CodeChunker, # AST-aware code chunking
# Specialized
TableChunker, # Table structure preservation
JSONStructureChunker, # JSON hierarchy chunking
XMLStructureChunker, # XML element chunking
MultimodalChunker, # Image + text alignment
AgenticChunker, # LLM-based semantic chunking
)
from agentic_rag.document_processing.chunkers import Language
# Basic semantic chunking
chunker = SemanticChunker(max_chunk_size=1000, overlap=100)
chunks = chunker.chunk(long_text)
# Fixed size
chunker = FixedSizeChunker(chunk_size=500, overlap=50)
# Markdown with headers
chunker = MarkdownChunker(
max_chunk_size=1500,
header_split_levels=[1, 2, 3] # Split at h1, h2, h3
)
# Code-aware (Python, JS, Java, etc.)
chunker = CodeChunker(
language=Language.PYTHON,
max_chunk_size=2000,
respect_function_boundaries=True
)
# HTML DOM chunking
chunker = HTMLDOMChunker(
max_chunk_size=1500,
respect_semantic_tags=True, # <article>, <section>, etc.
include_attributes=["id", "class"]
)
# PDF layout-aware
chunker = PDFLayoutChunker(
max_chunk_size=2000,
detect_columns=True,
preserve_page_breaks=False
)
# Tables with context
chunker = TableChunker(
max_chunk_size=1500,
keep_headers_with_rows=True,
output_format="text" # or "csv", "json"
)
Advanced PDF Extraction (5 Backends)
from agentic_rag.document_processing.loaders import (
PDFLoader,
PDFExtractionConfig,
DoclingPDFLoader, DoclingTableLoader, DoclingFigureLoader, DoclingEquationLoader,
CamelotTableLoader, CamelotLatticeLoader, CamelotStreamLoader,
UnstructuredPDFLoader, UnstructuredTableLoader,
KreuzbergPDFLoader, KreuzbergEquationLoader,
get_available_backends, # Check which backends are installed
)
# Check available backends
backends = get_available_backends()
# {'pypdf2': True, 'docling': True, 'camelot': False, ...}
# 1. Universal PDFLoader with backend selection
# Uses PyPDF2 by default (no extra dependencies)
docs = PDFLoader().load("document.pdf")
# Auto-select best backend based on extraction needs
docs = PDFLoader(
extraction_backend="auto", # Auto-select based on needs
extract_tables=True,
extract_figures=True,
extract_equations=True
).load("research_paper.pdf")
# Force specific backend
docs = PDFLoader(extraction_backend="docling").load("paper.pdf") # Best overall
docs = PDFLoader(extraction_backend="unstructured").load("form.pdf") # Multi-element
docs = PDFLoader(extraction_backend="kreuzberg").load("math.pdf") # Math equations
# 2. Docling - Full Document AI (IBM's state-of-the-art)
# Best for: research papers, complex layouts, tables, figures, equations
# Install: pip install ai-prishtina-agentic-rag[pdf-docling]
# Extract everything
docs = DoclingPDFLoader(
extraction_config=PDFExtractionConfig(
extract_tables=True,
extract_figures=True,
extract_equations=True
)
).load("research_paper.pdf")
# Returns list of documents by type:
# - Document(content="Introduction text...", metadata={"type": "text", "page": 1})
# - Document(content="| Revenue | Q1 | Q2 |...", metadata={"type": "table", "page": 3})
# - Document(content="Figure 1: Neural network architecture", metadata={"type": "figure", "page": 2})
# - Document(content="E = mc^2", metadata={"type": "equation", "latex": "E = mc^2"})
# Specialized loaders
from agentic_rag.document_processing.loaders import DoclingTableLoader
tables = await DoclingTableLoader().load("financial_report.pdf")
figures = await DoclingFigureLoader().load("paper_with_charts.pdf")
equations = await DoclingEquationLoader().load("math_paper.pdf")
# 3. Camelot - Table Extraction Specialist
# Best for: financial reports, data tables, structured PDFs
# Install: pip install ai-prishtina-agentic-rag[pdf-camelot]
# Auto-detect table type (lattice vs stream)
tables = await CamelotTableLoader(flavor="auto").load("report.pdf")
# Force lattice mode (for tables with ruling lines)
tables = await CamelotLatticeLoader(line_scale=15).load("financial_report.pdf")
# Force stream mode (for unruled tables using whitespace)
tables = await CamelotStreamLoader(shift_text=["l", "t"]).load("data_table.pdf")
# Access extracted table data
for table in tables:
df = table.metadata.get("dataframe") # pandas DataFrame
csv = table.metadata.get("csv") # CSV string
accuracy = table.metadata.get("accuracy") # Extraction confidence
print(f"Table on page {table.metadata['page']}: {table.metadata['shape']}")
# 4. Unstructured - Multi-Element Extraction
# Best for: mixed documents, forms, headers/footers, complex layouts
# Install: pip install ai-prishtina-agentic-rag[pdf-unstructured]
# Extract all elements
docs = UnstructuredPDFLoader(
strategy="hi_res", # high accuracy (slower)
extract_tables=True,
languages=["eng"]
).load("document.pdf")
# Filter by element type
tables = [d for d in docs if d.metadata.get("type") == "table"]
headers = [d for d in docs if d.metadata.get("type") == "header"]
text_blocks = [d for d in docs if d.metadata.get("type") == "text"]
# Fast text-only extraction
docs = UnstructuredTextLoader(strategy="fast").load("document.pdf")
# Table-only extraction
tables = UnstructuredTableLoader().load("report.pdf")
# 5. Kreuzberg - Modern Extraction with Math
# Best for: modern PDFs, mathematical content, academic papers
# Install: pip install ai-prishtina-agentic-rag[pdf-kreuzberg]
# General extraction
docs = KreuzbergPDFLoader().load("document.pdf")
# Math-focused extraction
equations = KreuzbergEquationLoader().load("math_paper.pdf")
for eq in equations:
latex = eq.metadata.get("latex")
print(f"Equation: {latex}")
Factory Pattern
from agentic_rag.factories import create_chunker, create_loader, get_loader_for_file
# Config-driven chunking
chunker = create_chunker("semantic", max_chunk_size=1000)
chunker = create_chunker("markdown", max_chunk_size=1500)
chunker = create_chunker("fixed", chunk_size=500)
# Config-driven loading
loader = create_loader("pdf")
loader = create_loader("docx")
loader = create_loader("auto") # Auto-detect from extension
docs = loader.load("document.pdf")
# Advanced PDF via factory
loader = create_loader("pdf", extraction_backend="docling")
loader = create_loader("pdf_tables", flavor="lattice")
loader = create_loader("pdf_docling_figures")
loader = create_loader("pdf_unstructured", strategy="hi_res")
# Auto-detect loader from file extension
loader = get_loader_for_file("research.pdf", extraction_backend="docling")
loader = get_loader_for_file("data.csv")
Preprocessing Pipeline
from agentic_rag.document_processing.preprocessors import TextPreprocessor, MetadataExtractor
# Clean and normalize
preprocessor = TextPreprocessor(
remove_extra_whitespace=True,
normalize_unicode=True,
fix_line_breaks=True
)
clean_text = preprocessor.process(raw_text)
# Extract metadata
extractor = MetadataExtractor()
metadata = extractor.extract(
content=doc_content,
source_path="report.pdf",
extract_title=True,
extract_dates=True,
extract_entities=True
)
Why you need this: Because real documents come in all shapes and sizes. PDFs, Word docs, Markdown, JSON - we handle them all. Smart chunking means better retrieval, which means better answers. It's that simple.
Recipe 7 — Graph RAG (entity relationships + graph traversal)
Basic Graph Setup
from agentic_rag.graph import KnowledgeGraph, EntityExtractor, GraphBuilder
# Create graph
graph = KnowledgeGraph()
# Choose extraction method
extractor = EntityExtractor(method="spacy") # or "llm" for better accuracy
# Build from documents
builder = GraphBuilder(graph=graph, extractor=extractor)
await builder.build_from_documents(documents)
# Query with graph
rag = AgenticRAG(vector_store=store, llm_provider=llm, knowledge_graph=graph)
response = await rag.aquery("How is Company X related to Product Y?")
Entity Extraction Methods
from agentic_rag.graph import EntityExtractor, GraphRAGQuery
# Fast rule-based extraction
extractor = EntityExtractor(method="spacy")
# LLM-powered extraction (more accurate, slower)
extractor = EntityExtractor(
method="llm",
llm_provider=llm,
entity_types=["PERSON", "ORG", "PRODUCT", "EVENT", "TECHNOLOGY"]
)
# Hybrid approach
extractor = EntityExtractor(
method="hybrid",
llm_provider=llm,
confidence_threshold=0.7
)
Graph Traversal Queries
from agentic_rag.graph import GraphRAGQuery, TraversalStrategy
# Multi-hop traversal
query = GraphRAGQuery(
start_node="Elon Musk",
relation_type="founded",
max_hops=2,
strategy=TraversalStrategy.BFS
)
path = await graph.traverse(query)
# Returns: Elon Musk -> founded -> Tesla -> produces -> Model S
# Relationship-based retrieval
results = await graph.query(
"Find all companies ACQUIRED_BY Google since 2020"
)
# Hybrid: Vector + Graph
response = await rag.aquery(
"Who worked at companies that were later acquired by Meta?",
use_graph=True,
graph_depth=3
)
Graph RAG Patterns
from agentic_rag.graph import (
EntityLinker, # Link entities across documents
RelationClassifier, # Classify relationship types
GraphSummarizer, # Summarize subgraphs
)
# Link entities (disambiguation)
linker = EntityLinker(graph=graph)
await linker.link_entities(threshold=0.85)
# Links "Apple Inc." and "Apple Company" as the same entity
# Relation classification
classifier = RelationClassifier(llm=llm)
relations = await classifier.classify(
entity1="Microsoft",
entity2="OpenAI",
context="Microsoft invested $10B in OpenAI"
)
print(relations) # ["investor_of", "partner_of"]
Why you need this: Because knowledge isn't flat. Companies acquire other companies, people work at multiple places, products have dependencies. Graph RAG understands relationships, not just keywords.
Recipe 8 — Streaming (SSE + structured events)
Basic Streaming
# Simple text streaming
async for chunk in rag.astream("Explain quantum computing in detail"):
print(chunk.content, end="", flush=True)
Structured Streaming Events
from agentic_rag.core import StreamingEvent
# Full event streaming (SSE-compatible)
async for event in rag.astream_structured(
"What are the latest AI breakthroughs?",
include_sources=True,
include_confidence=True
):
if event.type == "answer_chunk":
print(event.content, end="")
elif event.type == "source":
print(f"\n[Source {event.metadata['index']}]: {event.content[:100]}...")
elif event.type == "confidence":
print(f"\n[Confidence: {event.content:.2f}]")
elif event.type == "tool_call":
print(f"\n[Using tool: {event.metadata['tool_name']}]")
elif event.type == "reflection":
print(f"\n[Reflection: {event.content}]")
elif event.type == "complete":
print("\n[Done]")
FastAPI SSE Endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from agentic_rag.server import create_app
app = create_app()
@app.post("/stream")
async def stream_query(query: str):
async def event_generator():
async for event in rag.astream_structured(query):
yield f"event: {event.type}\ndata: {event.to_json()}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
# JavaScript client:
# const eventSource = new EventSource('/stream?query=...');
# eventSource.onmessage = (e) => console.log(JSON.parse(e.data));
WebSocket Streaming
from agentic_rag.server import WebSocketRAGHandler
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
handler = WebSocketRAGHandler(rag)
await handler.handle(websocket)
# Supports:
# - Bidirectional communication
# - Query cancellation
# - Session persistence
# - Multi-turn conversations
Why you need this: Because users hate waiting. Streaming gives instant feedback, lower perceived latency, and lets you build chat-style interfaces that don't feel like they're loading on dial-up.
Recipe 9 — Evaluation (metrics + A/B testing)
Comprehensive Evaluation
from agentic_rag.evaluation import (
ComprehensiveEvaluator,
AnswerRelevanceMetric,
ContextPrecisionMetric,
FaithfulnessMetric,
AnswerCorrectnessMetric,
LatencyMetric,
CostMetric
)
evaluator = ComprehensiveEvaluator()
# Evaluate a single response
scores = await evaluator.evaluate(
query="What is the capital of France?",
response="Paris is the capital of France.",
ground_truth="Paris",
context=["France is a country in Western Europe. Its capital is Paris."],
)
print(f"Answer Relevance: {scores['relevance'].score:.2f}")
print(f"Context Precision: {scores['context_precision'].score:.2f}")
print(f"Faithfulness: {scores['faithfulness'].score:.2f}")
print(f"Correctness: {scores['correctness'].score:.2f}")
print(f"Latency: {scores['latency'].value_ms}ms")
print(f"Cost: ${scores['cost'].value:.4f}")
Reference-Free Evaluation
from agentic_rag.evaluation import ReferenceFreeEvaluator
# When you don't have ground truth
rf_evaluator = ReferenceFreeEvaluator()
scores = await rf_evaluator.evaluate(
query="Explain neural networks",
response="Neural networks are computational models inspired by biological brains...",
context=["Neural networks consist of layers of interconnected nodes..."]
)
# Evaluates: coherence, completeness, relevance without ground truth
Batch Evaluation
from agentic_rag.evaluation import BatchEvaluator, EvaluationDataset
# Load test dataset
dataset = EvaluationDataset.from_json("test_queries.json")
# Format: [{"query": "...", "ground_truth": "...", "context": "..."}, ...]
# Run batch evaluation
batch_eval = BatchEvaluator(rag)
results = await batch_eval.evaluate_dataset(dataset)
# Get summary statistics
print(f"Mean relevance: {results.mean('relevance'):.2f}")
print(f"Mean latency: {results.mean('latency'):.0f}ms")
print(f"P95 latency: {results.percentile('latency', 95):.0f}ms")
# Export detailed results
results.to_csv("evaluation_results.csv")
results.to_html("evaluation_report.html")
A/B Testing
from agentic_rag.evaluation import ABTest, ABTestConfig, Variant
# Configure test
config = ABTestConfig(
test_name="gpt4_vs_claude_sonnet",
min_sample_size=100,
max_sample_size=500,
primary_metric="answer_correctness",
confidence_level=0.95
)
ab = ABTest(config)
# Register variants
ab.register_variant("gpt4", Variant(
pipeline=gpt4_rag,
name="GPT-4 Pipeline"
))
ab.register_variant("claude", Variant(
pipeline=claude_rag,
name="Claude Sonnet Pipeline"
))
# Run test (automatically determines winner)
winner = await ab.run(dataset)
print(f"Winner: {winner.name}")
print(f"Improvement: {winner.improvement_percent:.1f}%")
print(f"P-value: {winner.p_value:.4f}")
# Or manual control
ab.start()
ab.add_result("gpt4", query, response, score)
ab.add_result("claude", query, response, score)
status = ab.get_status()
if status.is_conclusive:
print(f"Winner: {status.winner}")
Custom Metrics
from agentic_rag.evaluation import BaseMetric, MetricResult
class CustomMetric(BaseMetric):
"""Custom metric for domain-specific evaluation."""
async def evaluate(self, query, response, context, **kwargs) -> MetricResult:
# Your custom logic
score = self._calculate_score(response)
return MetricResult(
score=score,
explanation="Custom evaluation reasoning",
metadata={"custom_field": value}
)
# Register and use
evaluator.register_metric("custom", CustomMetric())
Prompt Optimization (DSPy-style)
from agentic_rag.cognitive import PromptOptimizer, OptimizationConfig
# Golden dataset for optimization
golden_examples = [
{
"question": "What is RAG?",
"answer": "Retrieval-Augmented Generation combines information retrieval with text generation..."
},
{
"question": "How does chunking work?",
"answer": "Chunking splits documents into smaller segments for efficient retrieval..."
},
]
# Configure optimization
config = OptimizationConfig(
metric="answer_relevance", # "accuracy", "completeness", "semantic_similarity"
max_iterations=20,
few_shot_k=3,
)
optimizer = PromptOptimizer(
llm_provider=llm,
vector_store=store,
config=config,
)
# Run optimization
result = await optimizer.optimize(
base_prompt="Answer the question based on the provided context.",
examples=golden_examples,
)
print(f"Best prompt: {result.best_prompt.prompt_text}")
print(f"Score: {result.best_prompt.score:.3f}")
print(f"Improvement: +{result.improvement:.3f}")
# Save results
optimizer.save_candidates("optimized_prompts.json", result)
Why you need this: Because "it works" isn't enough. You need to know HOW well it works, whether that expensive model upgrade actually helps, and if your new retrieval strategy is worth the complexity.
Recipe 10 — Production hardening (caching + guardrails + monitoring)
Semantic Caching
from agentic_rag import SemanticCache
# Redis-backed semantic cache
cache = SemanticCache(
backend="redis",
redis_url="redis://localhost:6379",
similarity_threshold=0.95, # Consider queries similar above 95%
ttl=3600 # Cache for 1 hour
)
# Or in-memory for single-node
cache = SemanticCache(backend="memory", max_size=10000)
# Use with RAG
rag = AgenticRAG(
vector_store=store,
llm_provider=llm,
cache=cache
)
# Cache stats
print(f"Hit rate: {cache.hit_rate:.2%}")
print(f"Saved tokens: {cache.saved_tokens:,}")
print(f"Saved cost: ${cache.saved_cost:.2f}")
Circuit Breakers
from agentic_rag import CircuitBreaker
# Protect against LLM failures
breaker = CircuitBreaker(
name="openai_api",
failure_threshold=5, # Open after 5 failures
recovery_timeout=60, # Try again after 60s
half_open_max_calls=3, # Test with 3 calls when recovering
success_threshold=2 # Need 2 successes to close
)
# Use with provider
llm = OpenAIProvider(
api_key="...",
circuit_breaker=breaker,
fallback_provider=OllamaProvider(model="llama3.3") # Local fallback
)
# Monitor breaker state
print(f"State: {breaker.state}") # CLOSED, OPEN, HALF_OPEN
print(f"Failures: {breaker.failure_count}")
print(f"Last failure: {breaker.last_failure_time}")
Output Guardrails
from agentic_rag import OutputGuardrails, PII Detector, ToxicityFilter
# Comprehensive guardrails
guardrails = OutputGuardrails(
# PII Detection
enable_pii_detection=True,
pii_types=["email", "phone", "ssn", "credit_card", "address"],
# Content filtering
enable_toxicity_filter=True,
toxicity_threshold=0.7,
# Custom rules
blocked_patterns=[
r"\b(password|secret_key|api_key)\s*=\s*['\"][^'\"]+['\"]"
],
# Fact-checking (optional)
enable_fact_check=True,
fact_checker=fact_checker_service
)
# Apply to response
safe_response = await guardrails.check(response)
if not safe_response.is_safe:
print(f"Blocked: {safe_response.violations}")
return "I cannot provide that information."
Cost Tracking & Budgeting
from agentic_rag import CostTracker, BudgetManager
# Track costs
tracker = CostTracker()
tracker.record_usage(
model="gpt-4o",
input_tokens=500,
output_tokens=200,
embedding_tokens=1000
)
print(f"Total cost: ${tracker.total_cost:.4f}")
print(f"By model: {tracker.cost_by_model}")
print(f"By day: {tracker.cost_by_day}")
# Budget management
budget = BudgetManager(
daily_limit=50.0, # $50/day
monthly_limit=1000.0, # $1000/month
alert_threshold=0.8 # Alert at 80%
)
if budget.would_exceed_limit(expected_cost=2.0):
# Switch to cheaper model or reject request
llm = GroqProvider(model="llama-3.3-70b") # Much cheaper
Rate Limiting
from agentic_rag import RateLimiter
# Token bucket rate limiter
limiter = RateLimiter(
requests_per_second=10,
burst_size=20,
per_user=True # Track per API key
)
@limiter.limit()
async def query_endpoint(request):
return await rag.aquery(request.query)
# Or with decorator
@rate_limit(requests_per_minute=60, per_user=True)
async def stream_endpoint(request):
async for chunk in rag.astream(request.query):
yield chunk
Health Checks & Monitoring
from agentic_rag import HealthChecker, MetricsCollector
# Health check
health = HealthChecker(rag)
status = await health.check()
print(f"Vector store: {status.vector_store}") # healthy / degraded / down
print(f"LLM provider: {status.llm_provider}")
print(f"Cache: {status.cache}")
# Metrics
metrics = MetricsCollector()
metrics.record_latency("query", duration_ms=250)
metrics.record_throughput("requests_per_second", count=10)
metrics.record_error_rate("error_rate", errors=1, total=100)
# Export to Prometheus/Grafana
from agentic_rag.metrics import PrometheusExporter
exporter = PrometheusExporter()
exporter.export(metrics)
Prompt Versioning
from agentic_rag.core import PromptVersioning
# Create version store
versioning = PromptVersioning(storage_dir="./prompt_versions")
# Save initial version
v1 = versioning.save_prompt(
prompt_id="qa_prompt",
prompt_text="Answer the question based on the context.",
author="alice",
tags=["qa", "baseline"],
change_notes="Initial version"
)
# Iterate and improve
v2 = versioning.save_prompt(
prompt_id="qa_prompt",
prompt_text="Provide a concise answer (max 3 sentences) based on the context.",
author="alice",
parent_version=v1.version_id,
tags=["qa", "improved"],
change_notes="Added length constraint for conciseness"
)
# Rollback if needed
v3 = versioning.rollback("qa_prompt", "v1")
Why you need this: Because production is where prototypes go to die. Caching saves money, circuit breakers prevent cascading failures, guardrails keep you out of trouble, cost tracking prevents surprise bills, and prompt versioning lets you track what worked.
Recipe 11 — Contrib tools (30+ integrations)
Communication
from agentic_rag.contrib.communication import (
SlackTool, # Post to Slack channels
DiscordTool, # Discord messages
EmailTool, # Send emails via SMTP/SendGrid
TeamsTool, # Microsoft Teams
TelegramTool, # Telegram bot messages
WebhookTool, # Generic webhooks
)
# Slack notifications
slack = SlackTool(token="xoxb-your-token")
await slack.execute(
channel="#ai-research",
message=f"New insight: {response.answer[:500]}...",
blocks=[{
"type": "section",
"text": {"type": "mrkdwn", "text": f"*Query:* {query}"}
}]
)
# Email reports
email = EmailTool(
provider="sendgrid", # or "smtp", "aws_ses"
api_key="..."
)
await email.execute(
to="team@company.com",
subject="Daily RAG Summary",
body=response.answer,
attachments=["report.pdf"]
)
# Generic webhook
webhook = WebhookTool()
await webhook.execute(
url="https://hooks.zapier.com/hooks/catch/...",
payload={"query": query, "answer": response.answer}
)
Productivity
from agentic_rag.contrib.productivity import (
NotionTool, # Notion pages/databases
GoogleDocsTool, # Google Docs
TrelloTool, # Trello cards
AsanaTool, # Asana tasks
MondayTool, # Monday.com
LinearTool, # Linear issues
JiraTool, # Jira tickets
ConfluenceTool, # Confluence pages
)
# Create Notion page
notion = NotionTool(api_key="secret_...")
page = await notion.execute(
action="create_page",
parent_id="workspace-id",
title="Q4 Financial Analysis",
content=response.answer,
properties={
"Status": "Draft",
"Tags": ["AI-Generated", "Q4"]
}
)
# Create Jira ticket
jira = JiraTool(
server="https://company.atlassian.net",
username="bot@company.com",
api_token="..."
)
await jira.execute(
action="create_issue",
project="AI",
summary=f"Research: {query[:100]}",
description=response.answer,
issue_type="Task",
labels=["rag-generated"]
)
Storage & Cloud
from agentic_rag.contrib.storage import (
S3Tool, # AWS S3
GCS Tool, # Google Cloud Storage
AzureBlobTool, # Azure Blob Storage
DropboxTool, # Dropbox
GoogleDriveTool, # Google Drive
OneDriveTool, # Microsoft OneDrive
BoxTool, # Box.com
)
# S3 upload
s3 = S3Tool(
bucket="my-rag-outputs",
region="us-east-1",
aws_access_key="...",
aws_secret_key="..."
)
await s3.execute(
action="upload",
key=f"reports/{datetime.now():%Y-%m-%d}/analysis.txt",
body=response.answer,
metadata={"query": query, "confidence": str(response.confidence)}
)
# Google Drive
drive = GoogleDriveTool(credentials="credentials.json")
file = await drive.execute(
action="create_document",
name="Research Summary",
content=response.answer,
folder_id="folder-id"
)
Databases
from agentic_rag.contrib.database import (
PostgresTool, # PostgreSQL queries
MySQLTool, # MySQL queries
MongoDBTool, # MongoDB operations
RedisTool, # Redis commands
BigQueryTool, # Google BigQuery
SnowflakeTool, # Snowflake
ClickHouseTool, # ClickHouse
SupabaseTool, # Supabase
)
# Query database
postgres = PostgresTool(connection_string="postgresql://...")
results = await postgres.execute(
action="query",
sql="SELECT * FROM customers WHERE churn_risk > 0.8"
)
# Store in MongoDB
mongo = MongoDBTool(uri="mongodb+srv://...")
await mongo.execute(
action="insert_one",
database="rag",
collection="queries",
document={
"query": query,
"answer": response.answer,
"timestamp": datetime.now(),
"confidence": response.confidence
}
)
Business & Payments
from agentic_rag.contrib.business import (
StripeTool, # Stripe payments
ShopifyTool, # Shopify operations
HubSpotTool, # HubSpot CRM
SalesforceTool, # Salesforce
ZendeskTool, # Zendesk tickets
IntercomTool, # Intercom conversations
TwilioTool, # SMS/Voice
)
# Send SMS notification
twilio = TwilioTool(
account_sid="...",
auth_token="..."
)
await twilio.execute(
action="send_sms",
to="+1234567890",
body=f"Alert: {response.answer[:100]}..."
)
Search & Discovery
from agentic_rag.contrib.search import (
AlgoliaTool, # Algolia search
ElasticsearchTool, # Elasticsearch
MeilisearchTool, # Meilisearch
TypesenseTool, # Typesense
)
# Index documents
algolia = AlgoliaTool(app_id="...", api_key="...")
await algolia.execute(
action="index",
index_name="docs",
objects=[{"objectID": "1", "content": response.answer}]
)
Why you need this: Because insights stuck in your terminal aren't useful. Send them to Slack, save them to Notion, archive them to S3 - make your RAG system a team player, not a hermit.
Recipe 12 — Build your own tool (advanced patterns)
Basic Custom Tool
from agentic_rag.tools.base import BaseTool, ToolResult, ToolParameter
from typing import Dict, Any
class WeatherTool(BaseTool):
"""Because even RAG agents check the weather sometimes."""
def __init__(self, api_key: str):
super().__init__(
name="weather",
description="Get current weather for a city",
parameters=[
ToolParameter(
name="city",
type="string",
description="City name (e.g., 'London', 'New York')",
required=True
),
ToolParameter(
name="units",
type="string",
description="Temperature units",
enum=["metric", "imperial"],
default="metric"
)
],
)
self.api_key = api_key
async def execute(self, city: str, units: str = "metric", **kwargs) -> ToolResult:
"""Execute the weather lookup."""
try:
# Your API call here
data = await self._fetch_weather(city, units)
return ToolResult(
success=True,
result={
"temperature": data["temp"],
"conditions": data["weather"][0]["description"],
"humidity": data["humidity"]
},
metadata={"source": "openweather", "cached": False}
)
except Exception as e:
return ToolResult(
success=False,
error=str(e),
error_code="WEATHER_API_ERROR"
)
async def _fetch_weather(self, city: str, units: str) -> Dict:
# Implementation
pass
# Register
rag.register_tool(WeatherTool(api_key="your-key"))
Tool with Schema Validation
from pydantic import BaseModel, Field
class DatabaseQueryInput(BaseModel):
"""Input schema for database queries."""
table: str = Field(..., description="Table name to query")
columns: list[str] = Field(default=["*"], description="Columns to select")
where: str = Field(default="", description="WHERE clause (optional)")
limit: int = Field(default=100, ge=1, le=1000, description="Max results")
class DatabaseTool(BaseTool):
"""Execute safe database queries."""
input_schema = DatabaseQueryInput
async def execute(
self,
table: str,
columns: list[str] = ["*"],
where: str = "",
limit: int = 100
) -> ToolResult:
# Validate table exists
if table not in self.allowed_tables:
return ToolResult(
success=False,
error=f"Table '{table}' not in allowed list",
error_code="INVALID_TABLE"
)
# Safe query execution
try:
results = await self.db.fetch(table, columns, where, limit)
return ToolResult(
success=True,
result=results,
metadata={"row_count": len(results)}
)
except Exception as e:
return ToolResult(
success=False,
error=f"Query failed: {str(e)}",
error_code="QUERY_ERROR"
)
Tool with Caching
from agentic_rag.utils.cache import Cache
class CachedAPITool(BaseTool):
"""Tool with built-in caching."""
def __init__(self):
super().__init__(...)
self.cache = Cache(ttl=3600) # 1 hour cache
async def execute(self, query: str, **kwargs) -> ToolResult:
# Check cache
cache_key = f"api:{hash(query)}"
if cached := await self.cache.get(cache_key):
return ToolResult(success=True, result=cached, cached=True)
# Fetch and cache
result = await self._fetch(query)
await self.cache.set(cache_key, result)
return ToolResult(success=True, result=result, cached=False)
Tool Factory Registration
from agentic_rag.factories import register_tool
@register_tool("my_weather")
class WeatherTool(BaseTool):
"""Auto-registered weather tool."""
def __init__(self, api_key: str = None, **kwargs):
super().__init__(...)
self.api_key = api_key or os.getenv("WEATHER_API_KEY")
# Create via factory
from agentic_rag.factories import create_tool
tool = create_tool("my_weather", api_key="...")
Multi-Step Tool
class ReportGeneratorTool(BaseTool):
"""Generate comprehensive reports from multiple sources."""
async def execute(
self,
topic: str,
depth: str = "comprehensive",
format: str = "markdown"
) -> ToolResult:
steps = []
# Step 1: Web search
search_results = await self._search_web(topic)
steps.append(f"Searched web: {len(search_results)} sources")
# Step 2: Vector store query
docs = await self._query_documents(topic)
steps.append(f"Queried docs: {len(docs)} documents")
# Step 3: Synthesize
report = await self._generate_report(
topic, search_results, docs,
depth=depth, format=format
)
steps.append("Generated report")
return ToolResult(
success=True,
result={
"report": report,
"sources": len(search_results) + len(docs),
"format": format
},
metadata={"steps": steps, "duration_ms": 2500}
)
Why you need this: Because your use case is unique. We give you the building blocks to extend the system however you need - weather APIs, internal databases, custom calculations, whatever.
Recipe 18 — Monitoring Dashboard (analytics & observability)
Real-time web dashboard for monitoring your RAG system. Track queries, latency, success rates, and view recent interactions.
Enable Dashboard
from agentic_rag.server import create_app_with_dashboard
# Create app with dashboard
app = create_app_with_dashboard()
# Dashboard available at http://localhost:8000/dashboard
Record Queries for Analytics
from agentic_rag.server import record_rag_query
import time
# In your query handler
start = time.time()
response = await rag.aquery("What is machine learning?")
latency = (time.time() - start) * 1000
# Record for dashboard
record_rag_query(
query="What is machine learning?",
response=response.answer,
latency_ms=latency,
token_count=150,
sources=[doc.id for doc in response.sources],
confidence=response.confidence,
success=True,
)
Dashboard Features
- Real-time metrics: Query count, avg latency, success rate
- Recent queries: View last 50 queries with latency and status
- Performance trends: Latency and success rate over time
- Auto-refresh: Updates every 30 seconds
Access URLs:
- Dashboard UI:
/dashboard - Metrics API:
/api/dashboard/metrics - Query history:
/api/dashboard/queries
Why you need this: Because flying blind in production is terrifying. See what's happening, spot issues before users do, and have data to optimize.
Recipe 17 — Conversation Memory (multi-turn chat)
Simple conversation memory for multi-turn chat sessions. Unlike complex hierarchical memory, this is purpose-built for maintaining chat context within token limits.
Buffer Memory (Simple)
from agentic_rag.core import ConversationBufferMemory
memory = ConversationBufferMemory(max_token_limit=4000)
# Add messages
memory.add_user_message("What is machine learning?")
memory.add_ai_message("Machine learning is a subset of AI...")
memory.add_user_message("Give me an example")
memory.add_ai_message("Sure! Spam detection is a classic example...")
# Get context for LLM
context = memory.to_list()
# Returns: [
# {"role": "user", "content": "What is machine learning?"},
# {"role": "assistant", "content": "Machine learning is a subset of AI..."},
# ...
# ]
# Check token usage
tokens = memory.get_token_count() # 450 tokens
Window Memory (Last N turns)
from agentic_rag.core import ConversationBufferWindowMemory
# Keep only last 5 conversation turns
memory = ConversationBufferWindowMemory(k=5)
# Older messages auto-removed
Summary Memory (Compress old context)
from agentic_rag.core import ConversationSummaryMemory
# Summarizes old messages to save tokens
memory = ConversationSummaryMemory(
llm_provider=llm,
max_token_limit=4000,
summary_token_limit=500
)
# After many messages, early ones become:
# "Summary: User asked about ML, AI explained it's a subset of AI..."
Entity Memory (Track mentioned entities)
from agentic_rag.core import ConversationEntityMemory
# Automatically extracts and remembers entities
memory = ConversationEntityMemory(llm_provider=llm)
memory.add_user_message("Alice is working on the Q4 report.")
memory.add_ai_message("What is Alice's role?")
memory.add_user_message("She's the CTO.")
# Extracted entities
entities = memory.get_entities()
# {"alice": {"name": "Alice", "type": "Person", "role": "CTO"}}
With AgenticRAG
from agentic_rag import AgenticRAG
from agentic_rag.core import ConversationBufferMemory
memory = ConversationBufferMemory()
rag = AgenticRAG(vector_store=store, llm_provider=llm)
async def chat(user_input: str):
# Get conversation context
context = memory.to_list()
# Add user message
memory.add_user_message(user_input)
# Query with context
response = await rag.aquery(
user_input,
context_messages=context
)
# Store response
memory.add_ai_message(response.answer)
return response
Why you need this: Because users don't ask one question and leave. They have conversations. This keeps context without exploding your token budget.
Recipe 16 — ReAct Agent (reasoning + acting)
The ReAct pattern: think → act → observe → repeat. Our implementation combines reasoning and tool use in an interleaved loop.
Basic ReAct Agent
from agentic_rag.cognitive import ReActAgent
from agentic_rag.tools import WebSearchTool, CalculatorTool
agent = ReActAgent(
llm_provider=llm,
vector_store=store,
tools=[WebSearchTool(), CalculatorTool()],
max_iterations=5
)
result = await agent.query(
"Find GDP of France and Germany, calculate the difference"
)
print(result.answer)
# "The difference between France's GDP ($2.78T) and Germany's GDP ($4.07T) is $1.29T"
# Full reasoning trace
for step in result.reasoning_trace:
print(f"[{step.step_type}] {step.content}")
ReAct with Retrieval
# Vector store becomes a retrieval tool automatically
agent = ReActAgent(
llm_provider=llm,
vector_store=store, # Adds implicit "retrieve" tool
tools=[WebSearchTool()],
max_iterations=5
)
result = await agent.query(
"What does our Q4 report say about revenue? Also search for industry benchmarks."
)
# Combines internal documents + web search seamlessly
Why you need this: Sometimes you need to think, search, calculate, think again, and finally answer. ReAct makes this systematic and observable.
Recipe 15 — Parent-child chunking (provenance tracking)
Track which chunks came from which parent document. Essential for citations, expanded context, and hierarchical retrieval.
Chunk with Parent Tracking
from agentic_rag.document_processing.chunkers import (
ParentChildChunker,
FixedSizeChunker
)
chunker = ParentChildChunker(
base_chunker=FixedSizeChunker(chunk_size=500),
include_parent_metadata=True
)
# Chunk with tracking
parent = chunker.chunk_document(
text="Long document text...",
doc_id="doc_001",
metadata={"source": "report.pdf", "author": "Alice"}
)
# Access children with full provenance
for child in parent.children:
print(f"Chunk {child.child_index}/{child.total_children}")
print(f" Parent: {child.parent_doc_id}")
print(f" Citation: {child.to_citation()}") # [report.pdf, chunk 1/5]
print(f" Position: chars {child.char_start}-{child.char_end}")
Hierarchical Retrieval
from agentic_rag.document_processing.chunkers import HierarchicalRetriever
# Index child chunks
for parent in parent_chunks:
for child in parent.children:
await vector_store.add(
child.text,
metadata={
"parent_doc_id": child.parent_doc_id,
"child_index": child.child_index,
}
)
# Retrieve with context expansion
retriever = HierarchicalRetriever(
vector_store=store,
parents=parent_chunks
)
chunks = await retriever.retrieve("revenue trends", top_k=3)
expanded = retriever.expand_context(chunks, window_size=1)
# Returns target chunks + neighboring siblings for context
Why you need this: Because "trust me bro" is not a valid citation. Know exactly which chunk came from which document, and retrieve siblings for expanded context when needed.
Recipe 14 — Bulk document loading (SimpleDirectoryReader)
Load entire directories with automatic file type detection. The missing piece between "I have a folder of documents" and "my RAG system works."
Load Entire Directory
from agentic_rag.document_processing.loaders import SimpleDirectoryReader
reader = SimpleDirectoryReader(
input_dir="./data",
recursive=True,
exclude=["*.tmp", "*.log", "node_modules/*"]
)
# Load with progress tracking
def on_progress(file_path, current, total):
print(f"Loaded {current}/{total}: {Path(file_path).name}")
documents = await reader.load(progress_callback=on_progress)
print(f"Loaded {len(documents)} documents from {total} files")
Auto File Type Detection
# Automatically detects and uses correct loader for each extension
# .pdf → PDFLoader, .docx → DocxLoader, .md → MarkdownLoader, etc.
reader = SimpleDirectoryReader("./mixed_documents")
docs = await reader.load()
# Each document has file metadata
doc = docs[0]
print(doc.metadata["file_path"]) # ./mixed_documents/report.pdf
print(doc.metadata["file_ext"]) # .pdf
print(doc.metadata["directory"]) # ./mixed_documents
With Custom Metadata
def add_source_metadata(file_path: str) -> dict:
return {
"department": "engineering",
"processed_at": datetime.now().isoformat(),
"filename": Path(file_path).name
}
reader = SimpleDirectoryReader(
"./data",
file_metadata=add_source_metadata
)
Why you need this: Because loading documents one-by-one is tedious and you have better things to do. Point it at a folder, get documents back.
Recipe 13 — FastAPI server (production-ready API)
Basic Server
from agentic_rag.server import create_app
# One-liner production API
app = create_app()
# Run with uvicorn
# uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4
Custom Server Configuration
from agentic_rag.server import create_app, ServerConfig
from agentic_rag import AgenticRAG
# Configure components
config = ServerConfig(
# RAG settings
vector_store="chroma",
llm_provider="openai",
llm_model="gpt-4o",
enable_agent=True,
enable_memory=True,
# API settings
title="My RAG API",
version="1.0.0",
docs_url="/docs",
# Security
api_key_header="X-API-Key",
require_auth=True,
# Rate limiting
rate_limit_requests=100,
rate_limit_window=60,
# CORS
allowed_origins=["https://app.example.com"],
# Features
enable_streaming=True,
enable_caching=True,
enable_metrics=True
)
app = create_app(config=config)
API Endpoints Reference
| Method | Path | Description | Request Body |
|---|---|---|---|
GET |
/health |
Health check | - |
GET |
/ready |
Readiness probe | - |
POST |
/query |
RAG query | {"query": "...", "filters": {}} |
POST |
/query/cognitive |
Full AC-RAG | {"query": "...", "enable_reflection": true} |
POST |
/stream |
SSE streaming | {"query": "..."} |
POST |
/stream/structured |
Structured events | {"query": "...", "include_sources": true} |
POST |
/ingest |
Document upload | multipart/form-data |
POST |
/ingest/text |
Text ingestion | {"text": "...", "metadata": {}} |
GET |
/documents |
List documents | ?limit=10&offset=0 |
DELETE |
/documents/{id} |
Delete document | - |
GET |
/memory/stats |
Memory statistics | - |
POST |
/memory/clear |
Clear memory | {"type": "episodic"} |
GET |
/tools |
List available tools | - |
POST |
/tools/execute |
Execute tool | {"tool": "...", "params": {}} |
GET |
/metrics |
Prometheus metrics | - |
Query Endpoint Examples
import requests
# Basic query
response = requests.post(
"http://localhost:8000/query",
headers={"X-API-Key": "your-key"},
json={
"query": "What is machine learning?",
"temperature": 0.7,
"max_tokens": 500,
"include_sources": True
}
)
result = response.json()
print(result["answer"])
print(result["sources"]) # Source documents
print(result["confidence"]) # 0.0 - 1.0
# Cognitive query with full AC-RAG
response = requests.post(
"http://localhost:8000/query/cognitive",
json={
"query": "Explain quantum computing applications",
"enable_reflection": True,
"enable_progressive_retrieval": True,
"max_reflections": 3
}
)
result = response.json()
print(result["answer"])
print(result["reflection_count"]) # Number of self-improvement loops
print(result["retrieval_attempts"]) # Progressive retrieval count
Streaming Endpoints
# Server-Sent Events (SSE)
curl -X POST http://localhost:8000/stream \
-H "Content-Type: application/json" \
-d '{"query": "Explain neural networks"}'
# Response: stream of text chunks
# data: {"type": "chunk", "content": "Neural"}
# data: {"type": "chunk", "content": " networks"}
# data: {"type": "complete"}
// JavaScript client
const eventSource = new EventSource('/stream?query=...');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'chunk') {
document.getElementById('output').innerHTML += data.content;
}
};
Document Ingestion
# Upload file
with open("document.pdf", "rb") as f:
response = requests.post(
"http://localhost:8000/ingest",
headers={"X-API-Key": "your-key"},
files={"file": ("document.pdf", f, "application/pdf")},
data={
"chunker": "semantic",
"max_chunk_size": 1000,
"metadata": '{"source": "upload", "user": "alice"}'
}
)
print(response.json()["document_ids"]) # List of inserted doc IDs
# Ingest text directly
response = requests.post(
"http://localhost:8000/ingest/text",
json={
"text": "Your document content here...",
"metadata": {"source": "api", "title": "My Doc"}
}
)
Authentication & Security
from agentic_rag.server import AuthMiddleware
# API key authentication
app = create_app(
auth_provider="api_key",
api_keys=["key-1", "key-2", "key-3"]
)
# JWT authentication
app = create_app(
auth_provider="jwt",
jwt_secret="your-secret",
jwt_algorithm="HS256"
)
# Custom auth
from agentic_rag.server import BaseAuthProvider
class CustomAuth(BaseAuthProvider):
async def authenticate(self, request):
token = request.headers.get("Authorization")
# Your validation logic
return {"user_id": "123", "roles": ["user"]}
app = create_app(auth_provider=CustomAuth())
Deployment Configuration
# docker-compose.yml
version: '3.8'
services:
rag-api:
image: ai-prishtina-agentic-rag:latest
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- VECTOR_STORE=chroma
- ENABLE_AGENT=true
- RATE_LIMIT=100/minute
volumes:
- ./data:/app/data
- ./config.yaml:/app/config.yaml
command: uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4
redis:
image: redis:alpine
# For caching and session storage
Kubernetes Deployment
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: rag-api
spec:
replicas: 3
selector:
matchLabels:
app: rag-api
template:
metadata:
labels:
app: rag-api
spec:
containers:
- name: api
image: ai-prishtina-agentic-rag:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: api-secrets
key: openai-key
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "2Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
readinessProbe:
httpGet:
path: /ready
port: 8000
Workflow Orchestration UI
from agentic_rag.server import create_app_with_workflow_ui
# Create app with workflow UI
app = create_app_with_workflow_ui()
# Access at http://localhost:8000/workflow/ui
The workflow UI provides:
- Node Palette: Drag and drop nodes (loader, chunker, retriever, generator, tools)
- Canvas: Visual pipeline builder with connections
- Properties Panel: Configure each node's parameters
- Execution Monitor: Run and debug workflows
Why you need this: Because not everyone wants to write Python. Sometimes you just need an API that works out of the box - with auth, rate limiting, streaming, workflow UI, and all the bells and whistles already baked in.
Test Coverage & Quality
We don't ship code we wouldn't use ourselves. That's why we maintain 99% test coverage across all modules.
Coverage by Module
- Core functionality: 100% coverage
- Cognitive layer: 99% coverage
- Providers (18 LLM, 13 vector stores): 99% coverage
- Document processing: 99% coverage
- Contrib integrations: 99% coverage
Test Suite Stats
- 131 test files (124 unit + 7 integration)
- 280+ Python files tested
- Advanced PDF backends: Full coverage for docling, camelot, unstructured, kreuzberg
- Continuous integration on every commit
- No regressions - ever
Running Tests
# Run all tests
pytest
# Run PDF loader tests specifically
pytest tests/unit/test_pdf_advanced_loaders.py -v
pytest tests/integration/test_pdf_backends_integration.py -v
# Run with coverage
pytest --cov=agentic_rag --cov-report=html
Why this matters: Because confidence comes from knowing your code works. We test everything from the happy path to the weird edge cases, so you can deploy with confidence.
Architecture
┌─────────────────────────────────────┐
│ FastAPI Server / CLI │
│ (Rate Limit · Auth · Guardrails │
│ Multi-Tenancy · Feedback Loop) │
└──────────────┬──────────────────────┘
│
┌────────────────────────┼──────────────────────────┐
│ AC-RAG Cognitive Layer │
│ ┌──────────┐ ┌──────────┐ ┌─────────────────┐ │
│ │ Neural │ │Reflective│ │ Hierarchical │ │
│ │ Router │ │ Agent │ │ Memory │ │
│ └──────────┘ └──────────┘ └─────────────────┘ │
│ ┌───────────┐ ┌──────────┐ ┌─────────────────┐ │
│ │Progressive│ │Calibrated│ │ Knowledge │ │
│ │ Retrieval │ │Confidence│ │ Fusion │ │
│ └───────────┘ └──────────┘ └─────────────────┘ │
│ ┌───────────┐ ┌──────────┐ ┌─────────────────┐ │
│ │ Neural │ │ Tool │ │ Multi-Agent │ │
│ │ Classifier│ │ Composer │ │ Orchestrator │ │
│ └───────────┘ └──────────┘ └─────────────────┘ │
└─────────────────────┬─────────────────────────────┘
│
┌──────────────┬───────────┼──────────┬──────────────┐
▼ ▼ ▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ ┌──────────┐
│ Retrieval│ │ LLM │ │ Graph │ │ Tools │ │ Cache / │
│ (Vector) │ │ Providers│ │ RAG │ │(Search)│ │ Circuit │
└──────────┘ └──────────┘ └──────────┘ └────────┘ │ Breaker │
│ │ │ │ └──────────┘
▼ ▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────────────────┐
│ Document │ │Evaluation│ │ Config │ │ Docker / CI/CD │
│Processing│ │& Metrics │ │(YAML/INI)│ │ Infrastructure │
└──────────┘ └──────────┘ └──────────┘ └────────────────────┘
For detailed architecture docs: docs/01-architecture.md | AC-RAG vision: docs/vision-ac-rag.md
Configuration
Everything is externalized. No magic numbers hiding in the code. Load from YAML, INI, or env vars — or just use the defaults (they're pretty good).
from agentic_rag.utils.config import Config
cfg = Config.from_file("config.yaml") # YAML
cfg = Config.from_ini("config.ini") # INI
cfg = Config.from_env(prefix="AGENTIC_RAG_") # Environment variables
cfg = Config() # Sensible defaults
See config.example.yaml and config.example.ini for complete examples.
Full configuration reference (click to expand)
Configuration Sections
llm — LLM Provider Settings
| Parameter | Default | Options | Description |
|---|---|---|---|
provider |
openai |
openai, anthropic, cohere, local |
LLM provider |
model |
gpt-3.5-turbo |
- | Model name |
api_key |
null |
- | API key (or LLM_API_KEY env var) |
base_url |
null |
- | Custom API base URL |
temperature |
0.7 |
0.0–2.0 | Sampling temperature |
max_tokens |
1000 |
- | Max tokens to generate |
timeout |
30 |
- | Request timeout (seconds) |
vector_store — Vector Database
| Parameter | Default | Options | Description |
|---|---|---|---|
provider |
chroma |
chroma, pinecone, weaviate, faiss |
Backend provider |
collection_name |
agentic_rag |
- | Collection/index name |
persist_directory |
null |
- | Persistence path (Chroma/FAISS) |
embedding_model |
sentence-transformers/all-MiniLM-L6-v2 |
- | HuggingFace embedding model |
dimension |
384 |
- | Embedding dimension |
document_processing — Chunking
| Parameter | Default | Options | Description |
|---|---|---|---|
chunk_size |
1000 |
- | Token/character chunk size |
chunk_overlap |
200 |
- | Overlap between chunks |
chunking_strategy |
recursive |
fixed, semantic, recursive |
Chunking algorithm |
enable_preprocessing |
true |
- | Enable text preprocessing |
retrieval — Search & Reranking
| Parameter | Default | Description |
|---|---|---|
top_k |
5 |
Number of results to retrieve |
similarity_threshold |
0.7 |
Minimum similarity score (0.0–1.0) |
enable_reranking |
true |
Enable cross-encoder reranking |
reranker_model |
cross-encoder/ms-marco-MiniLM-L-6-v2 |
Reranker model name |
enable_hybrid_search |
false |
Combine dense + keyword search |
dense_weight |
0.7 |
Weight for dense embeddings (0.0–1.0) |
sparse_weight |
0.3 |
Weight for sparse/BM25 (0.0–1.0) |
agent — Agentic Capabilities
| Parameter | Default | Description |
|---|---|---|
enable_planning |
true |
Enable query planning |
max_planning_steps |
5 |
Max planning iterations |
enable_memory |
true |
Enable working memory |
memory_size |
1000 |
Memory capacity (entries) |
enable_tools |
true |
Enable tool integration |
available_tools |
["web_search", "calculator"] |
List of enabled tool names |
tool_timeout |
30 |
Tool execution timeout (seconds) |
cognitive — AC-RAG Features
| Parameter | Default | Description |
|---|---|---|
enable_query_routing |
true |
Enable neural query routing |
use_llm_classification |
false |
Use LLM vs DistilBERT for routing |
routing_confidence_threshold |
0.8 |
Router confidence cutoff (0.0–1.0) |
rule_confidence_fallback |
0.6 |
Fallback threshold when rules fail |
enable_reflection |
true |
Enable reflective agent loop |
confidence_threshold |
0.75 |
Minimum answer confidence (0.0–1.0) |
max_reflections |
3 |
Max self-critique iterations |
reflection_temperature |
0.3 |
Temperature for reflection (0.0–1.0) |
critique_context_window |
1500 |
Tokens for critique context |
regeneration_context_window |
3000 |
Tokens for answer regeneration |
episodic_max_entries |
100 |
Episodic memory size |
episodic_ttl_seconds |
3600 |
Episodic memory TTL (seconds) |
semantic_persist_path |
null |
Semantic memory storage path |
procedural_persist_path |
null |
Procedural memory storage path |
procedural_ema_alpha |
0.3 |
EMA learning rate (0.0–1.0) |
procedural_recall_threshold |
0.3 |
Strategy recall threshold (0.0–1.0) |
procedural_maturity_count |
5 |
Uses before strategy matures |
progressive_max_iterations |
3 |
Progressive retrieval iterations |
progressive_min_quality |
0.6 |
Minimum retrieval quality (0.0–1.0) |
progressive_reformulation_temperature |
0.3 |
Query reformulation temperature |
min_calibration_samples |
30 |
Calibration training samples |
calibration_learning_rate |
0.01 |
Platt scaling learning rate |
calibration_epochs |
200 |
Calibration training epochs |
confidence_length_normalizer |
200 |
Length normalization factor |
confidence_weights |
(map) | Weights for confidence components |
graph — Graph RAG
| Parameter | Default | Options | Description |
|---|---|---|---|
extraction_method |
pattern |
pattern, spacy, llm |
Entity extraction method |
spacy_model |
en_core_web_sm |
- | spaCy model for NER |
merge_similar_entities |
true |
- | Deduplicate similar entities |
similarity_threshold |
0.85 |
- | Entity merge threshold (0.0–1.0) |
max_hops |
2 |
- | Max graph traversal depth |
graph_top_k |
10 |
- | Max graph results |
server — FastAPI Server
| Parameter | Default | Description |
|---|---|---|
host |
0.0.0.0 |
Bind address |
port |
8000 |
Listen port |
workers |
2 |
Uvicorn worker processes |
cors_origins |
["*"] |
Allowed CORS origins |
rate_limit_rpm |
60 |
Rate limit (requests per minute) |
log_level |
INFO |
Logging level |
enable_auth |
false |
Enable API key/JWT auth |
api_keys |
[] |
List of allowed API keys |
jwt_secret |
null |
JWT signing secret |
jwt_algorithm |
HS256 |
JWT algorithm |
cache — Semantic Cache
| Parameter | Default | Options | Description |
|---|---|---|---|
enabled |
false |
- | Enable caching |
backend |
memory |
memory, redis |
Cache backend |
redis_url |
redis://localhost:6379/0 |
- | Redis connection URL |
ttl_seconds |
3600 |
- | Cache entry TTL (seconds) |
similarity_threshold |
0.92 |
- | Semantic match threshold (0.0–1.0) |
max_entries |
10000 |
- | Max cache size |
guardrails — Safety & Output Control
| Parameter | Default | Description |
|---|---|---|
enable_pii_detection |
false |
Enable PII detection/redaction |
enable_toxicity_filter |
false |
Enable toxic content filtering |
max_output_length |
5000 |
Max output character limit |
blocked_terms |
[] |
List of blocked terms/strings |
evaluation — Metrics & Benchmarking
| Parameter | Default | Description |
|---|---|---|
enable_evaluation |
false |
Enable evaluation metrics |
metrics |
["relevance", "faithfulness"] |
Metrics to compute |
log_level |
INFO |
Evaluation logging level |
Environment Variables
# API Keys
OPENAI_API_KEY=your_openai_key
ANTHROPIC_API_KEY=your_anthropic_key
COHERE_API_KEY=your_cohere_key
PINECONE_API_KEY=your_pinecone_key
SERP_API_KEY=your_search_api_key
# Config loading
AGENTIC_RAG_CONFIG_PATH=/path/to/config.yaml
Docker deployment
# One command to rule them all
docker-compose up -d
# Or if you prefer doing things the hard way
docker build -t agentic-rag .
docker run -p 8000:8000 -e OPENAI_API_KEY=sk-... agentic-rag
The server is agentic_rag.server.create_app (see Recipe 13 above). Multi-stage Dockerfile and docker-compose included in examples/docker/.
Testing
298+ tests and growing. Run them all or pick your battles:
# Run everything (grab a coffee)
pytest
# Run with coverage (grab two coffees)
pytest --cov=agentic_rag --cov-report=html
# Pick your battles
pytest tests/unit/test_tools.py # Core tools
pytest tests/unit/test_cognitive.py # AC-RAG modules
pytest tests/unit/test_graph*.py # Graph RAG (all graph tests)
pytest tests/unit/test_chunkers*.py # All chunking strategies
pytest tests/unit/test_loaders*.py # Document loaders
pytest tests/unit/test_prompts.py # Prompt management
pytest tests/integration/ # Integration tests
pytest tests/unit/test_evaluation.py -v # Benchmarks
Contributing
We don't bite. Contributions are welcome — from typo fixes to new vector store backends.
git clone https://github.com/albanmaxhuni/ai-prishtina-agentic-rag.git
cd ai-prishtina-agentic-rag
pip install -e .[dev]
pre-commit install
pytest # Make sure everything passes before you start
Code quality: Black + isort (formatting), Flake8 (linting), mypy (type checking), pytest (tests). See CONTRIBUTING.md for branch conventions and review expectations.
License
Dual-licensed:
- AGPLv3+ — Free for open source. Copyleft applies. Network use requires source disclosure.
- Commercial — For proprietary use without copyleft obligations. Contact info@albanmaxhuni.com or alban.q.maxhuni@gmail.com
See the LICENSE for complete details.
Contact and links
| Resource | URL |
|---|---|
| Documentation | ai-prishtina-agentic-rag.readthedocs.io |
| Issue tracker | GitHub Issues |
| Discussions | GitHub Discussions |
| info@albanmaxhuni.com |
Maintained by the AI Prishtina project. Built on the shoulders of open-source giants and published RAG research.
Sponsor ongoing development:
or
· BTC: 3BfwQJ2dNTWDn98H5SggNC47fNX8HeWshP
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ai_prishtina_agentic_rag-1.0.4.tar.gz.
File metadata
- Download URL: ai_prishtina_agentic_rag-1.0.4.tar.gz
- Upload date:
- Size: 519.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
46165a3d1e6dca8b3c971cf1387085f76156b809819cc9077d01e9288f6f3afb
|
|
| MD5 |
7c51632bd5a284eed072f185985c8a65
|
|
| BLAKE2b-256 |
f7700bc26e6689c0e69025989ac073efa5996e5af38ee46ba67e89fca7651386
|
File details
Details for the file ai_prishtina_agentic_rag-1.0.4-py3-none-any.whl.
File metadata
- Download URL: ai_prishtina_agentic_rag-1.0.4-py3-none-any.whl
- Upload date:
- Size: 603.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b2d07d75b93f4bb7e1c6b884bb55b92071216100460b0924c92bd08fc6fa3674
|
|
| MD5 |
a1b469bc5d4b6812ad6c1d66432dfb40
|
|
| BLAKE2b-256 |
f7815b43cb083f4171d78b73cd1a600dbbd9eb9236661bcf8c01d48a7441aeb7
|