Skip to main content

Python SDK for the Next Plaid ColBERT Search API

Project description

next-plaid-client

Python SDK for the NextPlaid ColBERT Search API. Provides synchronous and asynchronous clients for interacting with the next-plaid-api server.

Architecture Overview

┌─────────────────────────────────────────────────────────────────────────────┐
│                           next-plaid-client                                 │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  ┌─────────────────────────┐         ┌─────────────────────────────────┐    │
│  │   NextPlaidClient       │         │   AsyncNextPlaidClient          │    │
│  │     (Synchronous)       │         │     (Asynchronous)              │    │
│  ├─────────────────────────┤         ├─────────────────────────────────┤    │
│  │                         │         │                                 │    │
│  │  httpx.Client           │         │  httpx.AsyncClient              │    │
│  │         ↓               │         │         ↓                       │    │
│  │  Blocking I/O           │         │  asyncio I/O                    │    │
│  │                         │         │                                 │    │
│  └───────────┬─────────────┘         └───────────┬─────────────────────┘    │
│              │                                   │                          │
│              └───────────────┬───────────────────┘                          │
│                              ▼                                              │
│  ┌───────────────────────────────────────────────────────────────────────┐  │
│  │                     BaseNextPlaidClient                               │  │
│  │                                                                       │  │
│  │  - URL construction          - Payload preparation                    │  │
│  │  - Response parsing          - Error handling                         │  │
│  │  - Input type detection      - Exception mapping                      │  │
│  │                                                                       │  │
│  └───────────────────────────────────────────────────────────────────────┘  │
│                                                                             │
│  ┌─────────────────────────┐         ┌─────────────────────────────────┐    │
│  │       Models            │         │       Exceptions                │    │
│  ├─────────────────────────┤         ├─────────────────────────────────┤    │
│  │  IndexConfig            │         │  NextPlaidError (base)          │    │
│  │  IndexInfo              │         │  IndexNotFoundError             │    │
│  │  SearchParams           │         │  IndexExistsError               │    │
│  │  SearchResult           │         │  ValidationError                │    │
│  │  QueryResult            │         │  RateLimitError                 │    │
│  │  HealthResponse         │         │  ModelNotLoadedError            │    │
│  │  RerankResponse         │         │  ConnectionError                │    │
│  │  MetadataResponse       │         │  ServerError                    │    │
│  └─────────────────────────┘         └─────────────────────────────────┘    │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

Features

  • Dual Client Support: Synchronous (NextPlaidClient) and async (AsyncNextPlaidClient) clients
  • Automatic Input Detection: Methods auto-detect text vs embedding inputs
  • Type-Safe Models: Dataclass-based request/response models
  • Exception Hierarchy: Structured exceptions for error handling
  • Context Manager: Automatic resource cleanup with with statement
  • Connection Pooling: Efficient HTTP connection reuse via httpx

Installation

# From PyPI
pip install next-plaid-client

# From source
pip install git+https://github.com/lightonai/next-plaid.git#subdirectory=next-plaid-api/python-sdk

# For development
pip install -e "next-plaid-api/python-sdk[dev]"

Requirements

  • Python >= 3.8
  • httpx >= 0.24.0

Quick Start

from next_plaid_client import NextPlaidClient, IndexConfig, SearchParams

# Connect to the API
client = NextPlaidClient("http://localhost:8080")

# Check server health
health = client.health()
print(f"Server status: {health.status}")

# Create an index
client.create_index("my_index", IndexConfig(nbits=4))

# Add documents (text - requires model on server)
client.add(
    "my_index",
    ["Paris is the capital of France.", "Berlin is in Germany."],
    metadata=[{"country": "France"}, {"country": "Germany"}]
)

# Search with text queries
results = client.search("my_index", ["What is the capital of France?"])

# Print results
for result in results.results:
    for doc_id, score, meta in zip(result.document_ids, result.scores, result.metadata or []):
        print(f"Document {doc_id}: {score:.4f} - {meta}")

Client Initialization

Synchronous Client

from next_plaid_client import NextPlaidClient

client = NextPlaidClient(
    base_url="http://localhost:8080",  # API server URL
    timeout=30.0,                       # Request timeout in seconds
    headers={"Authorization": "..."}    # Optional headers
)

# Context manager usage (auto-closes connection)
with NextPlaidClient("http://localhost:8080") as client:
    health = client.health()

Async Client

import asyncio
from next_plaid_client import AsyncNextPlaidClient

async def main():
    async with AsyncNextPlaidClient("http://localhost:8080") as client:
        health = await client.health()
        print(f"Server status: {health.status}")

asyncio.run(main())

API Reference

Health Check

health = client.health()

Returns: HealthResponse

Field Type Description
status str Server status ("healthy")
version str API version
loaded_indices int Number of loaded indices
index_dir str Index storage directory
memory_usage_bytes int Memory usage
indices List[IndexSummary] Summary of each index

Index Management

List Indices

indices: List[str] = client.list_indices()

Get Index Info

info: IndexInfo = client.get_index("my_index")

Returns: IndexInfo

Field Type Description
name str Index name
num_documents int Document count
num_embeddings int Total embeddings
num_partitions int IVF partitions
avg_doclen float Average tokens per doc
dimension int Embedding dimension
has_metadata bool Has metadata DB
metadata_count Optional[int] Metadata entry count
max_documents Optional[int] Document limit

Create Index

client.create_index("my_index", IndexConfig(
    nbits=4,                    # Quantization bits (2 or 4)
    batch_size=50000,           # Documents per chunk
    seed=42,                    # Random seed
    start_from_scratch=999,     # Rebuild threshold
    max_documents=10000         # Max documents (None = unlimited)
))

Parameters:

Parameter Type Default Description
nbits int 4 Quantization bits (2 or 4)
batch_size int 50000 Documents per chunk
seed Optional[int] None Random seed
start_from_scratch int 999 Rebuild threshold
max_documents Optional[int] None Max documents

Update Index Config

client.update_index_config("my_index", max_documents=5000)

Delete Index

client.delete_index("my_index")

Document Operations

Add Documents

The add() method automatically detects input type (text vs embeddings).

# Text documents (requires model on server)
client.add(
    "my_index",
    ["Document 1 text", "Document 2 text"],
    metadata=[{"category": "science"}, {"category": "history"}]
)

# With token pooling (reduces embeddings by 2x)
client.add(
    "my_index",
    ["Long document text..."],
    pool_factor=2
)

# Pre-computed embeddings
client.add(
    "my_index",
    [{"embeddings": [[0.1, 0.2], [0.3, 0.4]]}],  # [num_tokens, dim]
    metadata=[{"title": "Doc 1"}]
)

Parameters:

Parameter Type Description
index_name str Target index name
documents Union[List[str], List[Dict]] Text or embeddings
metadata Optional[List[Dict]] Metadata per document
pool_factor Optional[int] Token reduction factor

Returns: str (status message, async 202)

Delete Documents

client.delete(
    "my_index",
    condition="category = ? AND year < ?",
    parameters=["outdated", 2020]
)

Parameters:

Parameter Type Description
index_name str Target index
condition str SQL WHERE clause
parameters Optional[List[Any]] Query parameters

Search Operations

The search() method automatically detects query type (text vs embeddings).

Text Search (requires model)

results = client.search(
    "my_index",
    ["What is machine learning?", "Neural networks"],
    params=SearchParams(top_k=10)
)

Embedding Search

results = client.search(
    "my_index",
    [[[0.1, 0.2], [0.3, 0.4]]],  # [batch, num_tokens, dim]
    params=SearchParams(top_k=10)
)

Filtered Search

results = client.search(
    "my_index",
    ["machine learning"],
    filter_condition="category = ? AND year > ?",
    filter_parameters=["science", 2020]
)

Subset Search

results = client.search(
    "my_index",
    ["query"],
    subset=[0, 5, 10, 15]  # Only search these document IDs
)

Search Parameters:

SearchParams(
    top_k=10,                      # Results per query (default: 10)
    n_ivf_probe=8,                 # IVF cells to probe (default: 8)
    n_full_scores=4096,            # Re-ranking candidates (default: 4096)
    centroid_score_threshold=0.4   # Pruning threshold (default: 0.4, set to None to disable)
)
Parameter Type Default Description
top_k int 10 Results per query
n_ivf_probe int 8 IVF cells to probe
n_full_scores int 4096 Candidates for exact scoring
centroid_score_threshold Optional[float] 0.4 Centroid pruning threshold (set to None to disable)

Returns: SearchResult

@dataclass
class SearchResult:
    results: List[QueryResult]  # One per query
    num_queries: int

@dataclass
class QueryResult:
    query_id: int
    document_ids: List[int]
    scores: List[float]
    metadata: Optional[List[Optional[Dict]]]

Metadata Operations

Get All Metadata

response: MetadataResponse = client.get_metadata("my_index")
# response.metadata: List[Dict]
# response.count: int

Get Metadata Count

result = client.get_metadata_count("my_index")
# result["count"]: int
# result["has_metadata"]: bool

Query Metadata

result = client.query_metadata(
    "my_index",
    condition="category = ? AND score > ?",
    parameters=["science", 0.5]
)
# result["document_ids"]: List[int]
# result["count"]: int

Get Metadata by IDs

response = client.get_metadata_by_ids(
    "my_index",
    document_ids=[0, 5, 10],
    limit=100
)

Check Document Existence

result: MetadataCheckResponse = client.check_metadata(
    "my_index",
    document_ids=[0, 1, 2, 999]
)
# result.existing_ids: List[int]
# result.missing_ids: List[int]
# result.existing_count: int
# result.missing_count: int

Text Encoding

Encode texts to ColBERT embeddings (requires model on server).

response: EncodeResponse = client.encode(
    texts=["Hello world", "Test document"],
    input_type="document",  # or "query"
    pool_factor=2           # Optional token reduction
)
# response.embeddings: List[List[List[float]]]  # [batch, num_tokens, dim]
# response.num_texts: int

Parameters:

Parameter Type Default Description
texts List[str] required Texts to encode
input_type str "document" "document" or "query"
pool_factor Optional[int] None Token reduction factor

Reranking

Reorder documents by relevance using ColBERT's MaxSim scoring.

Text Reranking (requires model)

result = client.rerank(
    query="What is the capital of France?",
    documents=[
        "Berlin is the capital of Germany.",
        "Paris is the capital of France.",
        "Tokyo is the largest city in Japan.",
    ]
)

# Results sorted by score (descending)
for r in result.results:
    print(f"Document {r.index}: {r.score:.4f}")
# Document 1: 15.2341  (Paris - most relevant)
# Document 0: 8.1234   (Berlin - somewhat relevant)
# Document 2: 3.4567   (Tokyo - least relevant)

Embedding Reranking

result = client.rerank(
    query=[[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]],  # [num_tokens, dim]
    documents=[
        {"embeddings": [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]]},
        {"embeddings": [[0.7, 0.8, 0.9], [0.1, 0.2, 0.3]]},
    ]
)

Returns: RerankResponse

@dataclass
class RerankResponse:
    results: List[RerankResult]  # Sorted by score descending
    num_documents: int

@dataclass
class RerankResult:
    index: int    # Original document index
    score: float  # MaxSim score

Exception Handling

All exceptions inherit from NextPlaidError:

from next_plaid_client import (
    NextPlaidError,
    IndexNotFoundError,
    IndexExistsError,
    ValidationError,
    RateLimitError,
    ModelNotLoadedError,
    ConnectionError,
    ServerError,
)

try:
    client.get_index("nonexistent")
except IndexNotFoundError as e:
    print(f"Index not found: {e.message}")
    print(f"Error code: {e.code}")
    print(f"HTTP status: {e.status_code}")
except RateLimitError as e:
    print(f"Rate limited: {e.message}")
except ValidationError as e:
    print(f"Invalid request: {e.message}")
except ModelNotLoadedError as e:
    print(f"Model required: {e.message}")
except NextPlaidError as e:
    print(f"API error: {e.message} (code: {e.code})")

Exception Hierarchy

Exception HTTP Status Description
NextPlaidError - Base exception
IndexNotFoundError 404 Index does not exist
IndexExistsError 409 Index already exists
ValidationError 400 Invalid request parameters
RateLimitError 429 Rate limit exceeded
ModelNotLoadedError 503 Encoding requires model
ConnectionError - Connection failed
ServerError 5xx Server error

Exception Attributes

Attribute Type Description
message str Human-readable error message
code Optional[str] Error code (e.g., INDEX_NOT_FOUND)
details Optional[Any] Additional error details
status_code Optional[int] HTTP status code

Data Models

IndexConfig

@dataclass
class IndexConfig:
    nbits: int = 4                       # Quantization bits
    batch_size: int = 50000              # Documents per chunk
    seed: Optional[int] = None           # Random seed
    start_from_scratch: int = 999        # Rebuild threshold
    max_documents: Optional[int] = None  # Max documents

IndexInfo

@dataclass
class IndexInfo:
    name: str
    num_documents: int
    num_embeddings: int
    num_partitions: int
    avg_doclen: float
    dimension: int
    has_metadata: bool
    metadata_count: Optional[int] = None
    max_documents: Optional[int] = None

SearchParams

@dataclass
class SearchParams:
    top_k: int = 10
    n_ivf_probe: int = 8
    n_full_scores: int = 4096
    centroid_score_threshold: Optional[float] = 0.4  # Default: 0.4, set to None to disable

SearchResult / QueryResult

@dataclass
class SearchResult:
    results: List[QueryResult]
    num_queries: int

@dataclass
class QueryResult:
    query_id: int
    document_ids: List[int]
    scores: List[float]
    metadata: Optional[List[Optional[Dict[str, Any]]]] = None

HealthResponse

