A RAG (Retrieval-Augmented Generation) library for document processing and retrieval.
Project description
Insta RAG
Build production‑grade Retrieval‑Augmented Generation in minutes — not months.
Plug‑and‑play RAG that you configure, not hand‑wire.
Insta RAG (a.k.a. insta_rag) is a modular, configuration‑driven Python library for building advanced RAG pipelines. It abstracts document processing, embedding, and hybrid retrieval behind a clean client so you can ship faster — and tune later.
- Semantic Chunking → splits docs on topic boundaries to preserve context.
- Hybrid Retrieval → semantic vectors + BM25 keyword search.
- HyDE Query Transform → synthesizes hypothetical answers to improve recall.
- Reranking → optional integration with SOTA rerankers (e.g., Cohere) to reorder results.
- Pluggable by Design → swap chunkers, embedders, rerankers, and vector DBs.
- Hybrid Storage → keep Qdrant lean for vectors and use MongoDB for cheap, flexible content storage.
- Graph RAG (NEW) → Knowledge graph-based retrieval using Neo4j and Graphiti for structured entity/relationship extraction and discovery.
Contents
- Why Insta RAG
- Quick Start
- Concepts
- Configuration
- Core API
- Convenience "Rack" API
- Decorators (syntactic sugar)
- Advanced Retrieval Recipes
- Graph RAG (NEW)
- FastAPI Example
- CLI (preview)
- Guides & Docs
- Contributing
- Roadmap
- License
Why Insta RAG
Most RAG stacks feel like soldering a radio: a tangle of chunkers, embedders, retrievers, rerankers, and caches. Insta RAG makes it a plug‑and‑play client. Configure once, swap pieces at will, and keep the door open for the latest techniques.
┌──────────┐ ┌────────┐ ┌──────────┐ ┌───────────┐ ┌────────┐
│ Documents├─▶│Chunking │─▶│ Embedding│─▶│ Retrieval │─▶│ Rerank │─▶ Results
└──────────┘ └────────┘ └──────────┘ └───────────┘ └────────┘
^ ^ ^
pluggable pluggable pluggable
Quick Start
1) Install
# Recommended: using uv
uv pip install insta-rag
# Or with pip
pip install insta-rag
2) Minimal example
from insta_rag import RAGClient, RAGConfig, DocumentInput
# Load configuration from environment variables (.env supported)
config = RAGConfig.from_env()
client = RAGClient(config)
# 1) Add documents to a collection
client.add_documents(
[DocumentInput.from_text("Your first document content.")],
collection_name="my_docs",
)
# 2) Retrieve relevant information
resp = client.retrieve(
query="What is this document about?",
collection_name="my_docs",
)
# Print the top chunk
if resp.chunks:
print(resp.chunks[0].content)
Tip: Start simple. You can turn on HyDE, hybrid retrieval, and reranking later via config.
Concepts
- Collection: named corpus (e.g.,
"my_docs"). - Chunker: splits raw docs into semantically coherent chunks.
- Embedder: turns chunks into vectors for semantic lookup.
- Retriever: finds candidates using vector search, BM25, or both.
- Reranker: reorders candidates using a cross‑encoder (optional).
- Rack: shorthand in this project for your knowledge base.
Configuration
Declare your stack in a .env or environment variables. Common options:
# Vector store
INSTA_RAG_QDRANT_URL=https://your-qdrant:6333
INSTA_RAG_QDRANT_API_KEY=...
# Hybrid storage (optional)
INSTA_RAG_MONGODB_URI=mongodb+srv://...
INSTA_RAG_MONGODB_DB=insta_rag
# Embeddings / LLMs
INSTA_RAG_EMBED_MODEL=text-embedding-3-large
OPENAI_API_KEY=...
# HyDE
INSTA_RAG_HYDE_ENABLED=true
INSTA_RAG_HYDE_MODEL=gpt-4o-mini
# Hybrid retrieval
INSTA_RAG_HYBRID_ENABLED=true
INSTA_RAG_BM25_WEIGHT=0.35
INSTA_RAG_VECTOR_WEIGHT=0.65
# Reranking (optional)
INSTA_RAG_RERANKER=cohere-rerank-3
COHERE_API_KEY=...
# Other
INSTA_RAG_DEFAULT_COLLECTION=my_docs
See Guides & Docs for a full catalog of settings.
Core API
from insta_rag import RAGClient, RAGConfig, DocumentInput
config = RAGConfig.from_env()
client = RAGClient(config)
# Add
docs = [
DocumentInput.from_text(
"Payments: To get a refund, contact support within 30 days.",
metadata={"source": "faq.md"},
),
]
client.add_documents(docs, collection_name="my_docs")
# Retrieve
resp = client.retrieve(
query="How do I get a refund?",
collection_name="my_docs",
k=8, # number of candidates
use_hyde=True, # HyDE query transformation
use_hybrid=True, # BM25 + vectors
rerank=True, # apply reranker if configured
)
for ch in resp.chunks:
print(f"score={ch.score:.3f}", ch.content[:80])
Convenience “Rack” API
For teams that want ultra‑simple, CRUD‑style operations on the knowledge base, Insta RAG ships a tiny convenience layer that wraps the core client methods. (It’s sugar; you can ignore it.)
from insta_rag import RAGClient, RAGConfig
from insta_rag.rack import Rack # sugar over client.add/update/remove
client = RAGClient(RAGConfig.from_env())
rack = Rack(client, collection="my_docs")
# Push (create)
rack.push(
id="doc-1",
text="Return policy: 30‑day refunds via support@acme.com",
metadata={"source": "policy.pdf", "lang": "en"},
)
# Update (replace text)
rack.update(id="doc-1", text="Return policy updated: 45 days.")
# Remove
rack.remove(id="doc-1")
# Ask (retrieve only; you format the answer)
chunks = rack.ask("What is the return window?", k=5)
print(chunks[0].content)
Decorators (syntactic sugar)
Prefer functions over boilerplate? Use decorators to bind a collection and configure retrieval at the call site. These live in insta_rag.decorators and are optional.
from insta_rag import RAGClient, RAGConfig
from insta_rag.decorators import rack, use_retrieval
client = RAGClient(RAGConfig.from_env())
@rack(client, collection="my_docs") # binds the knowledge base
@use_retrieval(hyde=True, hybrid=True, k=8, rerank=True)
def top_chunk(query, retrieve):
"""retrieve is injected: chunks = retrieve(query)"""
chunks = retrieve(query)
return chunks[0]
best = top_chunk("Summarize the refund policy")
print(best.content)
The decorator layer is intentionally thin so you can remove it without touching your business logic.
Advanced Retrieval Recipes
1) Metadata filtering
resp = client.retrieve(
query="refunds",
collection_name="my_docs",
filters={"lang": "en", "source": {"$in": ["policy.pdf", "faq.md"]}},
)
2) Balanced hybrid retrieval
resp = client.retrieve(
query="PCI requirements for card storage",
collection_name="my_docs",
use_hybrid=True,
bm25_weight=0.5,
vector_weight=0.5,
)
3) HyDE + rerank for long‑tail questions
resp = client.retrieve(
query="Could I still cancel after partial shipment?",
collection_name="my_docs",
use_hyde=True,
rerank=True,
k=12,
)
Graph RAG (NEW)
Graph RAG extracts entities and relationships from documents to build a knowledge graph in Neo4j. Perfect for discovering connections, understanding context, and answering relationship-based questions.
When to Use Graph RAG
- Complex knowledge with many interconnected entities (e.g., organizations, people, locations)
- Need explicit entity/relationship extraction and discovery
- Temporal awareness (when facts became relevant or expired)
- Natural language queries like "Who works at X?" or "What are Y's relationships?"
Quick Start with Graph RAG
1) Configure Neo4j
Add to .env:
# Neo4j Graph Database
NEO4J_URI=bolt://localhost:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=your_password
NEO4J_DATABASE=neo4j
# Graphiti (Entity Extraction)
GRAPHITI_LLM_MODEL=gpt-4.1
GRAPHITI_EMBEDDING_MODEL=text-embedding-3-large
2) Basic Usage
import asyncio
from insta_rag.graph_rag import GraphRAGClient
from insta_rag import DocumentInput
async def main():
# Initialize Graph RAG (async-only)
client = GraphRAGClient()
await client.initialize()
try:
# Add documents and extract entities/relationships
docs = [
DocumentInput.from_text("Alice works at TechCorp as a Senior Engineer"),
DocumentInput.from_text("TechCorp builds AI products for enterprises"),
]
results = await client.add_documents(
docs,
collection_name="company_knowledge"
)
print(f"✓ Extracted {results[0].nodes_created} entities, {results[0].edges_created} relationships")
# Query the knowledge graph
retrieval = await client.retrieve(
query="Who works at TechCorp?",
collection_name="company_knowledge",
k=10
)
print(f"\n📊 Found {len(retrieval.edges)} facts:")
for fact in retrieval.edges:
print(f" • {fact.fact}")
finally:
await client.close()
asyncio.run(main())
3) Using as Context Manager
async with GraphRAGClient() as client:
# Automatically handles initialization and cleanup
results = await client.add_documents(docs, "knowledge")
retrieval = await client.retrieve("query", "knowledge", k=5)
4) Combining Vector RAG + Graph RAG
from insta_rag import RAGClient, RAGConfig
from insta_rag.graph_rag import GraphRAGClient
async def hybrid_retrieval():
# Both systems can coexist independently
vector_client = RAGClient(RAGConfig.from_env()) # Sync
async with GraphRAGClient() as graph_client: # Async
# Vector search for semantic similarity
vector_results = vector_client.retrieve(
query="AI products",
collection_name="docs",
k=10
)
# Graph search for relationships
graph_results = await graph_client.retrieve(
query="Who works at TechCorp?",
collection_name="company",
k=10
)
# Combine insights
print(f"Vector results: {len(vector_results.chunks)} chunks")
print(f"Graph results: {len(graph_results.edges)} facts")
Graph RAG API Reference
| Method | Purpose |
|---|---|
await client.initialize() |
Connect to Neo4j and setup indices |
await client.add_documents(docs, collection_name) |
Extract entities/relationships and add to graph |
await client.retrieve(query, collection_name, k) |
Search graph using hybrid semantic + BM25 |
await client.retrieve_with_reranking(query, collection_name, center_node) |
Retrieve with distance-based reranking from center node |
await client.get_entity_context(entity_name, collection_name, depth) |
Get entity and related facts (up to depth levels) |
await client.close() |
Cleanup Neo4j connection |
Graph RAG vs Vector RAG
| Aspect | Vector RAG | Graph RAG |
|---|---|---|
| Storage | Qdrant vectors | Neo4j graph database |
| Client | RAGClient (sync) |
GraphRAGClient (async) |
| Retrieval | Semantic similarity | Fact/relationship queries |
| Entity Extraction | Not explicit | LLM-driven, explicit |
| Use Cases | General similarity search | Structured knowledge discovery |
| Best For | Content search | Relationship queries |
Async Document Processing (NEW)
Async Processing with Celery allows you to submit documents for Graph RAG processing without blocking your API, enabling horizontal scaling and better resource management.
When to Use Async Processing
- Processing large documents that take long to extract entities/relationships
- Building responsive APIs that should return immediately with task IDs
- Scaling horizontally with multiple workers
- Monitoring task progress in real-time
- Retrying failed tasks automatically
Quick Start with Celery
1) Configure Redis
Add to .env:
# Redis Configuration (for Celery async task processing)
CELERY_BROKER_URL=redis://default:your_password@your_host:6379/0
CELERY_RESULT_BACKEND=redis://default:your_password@your_host:6379/1
2) Start Workers
# Single worker with 4 concurrent tasks
celery -A insta_rag.celery_app worker -l debug -Q default -c 4
# Or use the library function to start multiple workers
python3 -c "from insta_rag import start_worker_pool; start_worker_pool(num_workers=2, concurrency_per_worker=4)"
3) Submit Documents Asynchronously
import asyncio
from insta_rag.graph_rag import GraphRAGClient
from insta_rag import DocumentInput
async def main():
async with GraphRAGClient() as client:
await client.initialize()
docs = [DocumentInput.from_text("Alice works at TechCorp")]
# Submit without waiting (returns immediately with task_id)
task_id = await client.submit_add_documents_async(
docs,
collection_name="company"
)
print(f"Task submitted: {task_id}")
# Check status anytime
from insta_rag.task_monitoring import get_task_monitoring
monitor = get_task_monitoring()
status = monitor.get_task_status(task_id)
print(f"Task status: {status}") # PENDING, STARTED, SUCCESS, or FAILURE
asyncio.run(main())
4) Monitor Tasks
from insta_rag.task_monitoring import get_task_monitoring
monitor = get_task_monitoring()
# Get task status
status = monitor.get_task_status(task_id)
# Get task result (when complete)
if status == "SUCCESS":
result = monitor.get_task_result(task_id)
print(result)
# Get queue depth
queue_length = monitor.get_queue_length()
print(f"Pending tasks: {queue_length}")
5) Scale Workers Horizontally
from insta_rag import scale_pool, get_pool_status, auto_scale_if_needed
# Scale to specific number
scale_pool(target_workers=4)
# Get pool status
status = get_pool_status()
print(f"Active workers: {status['active_workers']}")
# Auto-scale based on queue depth
auto_scale_if_needed(queue_depth_threshold=10, min_workers=1, max_workers=8)
FastAPI Integration
from fastapi import FastAPI
from insta_rag import start_worker_pool, stop_worker_pool, DocumentInput
from insta_rag.graph_rag import GraphRAGClient
from insta_rag.task_monitoring import get_task_monitoring
app = FastAPI()
@app.on_event("startup")
async def startup():
# Auto-start worker pool
start_worker_pool(num_workers=2, concurrency_per_worker=4)
@app.on_event("shutdown")
async def shutdown():
# Auto-stop worker pool
stop_worker_pool()
@app.post("/graph-rag/add-documents")
async def add_documents(documents: list[DocumentInput]):
"""Submit documents for async processing (non-blocking)."""
async with GraphRAGClient() as client:
await client.initialize()
task_id = await client.submit_add_documents_async(
documents,
collection_name="documents"
)
return {"task_id": task_id, "status": "submitted"}
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
"""Get status and results of a task."""
monitor = get_task_monitoring()
status = monitor.get_task_status(task_id)
response = {"task_id": task_id, "status": status}
if status == "SUCCESS":
response["result"] = monitor.get_task_result(task_id)
return response
Architecture
Document Submission (FastAPI)
↓
Celery Task Queue (Redis)
↓
Workers (multiple processes)
↓
Graph RAG Processing
↓
Results stored in Redis
↓
Task Status Polling (FastAPI)
↓
Results Retrieved
FastAPI Example
from fastapi import FastAPI, Query
from insta_rag import RAGClient, RAGConfig
app = FastAPI()
rag = RAGClient(RAGConfig.from_env())
@app.get("/ask")
async def ask(query: str = Query(...), collection: str = "my_docs"):
resp = rag.retrieve(query=query, collection_name=collection, use_hyde=True, use_hybrid=True, rerank=True)
return {
"matches": [
{"score": ch.score, "content": ch.content, "metadata": ch.metadata}
for ch in resp.chunks
]
}
CLI (preview)
Optional add‑on for simple ops. Install with
pip install insta-rag[cli].
# Ingest
insta-rag add --collection my_docs ./data/*.pdf
# Update by id
insta-rag update --collection my_docs --id doc-1 --file updated.txt
# Remove by id
insta-rag remove --collection my_docs --id doc-1
# Ask (JSON response)
insta-rag ask --collection my_docs --query "What is the refund window?"
Guides & Docs
- Installation Guide – Python versions, optional extras, uv vs pip
- Quickstart – end‑to‑end in 5 minutes
- Document Management – ingestion patterns, chunking strategies
- Advanced Retrieval – hybrid knobs, HyDE, reranking, filters
- Storage Backends – Qdrant setup, MongoDB sizing tips
Looking for something specific? See the Full Documentation (link your site here).
Contributing
We welcome contributions! Please check out the Contributing Guide for:
- Dev environment setup (
uv,poetry, orpip) - Code quality:
ruff,black,mypy,pytest,pre-commit - Commit conventions: Conventional Commits
- Branching model:
main(stable) /develop(active) - Versioning: SemVer
- PR checklist & CI matrix
Roadmap
Implemented
- Graph RAG – Knowledge graph-based retrieval with Neo4j and Graphiti
- Hybrid Storage – Qdrant vectors + MongoDB content
- Hybrid Retrieval – Semantic + BM25 search
- HyDE & Reranking – Query transformation and SOTA reranking
- Async Processing – Celery + Redis for non-blocking document ingestion and horizontal scaling
Coming Soon (Phase 2+)
- Graph RAG Scoring – Semantic similarity + BM25 for edges
- Built‑in summarization & answer synthesis helpers
- More rerankers (open‑source options)
- CLI GA
- LangChain/LlamaIndex adapters
- Streaming & tracing hooks (OpenTelemetry)
- Native PDF/HTML loaders with auto‑chunk profiles
- Task persistence and recovery
- Advanced scheduling and cron job support
Documentation
For detailed guides on installation, configuration, and advanced features, please see the Full Documentation.
Key sections include:
Contributing
We welcome contributions! Please see our Contributing Guide for details on:
- Setting up your development environment
- Code quality tools and pre-commit hooks
- Commit and branch naming conventions
- Version management
- Pull request process
License
This project is licensed under the MIT License.
Shout‑outs
Insta RAG packages the most effective, modern RAG techniques into a clean DX. You focus on your product; we keep the rack updated as the ecosystem evolves. lets rock
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file insta_rag-0.1.1b5.tar.gz.
File metadata
- Download URL: insta_rag-0.1.1b5.tar.gz
- Upload date:
- Size: 63.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.11 {"installer":{"name":"uv","version":"0.9.11"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
078892903d4b0e13e00975ff9e2cc8ef4a9f49737fece8665a61d2a15ee670bc
|
|
| MD5 |
cb63ef556821c3efd89e7ba402f4dd19
|
|
| BLAKE2b-256 |
7e7bd4783a2617def6dcc02daac1073c2822fabab0c0ae672b116d2fcff91466
|
File details
Details for the file insta_rag-0.1.1b5-py3-none-any.whl.
File metadata
- Download URL: insta_rag-0.1.1b5-py3-none-any.whl
- Upload date:
- Size: 79.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.11 {"installer":{"name":"uv","version":"0.9.11"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
eab0d24d5a4a2ec659acdd3d8ad4fca60cfa0a75eb879158bd94e904e068f610
|
|
| MD5 |
d828eb3a0cf7ce5a576108ae31056a81
|
|
| BLAKE2b-256 |
d9914528d743dd4b7f4fcd69e2222f81a4769b5fe105c5f02ded0f12001cd6d2
|