SDK Python para consumir a API do Ingestão Vetorial
Project description
ingestao-vetorial-sdk · Python
SDK Python oficial para a API do Ingestão Vetorial — sistema de ingestão e busca vetorial com suporte a RAG (Retrieval-Augmented Generation).
Cobre todos os recursos da API: coleções, documentos, upload, busca semântica, tags, estatísticas, progresso de ingestão e logs.
Os endpoints de lista da API retornam items e meta, mas o SDK preserva a interface anterior e devolve listas diretamente nesses métodos, desempacotando items automaticamente.
Índice
Requisitos
- Python >= 3.10
httpx >= 0.27.0
Instalação
# PyPI (após publicação)
pip install ingestao-vetorial-sdk
# Desenvolvimento local
pip install -e ./python
Início rápido
from ingestao_vetorial_sdk import Client
client = Client(
base_url="http://localhost:8000",
api_key="sua_api_key", # enviado como X-API-Key em toda requisição
timeout=30.0, # opcional, padrão: 30 s
)
# Criar uma coleção
col = client.create_collection(
name="Documentos Jurídicos",
embedding_model="amazon.titan-embed-text-v2:0",
dimension=1024,
)
# Fazer upload de um arquivo
resp = client.upload(
"contrato.pdf",
collection_id=col["id"],
document_type="contract",
tags=["jurídico", "2024"],
)
# Busca semântica
results = client.search(
"cláusula de rescisão contratual",
collection_id=col["id"],
limit=5,
min_score=0.75,
)
for r in results:
print(f"[{r['score']:.2f}] {r['document_name']}: {r['content'][:120]}")
Tratamento de erros
O SDK propaga httpx.HTTPStatusError para respostas 4xx/5xx:
import httpx
from ingestao_vetorial_sdk import Client
client = Client("http://localhost:8000", api_key="minha-key")
try:
doc = client.document("id-inexistente")
except httpx.HTTPStatusError as e:
print(f"Erro {e.response.status_code}: {e.response.text}")
except httpx.RequestError as e:
# Timeout, conexão recusada, etc.
print(f"Erro de rede: {e}")
Referência completa
Coleções
embedding_models() -> list[dict]
Lista os modelos de embedding disponíveis.
models = client.embedding_models()
# [{"id": "text-embedding-3-small", "provider": "openai", "dimensions": [1536], ...}]
collections(*, skip=0, limit=100, logic="and", ...) -> list[dict]
cols = client.collections(query="jurídico", limit=10)
create_collection(name, embedding_model, dimension, chunk_size=1400, chunk_overlap=250, *, description=None, alias=None, is_public=False, user_id=None, project_id=None) -> dict
col = client.create_collection(
name="Base de Conhecimento",
embedding_model="text-embedding-3-small",
dimension=1536,
description="Documentos internos da empresa",
)
get_collection(collection_id) -> dict
col = client.get_collection("uuid-da-colecao")
update_collection(collection_id, *, name=None, description=None, is_public=None) -> dict
col = client.update_collection("uuid", name="Novo Nome", is_public=True)
delete_collection(collection_id) -> None
client.delete_collection("uuid-da-colecao")
collection_raw(collection_id) -> dict
Retorna informações brutas do Qdrant para a coleção.
collection_documents(collection_id, *, skip=0, limit=100) -> list[dict]
docs = client.collection_documents("uuid", limit=25)
Documentos
documents(*, skip=0, limit=100, collection_id=None) -> list[dict]
docs = client.documents(collection_id="uuid", limit=50)
document(document_id) -> dict
Retorna detalhes completos incluindo versões e metadados.
doc = client.document("uuid-do-doc")
print(doc["versions"]) # lista de versões
document_chunks(document_id, *, version=None, q=None) -> list[dict]
Quando q é informado, o filtro acontece no servidor sobre o documento inteiro. O SDK pagina internamente até reunir todos os resultados.
chunks = client.document_chunks("uuid", version=1)
filtered_chunks = client.document_chunks("uuid", version=1, q="cláusula penal")
for c in chunks:
print(c["content"][:80], "→ tokens:", c["tokens"])
O mesmo desempacotamento automático vale para embedding_models(), collections(), collection_documents(), documents(), search(), tags(), search_tags(), recent_activity(), top_collections(), uploads_per_day(), vectors_per_week() e active_jobs().
document_markdown(document_id, *, version=None) -> bytes
md = client.document_markdown("uuid", version=1)
with open("extraido.md", "wb") as f:
f.write(md)
delete_document(document_id) -> None
reprocess_document(document_id, *, source_version=None, mode="replace", extraction_tool=None) -> dict
resp = client.reprocess_document("uuid", mode="replace", extraction_tool="pypdf")
print(resp["version"])
delete_document_version(document_id, version) -> None
set_version_active(document_id, version, *, is_active) -> dict
client.set_version_active("uuid", 2, is_active=True)
Upload
upload(file, collection_id, *, document_type="document", description="", tags=None, custom_fields=None, overwrite_existing=False, embedding_model=None, dimension=None, extraction_tool=None) -> dict
file pode ser caminho (str) ou qualquer objeto BinaryIO.
# Por caminho
resp = client.upload(
"relatorio_anual.pdf",
collection_id="uuid",
document_type="report",
tags=["finanças", "2024"],
custom_fields=[{"key": "departamento", "value": "RH"}],
overwrite_existing=True,
)
print(resp["document_id"], resp["version"])
# Por objeto de arquivo
with open("dados.csv", "rb") as fp:
resp = client.upload(fp, collection_id="uuid", document_type="dataset")
Resposta:
{
"success": True,
"document_id": "uuid",
"vector_count": 0, # 0 até a ingestão concluir (assíncrona)
"version": 1,
"message": None,
}
Busca semântica
search(query, collection_id=None, *, limit=10, offset=0, min_score=0.0) -> list[dict]
results = client.search(
"procedimentos de rescisão contratual",
collection_id="uuid",
limit=5,
min_score=0.75,
)
for r in results:
print(f"[{r['score']:.3f}] {r['document_name']} — chunk {r['chunk_index']}")
print(r["content"][:200])
Campos do resultado:
| Campo | Tipo | Descrição |
|---|---|---|
id |
str | ID do chunk |
score |
float | Similaridade cosine (0–1) |
content |
str | Texto do chunk |
document_name |
str | Nome do documento |
collection_id |
str | UUID da coleção |
collection_name |
str | Nome da coleção |
chunk_index |
int|None | Posição do chunk |
metadata |
dict | Metadados adicionais |
Tags
tags(*, skip=0, limit=100) -> list[str]
all_tags = client.tags()
search_tags(q) -> list[str]
tags = client.search_tags("fin")
create_tag(name) -> dict
tag = client.create_tag("compliance")
print(tag["id"], tag["name"])
Estatísticas
overview = client.dashboard_overview()
stats = client.dashboard_stats()
# {"total_collections": 4, "total_vectors": 1000, "total_size_mb": 50.5}
activity = client.recent_activity(limit=10)
top = client.top_collections(limit=3)
uploads = client.uploads_per_day(days=30)
vecs = client.vectors_per_week(weeks=12)
Progresso de ingestão
active_jobs() -> list[dict]
jobs = client.active_jobs()
for j in jobs:
print(f"{j['document_name']} — {j['status']} ({j['percent']:.0f}%)")
job_progress(document_id, version) -> dict
import time
while True:
p = client.job_progress("uuid", 1)
print(p["status"], p["percent"])
if p["status"] in ("completed", "error", "cancelled"):
break
time.sleep(2)
Status: extracting → chunking → upserting → completed | error | cancelled
stream_progress() -> stream context manager
Abre o endpoint SSE bruto de progresso.
with client.stream_progress() as response:
for line in response.iter_lines():
print(line)
cancel_ingestion(document_id, version) -> dict
result = client.cancel_ingestion("uuid", 1)
# {"ok": True}
Logs
logs(*, page=1, page_size=50, order_by="timestamp", order_dir="desc", from_ts=None, to_ts=None, nivel=None, ...) -> dict
from_ts e to_ts aceitam datetime ou string ISO-8601.
from datetime import datetime, timedelta, timezone
agora = datetime.now(timezone.utc)
ontem = agora - timedelta(days=1)
logs = client.logs(from_ts=ontem, to_ts=agora, nivel="ERROR", page_size=20)
for entry in logs["items"]:
print(entry["timestamp"], entry["acao"])
log_facets() -> dict
facets = client.log_facets()
print(facets["apps"], facets["endpoints"])
log_summary(*, from_ts=None, to_ts=None) -> dict
summary = client.log_summary()
print(summary["total"], summary["byLevel"])
export_logs(*, format="json", ..., limit=10000) -> bytes
data = client.export_logs(format="csv", nivel="ERROR", limit=500)
with open("erros.csv", "wb") as f:
f.write(data)
ingest_logs(payload, *, log_sink_token=None) -> dict
result = client.ingest_logs(
[{"nivel": "INFO", "modulo": "sdk", "acao": "startup", "detalhes": {"app": "external"}}],
log_sink_token="token-opcional",
)
print(result["accepted"])
Uso de tokens de IA
token_usage(...) -> dict
usage = client.token_usage(provider="openai", page_size=20)
print(usage["summary"]["totalTokens"])
Executar testes
pip install -e ".[dev]"
pytest tests/ -v
Licença
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ingestao_vetorial_sdk-0.1.3.tar.gz.
File metadata
- Download URL: ingestao_vetorial_sdk-0.1.3.tar.gz
- Upload date:
- Size: 18.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
415a81ad81e1a4910e20afea2e129997398fd3ab98ee71e6baaa35356242f593
|
|
| MD5 |
f4dea1f6e6675488567a26dedbabfd71
|
|
| BLAKE2b-256 |
44ffe0d8e593b473f3e889714c955cd4ba7a9dd9f655d0580a0d06614bbba0de
|
File details
Details for the file ingestao_vetorial_sdk-0.1.3-py3-none-any.whl.
File metadata
- Download URL: ingestao_vetorial_sdk-0.1.3-py3-none-any.whl
- Upload date:
- Size: 10.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f6b9851334683f8000344acbf87f56aedb0d05ba3e1d10fa94efd39106506bc7
|
|
| MD5 |
cd109b6c6755eb04850c9bad4903185d
|
|
| BLAKE2b-256 |
fc90d229157c509bbe2dd96064ce7e5210fe882df3c70cfbb511f0ffdc59ed13
|