@dataclass
class HealthResponse:
    status: str
    version: str
    loaded_indices: int
    index_dir: str
    memory_usage_bytes: int
    indices: List[IndexSummary]

RerankResponse / RerankResult

@dataclass
class RerankResponse:
    results: List[RerankResult]
    num_documents: int

@dataclass
class RerankResult:
    index: int
    score: float

MetadataResponse

@dataclass
class MetadataResponse:
    metadata: List[Dict[str, Any]]
    count: int

EncodeResponse

@dataclass
class EncodeResponse:
    embeddings: List[List[List[float]]]  # [batch, num_tokens, dim]
    num_texts: int

Async Client

The async client provides identical methods with await:

import asyncio
from next_plaid_client import AsyncNextPlaidClient, IndexConfig, SearchParams

async def main():
    async with AsyncNextPlaidClient("http://localhost:8080") as client:
        # Health check
        health = await client.health()
        print(f"Server status: {health.status}")

        # Create index
        await client.create_index("my_index", IndexConfig(nbits=4))

        # Add documents
        await client.add(
            "my_index",
            ["Paris is the capital of France."],
            metadata=[{"country": "France"}]
        )

        # Search
        results = await client.search(
            "my_index",
            ["What is the capital of France?"],
            params=SearchParams(top_k=5)
        )

        # Concurrent operations
        results = await asyncio.gather(
            client.search("index1", ["query1"]),
            client.search("index2", ["query2"]),
            client.search("index3", ["query3"]),
        )

asyncio.run(main())

Input Type Detection

The SDK automatically detects whether inputs are text or embeddings:

Documents

# Text input (first item is str) → uses /update_with_encoding
client.add("index", ["text 1", "text 2"])

# Embedding input (first item is dict with 'embeddings') → uses /update
client.add("index", [{"embeddings": [[0.1, 0.2]]}])

Queries

# Text queries (first item is str) → uses /search_with_encoding
client.search("index", ["query text"])

# Embedding queries (nested list) → uses /search
client.search("index", [[[0.1, 0.2], [0.3, 0.4]]])

Rerank

# Text (query is str) → uses /rerank_with_encoding
client.rerank(query="text", documents=["doc1", "doc2"])

# Embeddings (query is list) → uses /rerank
client.rerank(query=[[0.1, 0.2]], documents=[{"embeddings": [[...]]}])

Project Structure

next-plaid-api/python-sdk/
├── pyproject.toml                 # Package configuration
├── README.md                      # This file
├── next_plaid_client/
│   ├── __init__.py               # Public exports
│   ├── _base.py                  # Base client logic
│   ├── client.py                 # Synchronous client
│   ├── async_client.py           # Async client
│   ├── models.py                 # Data models
│   └── exceptions.py             # Exception classes
└── tests/
    └── test_*.py                 # Test files

Dependencies

Package Version Purpose
httpx >= 0.24.0 HTTP client (sync + async)

Development Dependencies

Package Version Purpose
pytest >= 7.0.0 Testing framework
pytest-cov >= 4.0.0 Coverage reporting
pytest-asyncio >= 0.21.0 Async test support

Version Compatibility

SDK Version API Version Python
0.4.0 0.4.0 >= 3.8

License

Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

next_plaid_client-1.1.1.tar.gz (26.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

next_plaid_client-1.1.1-py3-none-any.whl (21.6 kB view details)

Uploaded Python 3

File details

Details for the file next_plaid_client-1.1.1.tar.gz.

File metadata

  • Download URL: next_plaid_client-1.1.1.tar.gz
  • Upload date:
  • Size: 26.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for next_plaid_client-1.1.1.tar.gz
Algorithm Hash digest
SHA256 b36869f0f66edad3e91ff0ad52263ab64e2a35b900672d53c03ccfb8479599b7
MD5 d4350c932e1239aca0f79f0f56e2b47d
BLAKE2b-256 ed664b7b4296c757faf31196c5e77efdfc079016d5998aa657ceab624c5e1a35

See more details on using hashes here.

Provenance

The following attestation bundles were made for next_plaid_client-1.1.1.tar.gz:

Publisher: python-sdk-release.yml on lightonai/next-plaid

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file next_plaid_client-1.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for next_plaid_client-1.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 874fa2e44fede0249812d9970a5ee0632c0d19006f330dc3df82cf3964aac05e
MD5 b2e1ed6033136af287acd900e470fdd4
BLAKE2b-256 e110d4fe734b7eef42ec51b18d3ac5a0c60c223e422577c422b402b8acb29340

See more details on using hashes here.

Provenance

The following attestation bundles were made for next_plaid_client-1.1.1-py3-none-any.whl:

Publisher: python-sdk-release.yml on lightonai/next-plaid

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page