Client library for SPT Neo RAG API
Project description
SPT Neo RAG Client
A Python client library for interacting with the [SPT Neo RAG API]( - Please update this link), a modern retrieval-augmented generation system.
This client provides both asynchronous (async/await) methods and corresponding synchronous wrappers for ease of use in different application contexts.
Installation
# Using pip
pip install spt-neo-rag-client
# Or using uv
uv pip install spt-neo-rag-client
Authentication
The client supports two authentication methods:
-
API Key (Recommended for service accounts/scripts):
from spt_neo_rag_client import NeoRagClient client = NeoRagClient( base_url="https://your-api-base-url.com", api_key="your-spt-api-key" ) # Ready to make authenticated requests
-
Username/Password (Suitable for user-facing applications):
import asyncio from spt_neo_rag_client import NeoRagClient async def main(): client = NeoRagClient(base_url="https://your-api-base-url.com") try: token_info = await client.login(username="user@example.com", password="your_password") print(f"Login successful! Token expires at: {token_info.expires_at}") # Client is now authenticated for subsequent requests # Example: Get current user user = await client.get_current_user() print(f"Authenticated as: {user.name}") # Remember to close the client when done await client.close() except Exception as e: print(f"Authentication failed: {e}") # Run the async function asyncio.run(main())
Synchronous Login:
from spt_neo_rag_client import NeoRagClient client = NeoRagClient(base_url="https://your-api-base-url.com") try: token_info = client.login_sync(username="user@example.com", password="your_password") print(f"Login successful! Token expires at: {token_info.expires_at}") user = client.get_current_user_sync() print(f"Authenticated as: {user.name}") except Exception as e: print(f"Authentication failed: {e}")
Usage
The client provides access to different API endpoint groups via attributes. Each group contains methods for interacting with specific resources.
client.admin: Admin operationsclient.api_keys: API key management for the current userclient.bm25: BM25 index operations for enhanced text searchclient.documents: Document managementclient.health: Health checksclient.knowledge_bases: Knowledge base managementclient.queries: Query executionclient.structured_schemas: Structured schema managementclient.tasks: Background task managementclient.teams: Team managementclient.users: User management (most require admin privileges)
Async and Sync Methods
All API interaction methods have both an asynchronous version (e.g., await client.knowledge_bases.list_knowledge_bases()) and a synchronous wrapper (e.g., client.knowledge_bases.list_knowledge_bases_sync()). Use the version that best fits your application's architecture.
Note: The examples below primarily use the async/await syntax. To use the synchronous version, simply call the corresponding method ending in _sync.
Examples
(Assume client is an initialized and authenticated NeoRagClient instance)
Knowledge Bases (client.knowledge_bases)
import asyncio
from uuid import UUID
from spt_neo_rag_client import NeoRagClient, KnowledgeBaseCreate, KnowledgeBaseUpdate, KnowledgeBaseConfigUpdate
# --- Replace with your actual client setup ---
async def get_authenticated_client() -> NeoRagClient:
# Placeholder: Initialize and authenticate your client here
client = NeoRagClient(base_url="...")
await client.login("...")
# or client = NeoRagClient(base_url="...", api_key="...")
return client
# ----------------------------------------------
async def kb_examples():
client = await get_authenticated_client()
try:
# List knowledge bases
kbs = await client.knowledge_bases.list_knowledge_bases(limit=10)
print(f"Found {len(kbs)} knowledge bases.")
# Create a new knowledge base
new_kb_data = KnowledgeBaseCreate(
name="My Async KB",
description="An example knowledge base",
kb_type="default", # Or other type if available
credentials=["team-alpha"], # Restrict access
config={"embedding_model": "text-embedding-ada-002"}
)
created_kb = await client.knowledge_bases.create_knowledge_base(new_kb_data)
print(f"Created KB: {created_kb.id} - {created_kb.name}")
kb_id = created_kb.id
# Get KB count
count_response = await client.knowledge_bases.count_knowledge_bases()
print(f"Total KBs: {count_response.count}")
# Get a specific KB
kb = await client.knowledge_bases.get_knowledge_base(kb_id)
print(f"Retrieved KB: {kb.name} with {kb.documents_count} documents")
# Update KB details
update_data = KnowledgeBaseUpdate(description="Updated description")
updated_kb = await client.knowledge_bases.update_knowledge_base(kb_id, update_data)
print(f"Updated KB description: {updated_kb.description}")
# Update KB config
config_update = KnowledgeBaseConfigUpdate(config={"chunk_size": 512})
configured_kb = await client.knowledge_bases.update_knowledge_base_config(kb_id, config_update)
print(f"Updated KB config: {configured_kb.config}")
# Get available embedding models
embedding_models = await client.knowledge_bases.get_embedding_models(provider="openai")
print(f"\nAvailable embedding models for OpenAI: {embedding_models}")
# Delete the KB
delete_status = await client.knowledge_bases.delete_knowledge_base(kb_id)
print(f"Delete status: {delete_status}")
finally:
await client.close()
# Run example
# asyncio.run(kb_examples())
BM25 Index Operations (client.bm25)
import asyncio
from uuid import UUID
from spt_neo_rag_client import NeoRagClient
# --- Replace with your actual client setup ---
async def get_authenticated_client() -> NeoRagClient:
# Placeholder: Initialize and authenticate your client here
client = NeoRagClient(base_url="...")
await client.login("...")
# or client = NeoRagClient(base_url="...", api_key="...")
return client
# ----------------------------------------------
# Assume kb_id is a valid UUID from an existing KB with documents
kb_id = "your-knowledge-base-id"
async def bm25_examples():
client = await get_authenticated_client()
try:
# Check if BM25 is available for the knowledge base
availability = await client.bm25.check_bm25_availability(kb_id)
print(f"BM25 availability: {availability}")
# Get current BM25 index status
try:
status = await client.bm25.get_bm25_index_status(kb_id)
print(f"BM25 index status: {status['status']}")
print(f"Document count: {status['document_count']}")
print(f"Vocabulary size: {status['vocabulary_size']}")
except Exception as e:
print(f"No existing BM25 index: {e}")
# Build/rebuild BM25 index with custom parameters
print("Building BM25 index...")
build_response = await client.bm25.build_bm25_index(
kb_id,
k1=1.2, # Term frequency saturation parameter
b=0.75 # Field length normalization parameter
)
print(f"Build task started: {build_response['task_id']}")
# Wait a moment and check status again
await asyncio.sleep(5)
status = await client.bm25.get_bm25_index_status(kb_id)
print(f"Updated status: {status['status']}")
# Delete BM25 index if needed
# delete_response = await client.bm25.delete_bm25_index(kb_id)
# print(f"Delete response: {delete_response}")
finally:
await client.close()
# Run example
# asyncio.run(bm25_examples())
Documents (client.documents)
import asyncio
import io
from uuid import UUID
from spt_neo_rag_client import NeoRagClient, DocumentUpdate
# --- Replace with your actual client setup ---
async def get_authenticated_client() -> NeoRagClient:
# Placeholder: Initialize and authenticate your client here
client = NeoRagClient(base_url="...")
await client.login("...")
return client
# ----------------------------------------------
# Assume kb_id is a valid UUID from an existing KB
kb_id = UUID("your-knowledge-base-id")
async def doc_examples():
client = await get_authenticated_client()
doc_id = None
try:
# Get available processors and chunking strategies
processors = await client.documents.get_processors()
strategies = await client.documents.get_chunking_strategies()
print(f"Available Processors: {processors}")
print(f"Available Strategies: {strategies}")
# Create a dummy file in memory
dummy_content = b"This is the content of my test document."
file_object = io.BytesIO(dummy_content)
file_name = "test_document.txt"
# Upload a document
print(f"Uploading {file_name}...")
uploaded_doc = await client.documents.upload_document(
file=file_object,
file_name=file_name,
name="My Test Document",
knowledge_base_id=kb_id,
description="A simple test document",
metadata={"category": "testing"},
credentials=["team-alpha"] # Optional access control
# Add structured extraction params if needed:
# extract_structured_content=True,
# structured_document_type=StructuredDocumentType.GENERIC
)
doc_id = uploaded_doc.id
print(f"Uploaded Document: {doc_id} - Status: {uploaded_doc.status}")
# List documents in the KB
docs = await client.documents.list_documents(knowledge_base_id=kb_id)
print(f"Found {len(docs)} documents in KB {kb_id}.")
# Get document count
count_response = await client.documents.count_documents(knowledge_base_id=kb_id)
print(f"Total documents in KB: {count_response.count}")
# Get the specific document
doc = await client.documents.get_document(doc_id)
print(f"Retrieved Document: {doc.name}, Status: {doc.status}")
# Get document content (raw bytes)
content = await client.documents.get_document_content(doc_id)
print(f"Retrieved content (first 20 bytes): {content[:20]}...")
# Update the document
update_data = DocumentUpdate(description="Updated document description")
updated_doc = await client.documents.update_document(doc_id, update_data)
print(f"Updated document description: {updated_doc.description}")
# Get document chunks (wait briefly for processing if just uploaded)
await asyncio.sleep(5) # Allow time for initial processing
chunks = await client.documents.get_document_chunks(doc_id, limit=5)
print(f"Found {len(chunks)} chunks for document {doc_id}.")
# Get chunk count
chunk_count_response = await client.documents.count_document_chunks(doc_id)
print(f"Total chunks for document: {chunk_count_response.count}")
# Get page images (if applicable, e.g., for PDFs)
# images = await client.documents.get_document_page_images(doc_id)
# print(f"Found {len(images)} page images.")
except Exception as e:
print(f"An error occurred: {e}")
finally:
# Clean up: Delete the document if it was created
if doc_id:
try:
print(f"Deleting document {doc_id}...")
delete_status = await client.documents.delete_document(doc_id)
print(f"Document delete status: {delete_status}")
except Exception as del_e:
print(f"Error deleting document {doc_id}: {del_e}")
await client.close()
# Run example
# asyncio.run(doc_examples())
Queries (client.queries)
import asyncio
from uuid import UUID
from spt_neo_rag_client import NeoRagClient
# --- Replace with your actual client setup ---
async def get_authenticated_client() -> NeoRagClient:
# Placeholder: Initialize and authenticate your client here
client = NeoRagClient(base_url="...")
await client.login("...")
return client
# ----------------------------------------------
# Assume kb_id is a valid UUID from an existing KB with relevant documents
kb_id = UUID("your-knowledge-base-id")
async def query_examples():
client = await get_authenticated_client()
try:
# Get available query strategies
strategies_response = await client.queries.get_query_strategies()
print(f"Available query strategies: {strategies_response.strategies}")
# Execute a standard query
query_text = "What is the main topic of the test document?"
response = await client.queries.query(
query=query_text,
knowledge_base_ids=[kb_id],
num_results=3,
credentials=["team-alpha"] # Must match document/KB credentials
)
print(f"\nQuery: {query_text}")
print(f"Answer: {response.result.answer}")
if response.result.sources:
print("Sources:")
for source in response.result.sources:
print(f" - Doc: {source.document_name}, Score: {source.relevance_score:.2f}")
# Stream a query response
print(f"\nStreaming Query: {query_text}")
full_streamed_answer = ""
async for chunk_data in client.queries.stream_query(
query=query_text,
knowledge_base_ids=[kb_id],
credentials=["team-alpha"]
):
chunk_text = chunk_data.get("chunk", "")
print(chunk_text, end="", flush=True)
full_streamed_answer += chunk_text
if chunk_data.get("done"):
print("\n--- Stream Finished ---")
# Final chunk might contain sources
if chunk_data.get("sources"):
print("Sources (from stream final chunk):")
for source in chunk_data["sources"]:
# Access source details like source["document_name"], etc.
print(f" - Doc ID: {source.get('document_id')}")
print(f"\nFull Streamed Answer: {full_streamed_answer}")
finally:
await client.close()
# Run example
# asyncio.run(query_examples())
API Keys (client.api_keys)
import asyncio
from spt_neo_rag_client import NeoRagClient, ApiKeyCreate, ApiKeyUpdate
# --- Replace with your actual client setup ---
async def get_authenticated_client() -> NeoRagClient:
# Placeholder: Initialize and authenticate your client here
client = NeoRagClient(base_url="...")
await client.login("...")
return client
# ----------------------------------------------
async def api_key_examples():
client = await get_authenticated_client()
new_key_id = None
try:
# Create a new API key
create_data = ApiKeyCreate(
name="My Script Key",
scopes="query,read", # Scopes control permissions
expires_in_days=90
)
key_response = await client.api_keys.create_api_key(create_data)
new_key_id = key_response.id
print(f"Created API Key: {key_response.name} (ID: {new_key_id})")
print(f"!!! IMPORTANT: Save the key value now: {key_response.api_key} !!!")
# List API keys
keys = await client.api_keys.list_api_keys()
print(f"\nFound {len(keys)} API keys for this user.")
for key in keys:
print(f" - ID: {key.id}, Name: {key.name}, Active: {key.is_active}")
# Get the created key
retrieved_key = await client.api_keys.get_api_key(new_key_id)
print(f"\nRetrieved Key: {retrieved_key.name}, Expires: {retrieved_key.expires_at}")
# Update the key (e.g., deactivate it)
update_data = ApiKeyUpdate(is_active=False)
updated_key = await client.api_keys.update_api_key(new_key_id, update_data)
print(f"\nUpdated Key: {updated_key.name}, Active: {updated_key.is_active}")
except Exception as e:
print(f"An error occurred: {e}")
finally:
# Clean up: Delete the created key
if new_key_id:
try:
print(f"\nDeleting API key {new_key_id}...")
delete_status = await client.api_keys.delete_api_key(new_key_id)
print(f"API key delete status: {delete_status}")
except Exception as del_e:
print(f"Error deleting API key {new_key_id}: {del_e}")
await client.close()
# Run example
# asyncio.run(api_key_examples())
Tasks (client.tasks)
import asyncio
from uuid import UUID
from spt_neo_rag_client import NeoRagClient
# --- Replace with your actual client setup ---
async def get_authenticated_client() -> NeoRagClient:
# Placeholder: Initialize and authenticate your client here
client = NeoRagClient(base_url="...")
await client.login("...")
return client
# ----------------------------------------------
# Assume doc_id is a valid UUID from an existing document
doc_id = UUID("your-document-id")
async def task_examples():
client = await get_authenticated_client()
try:
# List recent tasks
tasks = await client.tasks.list_tasks()
print(f"Found {len(tasks)} recent tasks.")
if tasks:
task_id_to_check = tasks[0].id
print(f"Checking status of task: {task_id_to_check}")
# Get a specific task status
task_status = await client.tasks.get_task(task_id_to_check)
print(f"Task {task_status.id} status: {task_status.status}, Progress: {task_status.progress * 100:.1f}%")
# Try to cancel the task (might fail if already completed)
try:
cancel_status = await client.tasks.delete_task(task_id_to_check)
print(f"Task cancellation request status: {cancel_status}")
except Exception as cancel_e:
print(f"Could not cancel task {task_id_to_check}: {cancel_e}")
# List all tasks
all_tasks = await client.tasks.list_tasks()
print(f"Found {len(all_tasks)} total tasks.")
# Get tasks related to a specific document
doc_tasks = await client.tasks.get_document_tasks(doc_id)
print(f"\nFound {len(doc_tasks)} tasks related to document {doc_id}.")
for task in doc_tasks:
print(f" - Task {task.id}: Type={task.task_type}, Status={task.status}")
finally:
await client.close()
# Run example
# asyncio.run(task_examples())
Teams (client.teams)
import asyncio
from uuid import UUID
from spt_neo_rag_client import NeoRagClient, TeamCreate, TeamUpdate
# --- Replace with your actual client setup ---
async def get_authenticated_client() -> NeoRagClient:
# Placeholder: Initialize and authenticate your client here
client = NeoRagClient(base_url="...")
await client.login("...")
return client
# ----------------------------------------------
# Assume user_id_to_add and kb_id_to_share are valid UUIDs
user_id_to_add = UUID("uuid-of-another-user")
kb_id_to_share = UUID("uuid-of-a-knowledge-base")
async def team_examples():
client = await get_authenticated_client()
team_id = None
try:
# Create a team
create_data = TeamCreate(name="Alpha Squad", description="The A Team")
team = await client.teams.create_team(create_data)
team_id = team.id
print(f"Created Team: {team.name} (ID: {team_id})")
# List teams
teams = await client.teams.list_teams()
print(f"\nFound {len(teams)} teams.")
# Get team details
detailed_team = await client.teams.get_team(team_id)
print(f"\nTeam '{detailed_team.name}' details:")
print(f" Members ({len(detailed_team.users)}): {', '.join([u.name for u in detailed_team.users])}")
print(f" Shared KBs ({len(detailed_team.knowledge_bases)}): {', '.join([kb.name for kb in detailed_team.knowledge_bases])}")
# Add a user to the team (requires owner permission)
try:
updated_team = await client.teams.add_user_to_team(team_id, user_id_to_add)
print(f"\nUser {user_id_to_add} added to team {team_id}.")
except Exception as add_e:
print(f"\nCould not add user {user_id_to_add}: {add_e}")
# Share a KB with the team (requires owner permission)
try:
share_status = await client.teams.share_kb_with_team(team_id, kb_id_to_share)
print(f"\nKB {kb_id_to_share} shared with team {team_id}: {share_status}")
except Exception as share_e:
print(f"\nCould not share KB {kb_id_to_share}: {share_e}")
# List shared KBs
shared_kbs = await client.teams.list_shared_kbs_for_team(team_id)
print(f"\nKBs shared with team {team_id}: {[kb.name for kb in shared_kbs]}")
except Exception as e:
print(f"An error occurred: {e}")
finally:
# Clean up: Delete the team if created
if team_id:
try:
print(f"\nDeleting team {team_id}...")
delete_status = await client.teams.delete_team(team_id)
print(f"Team delete status: {delete_status}")
except Exception as del_e:
print(f"Error deleting team {team_id}: {del_e}")
await client.close()
# Run example
# asyncio.run(team_examples())
Structured Schemas (client.structured_schemas)
import asyncio
from uuid import UUID
from spt_neo_rag_client import NeoRagClient, StructuredSchemaCreateRequest, StructuredSchemaUpdateRequest
# --- Replace with your actual client setup ---
async def get_authenticated_client() -> NeoRagClient:
# Placeholder: Initialize and authenticate your client here
client = NeoRagClient(base_url="...")
await client.login("...")
return client
# ----------------------------------------------
async def schema_examples():
client = await get_authenticated_client()
schema_id = None
try:
# Create a schema
schema_def = {
"type": "object",
"properties": {
"invoice_id": {"type": "string"},
"total_amount": {"type": "number"},
"due_date": {"type": "string", "format": "date"}
},
"required": ["invoice_id", "total_amount"]
}
create_req = StructuredSchemaCreateRequest(
name="InvoiceSchema",
description="Schema for invoice documents",
openapi_schema=schema_def
)
schema = await client.structured_schemas.create_schema(create_req)
schema_id = schema.id
print(f"Created Schema: {schema.name} (ID: {schema_id})")
# List schemas
schemas = await client.structured_schemas.list_schemas()
print(f"\nFound {len(schemas)} schemas.")
# Get the schema
retrieved_schema = await client.structured_schemas.get_schema(schema_id)
print(f"\nRetrieved schema: {retrieved_schema.name}")
# print(f"Definition: {retrieved_schema.schema_definition}")
# Update the schema
update_req = StructuredSchemaUpdateRequest(description="Updated schema for invoices")
updated_schema = await client.structured_schemas.update_schema(schema_id, update_req)
print(f"\nUpdated schema description: {updated_schema.description}")
except Exception as e:
print(f"An error occurred: {e}")
finally:
# Clean up: Delete the schema
if schema_id:
try:
print(f"\nDeleting schema {schema_id}...")
delete_status = await client.structured_schemas.delete_schema(schema_id)
print(f"Schema delete status: {delete_status}")
except Exception as del_e:
print(f"Error deleting schema {schema_id}: {del_e}")
await client.close()
# Run example
# asyncio.run(schema_examples())
Users (client.users - Requires Admin)
import asyncio
from uuid import UUID
from spt_neo_rag_client import NeoRagClient, UserCreate, UserUpdate
# --- Replace with your actual client setup (MUST be an ADMIN user) ---
async def get_admin_client() -> NeoRagClient:
# Placeholder: Initialize and authenticate your ADMIN client here
client = NeoRagClient(base_url="...")
await client.login("admin@example.com", "admin_password")
return client
# ----------------------------------------------------------------------
async def user_management_examples():
# WARNING: These operations modify users and require admin privileges.
client = await get_admin_client()
new_user_id = None
try:
# List users
users = await client.users.list_users(limit=5, is_active=True)
print(f"Found {len(users)} active users.")
for user in users:
print(f" - {user.name} ({user.email}), ID: {user.id}")
# Create a new user
create_data = UserCreate(
email="test.user@example.com",
password="a-strong-password",
name="Test User (Managed)"
)
new_user = await client.users.create_user(create_data)
new_user_id = new_user.id
print(f"\nCreated User: {new_user.name} (ID: {new_user_id})")
# Get the new user
retrieved_user = await client.users.get_user(new_user_id)
print(f"\nRetrieved User: {retrieved_user.name}, Active: {retrieved_user.is_active}")
# Update the user (e.g., deactivate)
update_data = UserUpdate(is_active=False)
updated_user = await client.users.update_user(new_user_id, update_data)
print(f"\nUpdated User: {updated_user.name}, Active: {updated_user.is_active}")
# Update current admin user's details (example - use with caution)
# current_admin = await client.get_current_user()
# admin_update = UserUpdate(name="Admin User Updated")
# updated_admin = await client.users.update_current_user(admin_update)
# print(f"\nUpdated current admin name: {updated_admin.name}")
except Exception as e:
print(f"An error occurred: {e}")
finally:
# Clean up: Delete the created user
if new_user_id:
try:
print(f"\nDeleting user {new_user_id}...")
delete_status = await client.users.delete_user(new_user_id)
print(f"User delete status: {delete_status}")
except Exception as del_e:
print(f"Error deleting user {new_user_id}: {del_e}")
await client.close()
# Run example (Use with caution - requires admin)
# asyncio.run(user_management_examples())
Admin (client.admin - Requires Admin)
import asyncio
from spt_neo_rag_client import NeoRagClient
# --- Replace with your actual client setup (MUST be an ADMIN user) ---
async def get_admin_client() -> NeoRagClient:
# Placeholder: Initialize and authenticate your ADMIN client here
client = NeoRagClient(base_url="...")
await client.login("admin@example.com", "admin_password")
return client
# ----------------------------------------------------------------------
async def admin_examples():
# WARNING: Requires admin privileges.
client = await get_admin_client()
try:
# Get detailed system health
health = await client.admin.get_health_check()
print("Admin Health Check:", health)
# Get system statistics for last 7 days
stats = await client.admin.get_system_statistics(days=7)
print("\nSystem Statistics (last 7 days):", stats)
# Get token usage for the last day
token_usage = await client.admin.get_token_usage(period="day")
print("\nToken Usage (last day):", token_usage)
# Get license status
license_info = await client.admin.get_license_status()
print("\nLicense Status:", license_info)
finally:
await client.close()
# Run example (Requires admin)
# asyncio.run(admin_examples())
Health (client.health)
import asyncio
from spt_neo_rag_client import NeoRagClient
# --- Replace with your actual client setup ---
async def get_authenticated_client() -> NeoRagClient:
# Placeholder: Initialize and authenticate your client here
client = NeoRagClient(base_url="...")
await client.login("...") # Or use API key
return client
# ----------------------------------------------
async def health_examples():
client = await get_authenticated_client()
try:
# Simple health check
simple_health = await client.health.get_health()
print("Simple Health:", simple_health)
# Detailed health check
detailed_health = await client.health.get_detailed_health()
print("\nDetailed Health:", detailed_health)
print(f" DB Status: {detailed_health.components.get('database', {}).get('status')}")
# Version check
version_info = await client.health.get_version()
print("\nVersion Info:", version_info)
# Database health check
db_health = await client.health.database_health()
print("\nDatabase Health:", db_health)
# Pool health check
pool_health = await client.health.pool_health()
print("\nPool Health:", pool_health)
finally:
await client.close()
# Run example
# asyncio.run(health_examples())
Access Control with Credentials
The Neo RAG API includes a credential-based access control system that allows you to restrict access to knowledge bases and documents.
How Credentials Work
- Default Access: By default, resources are created with
["ALL"]credentials, meaning they're accessible to all users who can authenticate to the API. - Restricted Access: You can specify custom credentials (e.g.,
["team1", "finance"]) when creating or updating a knowledge base or document to limit who can access it. - Inheritance: If a document doesn't have specific credentials set, it inherits the credentials from its parent knowledge base.
- Access Rules: A user making a query must provide at least one matching credential in their
credentialslist to access a knowledge base or document that has restricted credentials set. Documents/KBs with["ALL"]are always accessible if the user provides["ALL"]or any specific credential.
Setting Credentials
You can set credentials when creating or updating resources:
# Create a knowledge base with restricted access
kb = await client.knowledge_bases.create_knowledge_base(
KnowledgeBaseCreate(
name="Finance Knowledge Base",
description="Financial documents",
credentials=["finance", "executives"]
)
)
# Upload a document inheriting KB credentials (assuming kb_id = kb.id)
# await client.documents.upload_document(..., knowledge_base_id=kb_id, credentials=None)
# Upload a document with specific credentials
# await client.documents.upload_document(..., knowledge_base_id=kb_id, credentials=["finance-q2"])
# Update a document's access control
update_data = DocumentUpdate(credentials=["finance"])
doc = await client.documents.update_document(document_id="...", update_data=update_data)
Querying with Credentials
When querying, provide the user's credentials to access restricted resources:
# Query with specific credentials to access finance/exec documents
response = await client.queries.query(
knowledge_base_ids=[kb.id],
query="What are our Q2 financial results?",
credentials=["finance"] # User only needs one matching credential
)
# Querying with ["ALL"] will only access unrestricted resources
# or resources explicitly marked with ["ALL"]
response_all = await client.queries.query(
knowledge_base_ids=[kb.id],
query="General information",
credentials=["ALL"] # Might not see finance/exec documents
)
If the provided credentials don't grant access to any relevant documents or the knowledge base itself, the query might return no sources or raise an error depending on the API configuration.
Error Handling
The client raises specific exceptions for different error conditions:
NeoRagApiError: Base exception for API errors (like 4xx or 5xx responses). Containsstatus_codeanddetailattributes.ConfigurationError: For issues like missing authentication.NetworkError: For connection problems (subclass ofhttpx.RequestError).
from spt_neo_rag_client import NeoRagClient
from spt_neo_rag_client.exceptions import NeoRagApiError, ConfigurationError, NetworkError
from uuid import UUID
import asyncio
# --- Replace with your actual client setup ---
async def get_authenticated_client() -> NeoRagClient:
# Placeholder: Initialize and authenticate your client here
client = NeoRagClient(base_url="...")
await client.login("...")
return client
# ----------------------------------------------
async def error_handling_example():
client = await get_authenticated_client()
try:
# Example: Try to get a non-existent knowledge base
invalid_kb_id = UUID("00000000-0000-0000-0000-000000000000")
kb = await client.knowledge_bases.get_knowledge_base(invalid_kb_id)
except NeoRagApiError as e:
print(f"API Error: {e.status_code}")
print(f"Detail: {e.detail}")
# Example: Handle specific errors
if e.status_code == 404:
print("Resource not found.")
elif e.status_code == 403:
print("Permission denied.")
except NetworkError as e:
print(f"Network Error: Could not connect to the API. {e}")
except ConfigurationError as e:
print(f"Configuration Error: {e}")
except Exception as e:
# Catch any other unexpected errors
print(f"An unexpected error occurred: {e}")
finally:
await client.close()
# Run example
# asyncio.run(error_handling_example())
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file spt_neo_rag_client-0.2.1.tar.gz.
File metadata
- Download URL: spt_neo_rag_client-0.2.1.tar.gz
- Upload date:
- Size: 93.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3f8796636223731066ebdd3f9d1523e85de0f9790ab3bc62f9271973b16b296e
|
|
| MD5 |
c1c8b4cc44c2b4a05500c6f4c8415a4d
|
|
| BLAKE2b-256 |
40421466483ce6c8b994ce3e591423284df86f76e04ef76c2d2c20869ffefb0e
|
File details
Details for the file spt_neo_rag_client-0.2.1-py3-none-any.whl.
File metadata
- Download URL: spt_neo_rag_client-0.2.1-py3-none-any.whl
- Upload date:
- Size: 43.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
692656ff1901279e5eeea9566f0ced0e0d984fd7bac871610b019b4f75b91d92
|
|
| MD5 |
873ee5cdb50d8390fec28174aaabf101
|
|
| BLAKE2b-256 |
d3ad7345acf566d50f2d201b37e4fa25f7ea62421317077de3ea9ea61b8e4f02
|