Python SDK for the Next Plaid ColBERT Search API
Project description
next-plaid-client
Python SDK for the NextPlaid ColBERT Search API. Provides synchronous and asynchronous clients for interacting with the next-plaid-api server.
Architecture Overview
┌─────────────────────────────────────────────────────────────────────────────┐
│ next-plaid-client │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────┐ ┌─────────────────────────────────┐ │
│ │ NextPlaidClient │ │ AsyncNextPlaidClient │ │
│ │ (Synchronous) │ │ (Asynchronous) │ │
│ ├─────────────────────────┤ ├─────────────────────────────────┤ │
│ │ │ │ │ │
│ │ httpx.Client │ │ httpx.AsyncClient │ │
│ │ ↓ │ │ ↓ │ │
│ │ Blocking I/O │ │ asyncio I/O │ │
│ │ │ │ │ │
│ └───────────┬─────────────┘ └───────────┬─────────────────────┘ │
│ │ │ │
│ └───────────────┬───────────────────┘ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ BaseNextPlaidClient │ │
│ │ │ │
│ │ - URL construction - Payload preparation │ │
│ │ - Response parsing - Error handling │ │
│ │ - Input type detection - Exception mapping │ │
│ │ │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────┐ ┌─────────────────────────────────┐ │
│ │ Models │ │ Exceptions │ │
│ ├─────────────────────────┤ ├─────────────────────────────────┤ │
│ │ IndexConfig │ │ NextPlaidError (base) │ │
│ │ IndexInfo │ │ IndexNotFoundError │ │
│ │ SearchParams │ │ IndexExistsError │ │
│ │ SearchResult │ │ ValidationError │ │
│ │ QueryResult │ │ RateLimitError │ │
│ │ HealthResponse │ │ ModelNotLoadedError │ │
│ │ RerankResponse │ │ ConnectionError │ │
│ │ MetadataResponse │ │ ServerError │ │
│ └─────────────────────────┘ └─────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Features
- Dual Client Support: Synchronous (
NextPlaidClient) and async (AsyncNextPlaidClient) clients - Automatic Input Detection: Methods auto-detect text vs embedding inputs
- Type-Safe Models: Dataclass-based request/response models
- Exception Hierarchy: Structured exceptions for error handling
- Context Manager: Automatic resource cleanup with
withstatement - Connection Pooling: Efficient HTTP connection reuse via httpx
Installation
# From PyPI
pip install next-plaid-client
# From source
pip install git+https://github.com/lightonai/next-plaid.git#subdirectory=next-plaid-api/python-sdk
# For development
pip install -e "next-plaid-api/python-sdk[dev]"
Requirements
- Python >= 3.8
- httpx >= 0.24.0
Quick Start
from next_plaid_client import NextPlaidClient, IndexConfig, SearchParams
# Connect to the API
client = NextPlaidClient("http://localhost:8080")
# Check server health
health = client.health()
print(f"Server status: {health.status}")
# Create an index
client.create_index("my_index", IndexConfig(nbits=4))
# Add documents (text - requires model on server)
client.add(
"my_index",
["Paris is the capital of France.", "Berlin is in Germany."],
metadata=[{"country": "France"}, {"country": "Germany"}]
)
# Search with text queries
results = client.search("my_index", ["What is the capital of France?"])
# Print results
for result in results.results:
for doc_id, score, meta in zip(result.document_ids, result.scores, result.metadata or []):
print(f"Document {doc_id}: {score:.4f} - {meta}")
Client Initialization
Synchronous Client
from next_plaid_client import NextPlaidClient
client = NextPlaidClient(
base_url="http://localhost:8080", # API server URL
timeout=30.0, # Request timeout in seconds
headers={"Authorization": "..."} # Optional headers
)
# Context manager usage (auto-closes connection)
with NextPlaidClient("http://localhost:8080") as client:
health = client.health()
Async Client
import asyncio
from next_plaid_client import AsyncNextPlaidClient
async def main():
async with AsyncNextPlaidClient("http://localhost:8080") as client:
health = await client.health()
print(f"Server status: {health.status}")
asyncio.run(main())
API Reference
Health Check
health = client.health()
Returns: HealthResponse
| Field | Type | Description |
|---|---|---|
status |
str |
Server status ("healthy") |
version |
str |
API version |
loaded_indices |
int |
Number of loaded indices |
index_dir |
str |
Index storage directory |
memory_usage_bytes |
int |
Memory usage |
indices |
List[IndexSummary] |
Summary of each index |
Index Management
List Indices
indices: List[str] = client.list_indices()
Get Index Info
info: IndexInfo = client.get_index("my_index")
Returns: IndexInfo
| Field | Type | Description |
|---|---|---|
name |
str |
Index name |
num_documents |
int |
Document count |
num_embeddings |
int |
Total embeddings |
num_partitions |
int |
IVF partitions |
avg_doclen |
float |
Average tokens per doc |
dimension |
int |
Embedding dimension |
has_metadata |
bool |
Has metadata DB |
metadata_count |
Optional[int] |
Metadata entry count |
max_documents |
Optional[int] |
Document limit |
Create Index
client.create_index("my_index", IndexConfig(
nbits=4, # Quantization bits (2 or 4)
batch_size=50000, # Documents per chunk
seed=42, # Random seed
start_from_scratch=999, # Rebuild threshold
max_documents=10000 # Max documents (None = unlimited)
))
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
nbits |
int |
4 |
Quantization bits (2 or 4) |
batch_size |
int |
50000 |
Documents per chunk |
seed |
Optional[int] |
None |
Random seed |
start_from_scratch |
int |
999 |
Rebuild threshold |
max_documents |
Optional[int] |
None |
Max documents |
Update Index Config
client.update_index_config("my_index", max_documents=5000)
Delete Index
client.delete_index("my_index")
Document Operations
Add Documents
The add() method automatically detects input type (text vs embeddings).
# Text documents (requires model on server)
client.add(
"my_index",
["Document 1 text", "Document 2 text"],
metadata=[{"category": "science"}, {"category": "history"}]
)
# With token pooling (reduces embeddings by 2x)
client.add(
"my_index",
["Long document text..."],
pool_factor=2
)
# Pre-computed embeddings
client.add(
"my_index",
[{"embeddings": [[0.1, 0.2], [0.3, 0.4]]}], # [num_tokens, dim]
metadata=[{"title": "Doc 1"}]
)
Parameters:
| Parameter | Type | Description |
|---|---|---|
index_name |
str |
Target index name |
documents |
Union[List[str], List[Dict]] |
Text or embeddings |
metadata |
Optional[List[Dict]] |
Metadata per document |
pool_factor |
Optional[int] |
Token reduction factor |
Returns: str (status message, async 202)
Delete Documents
client.delete(
"my_index",
condition="category = ? AND year < ?",
parameters=["outdated", 2020]
)
Parameters:
| Parameter | Type | Description |
|---|---|---|
index_name |
str |
Target index |
condition |
str |
SQL WHERE clause |
parameters |
Optional[List[Any]] |
Query parameters |
Search Operations
The search() method automatically detects query type (text vs embeddings).
Text Search (requires model)
results = client.search(
"my_index",
["What is machine learning?", "Neural networks"],
params=SearchParams(top_k=10)
)
Embedding Search
results = client.search(
"my_index",
[[[0.1, 0.2], [0.3, 0.4]]], # [batch, num_tokens, dim]
params=SearchParams(top_k=10)
)
Filtered Search
results = client.search(
"my_index",
["machine learning"],
filter_condition="category = ? AND year > ?",
filter_parameters=["science", 2020]
)
Subset Search
results = client.search(
"my_index",
["query"],
subset=[0, 5, 10, 15] # Only search these document IDs
)
Search Parameters:
SearchParams(
top_k=10, # Results per query (default: 10)
n_ivf_probe=8, # IVF cells to probe (default: 8)
n_full_scores=4096, # Re-ranking candidates (default: 4096)
centroid_score_threshold=0.4 # Pruning threshold (default: 0.4, set to None to disable)
)
| Parameter | Type | Default | Description |
|---|---|---|---|
top_k |
int |
10 |
Results per query |
n_ivf_probe |
int |
8 |
IVF cells to probe |
n_full_scores |
int |
4096 |
Candidates for exact scoring |
centroid_score_threshold |
Optional[float] |
0.4 |
Centroid pruning threshold (set to None to disable) |
Returns: SearchResult
@dataclass
class SearchResult:
results: List[QueryResult] # One per query
num_queries: int
@dataclass
class QueryResult:
query_id: int
document_ids: List[int]
scores: List[float]
metadata: Optional[List[Optional[Dict]]]
Metadata Operations
Get All Metadata
response: MetadataResponse = client.get_metadata("my_index")
# response.metadata: List[Dict]
# response.count: int
Get Metadata Count
result = client.get_metadata_count("my_index")
# result["count"]: int
# result["has_metadata"]: bool
Query Metadata
result = client.query_metadata(
"my_index",
condition="category = ? AND score > ?",
parameters=["science", 0.5]
)
# result["document_ids"]: List[int]
# result["count"]: int
Get Metadata by IDs
response = client.get_metadata_by_ids(
"my_index",
document_ids=[0, 5, 10],
limit=100
)
Check Document Existence
result: MetadataCheckResponse = client.check_metadata(
"my_index",
document_ids=[0, 1, 2, 999]
)
# result.existing_ids: List[int]
# result.missing_ids: List[int]
# result.existing_count: int
# result.missing_count: int
Text Encoding
Encode texts to ColBERT embeddings (requires model on server).
response: EncodeResponse = client.encode(
texts=["Hello world", "Test document"],
input_type="document", # or "query"
pool_factor=2 # Optional token reduction
)
# response.embeddings: List[List[List[float]]] # [batch, num_tokens, dim]
# response.num_texts: int
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
texts |
List[str] |
required | Texts to encode |
input_type |
str |
"document" |
"document" or "query" |
pool_factor |
Optional[int] |
None |
Token reduction factor |
Reranking
Reorder documents by relevance using ColBERT's MaxSim scoring.
Text Reranking (requires model)
result = client.rerank(
query="What is the capital of France?",
documents=[
"Berlin is the capital of Germany.",
"Paris is the capital of France.",
"Tokyo is the largest city in Japan.",
]
)
# Results sorted by score (descending)
for r in result.results:
print(f"Document {r.index}: {r.score:.4f}")
# Document 1: 15.2341 (Paris - most relevant)
# Document 0: 8.1234 (Berlin - somewhat relevant)
# Document 2: 3.4567 (Tokyo - least relevant)
Embedding Reranking
result = client.rerank(
query=[[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]], # [num_tokens, dim]
documents=[
{"embeddings": [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]]},
{"embeddings": [[0.7, 0.8, 0.9], [0.1, 0.2, 0.3]]},
]
)
Returns: RerankResponse
@dataclass
class RerankResponse:
results: List[RerankResult] # Sorted by score descending
num_documents: int
@dataclass
class RerankResult:
index: int # Original document index
score: float # MaxSim score
Exception Handling
All exceptions inherit from NextPlaidError:
from next_plaid_client import (
NextPlaidError,
IndexNotFoundError,
IndexExistsError,
ValidationError,
RateLimitError,
ModelNotLoadedError,
ConnectionError,
ServerError,
)
try:
client.get_index("nonexistent")
except IndexNotFoundError as e:
print(f"Index not found: {e.message}")
print(f"Error code: {e.code}")
print(f"HTTP status: {e.status_code}")
except RateLimitError as e:
print(f"Rate limited: {e.message}")
except ValidationError as e:
print(f"Invalid request: {e.message}")
except ModelNotLoadedError as e:
print(f"Model required: {e.message}")
except NextPlaidError as e:
print(f"API error: {e.message} (code: {e.code})")
Exception Hierarchy
| Exception | HTTP Status | Description |
|---|---|---|
NextPlaidError |
- | Base exception |
IndexNotFoundError |
404 | Index does not exist |
IndexExistsError |
409 | Index already exists |
ValidationError |
400 | Invalid request parameters |
RateLimitError |
429 | Rate limit exceeded |
ModelNotLoadedError |
503 | Encoding requires model |
ConnectionError |
- | Connection failed |
ServerError |
5xx | Server error |
Exception Attributes
| Attribute | Type | Description |
|---|---|---|
message |
str |
Human-readable error message |
code |
Optional[str] |
Error code (e.g., INDEX_NOT_FOUND) |
details |
Optional[Any] |
Additional error details |
status_code |
Optional[int] |
HTTP status code |
Data Models
IndexConfig
@dataclass
class IndexConfig:
nbits: int = 4 # Quantization bits
batch_size: int = 50000 # Documents per chunk
seed: Optional[int] = None # Random seed
start_from_scratch: int = 999 # Rebuild threshold
max_documents: Optional[int] = None # Max documents
IndexInfo
@dataclass
class IndexInfo:
name: str
num_documents: int
num_embeddings: int
num_partitions: int
avg_doclen: float
dimension: int
has_metadata: bool
metadata_count: Optional[int] = None
max_documents: Optional[int] = None
SearchParams
@dataclass
class SearchParams:
top_k: int = 10
n_ivf_probe: int = 8
n_full_scores: int = 4096
centroid_score_threshold: Optional[float] = 0.4 # Default: 0.4, set to None to disable
SearchResult / QueryResult
@dataclass
class SearchResult:
results: List[QueryResult]
num_queries: int
@dataclass
class QueryResult:
query_id: int
document_ids: List[int]
scores: List[float]
metadata: Optional[List[Optional[Dict[str, Any]]]] = None
HealthResponse
@dataclass
class HealthResponse:
status: str
version: str
loaded_indices: int
index_dir: str
memory_usage_bytes: int
indices: List[IndexSummary]
RerankResponse / RerankResult
@dataclass
class RerankResponse:
results: List[RerankResult]
num_documents: int
@dataclass
class RerankResult:
index: int
score: float
MetadataResponse
@dataclass
class MetadataResponse:
metadata: List[Dict[str, Any]]
count: int
EncodeResponse
@dataclass
class EncodeResponse:
embeddings: List[List[List[float]]] # [batch, num_tokens, dim]
num_texts: int
Async Client
The async client provides identical methods with await:
import asyncio
from next_plaid_client import AsyncNextPlaidClient, IndexConfig, SearchParams
async def main():
async with AsyncNextPlaidClient("http://localhost:8080") as client:
# Health check
health = await client.health()
print(f"Server status: {health.status}")
# Create index
await client.create_index("my_index", IndexConfig(nbits=4))
# Add documents
await client.add(
"my_index",
["Paris is the capital of France."],
metadata=[{"country": "France"}]
)
# Search
results = await client.search(
"my_index",
["What is the capital of France?"],
params=SearchParams(top_k=5)
)
# Concurrent operations
results = await asyncio.gather(
client.search("index1", ["query1"]),
client.search("index2", ["query2"]),
client.search("index3", ["query3"]),
)
asyncio.run(main())
Input Type Detection
The SDK automatically detects whether inputs are text or embeddings:
Documents
# Text input (first item is str) → uses /update_with_encoding
client.add("index", ["text 1", "text 2"])
# Embedding input (first item is dict with 'embeddings') → uses /update
client.add("index", [{"embeddings": [[0.1, 0.2]]}])
Queries
# Text queries (first item is str) → uses /search_with_encoding
client.search("index", ["query text"])
# Embedding queries (nested list) → uses /search
client.search("index", [[[0.1, 0.2], [0.3, 0.4]]])
Rerank
# Text (query is str) → uses /rerank_with_encoding
client.rerank(query="text", documents=["doc1", "doc2"])
# Embeddings (query is list) → uses /rerank
client.rerank(query=[[0.1, 0.2]], documents=[{"embeddings": [[...]]}])
Project Structure
next-plaid-api/python-sdk/
├── pyproject.toml # Package configuration
├── README.md # This file
├── next_plaid_client/
│ ├── __init__.py # Public exports
│ ├── _base.py # Base client logic
│ ├── client.py # Synchronous client
│ ├── async_client.py # Async client
│ ├── models.py # Data models
│ └── exceptions.py # Exception classes
└── tests/
└── test_*.py # Test files
Dependencies
| Package | Version | Purpose |
|---|---|---|
httpx |
>= 0.24.0 | HTTP client (sync + async) |
Development Dependencies
| Package | Version | Purpose |
|---|---|---|
pytest |
>= 7.0.0 | Testing framework |
pytest-cov |
>= 4.0.0 | Coverage reporting |
pytest-asyncio |
>= 0.21.0 | Async test support |
Version Compatibility
| SDK Version | API Version | Python |
|---|---|---|
| 0.4.0 | 0.4.0 | >= 3.8 |
License
Apache-2.0
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file next_plaid_client-1.1.1.tar.gz.
File metadata
- Download URL: next_plaid_client-1.1.1.tar.gz
- Upload date:
- Size: 26.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b36869f0f66edad3e91ff0ad52263ab64e2a35b900672d53c03ccfb8479599b7
|
|
| MD5 |
d4350c932e1239aca0f79f0f56e2b47d
|
|
| BLAKE2b-256 |
ed664b7b4296c757faf31196c5e77efdfc079016d5998aa657ceab624c5e1a35
|
Provenance
The following attestation bundles were made for next_plaid_client-1.1.1.tar.gz:
Publisher:
python-sdk-release.yml on lightonai/next-plaid
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
next_plaid_client-1.1.1.tar.gz -
Subject digest:
b36869f0f66edad3e91ff0ad52263ab64e2a35b900672d53c03ccfb8479599b7 - Sigstore transparency entry: 1115273957
- Sigstore integration time:
-
Permalink:
lightonai/next-plaid@eca929a7b296a32cd8d470fc6311b9e2745a2966 -
Branch / Tag:
refs/tags/v1.1.1 - Owner: https://github.com/lightonai
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-sdk-release.yml@eca929a7b296a32cd8d470fc6311b9e2745a2966 -
Trigger Event:
push
-
Statement type:
File details
Details for the file next_plaid_client-1.1.1-py3-none-any.whl.
File metadata
- Download URL: next_plaid_client-1.1.1-py3-none-any.whl
- Upload date:
- Size: 21.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
874fa2e44fede0249812d9970a5ee0632c0d19006f330dc3df82cf3964aac05e
|
|
| MD5 |
b2e1ed6033136af287acd900e470fdd4
|
|
| BLAKE2b-256 |
e110d4fe734b7eef42ec51b18d3ac5a0c60c223e422577c422b402b8acb29340
|
Provenance
The following attestation bundles were made for next_plaid_client-1.1.1-py3-none-any.whl:
Publisher:
python-sdk-release.yml on lightonai/next-plaid
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
next_plaid_client-1.1.1-py3-none-any.whl -
Subject digest:
874fa2e44fede0249812d9970a5ee0632c0d19006f330dc3df82cf3964aac05e - Sigstore transparency entry: 1115273979
- Sigstore integration time:
-
Permalink:
lightonai/next-plaid@eca929a7b296a32cd8d470fc6311b9e2745a2966 -
Branch / Tag:
refs/tags/v1.1.1 - Owner: https://github.com/lightonai
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-sdk-release.yml@eca929a7b296a32cd8d470fc6311b9e2745a2966 -
Trigger Event:
push
-
Statement type: