Unified document parsing, structured extraction, vector ingestion, and RAG pipeline SDK
Project description
docpipe
Unified document parsing, structured extraction, vector ingestion, and RAG pipeline SDK.
PyPI vs
main: PyPI ships v0.4.5. The latest on GitHubmainaddsdocpipe.query()(renamed fromdocpipe.rag()), optional turbovec, OpenTelemetry/Prometheus observability,/generate, and richer/health. Install from git until the next release:pip install "git+https://github.com/thesunnysinha/docpipe.git@main#egg=docpipe-sdk[server,observability]"
Overview
docpipe connects document parsing (Docling / GLM-OCR), LLM-based structured extraction (LangExtract + LangChain), vector ingestion (pgvector), and RAG querying into a single composable pipeline.
Four independent pipelines, composable together:
- Parse — Unstructured docs → parsed text/markdown via Docling or GLM-OCR
- Extract — Text → structured entities via LLM (LangExtract or LangChain)
- Ingest — Parsed chunks → embeddings → your vector DB (LangChain + pgvector)
- RAG — Questions → grounded answers with source citations (6 retrieval strategies)
docpipe never stores your data. It connects to your infrastructure and gets out of the way.
Install
pip install docpipe-sdk # Core only
pip install "docpipe-sdk[docling]" # + Document parsing via Docling (PDF, DOCX, images, ...)
pip install "docpipe-sdk[glm-ocr]" # + Document parsing via GLM-OCR (state-of-the-art OCR)
pip install "docpipe-sdk[langextract]" # + Google LangExtract
pip install "docpipe-sdk[openai]" # + OpenAI embeddings & LLM
pip install "docpipe-sdk[anthropic]" # + Anthropic Claude
pip install "docpipe-sdk[google]" # + Google Gemini
pip install "docpipe-sdk[ollama]" # + Ollama (local models)
pip install "docpipe-sdk[huggingface]" # + HuggingFace embeddings
pip install "docpipe-sdk[pgvector]" # + PostgreSQL vector store (default)
pip install "docpipe-sdk[turbovec]" # + Optional local turbovec file indices
pip install "docpipe-sdk[rag]" # + Hybrid search (BM25 + langchain-classic)
pip install "docpipe-sdk[rerank]" # + Local reranking (FlashRank)
pip install "docpipe-sdk[server]" # + FastAPI server
pip install "docpipe-sdk[observability]" # + OpenTelemetry OTLP export
pip install "docpipe-sdk[http]" # + httpx client (`docpipe.http.DocpipeClient`)
pip install "docpipe-sdk[all]" # All extras except turbovec & huggingface (install those separately)
Install latest main (features not yet on PyPI):
pip install "git+https://github.com/thesunnysinha/docpipe.git@main#egg=docpipe-sdk[all,turbovec,observability,http]"
Quick Start
Parse a document
import docpipe
# Default: Docling parser
doc = docpipe.parse("invoice.pdf")
print(doc.markdown)
print(doc.text)
# GLM-OCR parser (state-of-the-art OCR, best for scanned/image-heavy docs)
doc = docpipe.parse("scanned_report.pdf", parser="glm-ocr")
print(doc.markdown)
Extract structured data
schema = docpipe.ExtractionSchema(
description="Extract invoice line items with amounts",
model_id="gemini-2.5-flash",
)
results = docpipe.extract(doc.text, schema)
for r in results:
print(r.entity_class, r.text, r.attributes)
Full parse + extract pipeline
result = docpipe.run("invoice.pdf", schema)
print(result.parsed.markdown)
print(result.extractions)
Ingest into your vector DB
config = docpipe.IngestionConfig(
connection_string="postgresql://user:pass@localhost:5432/mydb",
table_name="invoices",
embedding_provider="openai",
embedding_model="text-embedding-3-small",
)
docpipe.ingest("invoice.pdf", config=config)
Incremental ingestion (skip unchanged files)
config = docpipe.IngestionConfig(
...,
incremental=True, # skips files already in the DB by SHA-256 hash
)
docpipe.ingest("invoice.pdf", config=config)
# → Skipped 'invoice.pdf' (unchanged, incremental mode)
Optional turbovec backend (local file indices)
By default docpipe uses pgvector in your PostgreSQL database. For standalone or edge deployments where you want a compressed on-disk index (no Postgres for vectors), install the turbovec extra:
pip install "docpipe-sdk[turbovec,openai]" # or your embedding provider extra
Set the backend via environment or per request (vector_backend on ingest/search/RAG bodies):
export DOCPIPE_VECTOR_BACKEND=turbovec
export DOCPIPE_TURBVEC_INDEX_DIR=./.docpipe/indices # default
config = docpipe.IngestionConfig(
connection_string="postgresql://unused", # still accepted; ignored for vectors
table_name="my_library", # used as the on-disk index folder name
embedding_provider="openai",
embedding_model="text-embedding-3-small",
vector_backend="turbovec",
)
docpipe.ingest("invoice.pdf", config=config)
# → writes ./.docpipe/indices/my_library/index.tvim + docstore.json
When to use: local prototypes, air-gapped RAG, or memory-constrained search without running pgvector. Jingo and other production Postgres deployments should keep the default pgvector backend.
RAG — ask questions against your documents
Use docpipe.query() for programmatic RAG (v0.4.5 on PyPI exposed docpipe.rag(), which shadowed the docpipe.rag package — removed on main).
rag_config = docpipe.RAGConfig(
connection_string="postgresql://user:pass@localhost:5432/mydb",
table_name="invoices",
embedding_provider="openai",
embedding_model="text-embedding-3-small",
llm_provider="openai",
llm_model="gpt-4o",
strategy="hyde", # naive | hyde | multi_query | parent_document | hybrid | auto
)
result = docpipe.query("What is the total amount on the invoice?", config=rag_config)
print(result.answer) # grounded answer with inline citations
print(result.sources) # ["invoice.pdf"]
print(result.chunks) # retrieved chunks with scores
Structured RAG output
from pydantic import BaseModel
class InvoiceSummary(BaseModel):
total: float
currency: str
vendor: str
result = docpipe.query(
"Summarize the invoice",
config=docpipe.RAGConfig(..., output_model=InvoiceSummary),
)
summary = result.structured # InvoiceSummary(total=4250.0, currency='USD', vendor='Acme')
With reranking
rag_config = docpipe.RAGConfig(
...,
strategy="naive",
reranker="flashrank", # local, no API key (pip install "docpipe-sdk[rerank]")
rerank_top_n=5,
)
Evaluate RAG quality
from docpipe import EvalConfig, EvalQuestion, EvalPipeline
questions = [
EvalQuestion(
question="What is the invoice total?",
expected_answer="$4,250",
expected_sources=["invoice.pdf"],
),
]
cfg = EvalConfig(rag_config=rag_config, questions=questions,
metrics=["hit_rate", "answer_similarity"])
result = EvalPipeline(cfg).run()
print(result.metrics.hit_rate) # 0.9
print(result.metrics.answer_similarity) # 0.85
RAG Strategies
| Strategy | How it works | Best for |
|---|---|---|
naive |
Vector similarity search | Well-formed queries, fast responses |
hyde |
LLM generates hypothetical answer → embed → retrieve | Complex / technical queries (highest accuracy) |
multi_query |
Expand into N query variants → union results | Vague or short queries |
parent_document |
Retrieve seed chunks → expand context by source | Long documents, context coherence |
hybrid |
Dense vector + BM25 keyword via EnsembleRetriever | Exact terms, proper nouns, IDs |
auto |
LLM classifies question → dispatches to optimal strategy | Mixed workloads, unknown query types |
CLI
# Parse
docpipe parse invoice.pdf --format markdown
# Extract
docpipe extract "some text" --schema schema.yaml --model gemini-2.5-flash
# Ingest (with incremental mode)
docpipe ingest invoice.pdf \
--db "postgresql://..." --table invoices \
--embedding-provider openai --embedding-model text-embedding-3-small \
--incremental
# RAG query
docpipe rag query "What is the total?" \
--db "postgresql://..." --table invoices \
--strategy hyde \
--llm-provider openai --llm-model gpt-4o \
--embedding-provider openai --embedding-model text-embedding-3-small \
--reranker flashrank
# Evaluate RAG quality
docpipe evaluate run \
--questions qa.json \
--db "postgresql://..." --table invoices \
--llm-provider openai --llm-model gpt-4o \
--embedding-provider openai --embedding-model text-embedding-3-small \
--metrics hit_rate,answer_similarity
# Start API server
docpipe serve --port 8000
# List installed plugins
docpipe plugins list
qa.json format for evaluation
[
{
"question": "What is the invoice total?",
"expected_answer": "$4,250",
"expected_sources": ["invoice.pdf"]
}
]
API Server
Start the FastAPI server:
docpipe serve --host 0.0.0.0 --port 8000
Endpoints:
| Method | Path | Description |
|---|---|---|
GET |
/health |
Health check, plugins, dependency status |
GET |
/metrics |
Prometheus metrics (no auth) |
POST |
/parse |
Parse a document |
POST |
/extract |
Extract structured data |
POST |
/run |
Parse + extract |
POST |
/ingest |
Ingest into vector DB |
DELETE |
/ingest |
Remove all chunks for a source document |
POST |
/search |
Vector similarity search (supports filters) |
POST |
/rag/query |
RAG question answering (supports history, filters) |
POST |
/rag/stream |
Streaming RAG via Server-Sent Events (SSE) |
POST |
/generate |
Plain LLM completion (no retrieval) |
POST |
/evaluate/run |
Evaluate RAG quality |
GET |
/plugins |
List registered plugins |
Conversation history
Pass prior turns to /rag/query or /rag/stream for multi-turn RAG:
history = [
{"role": "user", "content": "What is docpipe?"},
{"role": "assistant", "content": "docpipe is a document processing SDK..."},
]
response = requests.post(f"{BASE}/rag/query", json={..., "history": history})
Metadata filtering
Filter retrieved chunks by document metadata on /search, /rag/query, and /rag/stream:
requests.post(f"{BASE}/rag/query", json={..., "filters": {"source": "report.pdf"}})
Streaming (SSE)
Stream token-by-token answers from /rag/stream:
import sseclient, requests
resp = requests.post(f"{BASE}/rag/stream", json={...}, stream=True)
for event in sseclient.SSEClient(resp):
if event.data == "[DONE]":
break
if event.event == "metadata":
continue # optional: parse usage JSON before [DONE]
print(event.data, end="", flush=True)
Before data: [DONE], the server may emit a non-breaking metadata event:
event: metadata
data: {"type":"usage","usage":{"input_tokens":123,"output_tokens":45,"total_tokens":168}}
/rag/query includes the same usage object on the JSON body when the provider returns token counts.
Observability
Install optional extras:
pip install "docpipe-sdk[server,observability]"
| Variable | Default | Purpose |
|---|---|---|
DOCPIPE_OTEL_ENABLED |
false |
Export traces via OTLP/HTTP |
DOCPIPE_OTEL_SERVICE_NAME |
docpipe |
service.name resource |
DOCPIPE_OTEL_EXPORTER_OTLP_ENDPOINT |
— | e.g. http://localhost:4318/v1/traces |
DOCPIPE_OTEL_EXPORTER_OTLP_HEADERS |
— | Optional OTLP auth (key=value, comma-separated) |
DOCPIPE_OTEL_TRACES_SAMPLER |
parentbased_traceidratio |
OpenTelemetry sampler name |
DOCPIPE_OTEL_TRACES_SAMPLER_ARG |
1.0 |
Trace sample ratio (0.0–1.0) |
OTEL_SEMCONV_STABILITY_OPT_IN |
— | Set to gen_ai_latest_experimental for GenAI semconv |
DOCPIPE_LOG_LEVEL |
INFO |
Logging level |
DOCPIPE_LOG_FORMAT |
text |
json for one JSON object per log line |
DOCPIPE_HEALTH_CHECK_DB |
true |
SELECT 1 when DOCPIPE_DB_CONNECTION_STRING is set |
DOCPIPE_HEALTH_CHECK_EMBEDDING |
false |
Optional embed probe |
DOCPIPE_VECTOR_BACKEND |
pgvector |
pgvector or turbovec (server default) |
DOCPIPE_TURBVEC_INDEX_DIR |
.docpipe/indices |
On-disk turbovec index root |
DOCPIPE_TURBVEC_BIT_WIDTH |
4 |
turbovec quantization bit width |
DOCPIPE_ALLOW_PRIVATE_URLS |
false |
Allow ingest sources on private IPs (Docker/MinIO) |
DOCPIPE_AUTH_ENABLED |
true |
HTTP Basic Auth on API routes |
DOCPIPE_USERNAME / DOCPIPE_PASSWORD |
admin / docpipe |
Basic Auth credentials |
See .env.example for a full template used by docker compose.
Local OTLP (Jaeger all-in-one):
docker run -d --name jaeger \
-p 16686:16686 -p 4318:4318 \
jaegertracing/all-in-one:latest
export DOCPIPE_OTEL_ENABLED=true
export DOCPIPE_OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318/v1/traces
docpipe serve
Scrape Prometheus at GET /metrics. Error responses increment docpipe_errors_total with error_type and phase labels.
Python HTTP client
pip install "docpipe-sdk[http]"
from docpipe.http import DocpipeClient
with DocpipeClient("http://localhost:8000", username="admin", password="docpipe") as client:
print(client.health())
result = client.rag_query({...})
print(result.get("usage"))
Plain LLM completion
Call any configured LLM provider without retrieval:
response = requests.post(f"{BASE}/generate", json={
"prompt": "Generate a 5-word title for a document about photosynthesis",
"llm_provider": "openai",
"llm_model": "gpt-4o-mini",
"api_key": "sk-...", # optional — falls back to server env var
})
print(response.json()["content"])
| Field | Type | Required | Description |
|---|---|---|---|
prompt |
str | ✓ | The prompt to send to the LLM |
llm_provider |
str | ✓ | Provider name (openai, anthropic, google, ollama) |
llm_model |
str | ✓ | Model name (e.g. gpt-4o-mini, claude-3-5-haiku-latest) |
api_key |
str | — | Per-request API key; overrides server-level env var |
Google (Gemini) embedding models
Google retired models/embedding-001 on the Gemini API (v1beta returns 404 NOT_FOUND).
Use one of these model IDs with embedding_provider="google":
| Model | Notes |
|---|---|
models/text-embedding-004 |
Recommended default for new integrations (768-dim, stable on v1beta) |
models/gemini-embedding-001 |
Newer unified embedding model when you need the latest Google embedding API |
Docpipe returns 502 with structured detail (phase: embedding, plus a hint) when the upstream provider rejects the model or key, instead of a generic 400.
Delete a document
Remove all ingested chunks for a source (exact match) or path fragment (contains):
requests.delete(f"{BASE}/ingest", json={
"connection_string": "postgresql://...",
"table_name": "docs",
"source": "reports/q1.pdf",
})
# Partial source match (e.g. MinIO path prefix)
requests.delete(f"{BASE}/ingest", json={
"connection_string": "postgresql://...",
"table_name": "docs",
"match_mode": "contains",
"source_contains": "reports/",
})
POST /ingest accepts "incremental": true to skip unchanged sources (see skipped in the response).
POST /rag/query accepts "response_format": {...} (JSON schema) for structured answers when the LLM supports it.
Docker
The official image is published to GitHub Container Registry and updated automatically on every release.
docker pull ghcr.io/thesunnysinha/docpipe:latest
Run the API server
docker run -p 8000:8000 --env-file .env \
ghcr.io/thesunnysinha/docpipe:latest
Parse or ingest a document
# Parse
docker run -v ./data:/data \
ghcr.io/thesunnysinha/docpipe:latest \
parse /data/invoice.pdf --format markdown
# Ingest
docker run --env-file .env -v ./data:/data \
ghcr.io/thesunnysinha/docpipe:latest \
ingest /data/invoice.pdf \
--db "postgresql://..." --table invoices \
--embedding-provider openai --embedding-model text-embedding-3-small
Docker Compose — server + pgvector (zero config)
cp .env.example .env # fill in your API key
docker compose up -d
# docker-compose.yml
services:
docpipe:
image: ghcr.io/thesunnysinha/docpipe:latest
ports:
- "8000:8000"
env_file: .env
volumes:
- ./data:/data
depends_on:
db:
condition: service_healthy
db:
image: pgvector/pgvector:pg16
environment:
POSTGRES_USER: docpipe
POSTGRES_PASSWORD: docpipe
POSTGRES_DB: docpipe
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U docpipe"]
interval: 5s
retries: 5
volumes:
pgdata:
A full-stack variant with Adminer (DB UI) is in docker-compose.full.yml.
Available tags
| Tag | Description |
|---|---|
latest |
Most recent build from main |
0.4.5, 0.4 |
Specific release versions |
sha-<hash> |
Exact commit build |
Jingo sidecar (production pattern)
Jingo runs docpipe as a sidecar on the same Docker network as Django, PostgreSQL (pgvector), and MinIO:
- Backend calls
http://docpipe:8000with HTTP Basic Auth (DocpipeClientor raw REST). - Each knowledge library maps to a pgvector table (
docpipe_<library_uuid>). - Ingest
sourceis often a MinIO presigned URL; setDOCPIPE_ALLOW_PRIVATE_URLS=trueon the docpipe container so Docling can fetch internal URLs. - Vector backend stays pgvector (shared Postgres) — not turbovec.
See Jingo’s docker-compose.yml docpipe service and services/backend/chat/docpipe/client.py.
Plugin System
Register custom parsers or extractors via Python entry points:
# In your package's pyproject.toml
[project.entry-points."docpipe.parsers"]
my_parser = "my_package:MyParser"
[project.entry-points."docpipe.extractors"]
my_extractor = "my_package:MyExtractor"
Implement the BaseParser or BaseExtractor protocol (structural subtyping — no inheritance required):
class MyParser:
name = "my_parser"
def parse(self, source: str, **kwargs) -> docpipe.ParsedDocument: ...
async def aparse(self, source: str, **kwargs) -> docpipe.ParsedDocument: ...
def is_available(self) -> bool: ...
def supported_formats(self) -> list[str]: ...
See CONTRIBUTING.md for a full walkthrough.
Supported Providers
| Component | Providers |
|---|---|
| Parsing | Docling (PDF, DOCX, XLSX, PPTX, HTML, images), GLM-OCR (state-of-the-art multimodal OCR) |
| Extraction | LangExtract (Google), LangChain with_structured_output |
| Embeddings | OpenAI, Google Gemini, Ollama, HuggingFace |
| Vector store | PostgreSQL + pgvector (default), optional turbovec on-disk indices |
| LLM (RAG) | OpenAI, Anthropic Claude, Google Gemini, Ollama |
| Reranking | FlashRank (local), Cohere |
License
MIT — see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file docpipe_sdk-0.5.0.tar.gz.
File metadata
- Download URL: docpipe_sdk-0.5.0.tar.gz
- Upload date:
- Size: 184.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2f5f9a667bfa0833a08667ab7956cfbd8982a8e62cb771ac15a1785626354718
|
|
| MD5 |
f72ea4b43a2d8ef9de4fbf64276c45fc
|
|
| BLAKE2b-256 |
c44608f01903efc2b4bc1b590dd171ef4ca3c212e8027a75788ea0a9a7177bf2
|
File details
Details for the file docpipe_sdk-0.5.0-py3-none-any.whl.
File metadata
- Download URL: docpipe_sdk-0.5.0-py3-none-any.whl
- Upload date:
- Size: 69.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
837f4b2ef7b794094120e8b670dcead9ce6f66217ecafda710f9fa5420f3d7d8
|
|
| MD5 |
d125b834b3a12cddb3afa0231a9ac769
|
|
| BLAKE2b-256 |
9ed047622e9955b4e3cf1c0b160597ffbe4ca11fda66991497dd464c1bc84a74
|