Skip to main content

A comprehensive, professional-grade agentic Retrieval-Augmented Generation (RAG) library — core building blocks for building RAG applications

Project description

AI Prishtina · Agentic RAG

AI-Prishtina Logo

PyPI version Total Downloads Monthly Downloads Weekly Downloads PyPI Downloads Python 3.8+ License: Commercial Tests Coverage

Your documents had one job. Now they have an agent.

A Python library for agentic retrieval-augmented generation (RAG) that turns your documents into a self-improving, tool-wielding, multi-agent knowledge system. Ships with 18 LLM providers, 13 vector stores, a cognitive layer that literally critiques its own answers, and enough integrations to automate your entire Tuesday. Current release: v0.1.3.

Table of contents


Why this library?

Most RAG libraries stop at "retrieve then generate." This one keeps going.

It plans multi-step queries, remembers what worked last time, critiques its own answers, fuses knowledge from multiple sources, and — if you let it — sends a Slack message about the results. Think of it as the difference between a search bar and an intern who actually reads the documents.

# TL;DR — three lines to go from "I have documents" to "I have answers"
from agentic_rag import AgenticRAG
rag = AgenticRAG(vector_store=my_store, llm_provider=my_llm, enable_agent=True)
response = await rag.aquery("Summarize Q4 revenue trends", use_tools=True)

Key features

The Brain — Adaptive Cognitive RAG (AC-RAG)

The optional metacognitive layer that makes this library agentic rather than just retrieval-augmented.

Component What it does Module
Neural Query Router Routes queries to the optimal retrieval strategy (rule-based + LLM fallback) cognitive.query_router
Reflective Agent Critiques its own answers and iteratively improves them cognitive.reflective_agent
Hierarchical Memory Three-tier memory (episodic / semantic / procedural) that learns from interactions cognitive.hierarchical_memory
Progressive Retrieval Reformulates queries when initial retrieval quality is low cognitive.progressive_retrieval
Calibrated Confidence Platt-scaled confidence scores trained against actual accuracy cognitive.confidence
Knowledge Fusion Merges results from multiple sources with learned trust weights cognitive.knowledge_fusion
Tool Composer Discovers tools and builds DAG-based execution chains automatically cognitive.tool_composer
Multi-Agent Orchestrator Event-driven multi-agent collaboration for complex queries cognitive.multi_agent
Neural Classifier Sub-millisecond DistilBERT-based intent classification cognitive.neural_classifier
Query Decomposer Breaks complex questions into sub-queries with dependency tracking cognitive.agentic_components
Query Rewriter Multi-query, step-back, and sub-question rewriting strategies cognitive.query_rewriter
Corrective RAG (CRAG) Evaluate retrieval quality → refine or web-fallback cognitive.corrective_rag
Self-RAG Four-checkpoint pipeline: retrieve? relevant? supported? useful? cognitive.self_rag
Adaptive RAG Decides whether to retrieve based on query complexity cognitive.adaptive_rag
Speculative RAG Parallel draft generation → verification → best pick cognitive.speculative_rag
ReAct Agent Interleaved reasoning + acting with automatic tool selection cognitive.react_agent

The Secret Sauce — Novel Retrieval Patterns

Research-backed retrieval techniques that no other PyPI RAG library bundles together. Because why settle for "good enough" when you can have "actually novel"?

Feature What it does (in plain English) Module
HyDE Generates a fake-but-plausible answer, embeds that instead of your question, and retrieves docs that match the hypothetical. It sounds wrong, but it works. retrieval.hyde
RAPTOR Builds a hierarchical tree of document summaries at multiple abstraction levels. Retrieves from the right level based on query specificity. Like a Russian nesting doll, but for knowledge. retrieval.raptor
Citation Grounding Maps every sentence in the LLM's answer back to specific source chunks with [1] markers. Because "trust me bro" is not a valid citation format. core.citation

The Janitor — Clean Architecture

We just Marie Kondo'd the codebase. Deprecated modules? Thanked them for their service and sent them packing. What remains is a clean, logical structure where:

  • providers/ holds all your LLMs, embeddings, and vector stores (because they provide value)
  • document_processing/ has loaders and chunkers in their own subdirectories (no more flat chaos)
  • Everything else is exactly where your intuition expects it to be

If it doesn't spark joy (or serve a purpose), it's gone. The imports work. The structure makes sense. You're welcome.


The Suit — Production-Grade Infrastructure

Enterprise features that keep your RAG system running in production without embarrassing itself. Or you.

Feature What it does (in plain English) Module
Per-Document Access Control ACL-based document filtering during retrieval. Each document gets read_groups/read_users. Essential for multi-tenant deployments where not everyone should see everything. core.access_control
Streaming Structured Output SSE-compatible JSON events (answer chunk, source, confidence, tool_call) for real-time UIs. Your frontend will thank you. core.structured_stream
Prompt Compression LLMLingua-inspired context compression. Fits more context into your token budget while preserving relevance. Because LLMs are expensive and tokens don't grow on trees. core.prompt_compression
Answer Scorer Automatic answer quality scoring with reference-free and reference-based modes. Integrates with the feedback loop for continuous improvement. It grades your homework so you don't have to. evaluation.answer_scorer

The Muscles — 18 LLM Providers

OpenAI, Anthropic, Cohere, Gemini, Mistral, Ollama, Groq, DeepSeek, xAI/Grok, AWS Bedrock, Azure OpenAI, Together.ai, AI21, Fireworks, Perplexity, llama.cpp, HuggingFace local models, and any OpenAI-compatible API. Swap one line, keep everything else.

The Memory — 13 Vector Stores

ChromaDB, FAISS, Pinecone, Weaviate, Qdrant, pgvector, Milvus, Redis, MongoDB Atlas, Elasticsearch, Vespa, Azure AI Search, and an in-memory store for tests.

The Hands — Core Tools

Category Tools Import
Search WebSearchTool, WebScrapeTool, HTTPRequestTool, APISpecTool, GraphQLTool tools.search
Calculation CalculatorTool, StatisticsTool, UnitConverterTool tools.calculation
Code CodeExecutorTool (sandboxed Python/JS) tools.code
File FileReadTool, FileWriteTool, FileListTool, DocumentLoaderTool tools.file
Data JSONTool, SQLTool tools.data
Utility VectorStoreTool, KnowledgeGraphTool, DateTimeTool, TextProcessingTool, TextSummarizerTool tools.utility

The Wardrobe — Contrib Integrations

Optional integrations that live in contrib/ because not every RAG system needs to send a Slack message.

Category Tools Import path
Communication EmailTool, GoogleCalendarTool, SlackTool, DiscordTool, JiraTool contrib.communication
Productivity NotionTool, ConfluenceTool, AsanaTool, TrelloTool, LinearTool, ClickUpTool, AirtableTool, MondayTool contrib.productivity
API Integrations StripeTool, TwilioTool, SendGridTool, PagerDutyTool, DatadogTool, ZapierTool contrib.api_integrations
Storage S3Tool, GCSTool, AzureBlobTool, MemoryTool contrib.storage
DevOps GitTool, GitHubTool contrib.devops
Media AudioTool, WhisperTranscriptionTool, ImageGenerationTool, ImageEditTool contrib.media
Database MongoDBTool, RedisTool, PostgreSQLTool contrib.database
Infrastructure TenantManager, ModelCompressor, FederatedCoordinator contrib.infrastructure

The Librarian — Document Processing

Ingest anything short of a napkin sketch (we're working on it): PDF, DOCX, HTML, Markdown, CSV, Excel, JSON, XML, PowerPoint, EPUB, ZIP archives, Jupyter notebooks, images (OCR), audio (Whisper), and video (frame + audio analysis).

Organization that makes sense:

  • document_processing.loaders.* — 20+ document loaders (PDFLoader, DocxLoader, HTMLLoader, etc.)
  • document_processing.chunkers.* — 10+ chunking strategies (FixedSizeChunker, SemanticChunker, MarkdownChunker, CodeChunker, AgenticChunker, etc.)

No more "where did I put that import?" moments. Everything is exactly where it should be.

The Armor — Production Infrastructure

Component What it does
SemanticCache In-memory or Redis — stop paying for the same question twice
CircuitBreaker Prevent cascading failures when your LLM provider has a bad day
OutputGuardrails PII redaction, toxicity filtering, content validation
CostTracker Token usage monitoring with configurable pricing
FeedbackLoop Collect feedback, track trends, learn from interactions
DocumentVersionStore Diff-based change tracking with SHA-256 hashing
BatchIngestionPipeline Async bulk ingestion with back-pressure and retry
ModelCompressor INT8/FP16 quantization, ONNX export, pruning for edge deployment
FederatedCoordinator Privacy-preserving FedAvg strategy learning across deployments
OpenTelemetry Tracing Distributed tracing via get_tracer()

The Judge — Evaluation & Monitoring

ComprehensiveEvaluator (relevance, faithfulness, answer quality, latency), RAGBenchmark + PerformanceBenchmark suites, and ABTest for statistically rigorous variant comparison. 298+ tests in the suite.

Developer experience

  • Configuration-first: YAML, INI, or env vars — no hardcoded models or thresholds
  • Factory pattern: create_tool("web_search"), create_provider("openai") — config-driven instantiation
  • Type safety: Pydantic v2 models across all public APIs
  • Modular: Swap vector stores, LLM providers, tools, and cognitive components independently

Package structure

agentic_rag/
├── base/                   # Abstract base classes (BaseTool, BaseProvider, ...)
├── factories/              # Factory pattern (create_tool, create_provider, ...)
├── core/                   # AgenticRAG, planner, orchestrator, memory, cache, guardrails
├── cognitive/              # AC-RAG: router, reflection, memory, fusion, multi-agent
├── tools/                  # Core tools (search, calculation, code, file, data, utility)
├── contrib/                # Optional integrations
│   ├── communication/      #   Slack, email, calendar, Jira, Discord
│   ├── productivity/       #   Notion, Confluence, Asana, Trello, Linear, ...
│   ├── api_integrations/   #   Stripe, Twilio, SendGrid, PagerDuty, ...
│   ├── storage/            #   S3, GCS, Azure Blob
│   ├── devops/             #   Git, GitHub
│   ├── media/              #   Audio, image generation (DALL-E)
│   ├── database/           #   MongoDB, Redis, PostgreSQL
│   └── infrastructure/     #   Multi-tenancy, model compression, federated learning
├── providers/              # Everything that provides a service
│   ├── llm/                #   18 LLM providers (was: llm/)
│   ├── embeddings/         #   8 embedding providers + cache/utils (was: embeddings/)
│   └── vector_stores/      #   13 vector store backends
├── retrieval/              # Retrievers, rerankers, BM25, ColBERT
├── document_processing/    # Loaders, chunkers, preprocessors (organized in subdirs)
│   ├── loaders/            #   20+ document loaders (pdf, docx, html, etc.)
│   └── chunkers/           #   10+ chunking strategies
├── strategies/             # Pluggable chunking & retrieval strategies
├── graph/                  # Knowledge graph, entity extraction, graph retrieval
├── evaluation/             # Metrics, benchmarks, A/B testing
├── server/                 # FastAPI app (optional)
└── utils/                  # Config, exceptions, logging

Quick start

Installation

# The basics
pip install ai-prishtina-agentic-rag

# I want everything and I want it now
pip install ai-prishtina-agentic-rag[all]

# I'm a responsible adult who only installs what I need
pip install ai-prishtina-agentic-rag[openai,chroma]

# Development (you beautiful contributor, you)
pip install -e .[dev]
Use case Install command
Core library pip install ai-prishtina-agentic-rag
Single vector backend pip install ai-prishtina-agentic-rag[chroma] (or pinecone, weaviate, faiss)
All vector backends pip install ai-prishtina-agentic-rag[vector-all]
LLM providers pip install ai-prishtina-agentic-rag[openai] (or anthropic, cohere, llm-all)
Document processing pip install ai-prishtina-agentic-rag[documents,nlp,multimodal]
PDF (advanced) pip install ai-prishtina-agentic-rag[pdf-advanced] (docling, unstructured, camelot, kreuzberg)
PDF (tables only) pip install ai-prishtina-agentic-rag[pdf-camelot]
PDF (figures/equations) pip install ai-prishtina-agentic-rag[pdf-docling]
Production API pip install ai-prishtina-agentic-rag[server,observability]

Environment setup

# Required: at least one LLM provider key
export OPENAI_API_KEY=sk-your-key-here

# Optional: more providers, vector stores, tool keys
export ANTHROPIC_API_KEY=sk-ant-...
export PINECONE_API_KEY=...
export SERP_API_KEY=...       # for WebSearchTool

The full cookbook

Recipe 1 — Hello, RAG world

The absolute minimum to go from zero to answers. No bells, no whistles, just vibes.

Basic Setup

import asyncio
from agentic_rag import AgenticRAG
from agentic_rag.providers.llm import OpenAIProvider
from agentic_rag.providers.vector_stores import InMemoryVectorStore

async def main():
    store = InMemoryVectorStore()
    llm = OpenAIProvider(api_key="sk-...", model="gpt-4o")

    rag = AgenticRAG(vector_store=store, llm_provider=llm)

    await rag.add_documents([
        {"content": "Python was created by Guido van Rossum in 1991."},
        {"content": "The Zen of Python includes 'Beautiful is better than ugly.'"},
    ])

    response = await rag.aquery("Who created Python and when?")
    print(response.answer)
    # Output: Python was created by Guido van Rossum in 1991.

asyncio.run(main())

Load Documents from Files

from agentic_rag.document_processing.loaders import (
    PDFLoader, DocxLoader, TextLoader, MarkdownLoader,
    HTMLLoader, JSONLoader, CSVLoader
)

# PDF documents (basic extraction with PyPDF2)
docs = PDFLoader().load("report.pdf")
await rag.add_documents([{"content": d.content, "metadata": d.metadata} for d in docs])

# Advanced PDF extraction - tables, figures, equations
# Install: pip install ai-prishtina-agentic-rag[pdf-advanced]
docs = PDFLoader(
    extraction_backend="docling",  # or "unstructured", "camelot", "kreuzberg"
    extract_tables=True,
    extract_figures=True,
    extract_equations=True
).load("research_paper.pdf")

# Word documents
docs = DocxLoader().load("contract.docx")
await rag.add_documents([{"content": d.content, "metadata": d.metadata} for d in docs])

# Text files
docs = TextLoader(encoding="utf-8").load("notes.txt")

# Markdown
docs = MarkdownLoader().load("documentation.md")

# HTML (web pages)
docs = HTMLLoader().load("page.html")

# Structured data
json_docs = JSONLoader().load("data.json")
csv_docs = CSVLoader().load("spreadsheet.csv")

Smart Chunking

from agentic_rag.document_processing.chunkers import (
    SemanticChunker, FixedSizeChunker, MarkdownChunker,
    CodeChunker, Language
)

# Semantic chunking (preserves sentence boundaries)
chunker = SemanticChunker(max_chunk_size=1000, overlap=100)
chunks = chunker.chunk(long_document)

# Fixed size with overlap
chunker = FixedSizeChunker(chunk_size=500, overlap=50)

# Markdown-aware (respects headers)
chunker = MarkdownChunker(max_chunk_size=1500, header_split_levels=[1, 2, 3])

# Code-aware chunking
chunker = CodeChunker(language=Language.PYTHON, max_chunk_size=2000)

Query with Options

# Basic query
response = await rag.aquery("What is machine learning?")

# With filters
response = await rag.aquery(
    "Q4 revenue",
    filters={"source": "financial_reports", "year": 2024}
)

# With source citations
response = await rag.aquery(
    "Explain neural networks",
    include_citations=True,
    max_sources=5
)
print(response.answer)  # Includes [1], [2] citations
print(response.sources)  # List of source documents

# Control creativity
response = await rag.aquery(
    "Write a poem about AI",
    temperature=0.9,
    max_tokens=500
)

Migration from LlamaIndex/LangChain

from agentic_rag.contrib.migration import LlamaIndexImporter, LangChainImporter

# Import from LlamaIndex
from llama_index.core import VectorStoreIndex
li_index = VectorStoreIndex.from_documents(docs)
importer = LlamaIndexImporter(vector_store=store)
result = await importer.import_vector_store_index(li_index)
print(f"Migrated {result.documents_imported} documents")

# Import from LangChain
from langchain_chroma import Chroma
lc_store = Chroma(persist_directory="./my_chroma_db")
importer = LangChainImporter(vector_store=store)
result = await importer.import_vectorstore(lc_store)
print(f"Migrated {result.documents_imported} documents")

Why you need this: Because every journey starts with a single step, and this is that step. It's the "hello world" of making your documents actually useful instead of just taking up disk space.

Recipe 2 — Agentic mode (let it plan)

When your question is too complex for a single retrieval pass, let the planner break it down.

Basic Agent Setup

from agentic_rag.tools import WebSearchTool, CalculatorTool

rag = AgenticRAG(
    vector_store=store,
    llm_provider=llm,
    enable_agent=True,
    enable_memory=True,
)

rag.register_tool(WebSearchTool(api_key="your-serp-key"))
rag.register_tool(CalculatorTool())

response = await rag.aquery(
    "Find the GDP of France and Germany, then calculate the difference",
    enable_planning=True,
    use_tools=True,
)
print(response.answer)
print("Steps taken:", response.reasoning_steps)
print("Confidence:", response.confidence)

All Available Tools

from agentic_rag.tools import (
    # Search tools
    WebSearchTool,      # Google/Bing search via SERP API
    WebScrapeTool,      # Scrape web pages
    HTTPRequestTool,    # Generic HTTP requests
    APISpecTool,        # Read OpenAPI specs
    GraphQLTool,        # GraphQL queries
    
    # Calculation tools
    CalculatorTool,     # Math expressions
    StatisticsTool,     # Statistical analysis
    UnitConverterTool,  # Unit conversions
    
    # Code execution
    CodeExecutorTool,   # Sandboxed Python/JS execution
    
    # File operations
    FileReadTool,       # Read local files
    FileWriteTool,      # Write files
    FileListTool,       # List directories
    DocumentLoaderTool, # Load docs with auto-detection
    
    # Data processing
    JSONTool,           # JSON operations
    SQLTool,            # SQL queries on data
    
    # Knowledge tools
    VectorStoreTool,    # Direct vector search
    KnowledgeGraphTool, # Graph queries
    DateTimeTool,       # Date/time operations
    TextProcessingTool, # Text transformations
    TextSummarizerTool, # Summarize text
)

# Register multiple tools
rag.register_tools([
    WebSearchTool(api_key=os.getenv("SERP_API_KEY")),
    CalculatorTool(),
    CodeExecutorTool(allowed_languages=["python", "javascript"]),
    FileReadTool(base_path="/app/data"),
    JSONTool(),
    DateTimeTool(),
])

Multi-Step Planning

# Complex query requiring multiple steps
response = await rag.aquery(
    """
    Research the top 3 LLM models released in 2024,
    calculate their average parameter count,
    and save the results to a JSON file
    """,
    enable_planning=True,
    use_tools=True,
    max_steps=10,  # Allow up to 10 planning steps
)

# The agent will:
# 1. Search for "top LLM models 2024"
# 2. Extract parameter counts from results
# 3. Use Calculator to compute average
# 4. Use FileWriteTool to save JSON

Tool Chains

from agentic_rag.cognitive import ToolComposer

# Auto-compose tool chains for complex workflows
composer = ToolComposer(rag)
chain = composer.create_chain([
    "web_search",
    "web_scrape",
    "text_summarize",
    "file_write"
])

result = await chain.execute(
    "Find articles about climate change, summarize them, and save to climate_research.txt"
)

Why you need this: Because real questions aren't simple. Sometimes you need to search the web, do math, and synthesize results. This is like giving your RAG system a Swiss Army knife and the intelligence to know when to use each tool.

Recipe 3 — The cognitive pipeline (full AC-RAG)

For when you want the system to route, retrieve progressively, reflect, and learn.

Basic Cognitive Query

result = await rag.run_cognitive_query(
    "Compare the economic impact of AI adoption in healthcare vs finance",
    enable_reflection=True,
    enable_progressive_retrieval=True,
)
print(result.answer)
print(f"Confidence: {result.confidence:.2f}")
print(f"Reflections: {result.reflection_count}")

All 15 Cognitive Components

from agentic_rag.cognitive import (
    # Routing & Classification
    NeuralQueryRouter,      # Route queries to optimal strategy
    NeuralQueryClassifier,  # Sub-millisecond intent classification
    
    # Query Processing
    QueryDecomposer,        # Break complex questions into sub-queries
    QueryRewriter,          # Multi-query, step-back, sub-question rewriting
    
    # Retrieval Strategies
    ProgressiveRetriever,   # Reformulate when quality is low
    CorrectiveRAG,          # Evaluate quality → refine or web-fallback
    SelfRAG,                # Four-checkpoint pipeline
    AdaptiveRAG,            # Decide whether to retrieve
    SpeculativeRAG,         # Parallel draft → verify → pick best
    
    # Answer Quality
    ReflectiveAgent,        # Self-critique and iterative improvement
    CalibratedConfidence,   # Platt-scaled confidence scores
    KnowledgeFusion,        # Merge multi-source with trust weights
    
    # Multi-Agent
    MultiAgentOrchestrator, # Event-driven collaboration
    ToolComposer,           # Auto-discover and compose tool chains
    
    # Memory
    HierarchicalMemory,     # Episodic / semantic / procedural
)

# Use individual components
router = NeuralQueryRouter()
decision = router.route("What were Apple's Q3 earnings?")
print(decision.strategy)  # "financial_retrieval"

# Query decomposition
decomposer = QueryDecomposer()
decomposed = decomposer.decompose(
    "Compare Tesla and BMW's EV market share in Europe"
)
for sub in decomposed.sub_queries:
    print(f"  - {sub.query}")  # "Tesla EV market share Europe"
                              # "BMW EV market share Europe"

# Progressive retrieval
progressive = ProgressiveRetriever(vector_store=store)
result = await progressive.retrieve(
    "Quantum computing breakthroughs 2024",
    min_confidence=0.7,
    max_attempts=3
)

RAG Pattern Selection

from agentic_rag.cognitive import (
    CorrectiveRAG, SelfRAG, AdaptiveRAG, SpeculativeRAG
)

# Corrective RAG: Fix bad retrieval
crag = CorrectiveRAG(vector_store=store, web_search_tool=web_search)
result = await crag.query("Latest SpaceX Starship launch date")
# If retrieval quality is low, automatically falls back to web search

# Self-RAG: Four checkpoints
self_rag = SelfRAG(vector_store=store)
result = await self_rag.query(
    "Explain transformer architecture",
    checkpoints=["retrieve", "relevant", "supported", "useful"]
)
print(result.checkpoints_passed)  # [True, True, True, True]

# Adaptive RAG: Smart retrieval decisions
adaptive = AdaptiveRAG(vector_store=store)
result = await adaptive.query("What is 2+2?")  # No retrieval needed
result = await adaptive.query("Explain RAPTOR paper")  # Retrieves automatically

# Speculative RAG: Draft and verify
speculative = SpeculativeRAG(llm=llm)
result = await speculative.query(
    "Summarize climate change impacts",
    num_drafts=3,  # Generate 3 parallel drafts
)
print(result.best_draft.confidence)

Reflective Agent with Critique

from agentic_rag.cognitive import ReflectiveAgent

reflective = ReflectiveAgent(llm=llm, max_iterations=3)
result = await reflective.process(
    "What are the main causes of World War I?",
    reflection_focus=["factual_accuracy", "completeness", "source_diversity"]
)

print(f"Answer: {result.answer}")
print(f"Iterations: {result.iteration_count}")
for critique in result.critiques:
    print(f"  Issue: {critique.issue} | Severity: {critique.severity}")

Hierarchical Memory

from agentic_rag.cognitive import HierarchicalMemory

memory = HierarchicalMemory()

# Episodic: Remember past interactions
await memory.episodic.record_interaction(
    query="Python list comprehensions",
    answer="List comprehensions provide...",
    outcome="helpful"
)

# Semantic: Learn facts
await memory.semantic.store_fact(
    subject="Python",
    predicate="created_by",
    object="Guido van Rossum"
)

# Procedural: Learn strategies
await memory.procedural.record_strategy(
    situation="vague_query",
    action="ask_clarifying_question",
    outcome="success"
)

# Query memory
similar = await memory.episodic.find_similar("list comprehension syntax")
print(f"Found {len(similar)} similar past queries")

Why you need this: Because sometimes one pass isn't enough. This is like giving your RAG system a PhD - it questions its own answers, retrieves more when uncertain, and learns from every interaction.

Recipe 4 — Swap your LLM like changing socks (18 providers)

Every provider has the same interface. Swap one line, keep everything else.

All 18 LLM Providers

from agentic_rag.providers.llm import (
    # Cloud Providers
    OpenAIProvider,         # GPT-4o, o1, o3, GPT-4.5
    AnthropicProvider,      # Claude 3.5 / 4 Sonnet, Opus, Haiku
    GeminiProvider,         # Google Gemini 1.5/2.0 Pro/Flash
    CohereProvider,         # Command R/R+
    MistralProvider,        # Mistral Large/Medium/Small
    
    # Fast Inference
    GroqProvider,           # Llama/Mixtral at 500+ tok/sec
    TogetherAIProvider,     # 100+ open models
    FireworksProvider,      # Fast inference for open models
    
    # Specialized
    DeepSeekProvider,       # DeepSeek V3/R1 (reasoning)
    XAIProvider,            # Grok models
    PerplexityProvider,     # Sonar models with citations
    AI21Provider,           # Jamba models
    
    # Enterprise / Cloud
    AzureOpenAIProvider,    # GPT-4 via Azure
    BedrockProvider,        # AWS Bedrock (Claude, Llama, etc.)
    
    # OpenAI-Compatible
    OpenAICompatibleProvider,  # Any OpenAI-compatible API
    
    # Local / Self-Hosted
    OllamaProvider,         # Local models (Llama, Mistral, etc.)
    LocalModelProvider,     # Generic local model wrapper
    LlamaCppProvider,       # llama.cpp backend
)

Quick Examples

# OpenAI - The reliable choice
llm = OpenAIProvider(api_key="sk-...", model="gpt-4o")

# Anthropic - Best for long context
llm = AnthropicProvider(
    api_key="sk-ant-...",
    model="claude-sonnet-4-20250514",
    max_tokens=8192
)

# Google Gemini - Multimodal powerhouse
llm = GeminiProvider(api_key="...", model="gemini-2.0-flash")

# Groq - Speed demon (500+ tok/sec)
llm = GroqProvider(api_key="...", model="llama-3.3-70b-versatile")

# DeepSeek - Best reasoning model
llm = DeepSeekProvider(api_key="...", model="deepseek-r1")

# Ollama - Run locally, pay nothing
llm = OllamaProvider(model="llama3.3", host="http://localhost:11434")

# Azure OpenAI - Enterprise ready
llm = AzureOpenAIProvider(
    api_key="...",
    endpoint="https://myresource.openai.azure.com",
    deployment_name="gpt-4o"
)

# AWS Bedrock - Your AWS bill's new friend
llm = BedrockProvider(
    aws_access_key="...",
    aws_secret_key="...",
    region="us-east-1",
    model="anthropic.claude-3-sonnet-20240229-v1:0"
)

# Any OpenAI-compatible API
llm = OpenAICompatibleProvider(
    base_url="https://api.mystartup.com/v1",
    api_key="...",
    model="custom-model"
)

Using the Factory

from agentic_rag.factories import create_provider

# Create by name (useful for config-driven setups)
llm = create_provider("openai", model="gpt-4o", api_key="...")
llm = create_provider("anthropic", model="claude-sonnet-4", api_key="...")
llm = create_provider("ollama", model="llama3.3")

# Same interface everywhere
rag = AgenticRAG(vector_store=store, llm_provider=llm)

Why you need this: Because vendor lock-in is the adult version of "you can't sit with us." We believe in playing the field - try OpenAI today, switch to local models tomorrow, your code stays the same.

Recipe 5 — Vector stores (13 options + hybrid retrieval)

All 13 Vector Stores

from agentic_rag.providers.vector_stores import (
    # Local / Development
    InMemoryVectorStore,      # Zero-config, for testing
    ChromaVectorStore,        # Local ChromaDB
    FAISSVectorStore,         # Facebook AI Similarity Search
    
    # Production / Managed
    PineconeVectorStore,      # Managed vector search
    WeaviateVectorStore,      # Vector + semantic search
    QdrantVectorStore,        # High-performance vector DB
    PGVectorStore,            # PostgreSQL + pgvector
    MilvusVectorStore,        # Distributed vector DB
    
    # Cloud / Enterprise
    RedisVectorStore,         # Redis Stack with RediSearch
    MongoDBAtlasVectorStore,  # MongoDB Atlas Vector Search
    ElasticsearchVectorStore, # Elasticsearch dense vectors
    VespaVectorStore,          # Vespa search engine
    AzureAISearchVectorStore, # Azure AI Search
)

Quick Setup Examples

# In-Memory (testing/development)
store = InMemoryVectorStore()

# ChromaDB (local development)
store = ChromaVectorStore(
    collection_name="my_docs",
    persist_directory="./chroma_db"
)

# Pinecone (managed production)
store = PineconeVectorStore(
    api_key="your-key",
    environment="us-west1-gcp",
    index_name="production-index",
    dimension=1536  # Match your embedding model
)

# Weaviate (local or cloud)
store = WeaviateVectorStore(
    host="http://localhost:8080",
    class_name="Documents"
)

# Qdrant (local or cloud)
store = QdrantVectorStore(
    host="localhost",
    port=6333,
    collection_name="docs"
)

# PostgreSQL + pgvector
store = PGVectorStore(
    connection_string="postgresql://user:pass@localhost/db",
    table_name="embeddings",
    dimension=1536
)

# Milvus (distributed)
store = MilvusVectorStore(
    host="localhost",
    port="19530",
    collection_name="documents"
)

# Redis (with RediSearch)
store = RedisVectorStore(
    redis_url="redis://localhost:6379",
    index_name="rag_docs"
)

# MongoDB Atlas
store = MongoDBAtlasVectorStore(
    connection_string="mongodb+srv://...",
    database="rag",
    collection="documents",
    index_name="vector_index"
)

# Elasticsearch
store = ElasticsearchVectorStore(
    hosts=["http://localhost:9200"],
    index_name="rag_documents"
)

# Azure AI Search
store = AzureAISearchVectorStore(
    endpoint="https://search.search.windows.net",
    api_key="...",
    index_name="documents"
)

Hybrid Retrieval (Dense + Sparse)

from agentic_rag.retrieval import HybridRetriever, BM25Retriever

# Combine vector search with BM25 keyword search
hybrid = HybridRetriever(
    vector_store=store,
    dense_weight=0.7,
    sparse_weight=0.3
)

# Or with explicit BM25
from agentic_rag.retrieval import BM25Retriever
bm25 = BM25Retriever(documents=all_docs)
hybrid = HybridRetriever(
    vector_store=store,
    sparse_retriever=bm25,
    dense_weight=0.6,
    sparse_weight=0.4
)

# Use with RAG
rag = AgenticRAG(vector_store=store, llm_provider=llm)
response = await rag.aquery("What is machine learning?", retriever=hybrid)

Using the Factory

from agentic_rag.factories import create_vector_store

# Create by name from config
store = create_vector_store("chroma", collection_name="docs")
store = create_vector_store("pinecone", api_key="...", index_name="prod")
store = create_vector_store("qdrant", host="localhost", port=6333)

Why you need this: Because your use case matters. Local development shouldn't require a PhD in cloud architecture, and production shouldn't run on your laptop. We've got you covered from prototype to scale.

Recipe 6 — Document processing (20+ loaders, 10+ chunkers)

All Document Loaders

from agentic_rag.document_processing.loaders import (
    # Text & Documents
    TextLoader,           # Plain text files (.txt)
    PDFLoader,            # PDF documents (.pdf) - with 5 backends!
    DocxLoader,           # Word documents (.docx)
    MarkdownLoader,       # Markdown files (.md)
    HTMLLoader,           # HTML files (.html, .htm)
    
    # Structured Data
    JSONLoader,           # JSON files (.json)
    CSVLoader,            # CSV files (.csv)
    XMLLoader,            # XML files (.xml)
    
    # Spreadsheets & Presentations
    ExcelLoader,          # Excel files (.xlsx, .xls)
    PowerPointLoader,     # PowerPoint (.pptx)
    
    # Media & Archives
    ImageLoader,          # Images with OCR (.jpg, .png, etc.)
    AudioLoader,          # Audio files with transcription
    VideoLoader,          # Video analysis (frames + audio)
    EPubLoader,           # eBooks (.epub)
    NotebookLoader,       # Jupyter notebooks (.ipynb)
    ArchiveLoader,        # Zip, tar, etc.
    
    # Advanced PDF Loaders (specialized)
    DoclingPDFLoader,     # Full document AI (tables, figures, equations)
    CamelotTableLoader,   # Table extraction specialist
    UnstructuredPDFLoader, # Multi-element extraction
    KreuzbergPDFLoader,   # Modern extraction with math support
)

# Load any document
docs = PDFLoader().load("report.pdf")
docs = DocxLoader().load("contract.docx")
docs = MarkdownLoader().load("docs.md")
docs = HTMLLoader().load("page.html")
docs = JSONLoader().load("data.json")
docs = CSVLoader().load("spreadsheet.csv")

# Excel with sheet selection
excel_docs = ExcelLoader(sheet_name="Revenue").load("financials.xlsx")

# Image with OCR
image_docs = ImageLoader(
    ImageLoaderConfig(enable_ocr=True, extract_metadata=True)
).load("diagram.png")

# Video analysis (extract frames + transcribe audio)
from agentic_rag.document_processing.loaders import VideoLoader, VideoLoaderConfig

video_docs = VideoLoader(
    VideoLoaderConfig(
        extract_frames_every_seconds=5,
        transcribe_audio=True,
        max_frames=20
    )
).load("presentation.mp4")

All Chunking Strategies

from agentic_rag.document_processing.chunkers import (
    # Basic chunkers
    FixedSizeChunker,       # Fixed size with overlap
    SemanticChunker,        # Sentence/paragraph boundaries
    
    # Structure-aware
    MarkdownChunker,        # Respects headers (# ## ###)
    HTMLDOMChunker,         # HTML DOM tree chunking
    PDFLayoutChunker,       # Layout-aware PDF chunking
    CodeChunker,            # AST-aware code chunking
    
    # Specialized
    TableChunker,           # Table structure preservation
    JSONStructureChunker,   # JSON hierarchy chunking
    XMLStructureChunker,    # XML element chunking
    MultimodalChunker,      # Image + text alignment
    AgenticChunker,         # LLM-based semantic chunking
)
from agentic_rag.document_processing.chunkers import Language

# Basic semantic chunking
chunker = SemanticChunker(max_chunk_size=1000, overlap=100)
chunks = chunker.chunk(long_text)

# Fixed size
chunker = FixedSizeChunker(chunk_size=500, overlap=50)

# Markdown with headers
chunker = MarkdownChunker(
    max_chunk_size=1500,
    header_split_levels=[1, 2, 3]  # Split at h1, h2, h3
)

# Code-aware (Python, JS, Java, etc.)
chunker = CodeChunker(
    language=Language.PYTHON,
    max_chunk_size=2000,
    respect_function_boundaries=True
)

# HTML DOM chunking
chunker = HTMLDOMChunker(
    max_chunk_size=1500,
    respect_semantic_tags=True,  # <article>, <section>, etc.
    include_attributes=["id", "class"]
)

# PDF layout-aware
chunker = PDFLayoutChunker(
    max_chunk_size=2000,
    detect_columns=True,
    preserve_page_breaks=False
)

# Tables with context
chunker = TableChunker(
    max_chunk_size=1500,
    keep_headers_with_rows=True,
    output_format="text"  # or "csv", "json"
)

Advanced PDF Extraction (5 Backends)

from agentic_rag.document_processing.loaders import (
    PDFLoader,
    PDFExtractionConfig,
    DoclingPDFLoader, DoclingTableLoader, DoclingFigureLoader, DoclingEquationLoader,
    CamelotTableLoader, CamelotLatticeLoader, CamelotStreamLoader,
    UnstructuredPDFLoader, UnstructuredTableLoader,
    KreuzbergPDFLoader, KreuzbergEquationLoader,
    get_available_backends,  # Check which backends are installed
)

# Check available backends
backends = get_available_backends()
# {'pypdf2': True, 'docling': True, 'camelot': False, ...}

# 1. Universal PDFLoader with backend selection
# Uses PyPDF2 by default (no extra dependencies)
docs = PDFLoader().load("document.pdf")

# Auto-select best backend based on extraction needs
docs = PDFLoader(
    extraction_backend="auto",  # Auto-select based on needs
    extract_tables=True,
    extract_figures=True,
    extract_equations=True
).load("research_paper.pdf")

# Force specific backend
docs = PDFLoader(extraction_backend="docling").load("paper.pdf")      # Best overall
docs = PDFLoader(extraction_backend="unstructured").load("form.pdf") # Multi-element
docs = PDFLoader(extraction_backend="kreuzberg").load("math.pdf")    # Math equations

# 2. Docling - Full Document AI (IBM's state-of-the-art)
# Best for: research papers, complex layouts, tables, figures, equations
# Install: pip install ai-prishtina-agentic-rag[pdf-docling]

# Extract everything
docs = DoclingPDFLoader(
    extraction_config=PDFExtractionConfig(
        extract_tables=True,
        extract_figures=True,
        extract_equations=True
    )
).load("research_paper.pdf")

# Returns list of documents by type:
# - Document(content="Introduction text...", metadata={"type": "text", "page": 1})
# - Document(content="| Revenue | Q1 | Q2 |...", metadata={"type": "table", "page": 3})
# - Document(content="Figure 1: Neural network architecture", metadata={"type": "figure", "page": 2})
# - Document(content="E = mc^2", metadata={"type": "equation", "latex": "E = mc^2"})

# Specialized loaders
from agentic_rag.document_processing.loaders import DoclingTableLoader
tables = await DoclingTableLoader().load("financial_report.pdf")
figures = await DoclingFigureLoader().load("paper_with_charts.pdf")
equations = await DoclingEquationLoader().load("math_paper.pdf")

# 3. Camelot - Table Extraction Specialist
# Best for: financial reports, data tables, structured PDFs
# Install: pip install ai-prishtina-agentic-rag[pdf-camelot]

# Auto-detect table type (lattice vs stream)
tables = await CamelotTableLoader(flavor="auto").load("report.pdf")

# Force lattice mode (for tables with ruling lines)
tables = await CamelotLatticeLoader(line_scale=15).load("financial_report.pdf")

# Force stream mode (for unruled tables using whitespace)
tables = await CamelotStreamLoader(shift_text=["l", "t"]).load("data_table.pdf")

# Access extracted table data
for table in tables:
    df = table.metadata.get("dataframe")  # pandas DataFrame
    csv = table.metadata.get("csv")       # CSV string
    accuracy = table.metadata.get("accuracy")  # Extraction confidence
    print(f"Table on page {table.metadata['page']}: {table.metadata['shape']}")

# 4. Unstructured - Multi-Element Extraction
# Best for: mixed documents, forms, headers/footers, complex layouts
# Install: pip install ai-prishtina-agentic-rag[pdf-unstructured]

# Extract all elements
docs = UnstructuredPDFLoader(
    strategy="hi_res",  # high accuracy (slower)
    extract_tables=True,
    languages=["eng"]
).load("document.pdf")

# Filter by element type
tables = [d for d in docs if d.metadata.get("type") == "table"]
headers = [d for d in docs if d.metadata.get("type") == "header"]
text_blocks = [d for d in docs if d.metadata.get("type") == "text"]

# Fast text-only extraction
docs = UnstructuredTextLoader(strategy="fast").load("document.pdf")

# Table-only extraction
tables = UnstructuredTableLoader().load("report.pdf")

# 5. Kreuzberg - Modern Extraction with Math
# Best for: modern PDFs, mathematical content, academic papers
# Install: pip install ai-prishtina-agentic-rag[pdf-kreuzberg]

# General extraction
docs = KreuzbergPDFLoader().load("document.pdf")

# Math-focused extraction
equations = KreuzbergEquationLoader().load("math_paper.pdf")
for eq in equations:
    latex = eq.metadata.get("latex")
    print(f"Equation: {latex}")

Factory Pattern

from agentic_rag.factories import create_chunker, create_loader, get_loader_for_file

# Config-driven chunking
chunker = create_chunker("semantic", max_chunk_size=1000)
chunker = create_chunker("markdown", max_chunk_size=1500)
chunker = create_chunker("fixed", chunk_size=500)

# Config-driven loading
loader = create_loader("pdf")
loader = create_loader("docx")
loader = create_loader("auto")  # Auto-detect from extension
docs = loader.load("document.pdf")

# Advanced PDF via factory
loader = create_loader("pdf", extraction_backend="docling")
loader = create_loader("pdf_tables", flavor="lattice")
loader = create_loader("pdf_docling_figures")
loader = create_loader("pdf_unstructured", strategy="hi_res")

# Auto-detect loader from file extension
loader = get_loader_for_file("research.pdf", extraction_backend="docling")
loader = get_loader_for_file("data.csv")

Preprocessing Pipeline

from agentic_rag.document_processing.preprocessors import TextPreprocessor, MetadataExtractor

# Clean and normalize
preprocessor = TextPreprocessor(
    remove_extra_whitespace=True,
    normalize_unicode=True,
    fix_line_breaks=True
)
clean_text = preprocessor.process(raw_text)

# Extract metadata
extractor = MetadataExtractor()
metadata = extractor.extract(
    content=doc_content,
    source_path="report.pdf",
    extract_title=True,
    extract_dates=True,
    extract_entities=True
)

Why you need this: Because real documents come in all shapes and sizes. PDFs, Word docs, Markdown, JSON - we handle them all. Smart chunking means better retrieval, which means better answers. It's that simple.

Recipe 7 — Graph RAG (entity relationships + graph traversal)

Basic Graph Setup

from agentic_rag.graph import KnowledgeGraph, EntityExtractor, GraphBuilder

# Create graph
graph = KnowledgeGraph()

# Choose extraction method
extractor = EntityExtractor(method="spacy")  # or "llm" for better accuracy

# Build from documents
builder = GraphBuilder(graph=graph, extractor=extractor)
await builder.build_from_documents(documents)

# Query with graph
rag = AgenticRAG(vector_store=store, llm_provider=llm, knowledge_graph=graph)
response = await rag.aquery("How is Company X related to Product Y?")

Entity Extraction Methods

from agentic_rag.graph import EntityExtractor, GraphRAGQuery

# Fast rule-based extraction
extractor = EntityExtractor(method="spacy")

# LLM-powered extraction (more accurate, slower)
extractor = EntityExtractor(
    method="llm",
    llm_provider=llm,
    entity_types=["PERSON", "ORG", "PRODUCT", "EVENT", "TECHNOLOGY"]
)

# Hybrid approach
extractor = EntityExtractor(
    method="hybrid",
    llm_provider=llm,
    confidence_threshold=0.7
)

Graph Traversal Queries

from agentic_rag.graph import GraphRAGQuery, TraversalStrategy

# Multi-hop traversal
query = GraphRAGQuery(
    start_node="Elon Musk",
    relation_type="founded",
    max_hops=2,
    strategy=TraversalStrategy.BFS
)
path = await graph.traverse(query)
# Returns: Elon Musk -> founded -> Tesla -> produces -> Model S

# Relationship-based retrieval
results = await graph.query(
    "Find all companies ACQUIRED_BY Google since 2020"
)

# Hybrid: Vector + Graph
response = await rag.aquery(
    "Who worked at companies that were later acquired by Meta?",
    use_graph=True,
    graph_depth=3
)

Graph RAG Patterns

from agentic_rag.graph import (
    EntityLinker,       # Link entities across documents
    RelationClassifier, # Classify relationship types
    GraphSummarizer,    # Summarize subgraphs
)

# Link entities (disambiguation)
linker = EntityLinker(graph=graph)
await linker.link_entities(threshold=0.85)
# Links "Apple Inc." and "Apple Company" as the same entity

# Relation classification
classifier = RelationClassifier(llm=llm)
relations = await classifier.classify(
    entity1="Microsoft",
    entity2="OpenAI",
    context="Microsoft invested $10B in OpenAI"
)
print(relations)  # ["investor_of", "partner_of"]

Why you need this: Because knowledge isn't flat. Companies acquire other companies, people work at multiple places, products have dependencies. Graph RAG understands relationships, not just keywords.

Recipe 8 — Streaming (SSE + structured events)

Basic Streaming

# Simple text streaming
async for chunk in rag.astream("Explain quantum computing in detail"):
    print(chunk.content, end="", flush=True)

Structured Streaming Events

from agentic_rag.core import StreamingEvent

# Full event streaming (SSE-compatible)
async for event in rag.astream_structured(
    "What are the latest AI breakthroughs?",
    include_sources=True,
    include_confidence=True
):
    if event.type == "answer_chunk":
        print(event.content, end="")
    elif event.type == "source":
        print(f"\n[Source {event.metadata['index']}]: {event.content[:100]}...")
    elif event.type == "confidence":
        print(f"\n[Confidence: {event.content:.2f}]")
    elif event.type == "tool_call":
        print(f"\n[Using tool: {event.metadata['tool_name']}]")
    elif event.type == "reflection":
        print(f"\n[Reflection: {event.content}]")
    elif event.type == "complete":
        print("\n[Done]")

FastAPI SSE Endpoint

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from agentic_rag.server import create_app

app = create_app()

@app.post("/stream")
async def stream_query(query: str):
    async def event_generator():
        async for event in rag.astream_structured(query):
            yield f"event: {event.type}\ndata: {event.to_json()}\n\n"
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )

# JavaScript client:
# const eventSource = new EventSource('/stream?query=...');
# eventSource.onmessage = (e) => console.log(JSON.parse(e.data));

WebSocket Streaming

from agentic_rag.server import WebSocketRAGHandler

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    handler = WebSocketRAGHandler(rag)
    await handler.handle(websocket)
    
# Supports:
# - Bidirectional communication
# - Query cancellation
# - Session persistence
# - Multi-turn conversations

Why you need this: Because users hate waiting. Streaming gives instant feedback, lower perceived latency, and lets you build chat-style interfaces that don't feel like they're loading on dial-up.

Recipe 9 — Evaluation (metrics + A/B testing)

Comprehensive Evaluation

from agentic_rag.evaluation import (
    ComprehensiveEvaluator,
    AnswerRelevanceMetric,
    ContextPrecisionMetric,
    FaithfulnessMetric,
    AnswerCorrectnessMetric,
    LatencyMetric,
    CostMetric
)

evaluator = ComprehensiveEvaluator()

# Evaluate a single response
scores = await evaluator.evaluate(
    query="What is the capital of France?",
    response="Paris is the capital of France.",
    ground_truth="Paris",
    context=["France is a country in Western Europe. Its capital is Paris."],
)

print(f"Answer Relevance: {scores['relevance'].score:.2f}")
print(f"Context Precision: {scores['context_precision'].score:.2f}")
print(f"Faithfulness: {scores['faithfulness'].score:.2f}")
print(f"Correctness: {scores['correctness'].score:.2f}")
print(f"Latency: {scores['latency'].value_ms}ms")
print(f"Cost: ${scores['cost'].value:.4f}")

