LLM Observability Instrumentation SDK
Project description
LLM Observability Platform: Core Python Infrastructure
This guide covers the technical architecture and end-user usage for the Python-based observability components.
Table of Contents
- 1. System Architecture
- 2. End-User Usage Guide
- Installation
- Auto-Instrumentation (Zero-Code Changes)
- Remote Management API (REST)
- Basic Usage: Decorators
- Advanced Usage: Context Manager
- Token Counting (Pre-Call Token Counting)
- Streaming Observability (TTFT & Token Tracking)
- PII & Injection Scan (Aho-Corasick Redaction)
- Deterministic Sampling Gate (Modulo 100)
- MiniLM Embedding (Concurrent & Sampled)
- Prometheus Metrics & Grafana Dashboard
- Updating Config Files (Model Prices, PII Patterns, Infra)
- Docker Deployment
- 3. Implementation Call Chain
1. System Architecture
High-Level Data Flow
This diagram illustrates the lifecycle of a span from application capture to background enrichment. The SDK now includes a REST Management API for remote control and discovery.
┌────────────────┐ ┌──────────────────┐ ┌───────────────────┐
│ User App │ capture │ instrumentation │ queue │ Cloudflare Queue │
│ (Python/JS) ├─────────>│ -sdk ├─────────>│ (span-enrichment) │
└────────────────┘ └─────────┬────────┘ └─────────┬─────────┘
│ │
│ REST API (8000) │ trigger
v v
┌────────────────┐ ┌──────────────────┐ ┌───────────────────┐
│ Analytics DB │ storage │ Remote Control │ response │ queue-embedding │
│ (ClickHouse) │<─────────┤ (Init/Detect) │<─────────┤ -worker │
└────────────────┘ └──────────────────┘ └─────────┬─────────┘
│
│ HTTP call
v
┌───────────────────┐
│ Cloudflare AI │
│ (Workers AI API) │
└───────────────────┘
Technical Sequence
The SDK integrates with OpenTelemetry (OTEL) for standardized telemetry collection.
sequenceDiagram
autonumber
participant App as Application
participant SDK as instrumentation-sdk (REST)
participant OTEL as OTEL Collector
participant CFQ as Cloudflare Queue
Note over App, SDK: Management Phase
App->>SDK: POST /instrumentation/detect
SDK-->>App: {provider: "openai", model: "gpt-4"}
App->>SDK: POST /instrumentation/init
Note over App, OTEL: Capture Phase
App->>SDK: @llm_observe()
SDK->>OTEL: Export Span (OTLP/gRPC)
SDK->>CFQ: push_to_queue(payload)
2. End-User Usage Guide
The instrumentation-sdk is designed to be developer-friendly, requiring minimal code changes to start capturing observability data.
Installation
pip install instrumentation-sdk
Auto-Instrumentation (Zero-Code Changes)
The fastest way to get observability is to use auto-instrumentation. This patches the underlying HTTP calls of popular LLM clients transparently.
from instrumentation_sdk import init_auto_instrumentation
# Initialize at the start of your application
init_auto_instrumentation()
# Now any call to OpenAI, Anthropic, LiteLLM, or LangChain is tracked automatically
import openai
client = openai.AsyncOpenAI()
response = await client.chat.completions.create(model="gpt-4o", messages=[...])
Supported Providers:
- OpenAI:
openai.AsyncOpenAI - Anthropic:
anthropic.AsyncAnthropic - LiteLLM:
litellm.acompletion - LangChain: Any model inheriting from
BaseChatModel(viaainvoke)
Remote Management API (REST)
The SDK provides a built-in FastAPI-based management layer for remote orchestration.
| Endpoint | Method | Description |
|---|---|---|
/instrumentation/init |
POST | Remotely initialize auto-instrumentation. |
/instrumentation/uninstrument |
POST | Disable all active instrumentation. |
/instrumentation/detect |
POST | Discovery: Detect provider/model from a sample request body. |
/instrumentation/test-call |
POST | Verification: Trigger a sample LLM call to verify end-to-end tracing. |
/streaming/test-stream-call |
POST | Verification: Trigger a mock streaming call to verify streaming/TTFT. |
/v1/sampling/should-sample |
POST | Verification: Check if a span should be sampled. |
/v1/embeddings/embed |
POST | Verification: Generate MiniLM embeddings for a given text. |
Basic Usage: Decorators
Use the @llm_observe decorator to manually track functions.
from instrumentation_sdk import llm_observe
# (1) Decorate your LLM-calling functions
@llm_observe(service="payment-bot", endpoint="gpt-4o")
def get_llm_response(prompt: str):
# Your existing LLM logic here
# status, latency, and span_ids are captured automatically
return response
# (2) Support for Async functions
@llm_observe(service="search-agent", endpoint="claude-3")
async def get_async_response(prompt: str):
return await client.completions.create(...)
Advanced Usage: Context Manager
For callers who need to set metadata mid-call (e.g., after routing to a specific model or determining usage), use the llm_span context manager. It supports both synchronous and asynchronous usage.
from instrumentation_sdk import llm_span
async def my_handler(req):
# (1) Start a span with initial metadata
async with llm_span(model="gpt-4o", user_id=req.user_id) as span:
# (2) Perform your LLM call
response = await client.chat.completions.create(...)
# (3) Update metadata mid-call
span.set_metadata("actual_model", response.model)
span.set_metadata("prompt_tokens", response.usage.prompt_tokens)
# Span is automatically reported on exit (even if an error occurs)
Manual Reporting
If you prefer direct control over the span data, you can use the reporter manually.
from instrumentation_sdk import get_reporter
reporter = get_reporter()
reporter.report({
"span_id": "unique-id",
"service_name": "my-service",
"status": "success",
"text": "The prompt content to be enriched"
})
Docker Deployment
The instrumentation SDK API is available as a production-ready, fully self-contained All-in-One Standalone Observability Container. This container bundles the FastAPI application, Grafana, and Tempo into a single image, eliminating the need to set up external databases or visualizers manually.
Image Name: chiefj/instrumentation-sdk-api:unstable (or chiefj/instrumentation-sdk-api:latest)
To pull and run the fully integrated all-in-one container locally:
# Pull the latest standalone image
docker pull chiefj/instrumentation-sdk-api:unstable
# Run the unified all-in-one telemetry stack
docker run -d \
-p 8002:8000 \
-p 3002:3000 \
--name instrumentation-api-allinone \
chiefj/instrumentation-sdk-api:unstable
Once running:
- API Endpoints: Accessible at
http://localhost:8002 - Grafana Portal: Accessible at
http://localhost:3002(Tempo is automatically provisioned as a read-only datasource and ready to query!)
For development with hot-reloading, use the provided Docker Compose:
docker compose -f packages/python/instrumentation-sdk/deploy/docker/docker-compose.dev.yaml up instrumentation-api
Token Counting (Pre-Call Token Counting)
The SDK provides automatic pre-call token counting utilizing tiktoken with fallback character-based heuristics for non-OpenAI models. It supports plain text strings, complex chat message list schemas, and OpenAI's tile-based vision token calculation.
Direct Token Counting
Use count_tokens to calculate tokens directly:
from instrumentation_sdk import count_tokens
tokens, method = count_tokens("hello world", "gpt-4")
Context Manager with Automated Token Tracking
Use llm_span_with_tokens to automatically record prompt_tokens and token_count_method inside manual spans:
from instrumentation_sdk import llm_span_with_tokens
async def handle_request(req):
async with llm_span_with_tokens(model="gpt-4", provider="openai", prompt="hello world") as span:
pass
REST Management API (REST)
The /v1/token-counting/count REST API endpoint supports counting prompt tokens:
curl -X POST http://localhost:8000/v1/token-counting/count \
-H "Content-Type: application/json" \
-d '{"prompt": "hello world", "model": "gpt-4"}'
Streaming Observability (TTFT & Token Tracking)
The SDK provides specialized utilities for tracking streaming LLM calls. It wraps generators/iterators to:
- Capture the Time-to-First-Token (TTFT) latency when the first chunk is yielded.
- Accumulate the streamed chunks and automatically compute the completion token count (using Tiktoken/heuristics) upon stream completion or cancellation.
- Finalize and report the manual span only when the stream is exhausted, closed, or encounters an exception.
Basic Streaming Usage
Use llm_streaming_span, wrap_stream (for synchronous generators), and wrap_async_stream (for asynchronous generators):
from instrumentation_sdk import llm_streaming_span, wrap_stream, wrap_async_stream
# 1. Synchronous Streaming
with llm_streaming_span(model="gpt-4", provider="openai", prompt="Say hello") as span_ctx:
raw_generator = ["Hello", " world", "!"]
wrapped_stream = wrap_stream(raw_generator, span_context=span_ctx, model="gpt-4")
for chunk in wrapped_stream:
print(chunk)
# 2. Asynchronous Streaming
async with llm_streaming_span(model="gpt-4", provider="openai", prompt="Say hello") as span_ctx:
async def async_generator():
yield "Hello"
yield " world"
wrapped_stream = wrap_async_stream(async_generator(), span_context=span_ctx, model="gpt-4")
async for chunk in wrapped_stream:
print(chunk)
Mid-Stream Updates & Abort Resilience
- You can dynamically update span metadata using
span_ctx.set_metadata("custom_field", "value")mid-stream. - If the stream is closed early (via
wrapped_stream.close()or.aclose()), the SDK captures and reports all completion tokens generated up to that point.
REST Verification Endpoint
The /v1/streaming/test-stream-call endpoint streams SSE events back to the client while validating end-to-end streaming tracing:
curl -X POST http://localhost:8000/v1/streaming/test-stream-call \
-H "Content-Type: application/json" \
-d '{"provider": "openai", "chunks": ["A", "B", "C"]}'
PII & Injection Scan (Aho-Corasick Redaction)
The SDK features an inline Aho-Corasick trie-based scanner that runs on all prompts inside manual span contexts (LLMSpanContext and LLMSpanWithTokensContext). It intercepts prompts, detects PII and SQL/prompt injection, and updates telemetry accordingly.
Redaction & Interception Behavior
- PII Detected: The prompt and downstream fields (like hashes and embeddings) are completely redacted (
Noneor empty). The custom span attributellm.pii_detectedis set toTrue. - Injection Detected: The prompt is preserved, but the custom span attribute
llm.injection_attemptis set toTrue. - Fail-Safe execution: Any exception raised inside the scanning engine is caught internally, allowing client code or FastAPI handler to execute without crashes.
Programmatic Scan Usage
You can import and call scan_prompt directly to inspect a prompt:
from instrumentation_sdk import scan_prompt
# Returns (pii_detected: bool, injection_attempt: bool)
pii, inj = scan_prompt("my email is test@example.com")
print(f"PII: {pii}, Injection: {inj}")
REST Scanning Endpoint
The /v1/pii-injection/scan REST API endpoint supports checking prompt contents:
curl -X POST http://localhost:8000/v1/pii-injection/scan \
-H "Content-Type: application/json" \
-d '{"prompt": "my email is user@example.com"}'
Deterministic Sampling Gate (Modulo 100)
The SDK implements deterministic sampling decided at span creation time. It hashes the span_id using SHA256 and evaluates whether the hash value modulo 100 is equal to 0.
- Sampled (
is_sampledisTrue): The span is processed normally, performing prompt hashing and embedding generation. - Unsampled (
is_sampledisFalse): The span drops/skips both the SHA256 hashing and the MiniLM embedding generation, saving computational resources.
Programmatic Usage
You can query the sampling logic directly:
from instrumentation_sdk import should_sample
sampled = should_sample("test-span-id")
REST Endpoint
Query the /v1/sampling/should-sample endpoint to check sampling:
curl -X POST http://localhost:8000/v1/sampling/should-sample \
-H "Content-Type: application/json" \
-d '{"span_id": "test-span-id"}'
MiniLM Embedding (Concurrent & Sampled)
The SDK asynchronously calls the embedding-worker HTTP endpoint (POST /embed) to generate a 384-dimensional vector embedding of the prompt text.
- Concurrent Execution: To prevent blocking client requests, the SDK uses
asyncio.create_task()to fire the embedding generation concurrently with span finalization. - Conditionality: The embedding is only generated if the span is sampled (
is_sampledisTrue) and no PII is detected in the prompt (pii_detectedisFalse). - Timeout and Resilience: The embedding HTTP request has a timeout of 500ms. If the request times out or fails, the SDK falls back to
Nonefor the embedding field while the rest of the span details are still successfully emitted.
Programmatic Usage
from instrumentation_sdk import get_embedding
embedding = await get_embedding("your text here")
REST Endpoint
curl -X POST http://localhost:8000/v1/embeddings/embed \
-H "Content-Type: application/json" \
-d '{"text": "your text here"}'
Prometheus Metrics & Grafana Dashboard
The SDK integrates a Prometheus metrics collection pipeline to track operational metrics for LLM calls (latency, TTFT, token usage, cost, and security violations).
Configuration & Initialization
Initialize the Prometheus metrics scraping endpoint:
curl -X POST http://localhost:8000/v1/metrics/init \
-H "Content-Type: application/json" \
-d '{"port": 9464}'
Metrics Endpoints
- Initialize Pipeline:
POST /v1/metrics/init - Health Check:
GET /v1/metrics/health - Record Single Span Metrics:
POST /v1/metrics/record - Record Batch Spans Metrics:
POST /v1/metrics/record-batch
Grafana Dashboard
The dashboard is built-in and automatically provisioned on port 3000 (or 3002 in standalone mode). It includes:
- LLM Latency & TTFT: Histogram distribution of request latency and time-to-first-token.
- Token Usage: Track prompt and completion tokens.
- Cost Analysis: Live cost calculation in micro-USD.
- Security Scans: Record rates of PII exposure and prompt injections.
Updating Config Files (Model Prices, PII Patterns, Infra)
The SDK reads config files once at startup. After any change, a container restart is required (except dashboard JSON files which are hot-reloaded).
Adding or updating a model price
Edit config/model_prices.yaml:
- model: gpt-5
provider: openai
input_price_per_1m: 10.00
output_price_per_1m: 30.00
version: "2026-01-01"
Required fields: model, provider, input_price_per_1m, output_price_per_1m, version.
Prices must be >= 0. Duplicate (model, provider) pairs are rejected by CI.
Then restart:
docker restart instrumentation-sdk-api
Adding or updating a PII / Injection pattern
Edit config/patterns.yaml:
patterns:
- name: phone_number
regex: "\\b\\d{3}[-.]?\\d{3}[-.]?\\d{4}\\b"
type: PII_STRUCTURAL # or INJECTION_ATTEMPT
Valid type values: PII_STRUCTURAL, INJECTION_ATTEMPT.
The CI validates that every regex compiles and no pattern name is duplicated.
Then restart:
docker restart instrumentation-sdk-api
Updating a Grafana dashboard
Edit any file under build/dashboards/*.json.
No restart required — Grafana polls and hot-reloads dashboards every 30 seconds automatically.
Updating infra configs (Grafana datasource, Prometheus, Tempo)
Edit:
build/grafana-datasource.yaml— add/change datasourcesbuild/grafana-dashboard-provider.yaml— change dashboard provider path/folderbuild/prometheus.yml— add scrape targetsbuild/tempo-config.yaml— change Tempo storage or OTLP port
After editing, rebuild the image and restart:
DOCKER_PAT=<your-pat> ./scripts/deploy_docker.sh
docker stop instrumentation-sdk-api && docker rm instrumentation-sdk-api
docker pull chiefj/instrumentation-sdk-api:latest
docker run -d -p 8000:8000 -p 3000:3000 -p 4317:4317 -p 9464:9464 \
--name instrumentation-sdk-api chiefj/instrumentation-sdk-api:latest
CI — what runs automatically
The grafana-config-validate.yml workflow triggers only when one of the 10 watched files changes. It validates:
| File | What is checked |
|---|---|
grafana-datasource.yaml |
YAML valid, name/type/url present, Prometheus datasource exists |
grafana-dashboard-provider.yaml |
YAML valid, options.path present |
prometheus.yml |
YAML valid, scrape_configs non-empty |
tempo-config.yaml |
server.http_listen_port, distributor.receivers.otlp, storage.trace.backend |
dashboards/*.json |
JSON valid, title/panels/schemaVersion present, no duplicate UIDs |
model_prices.yaml |
List non-empty, all required fields, prices >= 0, no duplicate pairs |
patterns.yaml |
All required fields, valid type, no duplicate names, regex compiles |
Running the load test locally
cd packages/python/instrumentation-sdk
.venv/bin/python -m pytest tests/performance/ -m performance -v
This sends 1000 spans (100 individual + 10×50 batch) covering all 6 model/provider combos, error ratios, PII flags, and high token counts.
3. Implementation Call Chain
| Pipeline Stage | Method Call | Primary File |
|---|---|---|
| REST API | create_app() |
api/rest/v1/app.py |
| Management | init_instrumentation() |
api/rest/v1/handlers/instrumentation.py |
| Tracing | instrument_app() |
infra/tracing/middleware.py |
| Auto-Capture | init_auto_instrumentation() |
features/auto_instrumentation/index.py |
| Decorator | @llm_observe |
features/spans/decorator.py |
| Context Manager | llm_span() |
features/manual_instrumentation/service.py |
| Orchestration | handle_job() |
worker/index.py |
| Logic | enrich_span() |
features/enrich_span/service.py |
| Integration | create_embedding() |
infra/clients/cloudflare_embeddings.py |
| Identity | stable_embedding_key() |
shared/utils/hash.py |
| Token Counting | count_tokens() |
features/token_counting/service.py |
| Streaming SDK | wrap_async_stream() |
features/streaming/index.py |
| Streaming Logic | finalize_stream() |
features/streaming/service.py |
| PII & Injection Scan | scan_prompt() |
features/pii_injection_scan/index.py |
| Deterministic Sampling | should_sample() |
features/deterministic_sampling/index.py |
| MiniLM Embedding | get_embedding() |
features/minilm_embedding/index.py |
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file instrumentation_sdk-1.7.1.tar.gz.
File metadata
- Download URL: instrumentation_sdk-1.7.1.tar.gz
- Upload date:
- Size: 48.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
57760b7a88aea8d1c4d0f25b5e3c3e6661af0c381fb83ec9ad557e62113c9135
|
|
| MD5 |
023fc02105b3c60d6bdd72a3e14a4988
|
|
| BLAKE2b-256 |
39ed5f19fa311898763401542305fac51770d36c18f1f1197d0e09f8fba85404
|
Provenance
The following attestation bundles were made for instrumentation_sdk-1.7.1.tar.gz:
Publisher:
publish-instrumentation-sdk.yml on Chief-Strategist-J/llm-observability-platform
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
instrumentation_sdk-1.7.1.tar.gz -
Subject digest:
57760b7a88aea8d1c4d0f25b5e3c3e6661af0c381fb83ec9ad557e62113c9135 - Sigstore transparency entry: 1590534372
- Sigstore integration time:
-
Permalink:
Chief-Strategist-J/llm-observability-platform@66bc6279750221c1e8d84581741775eb0e4b40a4 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/Chief-Strategist-J
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-instrumentation-sdk.yml@66bc6279750221c1e8d84581741775eb0e4b40a4 -
Trigger Event:
push
-
Statement type:
File details
Details for the file instrumentation_sdk-1.7.1-py3-none-any.whl.
File metadata
- Download URL: instrumentation_sdk-1.7.1-py3-none-any.whl
- Upload date:
- Size: 64.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
efe7bf41c0c0cf0d031c37ea303a074b7f2f38f5d8170a22aaf45d6619aededd
|
|
| MD5 |
a2d23d5465a9d6cf10d675e6d7affcbd
|
|
| BLAKE2b-256 |
890555a3b0b8c654ced6875322c4a8ab641342c47d40b1d9705c4e59c62a07c7
|
Provenance
The following attestation bundles were made for instrumentation_sdk-1.7.1-py3-none-any.whl:
Publisher:
publish-instrumentation-sdk.yml on Chief-Strategist-J/llm-observability-platform
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
instrumentation_sdk-1.7.1-py3-none-any.whl -
Subject digest:
efe7bf41c0c0cf0d031c37ea303a074b7f2f38f5d8170a22aaf45d6619aededd - Sigstore transparency entry: 1590534507
- Sigstore integration time:
-
Permalink:
Chief-Strategist-J/llm-observability-platform@66bc6279750221c1e8d84581741775eb0e4b40a4 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/Chief-Strategist-J
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-instrumentation-sdk.yml@66bc6279750221c1e8d84581741775eb0e4b40a4 -
Trigger Event:
push
-
Statement type: