Python SDK for SPOT platform - API contracts, models, and utilities
Project description
SPOT Contracts Python SDK
Python SDK generated from SPOT Platform OpenAPI specifications.
Installation
pip install spot-sdk-python
Usage
Basic Usage
from spot_sdk.analyzer import Email, EmailHeader, AnalysisResult
from spot_sdk.api_gateway import AnalysisRequest
# Create email header
header = EmailHeader(
message_id="msg-001",
subject="Test Email",
sender="sender@example.com",
recipients=["recipient@example.com"],
date="2024-01-01T12:00:00Z"
)
# Create email object
email = Email(
id="12345",
headers=header,
body_text="This is the email content"
)
# Use with API Gateway
request = AnalysisRequest(email=email.dict())
Pydantic Models
All models are Pydantic v2 BaseModel instances with:
- Type validation - Automatic validation of field types
- Serialization -
.dict()and.json()methods - Field descriptions - Documentation from OpenAPI specs
- IDE support - Full type hints and autocompletion
Example: Email Analysis
import httpx
from spot_sdk.analyzer import Email
from spot_sdk.api_gateway import AnalysisRequest
async def analyze_email(email: Email) -> dict:
async with httpx.AsyncClient() as client:
request = AnalysisRequest(email=email.dict())
response = await client.post(
"http://localhost:8001/api/v1/analyze",
json=request.dict()
)
return response.json()
# Usage
email = Email(
id="sample-123",
headers={
"message_id": "msg-sample",
"subject": "Urgent: Action Required",
"sender": "suspicious@example.com",
"recipients": ["target@company.com"],
"date": "2024-01-15T10:30:00Z"
},
body_text="Please click this link immediately..."
)
result = await analyze_email(email)
print(f"Phishing detected: {result['is_phishing']}")
Accessing Previous Stage Results (analysis_context)
The orchestrator automatically populates Email.analysis_context with the results of all previously-completed stages before calling each analyzer. No workflow configuration is required -- analyzers can read whatever they need.
Structure:
email.analysis_context = {
"<stage-name>": {
"providers": {
"<provider-id>": { ...free-form data... }
},
"analyzers": {
"<analyzer-id>": { ...AnalyzerResult fields... }
}
},
...
}
- Top-level keys are stage names
- Each stage has a
providersdict (populated by Context Providers) and ananalyzersdict - Analyzer values expose all
AnalyzerResultfields (is_phishing,confidence,threat_level,indicators,analyzer_details, ...) - Only stages that have completed before the current analyzer runs are present
Recommended: use the email.ctx helper
The SDK provides an AnalysisContextReader (via the Email.ctx property) with ergonomic accessors so you don't have to write manual dict traversal:
@app.post("/internal/analyze")
async def analyze_email(email: Email) -> AnalysisResult:
ctx = email.ctx # AnalysisContextReader
# Get a specific analyzer from any stage (first match)
nlp = ctx.analyzer("analyzer-nlp")
if nlp:
confidence = nlp["confidence"]
# Get a specific analyzer from a specific stage
ml = ctx.analyzer("analyzer-ml", stage="parallel-analysis")
# Get a provider's data
dept = ctx.provider("employee-dir")
if dept:
role = dept.get("role")
# Iterate all analyzers in a stage
for aid, result in ctx.analyzers_in("parallel-analysis").items():
print(aid, result["confidence"])
# Listing helpers
ctx.stages() # ['enrichment', 'parallel-analysis']
ctx.analyzer_ids() # ['analyzer-nlp', 'analyzer-ml']
ctx.provider_ids() # ['employee-dir', 'threat-feed']
ctx.analyzer_ids_in("parallel-analysis") # stage-scoped
ctx.provider_ids_in("enrichment")
ctx.has_stage("parallel-analysis") # True/False
# Bulk accessors: {stage: {id: data, ...}, ...}
ctx.all_analyzers()
ctx.all_providers()
...
All single accessors (analyzer(), provider()) return None when not found. Bulk accessors return empty dicts. Nothing raises -- missing stages or IDs are treated as normal.
Raw access (if you prefer)
You can also read email.analysis_context directly as a plain dict:
prev = email.analysis_context.get("parallel-analysis", {})
nlp = prev.get("analyzers", {}).get("analyzer-nlp")
Analyzers that don't need previous results can safely ignore this field -- it defaults to an empty dict.
Plugins
SPOT's umbrella vocabulary for pluggable components is plugin.
PluginKind discriminates between the two kinds today:
from spot_sdk import PluginKind
PluginKind.ANALYZER # analyzer kind
PluginKind.CONTEXT_PROVIDER # context provider kind
- Analyzers run
POST /internal/analyzeand return anAnalysisResult(a phishing verdict contributing to aggregation). - Context providers run
POST /internal/enrichand return anEnrichmentResultwhose data populatesanalysis_contextfor downstream analyzers.
Plugins are discovered from OCI image labels prefixed spot.plugin.*
(see the platform documentation for the label contract) and installed
into the platform via the /api/v1/plugins/* and
/api/v1/config/plugin/{kind} APIs.
Context Providers
Context providers enrich an email with organizational data (employee directory, sender history, threat intelligence, knowledge bases, ...) before analyzers run. They are defined per workflow stage and run in parallel, just like analyzers.
Each provider exposes a single HTTP endpoint:
POST /internal/enrich
Content-Type: application/json
Input: Email (same model analyzers receive)
Output: EnrichmentResult
Implementing a provider
from fastapi import FastAPI
from spot_sdk import Email, EnrichmentResult
app = FastAPI()
@app.post("/internal/enrich")
async def enrich(email: Email) -> EnrichmentResult:
sender = email.headers.sender
return EnrichmentResult(
provider_id="employee-dir",
data={
"sender_known": True,
"sender_department": "Finance",
"sender_role": "CFO",
},
source="company-ldap",
confidence=0.95,
)
The orchestrator merges each provider's data dict into
email.analysis_context["<stage-name>"]["providers"]["<provider-id>"],
where downstream analyzers can read it via email.ctx.provider(...).
Knowledge Store
The Knowledge Store is the platform's RAG layer: context providers deposit tagged documents, analyzers fetch them on demand. Neither side references the other ; they share only the tag vocabulary.
from spot_sdk import KnowledgeClient, KnowledgeDocument, KnowledgeTag, chunk_text
Ingestion (providers)
Upsert is idempotent on id. Use stable deterministic ids so re-syncs
replace rather than duplicate.
kb = KnowledgeClient(
url=os.environ["SPOT_KNOWLEDGE_URL"], # injected by installer
api_key=os.environ["SPOT_INTERNAL_API_KEY"], # idem
)
await kb.bulk_upsert([
KnowledgeDocument(
id=f"employee:{e.email}",
content=f"{e.name}, {e.title}, {e.department}",
tags=[KnowledgeTag.EMPLOYEE, *([KnowledgeTag.EXECUTIVE] if e.is_exec else [])],
metadata={"email": e.email, "title": e.title},
source="provider-employee-dir",
)
for e in employees
])
For long content use chunk_text(body, max_chars=2000, overlap=200)
and upsert each chunk with metadata["parent_id"] set to the source
doc id.
Consumption (analyzers)
kb = KnowledgeClient.for_analysis(email) # reads SPOT_KNOWLEDGE_URL +
# email.retrieval_limits
docs = await kb.fetch(tags="employee+executive", text=email.sender, top_k=3)
Tag-expression syntax: a, a+b (AND), a|b (OR), a+b|c
((a AND b) OR c). + binds tighter than |. Empty or None means
no tag filter.
Workflow-declared retrieval_limits (stage-level caps on top_k and
min_score) are enforced transparently by for_analysis() ; the
analyzer doesn't need to know about them.
Testing
from spot_sdk.testing.fake_knowledge_client import FakeKnowledgeClient
kb = FakeKnowledgeClient()
await kb.upsert(KnowledgeDocument(id="e:a", content="Alice", tags=["employee"]))
assert (await kb.fetch(tags="employee", text="Alice"))[0].id == "e:a"
Available Models
Analyzer Service (spot_sdk.analyzer)
Email- Email data structureEmailHeader- Email header fieldsAttachment- Email attachmentAnalysisResult- Analysis resultsAnalysisIndicator- Phishing indicatorsThreatLevel- Threat level enum
API Gateway (spot_sdk.api_gateway)
AnalysisRequest- Analysis requestAnalysisResponse- Analysis responseConfigRequest- Configuration requestConfigResponse- Configuration response
Workflow (spot_sdk.workflow)
Workflow,WorkflowStageAnalyzerConfig,ContextProviderConfigRetryConfig,FailureStrategy
Plugin vocabulary (spot_sdk.plugin)
PluginKind-ANALYZER|CONTEXT_PROVIDER
Enrichment (spot_sdk.enrichment)
EnrichmentResult- Returned by context providers from/internal/enrich
Requirements
- Python 3.11+
- pydantic >= 2.0.0
- httpx >= 0.25.0
- typing-extensions >= 4.8.0
Development
This SDK is automatically generated from OpenAPI specifications. Do not modify generated files directly - update the OpenAPI specs instead.
Version
Current version: 1.0.0
Generated from SPOT Contracts repository.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file spot_sdk_python-1.2.1.tar.gz.
File metadata
- Download URL: spot_sdk_python-1.2.1.tar.gz
- Upload date:
- Size: 34.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.4 CPython/3.11.15 Linux/6.1.0-44-amd64
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ee4a1e3f48b7af06c317437b3c699e8bf1612a2f7df1295076b0cba88740311b
|
|
| MD5 |
ac7b3f993b77c8314c9c404c3677dde3
|
|
| BLAKE2b-256 |
ead12e1c73ab225e47f66f0753ca904cacf1bbe0975dac07ac21036f250415d2
|
File details
Details for the file spot_sdk_python-1.2.1-py3-none-any.whl.
File metadata
- Download URL: spot_sdk_python-1.2.1-py3-none-any.whl
- Upload date:
- Size: 42.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.4 CPython/3.11.15 Linux/6.1.0-44-amd64
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8f03376e72f92e302fb7263d3cb3af8bc272ba085b4227fce37f35f0dd88f932
|
|
| MD5 |
e9fa6871298a5d30d0a5a90e79cedc8b
|
|
| BLAKE2b-256 |
d01390f1b4e0b687ff279162b20873eb5a06a71a1987568bc7782b5914833d39
|