Reference-Free Evaluation

from agentic_rag.evaluation import ReferenceFreeEvaluator

# When you don't have ground truth
rf_evaluator = ReferenceFreeEvaluator()
scores = await rf_evaluator.evaluate(
    query="Explain neural networks",
    response="Neural networks are computational models inspired by biological brains...",
    context=["Neural networks consist of layers of interconnected nodes..."]
)
# Evaluates: coherence, completeness, relevance without ground truth

Batch Evaluation

from agentic_rag.evaluation import BatchEvaluator, EvaluationDataset

# Load test dataset
dataset = EvaluationDataset.from_json("test_queries.json")
# Format: [{"query": "...", "ground_truth": "...", "context": "..."}, ...]

# Run batch evaluation
batch_eval = BatchEvaluator(rag)
results = await batch_eval.evaluate_dataset(dataset)

# Get summary statistics
print(f"Mean relevance: {results.mean('relevance'):.2f}")
print(f"Mean latency: {results.mean('latency'):.0f}ms")
print(f"P95 latency: {results.percentile('latency', 95):.0f}ms")

# Export detailed results
results.to_csv("evaluation_results.csv")
results.to_html("evaluation_report.html")

A/B Testing

from agentic_rag.evaluation import ABTest, ABTestConfig, Variant

# Configure test
config = ABTestConfig(
    test_name="gpt4_vs_claude_sonnet",
    min_sample_size=100,
    max_sample_size=500,
    primary_metric="answer_correctness",
    confidence_level=0.95
)

ab = ABTest(config)

# Register variants
ab.register_variant("gpt4", Variant(
    pipeline=gpt4_rag,
    name="GPT-4 Pipeline"
))
ab.register_variant("claude", Variant(
    pipeline=claude_rag,
    name="Claude Sonnet Pipeline"
))

# Run test (automatically determines winner)
winner = await ab.run(dataset)
print(f"Winner: {winner.name}")
print(f"Improvement: {winner.improvement_percent:.1f}%")
print(f"P-value: {winner.p_value:.4f}")

# Or manual control
ab.start()
ab.add_result("gpt4", query, response, score)
ab.add_result("claude", query, response, score)
status = ab.get_status()
if status.is_conclusive:
    print(f"Winner: {status.winner}")

Custom Metrics

from agentic_rag.evaluation import BaseMetric, MetricResult

class CustomMetric(BaseMetric):
    """Custom metric for domain-specific evaluation."""
    
    async def evaluate(self, query, response, context, **kwargs) -> MetricResult:
        # Your custom logic
        score = self._calculate_score(response)
        return MetricResult(
            score=score,
            explanation="Custom evaluation reasoning",
            metadata={"custom_field": value}
        )

# Register and use
evaluator.register_metric("custom", CustomMetric())

Prompt Optimization (DSPy-style)

from agentic_rag.cognitive import PromptOptimizer, OptimizationConfig

# Golden dataset for optimization
golden_examples = [
    {
        "question": "What is RAG?",
        "answer": "Retrieval-Augmented Generation combines information retrieval with text generation..."
    },
    {
        "question": "How does chunking work?",
        "answer": "Chunking splits documents into smaller segments for efficient retrieval..."
    },
]

# Configure optimization
config = OptimizationConfig(
    metric="answer_relevance",  # "accuracy", "completeness", "semantic_similarity"
    max_iterations=20,
    few_shot_k=3,
)

optimizer = PromptOptimizer(
    llm_provider=llm,
    vector_store=store,
    config=config,
)

# Run optimization
result = await optimizer.optimize(
    base_prompt="Answer the question based on the provided context.",
    examples=golden_examples,
)

print(f"Best prompt: {result.best_prompt.prompt_text}")
print(f"Score: {result.best_prompt.score:.3f}")
print(f"Improvement: +{result.improvement:.3f}")

# Save results
optimizer.save_candidates("optimized_prompts.json", result)

Why you need this: Because "it works" isn't enough. You need to know HOW well it works, whether that expensive model upgrade actually helps, and if your new retrieval strategy is worth the complexity.

Recipe 10 — Production hardening (caching + guardrails + monitoring)

Semantic Caching

from agentic_rag import SemanticCache

# Redis-backed semantic cache
cache = SemanticCache(
    backend="redis",
    redis_url="redis://localhost:6379",
    similarity_threshold=0.95,  # Consider queries similar above 95%
    ttl=3600  # Cache for 1 hour
)

# Or in-memory for single-node
cache = SemanticCache(backend="memory", max_size=10000)

# Use with RAG
rag = AgenticRAG(
    vector_store=store,
    llm_provider=llm,
    cache=cache
)

# Cache stats
print(f"Hit rate: {cache.hit_rate:.2%}")
print(f"Saved tokens: {cache.saved_tokens:,}")
print(f"Saved cost: ${cache.saved_cost:.2f}")

Circuit Breakers

from agentic_rag import CircuitBreaker

# Protect against LLM failures
breaker = CircuitBreaker(
    name="openai_api",
    failure_threshold=5,      # Open after 5 failures
    recovery_timeout=60,        # Try again after 60s
    half_open_max_calls=3,      # Test with 3 calls when recovering
    success_threshold=2         # Need 2 successes to close
)

# Use with provider
llm = OpenAIProvider(
    api_key="...",
    circuit_breaker=breaker,
    fallback_provider=OllamaProvider(model="llama3.3")  # Local fallback
)

# Monitor breaker state
print(f"State: {breaker.state}")  # CLOSED, OPEN, HALF_OPEN
print(f"Failures: {breaker.failure_count}")
print(f"Last failure: {breaker.last_failure_time}")

Output Guardrails

from agentic_rag import OutputGuardrails, PII Detector, ToxicityFilter

# Comprehensive guardrails
guardrails = OutputGuardrails(
    # PII Detection
    enable_pii_detection=True,
    pii_types=["email", "phone", "ssn", "credit_card", "address"],
    
    # Content filtering
    enable_toxicity_filter=True,
    toxicity_threshold=0.7,
    
    # Custom rules
    blocked_patterns=[
        r"\b(password|secret_key|api_key)\s*=\s*['\"][^'\"]+['\"]"
    ],
    
    # Fact-checking (optional)
    enable_fact_check=True,
    fact_checker=fact_checker_service
)

# Apply to response
safe_response = await guardrails.check(response)
if not safe_response.is_safe:
    print(f"Blocked: {safe_response.violations}")
    return "I cannot provide that information."

Cost Tracking & Budgeting

from agentic_rag import CostTracker, BudgetManager

# Track costs
tracker = CostTracker()
tracker.record_usage(
    model="gpt-4o",
    input_tokens=500,
    output_tokens=200,
    embedding_tokens=1000
)

print(f"Total cost: ${tracker.total_cost:.4f}")
print(f"By model: {tracker.cost_by_model}")
print(f"By day: {tracker.cost_by_day}")

# Budget management
budget = BudgetManager(
    daily_limit=50.0,      # $50/day
    monthly_limit=1000.0,  # $1000/month
    alert_threshold=0.8    # Alert at 80%
)

if budget.would_exceed_limit(expected_cost=2.0):
    # Switch to cheaper model or reject request
    llm = GroqProvider(model="llama-3.3-70b")  # Much cheaper

Rate Limiting

from agentic_rag import RateLimiter

# Token bucket rate limiter
limiter = RateLimiter(
    requests_per_second=10,
    burst_size=20,
    per_user=True  # Track per API key
)

@limiter.limit()
async def query_endpoint(request):
    return await rag.aquery(request.query)

# Or with decorator
@rate_limit(requests_per_minute=60, per_user=True)
async def stream_endpoint(request):
    async for chunk in rag.astream(request.query):
        yield chunk

Health Checks & Monitoring

from agentic_rag import HealthChecker, MetricsCollector

# Health check
health = HealthChecker(rag)
status = await health.check()
print(f"Vector store: {status.vector_store}")  # healthy / degraded / down
print(f"LLM provider: {status.llm_provider}")
print(f"Cache: {status.cache}")

# Metrics
metrics = MetricsCollector()
metrics.record_latency("query", duration_ms=250)
metrics.record_throughput("requests_per_second", count=10)
metrics.record_error_rate("error_rate", errors=1, total=100)

# Export to Prometheus/Grafana
from agentic_rag.metrics import PrometheusExporter
exporter = PrometheusExporter()
exporter.export(metrics)

Prompt Versioning

from agentic_rag.core import PromptVersioning

# Create version store
versioning = PromptVersioning(storage_dir="./prompt_versions")

# Save initial version
v1 = versioning.save_prompt(
    prompt_id="qa_prompt",
    prompt_text="Answer the question based on the context.",
    author="alice",
    tags=["qa", "baseline"],
    change_notes="Initial version"
)

# Iterate and improve
v2 = versioning.save_prompt(
    prompt_id="qa_prompt",
    prompt_text="Provide a concise answer (max 3 sentences) based on the context.",
    author="alice",
    parent_version=v1.version_id,
    tags=["qa", "improved"],
    change_notes="Added length constraint for conciseness"
)

# Rollback if needed
v3 = versioning.rollback("qa_prompt", "v1")

Why you need this: Because production is where prototypes go to die. Caching saves money, circuit breakers prevent cascading failures, guardrails keep you out of trouble, cost tracking prevents surprise bills, and prompt versioning lets you track what worked.

Recipe 11 — Contrib tools (30+ integrations)

Communication

from agentic_rag.contrib.communication import (
    SlackTool,          # Post to Slack channels
    DiscordTool,        # Discord messages
    EmailTool,          # Send emails via SMTP/SendGrid
    TeamsTool,          # Microsoft Teams
    TelegramTool,       # Telegram bot messages
    WebhookTool,        # Generic webhooks
)

# Slack notifications
slack = SlackTool(token="xoxb-your-token")
await slack.execute(
    channel="#ai-research",
    message=f"New insight: {response.answer[:500]}...",
    blocks=[{
        "type": "section",
        "text": {"type": "mrkdwn", "text": f"*Query:* {query}"}
    }]
)

# Email reports
email = EmailTool(
    provider="sendgrid",  # or "smtp", "aws_ses"
    api_key="..."
)
await email.execute(
    to="team@company.com",
    subject="Daily RAG Summary",
    body=response.answer,
    attachments=["report.pdf"]
)

# Generic webhook
webhook = WebhookTool()
await webhook.execute(
    url="https://hooks.zapier.com/hooks/catch/...",
    payload={"query": query, "answer": response.answer}
)

Productivity

from agentic_rag.contrib.productivity import (
    NotionTool,         # Notion pages/databases
    GoogleDocsTool,     # Google Docs
    TrelloTool,         # Trello cards
    AsanaTool,          # Asana tasks
    MondayTool,         # Monday.com
    LinearTool,         # Linear issues
    JiraTool,           # Jira tickets
    ConfluenceTool,     # Confluence pages
)

# Create Notion page
notion = NotionTool(api_key="secret_...")
page = await notion.execute(
    action="create_page",
    parent_id="workspace-id",
    title="Q4 Financial Analysis",
    content=response.answer,
    properties={
        "Status": "Draft",
        "Tags": ["AI-Generated", "Q4"]
    }
)

# Create Jira ticket
jira = JiraTool(
    server="https://company.atlassian.net",
    username="bot@company.com",
    api_token="..."
)
await jira.execute(
    action="create_issue",
    project="AI",
    summary=f"Research: {query[:100]}",
    description=response.answer,
    issue_type="Task",
    labels=["rag-generated"]
)

Storage & Cloud

from agentic_rag.contrib.storage import (
    S3Tool,             # AWS S3
    GCS Tool,           # Google Cloud Storage
    AzureBlobTool,      # Azure Blob Storage
    DropboxTool,        # Dropbox
    GoogleDriveTool,    # Google Drive
    OneDriveTool,       # Microsoft OneDrive
    BoxTool,            # Box.com
)

# S3 upload
s3 = S3Tool(
    bucket="my-rag-outputs",
    region="us-east-1",
    aws_access_key="...",
    aws_secret_key="..."
)
await s3.execute(
    action="upload",
    key=f"reports/{datetime.now():%Y-%m-%d}/analysis.txt",
    body=response.answer,
    metadata={"query": query, "confidence": str(response.confidence)}
)

# Google Drive
drive = GoogleDriveTool(credentials="credentials.json")
file = await drive.execute(
    action="create_document",
    name="Research Summary",
    content=response.answer,
    folder_id="folder-id"
)

Databases

from agentic_rag.contrib.database import (
    PostgresTool,       # PostgreSQL queries
    MySQLTool,          # MySQL queries
    MongoDBTool,        # MongoDB operations
    RedisTool,          # Redis commands
    BigQueryTool,       # Google BigQuery
    SnowflakeTool,      # Snowflake
    ClickHouseTool,     # ClickHouse
    SupabaseTool,       # Supabase
)

# Query database
postgres = PostgresTool(connection_string="postgresql://...")
results = await postgres.execute(
    action="query",
    sql="SELECT * FROM customers WHERE churn_risk > 0.8"
)

# Store in MongoDB
mongo = MongoDBTool(uri="mongodb+srv://...")
await mongo.execute(
    action="insert_one",
    database="rag",
    collection="queries",
    document={
        "query": query,
        "answer": response.answer,
        "timestamp": datetime.now(),
        "confidence": response.confidence
    }
)

Business & Payments

from agentic_rag.contrib.business import (
    StripeTool,         # Stripe payments
    ShopifyTool,        # Shopify operations
    HubSpotTool,        # HubSpot CRM
    SalesforceTool,     # Salesforce
    ZendeskTool,        # Zendesk tickets
    IntercomTool,       # Intercom conversations
    TwilioTool,         # SMS/Voice
)

# Send SMS notification
twilio = TwilioTool(
    account_sid="...",
    auth_token="..."
)
await twilio.execute(
    action="send_sms",
    to="+1234567890",
    body=f"Alert: {response.answer[:100]}..."
)

Search & Discovery

from agentic_rag.contrib.search import (
    AlgoliaTool,        # Algolia search
    ElasticsearchTool,  # Elasticsearch
    MeilisearchTool,    # Meilisearch
    TypesenseTool,      # Typesense
)

# Index documents
algolia = AlgoliaTool(app_id="...", api_key="...")
await algolia.execute(
    action="index",
    index_name="docs",
    objects=[{"objectID": "1", "content": response.answer}]
)

Why you need this: Because insights stuck in your terminal aren't useful. Send them to Slack, save them to Notion, archive them to S3 - make your RAG system a team player, not a hermit.

Recipe 12 — Build your own tool (advanced patterns)

Basic Custom Tool

from agentic_rag.tools.base import BaseTool, ToolResult, ToolParameter
from typing import Dict, Any

class WeatherTool(BaseTool):
    """Because even RAG agents check the weather sometimes."""

    def __init__(self, api_key: str):
        super().__init__(
            name="weather",
            description="Get current weather for a city",
            parameters=[
                ToolParameter(
                    name="city",
                    type="string",
                    description="City name (e.g., 'London', 'New York')",
                    required=True
                ),
                ToolParameter(
                    name="units",
                    type="string",
                    description="Temperature units",
                    enum=["metric", "imperial"],
                    default="metric"
                )
            ],
        )
        self.api_key = api_key

    async def execute(self, city: str, units: str = "metric", **kwargs) -> ToolResult:
        """Execute the weather lookup."""
        try:
            # Your API call here
            data = await self._fetch_weather(city, units)
            return ToolResult(
                success=True,
                result={
                    "temperature": data["temp"],
                    "conditions": data["weather"][0]["description"],
                    "humidity": data["humidity"]
                },
                metadata={"source": "openweather", "cached": False}
            )
        except Exception as e:
            return ToolResult(
                success=False,
                error=str(e),
                error_code="WEATHER_API_ERROR"
            )

    async def _fetch_weather(self, city: str, units: str) -> Dict:
        # Implementation
        pass

# Register
rag.register_tool(WeatherTool(api_key="your-key"))

Tool with Schema Validation

from pydantic import BaseModel, Field

class DatabaseQueryInput(BaseModel):
    """Input schema for database queries."""
    table: str = Field(..., description="Table name to query")
    columns: list[str] = Field(default=["*"], description="Columns to select")
    where: str = Field(default="", description="WHERE clause (optional)")
    limit: int = Field(default=100, ge=1, le=1000, description="Max results")

class DatabaseTool(BaseTool):
    """Execute safe database queries."""
    
    input_schema = DatabaseQueryInput
    
    async def execute(
        self,
        table: str,
        columns: list[str] = ["*"],
        where: str = "",
        limit: int = 100
    ) -> ToolResult:
        # Validate table exists
        if table not in self.allowed_tables:
            return ToolResult(
                success=False,
                error=f"Table '{table}' not in allowed list",
                error_code="INVALID_TABLE"
            )
        
        # Safe query execution
        try:
            results = await self.db.fetch(table, columns, where, limit)
            return ToolResult(
                success=True,
                result=results,
                metadata={"row_count": len(results)}
            )
        except Exception as e:
            return ToolResult(
                success=False,
                error=f"Query failed: {str(e)}",
                error_code="QUERY_ERROR"
            )

Tool with Caching

from agentic_rag.utils.cache import Cache

class CachedAPITool(BaseTool):
    """Tool with built-in caching."""
    
    def __init__(self):
        super().__init__(...)
        self.cache = Cache(ttl=3600)  # 1 hour cache
    
    async def execute(self, query: str, **kwargs) -> ToolResult:
        # Check cache
        cache_key = f"api:{hash(query)}"
        if cached := await self.cache.get(cache_key):
            return ToolResult(success=True, result=cached, cached=True)
        
        # Fetch and cache
        result = await self._fetch(query)
        await self.cache.set(cache_key, result)
        return ToolResult(success=True, result=result, cached=False)

Tool Factory Registration

from agentic_rag.factories import register_tool

@register_tool("my_weather")
class WeatherTool(BaseTool):
    """Auto-registered weather tool."""
    
    def __init__(self, api_key: str = None, **kwargs):
        super().__init__(...)
        self.api_key = api_key or os.getenv("WEATHER_API_KEY")

# Create via factory
from agentic_rag.factories import create_tool
tool = create_tool("my_weather", api_key="...")

Multi-Step Tool

class ReportGeneratorTool(BaseTool):
    """Generate comprehensive reports from multiple sources."""
    
    async def execute(
        self,
        topic: str,
        depth: str = "comprehensive",
        format: str = "markdown"
    ) -> ToolResult:
        steps = []
        
        # Step 1: Web search
        search_results = await self._search_web(topic)
        steps.append(f"Searched web: {len(search_results)} sources")
        
        # Step 2: Vector store query
        docs = await self._query_documents(topic)
        steps.append(f"Queried docs: {len(docs)} documents")
        
        # Step 3: Synthesize
        report = await self._generate_report(
            topic, search_results, docs,
            depth=depth, format=format
        )
        steps.append("Generated report")
        
        return ToolResult(
            success=True,
            result={
                "report": report,
                "sources": len(search_results) + len(docs),
                "format": format
            },
            metadata={"steps": steps, "duration_ms": 2500}
        )

Why you need this: Because your use case is unique. We give you the building blocks to extend the system however you need - weather APIs, internal databases, custom calculations, whatever.

Recipe 18 — Monitoring Dashboard (analytics & observability)

Real-time web dashboard for monitoring your RAG system. Track queries, latency, success rates, and view recent interactions.

Enable Dashboard

from agentic_rag.server import create_app_with_dashboard

# Create app with dashboard
app = create_app_with_dashboard()

# Dashboard available at http://localhost:8000/dashboard

Record Queries for Analytics

from agentic_rag.server import record_rag_query
import time

# In your query handler
start = time.time()
response = await rag.aquery("What is machine learning?")
latency = (time.time() - start) * 1000

# Record for dashboard
record_rag_query(
    query="What is machine learning?",
    response=response.answer,
    latency_ms=latency,
    token_count=150,
    sources=[doc.id for doc in response.sources],
    confidence=response.confidence,
    success=True,
)

Dashboard Features

  • Real-time metrics: Query count, avg latency, success rate
  • Recent queries: View last 50 queries with latency and status
  • Performance trends: Latency and success rate over time
  • Auto-refresh: Updates every 30 seconds

Access URLs:

  • Dashboard UI: /dashboard
  • Metrics API: /api/dashboard/metrics
  • Query history: /api/dashboard/queries

Why you need this: Because flying blind in production is terrifying. See what's happening, spot issues before users do, and have data to optimize.

Recipe 17 — Conversation Memory (multi-turn chat)

Simple conversation memory for multi-turn chat sessions. Unlike complex hierarchical memory, this is purpose-built for maintaining chat context within token limits.

Buffer Memory (Simple)

from agentic_rag.core import ConversationBufferMemory

memory = ConversationBufferMemory(max_token_limit=4000)

# Add messages
memory.add_user_message("What is machine learning?")
memory.add_ai_message("Machine learning is a subset of AI...")
memory.add_user_message("Give me an example")
memory.add_ai_message("Sure! Spam detection is a classic example...")

# Get context for LLM
context = memory.to_list()
# Returns: [
#   {"role": "user", "content": "What is machine learning?"},
#   {"role": "assistant", "content": "Machine learning is a subset of AI..."},
#   ...
# ]

# Check token usage
tokens = memory.get_token_count()  # 450 tokens

Window Memory (Last N turns)

from agentic_rag.core import ConversationBufferWindowMemory

# Keep only last 5 conversation turns
memory = ConversationBufferWindowMemory(k=5)

# Older messages auto-removed

Summary Memory (Compress old context)

from agentic_rag.core import ConversationSummaryMemory

# Summarizes old messages to save tokens
memory = ConversationSummaryMemory(
    llm_provider=llm,
    max_token_limit=4000,
    summary_token_limit=500
)

# After many messages, early ones become:
# "Summary: User asked about ML, AI explained it's a subset of AI..."

Entity Memory (Track mentioned entities)

from agentic_rag.core import ConversationEntityMemory

# Automatically extracts and remembers entities
memory = ConversationEntityMemory(llm_provider=llm)

memory.add_user_message("Alice is working on the Q4 report.")
memory.add_ai_message("What is Alice's role?")
memory.add_user_message("She's the CTO.")

# Extracted entities
entities = memory.get_entities()
# {"alice": {"name": "Alice", "type": "Person", "role": "CTO"}}

With AgenticRAG

from agentic_rag import AgenticRAG
from agentic_rag.core import ConversationBufferMemory

memory = ConversationBufferMemory()
rag = AgenticRAG(vector_store=store, llm_provider=llm)

async def chat(user_input: str):
    # Get conversation context
    context = memory.to_list()
    
    # Add user message
    memory.add_user_message(user_input)
    
    # Query with context
    response = await rag.aquery(
        user_input,
        context_messages=context
    )
    
    # Store response
    memory.add_ai_message(response.answer)
    
    return response

Why you need this: Because users don't ask one question and leave. They have conversations. This keeps context without exploding your token budget.

Recipe 16 — ReAct Agent (reasoning + acting)

The ReAct pattern: think → act → observe → repeat. Our implementation combines reasoning and tool use in an interleaved loop.

Basic ReAct Agent

from agentic_rag.cognitive import ReActAgent
from agentic_rag.tools import WebSearchTool, CalculatorTool

agent = ReActAgent(
    llm_provider=llm,
    vector_store=store,
    tools=[WebSearchTool(), CalculatorTool()],
    max_iterations=5
)

result = await agent.query(
    "Find GDP of France and Germany, calculate the difference"
)
print(result.answer)
# "The difference between France's GDP ($2.78T) and Germany's GDP ($4.07T) is $1.29T"

# Full reasoning trace
for step in result.reasoning_trace:
    print(f"[{step.step_type}] {step.content}")

ReAct with Retrieval

# Vector store becomes a retrieval tool automatically
agent = ReActAgent(
    llm_provider=llm,
    vector_store=store,  # Adds implicit "retrieve" tool
    tools=[WebSearchTool()],
    max_iterations=5
)

result = await agent.query(
    "What does our Q4 report say about revenue? Also search for industry benchmarks."
)
# Combines internal documents + web search seamlessly

Why you need this: Sometimes you need to think, search, calculate, think again, and finally answer. ReAct makes this systematic and observable.

Recipe 15 — Parent-child chunking (provenance tracking)

Track which chunks came from which parent document. Essential for citations, expanded context, and hierarchical retrieval.

Chunk with Parent Tracking

from agentic_rag.document_processing.chunkers import (
    ParentChildChunker,
    FixedSizeChunker
)

chunker = ParentChildChunker(
    base_chunker=FixedSizeChunker(chunk_size=500),
    include_parent_metadata=True
)

# Chunk with tracking
parent = chunker.chunk_document(
    text="Long document text...",
    doc_id="doc_001",
    metadata={"source": "report.pdf", "author": "Alice"}
)

# Access children with full provenance
for child in parent.children:
    print(f"Chunk {child.child_index}/{child.total_children}")
    print(f"  Parent: {child.parent_doc_id}")
    print(f"  Citation: {child.to_citation()}")  # [report.pdf, chunk 1/5]
    print(f"  Position: chars {child.char_start}-{child.char_end}")

Hierarchical Retrieval

from agentic_rag.document_processing.chunkers import HierarchicalRetriever

# Index child chunks
for parent in parent_chunks:
    for child in parent.children:
        await vector_store.add(
            child.text,
            metadata={
                "parent_doc_id": child.parent_doc_id,
                "child_index": child.child_index,
            }
        )

# Retrieve with context expansion
retriever = HierarchicalRetriever(
    vector_store=store,
    parents=parent_chunks
)

chunks = await retriever.retrieve("revenue trends", top_k=3)
expanded = retriever.expand_context(chunks, window_size=1)
# Returns target chunks + neighboring siblings for context

Why you need this: Because "trust me bro" is not a valid citation. Know exactly which chunk came from which document, and retrieve siblings for expanded context when needed.

Recipe 14 — Bulk document loading (SimpleDirectoryReader)

Load entire directories with automatic file type detection. The missing piece between "I have a folder of documents" and "my RAG system works."

Load Entire Directory

from agentic_rag.document_processing.loaders import SimpleDirectoryReader

reader = SimpleDirectoryReader(
    input_dir="./data",
    recursive=True,
    exclude=["*.tmp", "*.log", "node_modules/*"]
)

# Load with progress tracking
def on_progress(file_path, current, total):
    print(f"Loaded {current}/{total}: {Path(file_path).name}")

documents = await reader.load(progress_callback=on_progress)
print(f"Loaded {len(documents)} documents from {total} files")

Auto File Type Detection

# Automatically detects and uses correct loader for each extension
# .pdf → PDFLoader, .docx → DocxLoader, .md → MarkdownLoader, etc.
reader = SimpleDirectoryReader("./mixed_documents")
docs = await reader.load()

# Each document has file metadata
doc = docs[0]
print(doc.metadata["file_path"])   # ./mixed_documents/report.pdf
print(doc.metadata["file_ext"])    # .pdf
print(doc.metadata["directory"]) # ./mixed_documents

With Custom Metadata

def add_source_metadata(file_path: str) -> dict:
    return {
        "department": "engineering",
        "processed_at": datetime.now().isoformat(),
        "filename": Path(file_path).name
    }

reader = SimpleDirectoryReader(
    "./data",
    file_metadata=add_source_metadata
)

Why you need this: Because loading documents one-by-one is tedious and you have better things to do. Point it at a folder, get documents back.

Recipe 13 — FastAPI server (production-ready API)

Basic Server

from agentic_rag.server import create_app

# One-liner production API
app = create_app()

# Run with uvicorn
# uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4

Custom Server Configuration

from agentic_rag.server import create_app, ServerConfig
from agentic_rag import AgenticRAG

# Configure components
config = ServerConfig(
    # RAG settings
    vector_store="chroma",
    llm_provider="openai",
    llm_model="gpt-4o",
    enable_agent=True,
    enable_memory=True,
    
    # API settings
    title="My RAG API",
    version="1.0.0",
    docs_url="/docs",
    
    # Security
    api_key_header="X-API-Key",
    require_auth=True,
    
    # Rate limiting
    rate_limit_requests=100,
    rate_limit_window=60,
    
    # CORS
    allowed_origins=["https://app.example.com"],
    
    # Features
    enable_streaming=True,
    enable_caching=True,
    enable_metrics=True
)

app = create_app(config=config)

API Endpoints Reference

Method Path Description Request Body
GET /health Health check -
GET /ready Readiness probe -
POST /query RAG query {"query": "...", "filters": {}}
POST /query/cognitive Full AC-RAG {"query": "...", "enable_reflection": true}
POST /stream SSE streaming {"query": "..."}
POST /stream/structured Structured events {"query": "...", "include_sources": true}
POST /ingest Document upload multipart/form-data
POST /ingest/text Text ingestion {"text": "...", "metadata": {}}
GET /documents List documents ?limit=10&offset=0
DELETE /documents/{id} Delete document -
GET /memory/stats Memory statistics -
POST /memory/clear Clear memory {"type": "episodic"}
GET /tools List available tools -
POST /tools/execute Execute tool {"tool": "...", "params": {}}
GET /metrics Prometheus metrics -

Query Endpoint Examples

import requests

# Basic query
response = requests.post(
    "http://localhost:8000/query",
    headers={"X-API-Key": "your-key"},
    json={
        "query": "What is machine learning?",
        "temperature": 0.7,
        "max_tokens": 500,
        "include_sources": True
    }
)
result = response.json()
print(result["answer"])
print(result["sources"])  # Source documents
print(result["confidence"])  # 0.0 - 1.0

# Cognitive query with full AC-RAG
response = requests.post(
    "http://localhost:8000/query/cognitive",
    json={
        "query": "Explain quantum computing applications",
        "enable_reflection": True,
        "enable_progressive_retrieval": True,
        "max_reflections": 3
    }
)
result = response.json()
print(result["answer"])
print(result["reflection_count"])  # Number of self-improvement loops
print(result["retrieval_attempts"])  # Progressive retrieval count

Streaming Endpoints

# Server-Sent Events (SSE)
curl -X POST http://localhost:8000/stream \
  -H "Content-Type: application/json" \
  -d '{"query": "Explain neural networks"}'

# Response: stream of text chunks
# data: {"type": "chunk", "content": "Neural"}
# data: {"type": "chunk", "content": " networks"}
# data: {"type": "complete"}
// JavaScript client
const eventSource = new EventSource('/stream?query=...');
eventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
    if (data.type === 'chunk') {
        document.getElementById('output').innerHTML += data.content;
    }
};

Document Ingestion

# Upload file
with open("document.pdf", "rb") as f:
    response = requests.post(
        "http://localhost:8000/ingest",
        headers={"X-API-Key": "your-key"},
        files={"file": ("document.pdf", f, "application/pdf")},
        data={
            "chunker": "semantic",
            "max_chunk_size": 1000,
            "metadata": '{"source": "upload", "user": "alice"}'
        }
    )
print(response.json()["document_ids"])  # List of inserted doc IDs

# Ingest text directly
response = requests.post(
    "http://localhost:8000/ingest/text",
    json={
        "text": "Your document content here...",
        "metadata": {"source": "api", "title": "My Doc"}
    }
)

Authentication & Security

from agentic_rag.server import AuthMiddleware

# API key authentication
app = create_app(
    auth_provider="api_key",
    api_keys=["key-1", "key-2", "key-3"]
)

# JWT authentication
app = create_app(
    auth_provider="jwt",
    jwt_secret="your-secret",
    jwt_algorithm="HS256"
)

# Custom auth
from agentic_rag.server import BaseAuthProvider

class CustomAuth(BaseAuthProvider):
    async def authenticate(self, request):
        token = request.headers.get("Authorization")
        # Your validation logic
        return {"user_id": "123", "roles": ["user"]}

app = create_app(auth_provider=CustomAuth())

Deployment Configuration

# docker-compose.yml
version: '3.8'
services:
  rag-api:
    image: ai-prishtina-agentic-rag:latest
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - VECTOR_STORE=chroma
      - ENABLE_AGENT=true
      - RATE_LIMIT=100/minute
    volumes:
      - ./data:/app/data
      - ./config.yaml:/app/config.yaml
    command: uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4

  redis:
    image: redis:alpine
    # For caching and session storage

Kubernetes Deployment

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: rag-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: rag-api
  template:
    metadata:
      labels:
        app: rag-api
    spec:
      containers:
      - name: api
        image: ai-prishtina-agentic-rag:latest
        ports:
        - containerPort: 8000
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: api-secrets
              key: openai-key
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "2Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000

Workflow Orchestration UI

from agentic_rag.server import create_app_with_workflow_ui

# Create app with workflow UI
app = create_app_with_workflow_ui()

# Access at http://localhost:8000/workflow/ui

The workflow UI provides:

  • Node Palette: Drag and drop nodes (loader, chunker, retriever, generator, tools)
  • Canvas: Visual pipeline builder with connections
  • Properties Panel: Configure each node's parameters
  • Execution Monitor: Run and debug workflows

Why you need this: Because not everyone wants to write Python. Sometimes you just need an API that works out of the box - with auth, rate limiting, streaming, workflow UI, and all the bells and whistles already baked in.


Test Coverage & Quality

We don't ship code we wouldn't use ourselves. That's why we maintain 99% test coverage across all modules.

Coverage by Module

  • Core functionality: 100% coverage
  • Cognitive layer: 99% coverage
  • Providers (18 LLM, 13 vector stores): 99% coverage
  • Document processing: 99% coverage
  • Contrib integrations: 99% coverage

Test Suite Stats

  • 131 test files (124 unit + 7 integration)
  • 280+ Python files tested
  • Advanced PDF backends: Full coverage for docling, camelot, unstructured, kreuzberg
  • Continuous integration on every commit
  • No regressions - ever

Running Tests

# Run all tests
pytest

# Run PDF loader tests specifically
pytest tests/unit/test_pdf_advanced_loaders.py -v
pytest tests/integration/test_pdf_backends_integration.py -v

# Run with coverage
pytest --cov=agentic_rag --cov-report=html

Why this matters: Because confidence comes from knowing your code works. We test everything from the happy path to the weird edge cases, so you can deploy with confidence.


Architecture

                    ┌─────────────────────────────────────┐
                    │        FastAPI Server / CLI         │
                    │  (Rate Limit · Auth · Guardrails    │
                    │   Multi-Tenancy · Feedback Loop)    │
                    └──────────────┬──────────────────────┘
                                   │
          ┌────────────────────────┼──────────────────────────┐
          │              AC-RAG Cognitive Layer               │
          │  ┌──────────┐ ┌──────────┐ ┌─────────────────┐    │
          │  │  Neural  │ │Reflective│ │  Hierarchical   │    │
          │  │  Router  │ │  Agent   │ │    Memory       │    │
          │  └──────────┘ └──────────┘ └─────────────────┘    │
          │  ┌───────────┐ ┌──────────┐ ┌─────────────────┐   │
          │  │Progressive│ │Calibrated│ │   Knowledge     │   │
          │  │ Retrieval │ │Confidence│ │    Fusion       │   │
          │  └───────────┘ └──────────┘ └─────────────────┘   │
          │  ┌───────────┐ ┌──────────┐ ┌─────────────────┐   │
          │  │   Neural  │ │   Tool   │ │  Multi-Agent    │   │
          │  │ Classifier│ │ Composer │ │   Orchestrator  │   │
          │  └───────────┘ └──────────┘ └─────────────────┘   │
          └─────────────────────┬─────────────────────────────┘
                                │
     ┌──────────────┬───────────┼──────────┬──────────────┐
     ▼              ▼           ▼          ▼              ▼
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ ┌──────────┐
│ Retrieval│ │   LLM    │ │  Graph   │ │ Tools  │ │  Cache / │
│ (Vector) │ │ Providers│ │   RAG    │ │(Search)│ │ Circuit  │
└──────────┘ └──────────┘ └──────────┘ └────────┘ │ Breaker  │
     │              │           │          │      └──────────┘
     ▼              ▼           ▼          ▼
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────────────────┐
│ Document │ │Evaluation│ │  Config  │ │   Docker / CI/CD   │
│Processing│ │& Metrics │ │(YAML/INI)│ │   Infrastructure   │
└──────────┘ └──────────┘ └──────────┘ └────────────────────┘

For detailed architecture docs: docs/01-architecture.md | AC-RAG vision: docs/vision-ac-rag.md

Configuration

Everything is externalized. No magic numbers hiding in the code. Load from YAML, INI, or env vars — or just use the defaults (they're pretty good).

from agentic_rag.utils.config import Config

cfg = Config.from_file("config.yaml")   # YAML
cfg = Config.from_ini("config.ini")     # INI
cfg = Config.from_env(prefix="AGENTIC_RAG_")  # Environment variables
cfg = Config()                           # Sensible defaults

See config.example.yaml and config.example.ini for complete examples.

Full configuration reference (click to expand)

Configuration Sections

llm — LLM Provider Settings

Parameter Default Options Description
provider openai openai, anthropic, cohere, local LLM provider
model gpt-3.5-turbo - Model name
api_key null - API key (or LLM_API_KEY env var)
base_url null - Custom API base URL
temperature 0.7 0.0–2.0 Sampling temperature
max_tokens 1000 - Max tokens to generate
timeout 30 - Request timeout (seconds)

vector_store — Vector Database

Parameter Default Options Description
provider chroma chroma, pinecone, weaviate, faiss Backend provider
collection_name agentic_rag - Collection/index name
persist_directory null - Persistence path (Chroma/FAISS)
embedding_model sentence-transformers/all-MiniLM-L6-v2 - HuggingFace embedding model
dimension 384 - Embedding dimension

document_processing — Chunking

Parameter Default Options Description
chunk_size 1000 - Token/character chunk size
chunk_overlap 200 - Overlap between chunks
chunking_strategy recursive fixed, semantic, recursive Chunking algorithm
enable_preprocessing true - Enable text preprocessing

retrieval — Search & Reranking

Parameter Default Description
top_k 5 Number of results to retrieve
similarity_threshold 0.7 Minimum similarity score (0.0–1.0)
enable_reranking true Enable cross-encoder reranking
reranker_model cross-encoder/ms-marco-MiniLM-L-6-v2 Reranker model name
enable_hybrid_search false Combine dense + keyword search
dense_weight 0.7 Weight for dense embeddings (0.0–1.0)
sparse_weight 0.3 Weight for sparse/BM25 (0.0–1.0)

agent — Agentic Capabilities

Parameter Default Description
enable_planning true Enable query planning
max_planning_steps 5 Max planning iterations
enable_memory true Enable working memory
memory_size 1000 Memory capacity (entries)
enable_tools true Enable tool integration
available_tools ["web_search", "calculator"] List of enabled tool names
tool_timeout 30 Tool execution timeout (seconds)

cognitive — AC-RAG Features

Parameter Default Description
enable_query_routing true Enable neural query routing
use_llm_classification false Use LLM vs DistilBERT for routing
routing_confidence_threshold 0.8 Router confidence cutoff (0.0–1.0)
rule_confidence_fallback 0.6 Fallback threshold when rules fail
enable_reflection true Enable reflective agent loop
confidence_threshold 0.75 Minimum answer confidence (0.0–1.0)
max_reflections 3 Max self-critique iterations
reflection_temperature 0.3 Temperature for reflection (0.0–1.0)
critique_context_window 1500 Tokens for critique context
regeneration_context_window 3000 Tokens for answer regeneration
episodic_max_entries 100 Episodic memory size
episodic_ttl_seconds 3600 Episodic memory TTL (seconds)
semantic_persist_path null Semantic memory storage path
procedural_persist_path null Procedural memory storage path
procedural_ema_alpha 0.3 EMA learning rate (0.0–1.0)
procedural_recall_threshold 0.3 Strategy recall threshold (0.0–1.0)
procedural_maturity_count 5 Uses before strategy matures
progressive_max_iterations 3 Progressive retrieval iterations
progressive_min_quality 0.6 Minimum retrieval quality (0.0–1.0)
progressive_reformulation_temperature 0.3 Query reformulation temperature
min_calibration_samples 30 Calibration training samples
calibration_learning_rate 0.01 Platt scaling learning rate
calibration_epochs 200 Calibration training epochs
confidence_length_normalizer 200 Length normalization factor
confidence_weights (map) Weights for confidence components

graph — Graph RAG

Parameter Default Options Description
extraction_method pattern pattern, spacy, llm Entity extraction method
spacy_model en_core_web_sm - spaCy model for NER
merge_similar_entities true - Deduplicate similar entities
similarity_threshold 0.85 - Entity merge threshold (0.0–1.0)
max_hops 2 - Max graph traversal depth
graph_top_k 10 - Max graph results

server — FastAPI Server

Parameter Default Description
host 0.0.0.0 Bind address
port 8000 Listen port
workers 2 Uvicorn worker processes
cors_origins ["*"] Allowed CORS origins
rate_limit_rpm 60 Rate limit (requests per minute)
log_level INFO Logging level
enable_auth false Enable API key/JWT auth
api_keys [] List of allowed API keys
jwt_secret null JWT signing secret
jwt_algorithm HS256 JWT algorithm

cache — Semantic Cache

Parameter Default Options Description
enabled false - Enable caching
backend memory memory, redis Cache backend
redis_url redis://localhost:6379/0 - Redis connection URL
ttl_seconds 3600 - Cache entry TTL (seconds)
similarity_threshold 0.92 - Semantic match threshold (0.0–1.0)
max_entries 10000 - Max cache size

guardrails — Safety & Output Control

Parameter Default Description
enable_pii_detection false Enable PII detection/redaction
enable_toxicity_filter false Enable toxic content filtering
max_output_length 5000 Max output character limit
blocked_terms [] List of blocked terms/strings

evaluation — Metrics & Benchmarking

Parameter Default Description
enable_evaluation false Enable evaluation metrics
metrics ["relevance", "faithfulness"] Metrics to compute
log_level INFO Evaluation logging level

Environment Variables

# API Keys
OPENAI_API_KEY=your_openai_key
ANTHROPIC_API_KEY=your_anthropic_key
COHERE_API_KEY=your_cohere_key
PINECONE_API_KEY=your_pinecone_key
SERP_API_KEY=your_search_api_key

# Config loading
AGENTIC_RAG_CONFIG_PATH=/path/to/config.yaml

Docker deployment

# One command to rule them all
docker-compose up -d

# Or if you prefer doing things the hard way
docker build -t agentic-rag .
docker run -p 8000:8000 -e OPENAI_API_KEY=sk-... agentic-rag

The server is agentic_rag.server.create_app (see Recipe 13 above). Multi-stage Dockerfile and docker-compose included in examples/docker/.

Testing

298+ tests and growing. Run them all or pick your battles:

# Run everything (grab a coffee)
pytest

# Run with coverage (grab two coffees)
pytest --cov=agentic_rag --cov-report=html

# Pick your battles
pytest tests/unit/test_tools.py             # Core tools
pytest tests/unit/test_cognitive.py         # AC-RAG modules
pytest tests/unit/test_graph*.py            # Graph RAG (all graph tests)
pytest tests/unit/test_chunkers*.py         # All chunking strategies
pytest tests/unit/test_loaders*.py          # Document loaders
pytest tests/unit/test_prompts.py           # Prompt management
pytest tests/integration/                   # Integration tests
pytest tests/unit/test_evaluation.py -v     # Benchmarks

Contributing

We don't bite. Contributions are welcome — from typo fixes to new vector store backends.

git clone https://github.com/albanmaxhuni/ai-prishtina-agentic-rag.git
cd ai-prishtina-agentic-rag
pip install -e .[dev]
pre-commit install
pytest  # Make sure everything passes before you start

Code quality: Black + isort (formatting), Flake8 (linting), mypy (type checking), pytest (tests). See CONTRIBUTING.md for branch conventions and review expectations.

License

Dual-licensed:

See the LICENSE for complete details.


Contact and links

Resource URL
Documentation ai-prishtina-agentic-rag.readthedocs.io
Issue tracker GitHub Issues
Discussions GitHub Discussions
Email info@albanmaxhuni.com

Maintained by the AI Prishtina project. Built on the shoulders of open-source giants and published RAG research.

Sponsor ongoing development:

· coff.ee/albanmaxhuni

or

· BTC: 3BfwQJ2dNTWDn98H5SggNC47fNX8HeWshP

BTC Wallet QR Code

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ai_prishtina_agentic_rag-1.0.4.tar.gz (519.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ai_prishtina_agentic_rag-1.0.4-py3-none-any.whl (603.1 kB view details)

Uploaded Python 3

File details

Details for the file ai_prishtina_agentic_rag-1.0.4.tar.gz.

File metadata

  • Download URL: ai_prishtina_agentic_rag-1.0.4.tar.gz
  • Upload date:
  • Size: 519.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.9

File hashes

Hashes for ai_prishtina_agentic_rag-1.0.4.tar.gz
Algorithm Hash digest
SHA256 46165a3d1e6dca8b3c971cf1387085f76156b809819cc9077d01e9288f6f3afb
MD5 7c51632bd5a284eed072f185985c8a65
BLAKE2b-256 f7700bc26e6689c0e69025989ac073efa5996e5af38ee46ba67e89fca7651386

See more details on using hashes here.

File details

Details for the file ai_prishtina_agentic_rag-1.0.4-py3-none-any.whl.

File metadata

File hashes

Hashes for ai_prishtina_agentic_rag-1.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 b2d07d75b93f4bb7e1c6b884bb55b92071216100460b0924c92bd08fc6fa3674
MD5 a1b469bc5d4b6812ad6c1d66432dfb40
BLAKE2b-256 f7815b43cb083f4171d78b73cd1a600dbbd9eb9236661bcf8c01d48a7441aeb7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page