A python package for observing traces of your LLM application.
Project description
ObserveLLM
A powerful observability library for AI/ML applications that provides comprehensive tracing and monitoring capabilities using Langfuse.
Installation
Install the package from PyPI using:
pip install observeLLM
Note: It is recommended to use the latest version for optimal performance.
Quick Start
1. Initialize Langfuse Client
First, initialize the Langfuse client at your application startup:
from observe_traces import LangfuseInitializer, request_context, trace_api_call
from observe_traces import llm_tracing, llm_streaming_tracing, embedding_tracing, vectordb_tracing, reranking_tracing, general_tracing
from observe_traces import ObservabilityService
# Initialize Langfuse client
LangfuseInitializer.initialize(
langfuse_public_key='your_langfuse_public_key',
langfuse_secret_key='your_langfuse_secret_key',
langfuse_host='your_host_url', # e.g., 'http://localhost:3000'
release='app_version', # e.g., '1.0.0'
environment='your_environment' # e.g., 'development', 'production'
)
# Optional: Close Langfuse client when shutting down
LangfuseInitializer.close()
2. FastAPI Middleware Setup
Add the unified middleware to your FastAPI application in main.py or your entry point:
from fastapi import FastAPI, Request
from observe_traces import unified_middleware, trace_api_call
app = FastAPI()
@app.middleware("http")
async def set_request_context_middleware(request: Request, call_next):
session_id = request.headers.get("X-Request-ID")
# Capture request body for trace input (optional)
body = None
if request.method in ["POST", "PUT", "PATCH"]:
try:
body = await request.json()
except:
# If body can't be parsed as JSON, you can capture it as text or skip
pass
metadata = {
"sessionId": session_id,
"environment": "development",
"serviceName": "observeLLM",
"apiEndpoint": request.url.path,
"user": request.headers.get("X-User-Email"),
**(body or {}),
}
# Optional: Define custom route names
route_mapping = {
"/api/chat": "Chat Generation",
"/api/embeddings": "Text Embedding",
"/api/rerank": "Document Reranking"
}
# Optional: Define tags for categorization and filtering
tag_mapping = {
"/api/chat": ["production", "llm", "chat"],
"/api/embeddings": ["production", "embedding", "search"],
"/api/rerank": ["production", "reranking", "search"],
"/api/health": ["monitoring", "health"]
}
# Optional: Include/exclude routes from tracing
include_routes = ["/api/chat", "/api/embeddings", "/api/rerank"]
exclude_routes = ["/health", "/metrics", "/docs"]
# Prepare input data for trace (can be request body, query params, or custom data)
trace_input = {
"method": request.method,
"path": request.url.path,
"query_params": dict(request.query_params),
"headers": dict(request.headers),
"body": body
}
return await unified_middleware(
request,
call_next,
metadata=metadata,
route_mapping=route_mapping,
tag_mapping=tag_mapping,
include_routes=include_routes,
exclude_routes=exclude_routes,
input=trace_input
)
New Input Capture Feature:
input: Captures input data for traces (can be any JSON object)- Useful for tracking request payloads, query parameters, headers, etc.
- Helps with debugging and understanding what data was provided to each trace
- Example:
{"method": "POST", "body": {"query": "Hello"}, "headers": {...}}
Enhanced Features:
tag_mapping: Maps route paths to lists of tags for categorization- Tags help organize and filter traces in the Langfuse UI
- Useful for grouping traces by environment, service type, or functionality
- Example:
{"/api/chat": ["production", "llm"], "/api/embeddings": ["production", "embedding"]}
Usage Methods
ObserveLLM provides two ways to use the tracing decorators:
Method 1: Direct Decorator Functions
Use the imported decorator functions directly:
from observe_traces import llm_tracing, embedding_tracing, vectordb_tracing, reranking_tracing
@llm_tracing(provider='openai')
async def my_llm_function():
# Your implementation
pass
Method 2: ObservabilityService Class
Create an ObservabilityService instance and use its methods:
from observe_traces import ObservabilityService
# Create service instance
observability_service = ObservabilityService()
# Use as decorator methods
@observability_service.llm_tracing(provider='openai')
async def my_llm_function():
# Your implementation
pass
Both methods provide identical functionality and can be used interchangeably.
Variable Mapping
All tracing decorators support an optional variable_mapping parameter that allows you to map the expected parameter names to your actual function parameter names. This is useful when your function parameters don't match the decorator's expected names.
Important: The mapping direction is expected_parameter_name: your_parameter_name
LLM Tracing Expected Parameters
For both llm_tracing and llm_streaming_tracing decorators, the following parameters are expected:
model_name(required) - The model name/identifier (fallback:model)system_prompt(optional) - System instructions for the modelchat_messages(required) - The conversation messages/user prompt (fallback:user_prompt)operation_name(optional) - Custom name for the operation (used in trace naming)max_tokens(optional) - Maximum tokens to generatetemperature(optional) - Sampling temperaturetools(optional) - Available tools/functions for the model
Additional decorator parameters:
metadata_config(optional) - List of metadata keys to include in traces. If None, includes all metadata.is_sdk(optional) - Boolean indicating SDK mode (True) vs standard mode (False, default). Supported for bothllm_tracingandllm_streaming_tracingdecorators.
Note: The decorator will automatically try fallback parameter names if the primary ones are not found. For example, if model_name is not provided, it will look for model. If chat_messages is not found, it will look for user_prompt.
from observe_traces import ObservabilityService
observability_service = ObservabilityService()
@observability_service.llm_tracing(
provider='openai',
variable_mapping={
'model_name': 'model', # Maps decorator's 'model_name' to your 'model' parameter
'chat_messages': 'user_prompt', # Maps decorator's 'chat_messages' to your 'user_prompt' parameter
'system_prompt': 'system_msg', # Maps decorator's 'system_prompt' to your 'system_msg' parameter
'operation_name': 'task_name', # Maps decorator's 'operation_name' to your 'task_name' parameter
'max_tokens': 'max_length', # Maps decorator's 'max_tokens' to your 'max_length' parameter
'temperature': 'temp', # Maps decorator's 'temperature' to your 'temp' parameter
'tools': 'available_tools' # Maps decorator's 'tools' to your 'available_tools' parameter
}
)
async def my_custom_llm_function(model, system_msg, user_prompt, task_name=None, max_length=None, temp=None, available_tools=None, **kwargs):
# Your implementation here
pass
# Example for streaming LLM with metadata filtering
@observability_service.llm_streaming_tracing(
provider='anthropic',
variable_mapping={
'model_name': 'model',
'chat_messages': 'messages',
'system_prompt': 'system',
'operation_name': 'stream_name',
'max_tokens': 'max_output_tokens',
'temperature': 'temp_setting',
'tools': 'function_tools'
},
metadata_config=['model', 'provider', 'timeTaken', 'totalCost'],
is_sdk=False
)
async def my_custom_streaming_function(model, system, messages, stream_name=None, max_output_tokens=None, temp_setting=None, function_tools=None, **kwargs):
# Your streaming implementation here with filtered metadata
pass
Note: If you don't provide variable mapping, your function parameters must match the expected parameter names exactly. For example:
@observability_service.llm_tracing(provider='openai')
async def standard_llm_function(model_name, system_prompt, chat_messages, operation_name=None, max_tokens=None, temperature=None, tools=None, **kwargs):
# Function parameters match expected names exactly
pass
Other Decorator Mappings
The mapping works for all decorator types:
- LLM Tracing: Maps
model_name,system_prompt,chat_messages,operation_name,max_tokens,temperature,tools - Embedding Tracing: Maps
model_name,inputs,texts, etc. - Vector DB Tracing: Maps
namespace,query,index_host,top_k, etc. - Reranking Tracing: Maps
model_name,query,documents,top_n, etc.
Tracing Decorators
ObserveLLM provides six powerful decorators and utility functions to enable comprehensive tracing for different AI/ML components:
1. LLM Tracing
from observe_traces import llm_tracing
# OR
from observe_traces import ObservabilityService
observability_service = ObservabilityService()
@llm_tracing(provider='openai') # Direct function
# OR
@observability_service.llm_tracing(provider='openai') # Service method
async def llm_api_calling_function(
model_name: str, # Required: e.g., 'gpt-3.5-turbo'
system_prompt: str, # Optional: System instructions
chat_messages: list, # Required: Conversation history
operation_name: str = None, # Optional: Custom operation name for tracing
max_tokens: int = None, # Optional: Maximum tokens to generate
temperature: float = None, # Optional: Sampling temperature
tools: list = None, # Optional: Available tools/functions
**kwargs # Additional parameters
):
# Your LLM API calling logic here
# Returns either:
# 1. Tuple of (response_data, raw_response)
# 2. Raw response object
# Example with metadata filtering
@llm_tracing(
provider='openai',
metadata_config=['maxTokens', 'temperature', 'totalCost'] # Only include specific metadata
)
async def cost_focused_llm_function(model_name, chat_messages, **kwargs):
# Only 'maxTokens', 'temperature', and 'totalCost' will be included in trace metadata
pass
# SDK Mode Support (OpenAI and Anthropic)
@llm_tracing(provider='openai', is_sdk=True) # SDK mode for OpenAI
async def openai_sdk_function(model_name, chat_messages, **kwargs):
# Use OpenAI SDK directly - returns complete SDK response object
from openai import AsyncOpenAI
client = AsyncOpenAI()
response = await client.chat.completions.create(
model=model_name,
messages=chat_messages,
tools=kwargs.get('tools', [])
)
return response # Return SDK response object directly
@llm_tracing(provider='anthropic', is_sdk=True) # SDK mode for Anthropic
async def anthropic_sdk_function(model_name, chat_messages, **kwargs):
# Use Anthropic SDK directly - returns complete SDK response object
from anthropic import AsyncAnthropic
client = AsyncAnthropic()
response = await client.messages.create(
model=model_name,
messages=chat_messages,
tools=kwargs.get('tools', [])
)
return response # Return SDK response object directly
@llm_tracing(provider='openai', is_sdk=False) # Standard mode (default)
async def openai_standard_function(model_name, chat_messages, **kwargs):
# Use raw HTTP requests - returns tuple (response_text, raw_json_response)
import httpx
async with httpx.AsyncClient() as client:
response = await client.post(
"https://api.openai.com/v1/chat/completions",
json={"model": model_name, "messages": chat_messages},
headers={"Authorization": f"Bearer {api_key}"}
)
raw_json = response.json()
text = raw_json["choices"][0]["message"]["content"]
return text, raw_json # Return tuple
is_sdk Parameter for LLM Tracing:
The is_sdk parameter determines how the decorator processes function returns:
is_sdk=False(default): Standard mode - expects raw HTTP API responses, typically returned as tuple(response_text, raw_json_response)is_sdk=True: SDK mode - expects complete SDK response objects (e.g., OpenAIChatCompletionor AnthropicMessageobjects)
SDK Mode Benefits:
- Direct extraction of token usage, tool calls, and metadata from SDK objects
- Enhanced tool call support with comprehensive metadata
- Simplified integration with official SDKs
- Automatic handling of complex response structures
Supported LLM Providers:
- OpenAI (GPT-3.5, GPT-4, GPT-4o, etc.) - SDK mode supported
- Anthropic (Claude models) - SDK mode supported
- Groq
- Custom providers can be added using
register_provider()
2. LLM Streaming Tracing
from observe_traces import llm_streaming_tracing
# OR
from observe_traces import ObservabilityService
observability_service = ObservabilityService()
import json
@llm_streaming_tracing(provider='anthropic', is_sdk=False) # Direct function
# OR
@observability_service.llm_streaming_tracing(provider='anthropic', is_sdk=False) # Service method
async def llm_streaming_function(
model_name: str, # Required: e.g., 'claude-3-opus-20240229'
system_prompt: str, # Optional: System instructions
chat_messages: list, # Required: Conversation history
operation_name: str = None, # Optional: Custom operation name for tracing
max_tokens: int = None, # Optional: Maximum tokens to generate
temperature: float = None, # Optional: Sampling temperature
tools: list = None, # Optional: Available tools/functions
**kwargs # Additional parameters
):
# Your streaming LLM API calling logic here
# Should be an async generator that yields specific formatted lines:
# 1. For streaming response chunks:
# yield f"data: {json.dumps({'type': 'data', 'data': chunk_text})}"
# Example:
# yield 'data: {"type": "data", "data": "Hello"}'
# 2. For token usage information:
# yield f"tokens: {json.dumps({'data': {'input': input_tokens, 'output': output_tokens}})}"
# Example:
# yield 'tokens: {"data": {"input": 10, "output": 5}}'
# 3. Any other lines that should be passed through unchanged
# The decorator will:
# - Collect all response chunks to build the complete response
# - Track token usage throughout the stream
# - Calculate costs based on token usage
# - Create a trace in Langfuse with the complete response and metrics
# SDK Mode for Complete Response Objects
@llm_streaming_tracing(provider='anthropic', is_sdk=True)
async def llm_sdk_function(
model_name: str,
chat_messages: list,
**kwargs
):
# Your LLM SDK calling logic here that returns a complete response object
# Example SDK response structure:
# {
# "id": "msg_01...",
# "content": [
# {"type": "text", "text": "Response content"},
# {"type": "tool_use", "id": "toolu_01...", "name": "tool_name", "input": {...}}
# ],
# "usage": {"input_tokens": 10, "output_tokens": 20},
# "stop_reason": "end_turn"
# }
return complete_response_object
# Streaming with metadata filtering
@llm_streaming_tracing(
provider='anthropic',
is_sdk=False,
metadata_config=['provider', 'model', 'totalCost', 'hasToolCalls']
)
async def focused_streaming_function(model_name, chat_messages, **kwargs):
# Only specified metadata fields will be included in the trace
async for chunk in streaming_api_call():
yield chunk
is_sdk Parameter:
The is_sdk parameter determines how the decorator handles the function's return value:
is_sdk=False(default): Streaming mode - expects an async generator that yields formatted chunksis_sdk=True: SDK mode - expects an async generator that yields streaming data and special final message events
Streaming Mode (is_sdk=False):
- Function must be an async generator yielding chunks
- Processes streaming events in real-time
- Collects chunks to build complete response
- Parses token usage from streaming events
SDK Mode (is_sdk=True):
- Function must be an async generator that yields streaming data in real-time
- Yields streaming text chunks during the LLM response
- Yields a special
anthropic_final_messageevent at the end with complete response data - The decorator automatically detects this special event and extracts trace data from it
- Combines real-time streaming with comprehensive tracing
SDK Mode Implementation Example
import json
from anthropic import AsyncAnthropic
@llm_streaming_tracing(provider='anthropic', is_sdk=True)
async def sdk_streaming_function(model_name, chat_messages, system_prompt=None, **kwargs):
"""
SDK Mode function that yields streaming data and final message for tracing.
This function:
1. Streams text chunks in real-time using the Anthropic SDK
2. Yields a special anthropic_final_message event with complete response data
3. The decorator automatically processes this event for comprehensive tracing
"""
client = AsyncAnthropic(api_key="your-api-key")
try:
# Stream the response using Anthropic SDK
async with client.messages.stream(
model=model_name,
max_tokens=kwargs.get('max_tokens', 1024),
system=system_prompt,
messages=chat_messages,
temperature=kwargs.get('temperature', 0.7),
tools=kwargs.get('tools', []) # Include tools if provided
) as stream:
# Yield streaming text chunks in real-time
async for text in stream.text_stream:
yield f"data: {json.dumps({'type': 'text_chunk', 'text': text})}\n\n"
# Get the final message with complete response data
final_message = await stream.get_final_message()
# Convert to dict for the special event
final_message_dict = final_message.model_dump()
# Yield the special anthropic_final_message event
# The decorator will detect this and extract tracing data
yield f"data: {json.dumps({'type': 'anthropic_final_message', 'data': final_message_dict})}\n\n"
# Optional: yield completion event
yield f"data: {json.dumps({'type': 'stream_complete', 'message': 'SDK streaming completed'})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'error': str(e)})}\n\n"
# Usage in FastAPI endpoint
@app.post("/stream/sdk-mode")
async def stream_with_sdk_mode(request: StreamingRequest):
async def generate():
async for chunk in sdk_streaming_function(
model_name=request.model,
system_prompt=request.system_prompt,
chat_messages=[{"role": msg.role, "content": msg.content} for msg in request.messages],
max_tokens=request.max_tokens,
temperature=0.7,
tools=[{"name": "weather", "description": "Get weather info"}] # Optional tools
):
yield chunk
return StreamingResponse(
generate(),
media_type="text/plain",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}
)
Key Benefits of SDK Mode (is_sdk=True):
- Real-time streaming: Users see text appearing as it's generated
- Comprehensive tracing: Complete response data, token usage, costs, and tool calls are captured
- Tool call support: Handles complex responses with tool calls and multiple content blocks
- Error handling: Proper error propagation while maintaining streaming capability
- Automatic processing: The decorator handles trace creation from the final message event
Context Window Tool Tracking
The llm_streaming_tracing decorator supports two additional tool-related metadata features for generation observations:
currentStreamToolNames
Automatically added to every streaming generation's metadata. Contains a comma-separated string of tool names that the LLM is invoking in the current response stream.
Example metadata output when the LLM calls two tools:
{
"currentStreamToolNames": "searchTool,codeInterpreter"
}
If the LLM doesn't call any tools in the current stream, the value will be an empty string "".
trackContextWindowTools
An optional decorator parameter that lets you define tool names to look for in the input messages (context window). For each tool name, you specify a metadata key that will be set to true or false depending on whether that tool was invoked in the conversation history.
Structure:
track_context_window_tools = {
"<toolNameToFind>": "<metadataKeyToSet>",
}
- Key: The exact tool name as it appears in Anthropic
tool_usecontent blocks (the"name"field) - Value: The metadata key name to set as a boolean in the generation observation
Usage:
@llm_streaming_tracing(
provider="anthropic",
variable_mapping={
"model_name": "model",
"system_prompt": "system",
"chat_messages": "messages",
"operation_name": "operation_name",
"max_tokens": "max_tokens",
"temperature": "temperature",
"tools": "tools",
},
metadata_config=["model", "provider", "currentStreamToolNames", "hasSearchToolInContext", "hasCodeInterpreterInContext"],
is_sdk=True,
track_context_window_tools={
"searchTool": "hasSearchToolInContext",
"codeInterpreter": "hasCodeInterpreterInContext",
"webBrowser": "hasWebBrowserInContext",
}
)
async def my_streaming_function(self, **params):
# Your streaming implementation
pass
Resulting metadata on the generation observation:
{
"currentStreamToolNames": "searchTool",
"hasSearchToolInContext": true,
"hasCodeInterpreterInContext": false,
"hasWebBrowserInContext": true
}
How detection works (Anthropic): The decorator scans all assistant role messages in the input chat_messages list for tool_use content blocks. If a message contains {"type": "tool_use", "name": "searchTool", ...}, then hasSearchToolInContext is set to true. This detects tools that were actually invoked by the LLM in earlier conversation turns, not just tools that were available.
Exception safety: All context window tool tracking is wrapped inside the existing tracing error handlers. If anything goes wrong during detection, it's logged and the parent service function continues unaffected.
Metadata Configuration
All tracing decorators support an optional metadata_config parameter that allows you to control which metadata fields are included in your traces. This feature provides fine-grained control over trace payloads and helps focus on specific metrics.
Usage
# Include only specific metadata fields
@llm_tracing(provider='openai', metadata_config=['maxTokens', 'temperature', 'totalCost'])
async def focused_llm_function(model_name, chat_messages, **kwargs):
pass
# Include all metadata (default behavior)
@llm_tracing(provider='openai') # metadata_config=None
async def full_metadata_function(model_name, chat_messages, **kwargs):
pass
# Include no metadata
@llm_tracing(provider='openai', metadata_config=[])
async def minimal_metadata_function(model_name, chat_messages, **kwargs):
pass
Available Metadata Fields
LLM Tracing (llm_tracing and llm_streaming_tracing):
model- Model name/identifierprovider- LLM provider namemaxTokens- Maximum tokens to generatetemperature- Sampling temperaturetool- Available tools/functionstimeTaken- Response time in secondsinputTokens- Number of input tokensoutputTokens- Number of output tokensinputCost- Cost for input tokensoutputCost- Cost for output tokenstotalCost- Total cost for the requesthasToolCalls- Whether tool calls were madetoolCallCount- Number of tool callssdkMode- Whether SDK mode was used (streaming only)currentStreamHasToolCalls- Tool calls in current stream (streaming only)currentStreamToolCallCount- Tool call count in current stream (streaming only)currentStreamToolNames- Comma-separated tool names invoked in the current stream (streaming only)originalResponse- Full raw response from the LLM, use this field wisely- Dynamic boolean keys from
trackContextWindowToolsconfig (streaming only) and many more.....
Embedding Tracing (embedding_tracing):
provider- Embedding provider namemodel_name- Model name/identifierinput count- Number of input textscost- Total cost for embeddingstoken usage- Number of tokens usedprice- Detailed pricing informationembedding_dimensions- Dimensionality of embeddingstimestamp- Timestamp of the operation
Vector DB Tracing (vectordb_tracing):
operation_type- Type of operation (read/write)provider- Vector DB provider namecost- Operation costread_units- Number of read units consumedindex_host- Vector database hostnamespace- Vector database namespacetop_k- Number of results requested (read operations)upserted_vectors- Number of vectors upserted (write operations)
Reranking Tracing (reranking_tracing):
provider- Reranking provider namemodel_name- Model name/identifieroutput_count- Number of documents processedcost- Total cost for rerankingtoken usage- Number of tokens usedtimestamp- Timestamp of the operationtop_n- Number of top results requested
Common Use Cases
# Cost tracking focus
@llm_tracing(
provider='openai',
metadata_config=['inputCost', 'outputCost', 'totalCost', 'inputTokens', 'outputTokens']
)
async def cost_monitoring_function(model_name, chat_messages, **kwargs):
pass
# Performance tracking focus
@llm_tracing(
provider='anthropic',
metadata_config=['timeTaken', 'inputTokens', 'outputTokens', 'model']
)
async def performance_monitoring_function(model_name, chat_messages, **kwargs):
pass
# Tool usage tracking
@llm_streaming_tracing(
provider='anthropic',
metadata_config=['hasToolCalls', 'toolCallCount', 'currentStreamHasToolCalls', 'currentStreamToolNames', 'tool'],
track_context_window_tools={
'searchTool': 'hasSearchToolInContext',
'codeInterpreter': 'hasCodeInterpreterInContext',
}
)
async def tool_monitoring_function(model_name, chat_messages, tools, **kwargs):
pass
# Minimal metadata for compliance
@embedding_tracing(
provider='openai',
metadata_config=['provider', 'model_name']
)
async def compliance_focused_embedding(model_name, inputs, **kwargs):
pass
Benefits
- Reduced payload size: Include only necessary metadata to minimize trace size
- Focused monitoring: Track specific metrics relevant to your use case
- Performance optimization: Smaller payloads improve query and dashboard performance
- Compliance support: Exclude sensitive metadata fields when required
- Cost optimization: Reduce storage and bandwidth costs for traces
3. Embedding Tracing
from observe_traces import embedding_tracing
# OR
from observe_traces import ObservabilityService
observability_service = ObservabilityService()
@embedding_tracing(provider='openai') # Direct function
# OR
@observability_service.embedding_tracing(provider='openai') # Service method
async def embedding_generation_function(
model_name: str, # e.g., 'text-embedding-ada-002'
inputs: list, # List of texts to embed
**kwargs # Additional parameters
):
# Your embedding API calling logic here
# Returns either:
# 1. Tuple of (embeddings, raw_response)
# 2. Raw response object
Supported Embedding Providers:
- OpenAI
- Pinecone
- Cohere
- Jina
- VoyageAI
- Custom providers can be added using
register_embedding_provider()
4. Vector Database Tracing
from observe_traces import vectordb_tracing
# OR
from observe_traces import ObservabilityService
observability_service = ObservabilityService()
# For write operations
@vectordb_tracing(provider='pinecone', operation_type='write') # Direct function
# OR
@observability_service.vectordb_tracing(provider='pinecone', operation_type='write') # Service method
async def vectordb_write_function(
index_host: str,
vectors: list,
namespace: str
):
# Your vector DB write logic here
# Returns raw response object
# For read operations
@vectordb_tracing(provider='pinecone', operation_type='read') # Direct function
# OR
@observability_service.vectordb_tracing(provider='pinecone', operation_type='read') # Service method
async def vectordb_read_function(
index_host: str,
namespace: str,
top_k: int,
query: str,
query_vector_embeds: list,
query_sparse_embeds: dict = None,
include_metadata: bool = True,
filter_dict: dict = None
):
# Your vector DB read logic here
# Returns raw response object
Supported Vector DB Providers:
- Pinecone
- Custom providers can be added by extending the provider configurations
5. API Call Tracing
from observe_traces import trace_api_call
from fastapi import Request
@app.get("/some-endpoint")
async def example_endpoint(request: Request):
# Your API logic here
input_data = {"param1": "value1", "param2": "value2"}
# Perform some operation
result = some_function(input_data)
# Log the API call within the request trace
span_id = trace_api_call(
request=request,
name="Example API Call",
input_data=input_data,
output_data=result,
metadata={"additional_info": "some value"}
)
return result
This function allows you to create spans within existing traces to track API calls with:
- Complete input/output data
- Custom metadata
- Integration with the request tracing system
6. Reranking Tracing
from observe_traces import reranking_tracing
# OR
from observe_traces import ObservabilityService
observability_service = ObservabilityService()
@reranking_tracing(provider='cohere') # Direct function
# OR
@observability_service.reranking_tracing(provider='cohere') # Service method
async def reranking_function(
model_name: str,
query: str,
documents: list,
top_n: int,
**kwargs
):
# Your reranking API calling logic here
# Returns either:
# 1. Tuple of (rerank_results, raw_response)
# 2. Raw response object
Supported Reranking Providers:
- Cohere
- Pinecone
- Jina
- VoyageAI
- Custom providers can be added using
register_reranking_provider()
6. General Tracing
from observe_traces import general_tracing
# OR
from observe_traces import ObservabilityService
observability_service = ObservabilityService()
@general_tracing() # Direct function
# OR
@observability_service.general_tracing() # Service method
async def any_function(
param1: Any, # Any function parameters
param2: Any, # The decorator is completely agnostic
**kwargs # Additional parameters
):
# Your function logic here
# Returns any value or None
The general_tracing decorator is a powerful, agnostic tracing solution that can trace any Python function regardless of its purpose. It automatically captures:
- Function arguments as input
- Return values as output (or captured results for functions without return values)
- Execution timing and metadata
- Parent-child relationships for nested function calls
- Error handling and exception information
Key Features:
Case 1: Normal Functions with Return Values
from observe_traces import general_tracing
@general_tracing()
async def process_data(data: dict, operation: str) -> dict:
"""Function that returns a result."""
processed = {"operation": operation, "result": data["value"] * 2}
return processed
# Usage in an endpoint
@app.post("/process")
async def process_endpoint(request: ProcessRequest):
result = await process_data(request.data, "multiply")
return {"success": True, "result": result}
Case 2: Functions Without Return Values (using capture_result)
from observe_traces import general_tracing, capture_result
@general_tracing()
async def log_operation(user_id: str, action: str) -> None:
"""Function that doesn't return anything but captures results."""
log_entry = {
"user_id": user_id,
"action": action,
"timestamp": datetime.now().isoformat(),
"status": "completed"
}
# Store in database (no return value)
database.insert_log(log_entry)
# Capture the result for tracing
capture_result(log_entry)
# Usage in an endpoint
@app.post("/log")
async def log_endpoint(request: LogRequest):
await log_operation(request.user_id, request.action)
return {"success": True, "message": "Action logged"}
Case 3: Custom Span Names
@general_tracing(name="Data Processing Pipeline")
async def complex_data_processing(input_data: list) -> dict:
"""Function with custom span name instead of function name."""
# Your processing logic
return {"processed_count": len(input_data)}
Case 4: Metadata Filtering
@general_tracing(metadata_config=["functionName", "timeTaken", "hasReturn", "argumentCount"])
async def optimized_function(large_data: dict) -> dict:
"""Function with filtered metadata to reduce trace payload size."""
# Only specified metadata fields will be included in the trace
return {"status": "processed"}
Available Metadata Fields:
functionName- Name of the traced functiontimeTaken- Execution time in secondshasReturn- Whether function returns a valuehasCapturedResult- Whether capture_result() was usedargumentCount- Number of function argumentsisAsync- Whether function is asyncmodule- Function's module namehasError- Whether an error occurred (error cases only)errorType- Type of error (error cases only)
Case 5: Nested Functions (Parent-Child Relationships)
@general_tracing(name="Main Workflow")
async def main_workflow(task_id: str) -> dict:
"""Parent function that calls multiple child functions."""
# Step 1: Validate input
validation_result = await validate_input(task_id)
# Step 2: Process data
processing_result = await process_task_data(task_id, validation_result)
# Step 3: Generate report
await generate_task_report(task_id, processing_result)
return {
"task_id": task_id,
"status": "completed",
"validation": validation_result,
"processing": processing_result
}
@general_tracing(name="Input Validation")
async def validate_input(task_id: str) -> dict:
"""Child function - automatically nested under parent span."""
return {"task_id": task_id, "valid": True}
@general_tracing(name="Data Processing")
async def process_task_data(task_id: str, validation: dict) -> dict:
"""Child function - automatically nested under parent span."""
return {"task_id": task_id, "processed_items": 42}
@general_tracing(name="Report Generation")
async def generate_task_report(task_id: str, data: dict) -> None:
"""Child function without return value."""
report = {"task_id": task_id, "summary": data, "generated_at": datetime.now()}
capture_result(report)
This creates a hierarchical trace structure in Langfuse:
📊 Trace: Main Workflow
├── 🟦 Main Workflow (parent span)
│ ├── 🟦 Input Validation (child span)
│ ├── 🟦 Data Processing (child span)
│ └── 🟦 Report Generation (child span)
Case 6: ObservabilityService Class Usage
from observe_traces import ObservabilityService
# Create service instance
observability_service = ObservabilityService()
@observability_service.general_tracing(name="Service Method")
async def service_function(data: list) -> dict:
"""Using ObservabilityService instead of direct decorator."""
processed = [item * 2 for item in data]
return {"processed": processed, "count": len(processed)}
Case 7: Error Handling
@general_tracing(name="Error Prone Function")
async def risky_operation(data: dict) -> dict:
"""Function that might throw errors - automatically traced."""
if not data.get("valid"):
raise ValueError("Invalid data provided")
return {"status": "success", "data": data}
# Errors are automatically captured in span metadata and output
Complete FastAPI Example:
from fastapi import FastAPI, Request
from observe_traces import (
LangfuseInitializer,
general_tracing,
capture_result,
unified_middleware
)
app = FastAPI()
# Initialize Langfuse
LangfuseInitializer.initialize(
langfuse_public_key="your_key",
langfuse_secret_key="your_secret",
langfuse_host="your_host"
)
# Add middleware for tracing context
@app.middleware("http")
async def tracing_middleware(request: Request, call_next):
metadata = {
"sessionId": request.headers.get("X-Session-ID", "default"),
"user": request.headers.get("X-User-Email", "anonymous"),
"environment": "production"
}
return await unified_middleware(request, call_next, metadata=metadata)
# Traced business logic functions
@general_tracing(name="Order Validation")
async def validate_order(order_data: dict) -> dict:
# Validation logic
return {"valid": True, "order_id": order_data["id"]}
@general_tracing(name="Payment Processing")
async def process_payment(order_id: str, amount: float) -> dict:
# Payment logic
return {"transaction_id": "txn_123", "status": "completed"}
@general_tracing(name="Inventory Update")
async def update_inventory(order_data: dict) -> None:
# Inventory logic (no return value)
inventory_update = {
"items_updated": len(order_data["items"]),
"timestamp": datetime.now().isoformat()
}
capture_result(inventory_update)
@general_tracing(name="Complete Order Workflow")
async def complete_order(order_data: dict) -> dict:
"""Main workflow with nested function calls."""
# Step 1: Validate order
validation = await validate_order(order_data)
# Step 2: Process payment
payment = await process_payment(order_data["id"], order_data["total"])
# Step 3: Update inventory
await update_inventory(order_data)
return {
"order_id": order_data["id"],
"status": "completed",
"validation": validation,
"payment": payment
}
# API endpoint
@app.post("/orders/complete")
async def complete_order_endpoint(order_request: OrderRequest):
result = await complete_order(order_request.dict())
return {"success": True, "result": result}
Important Requirements:
⚠️ The general tracing decorator requires the unified middleware to work properly:
- Middleware Setup: Must use
unified_middlewarein your FastAPI application - HTTP Requests: Tracing only works via HTTP endpoints, not direct function calls
- Request Headers: Include
X-Session-IDandX-User-Emailfor better tracing context
Benefits of General Tracing:
- ✅ Universal Compatibility: Works with any Python function
- ✅ Automatic Nesting: Preserves parent-child relationships
- ✅ Flexible Output Capture: Supports both return values and captured results
- ✅ Performance Monitoring: Automatic timing and metadata collection
- ✅ Error Tracking: Comprehensive error information capture
- ✅ Payload Optimization: Configurable metadata filtering
- ✅ Easy Integration: Works alongside existing LLM, embedding, and vector DB tracers
Custom Provider Registration
You can register custom providers using either approach:
Using Direct Functions
from observe_traces import register_provider, register_embedding_provider, register_reranking_provider
# Register custom LLM provider
register_provider(
provider_name="my_custom_llm",
token_parser=my_token_parser_function,
response_extractor=my_response_extractor_function
)
# Register custom embedding provider
register_embedding_provider(
provider_name="my_custom_embedding",
token_parser=my_token_parser_function,
price_calculator=my_price_calculator_function,
embeddings_extractor=my_embeddings_extractor_function
)
Using ObservabilityService
from observe_traces import ObservabilityService
observability_service = ObservabilityService()
# Register custom providers
observability_service.register_llm_provider("my_custom_llm", my_custom_provider_instance)
observability_service.register_embedding_provider("my_custom_embedding", my_embedding_provider_instance)
observability_service.register_vectordb_provider("my_custom_vectordb", my_vectordb_provider_instance)
observability_service.register_reranking_provider("my_custom_reranking", my_reranking_provider_instance)
Creating Custom Providers with Base Classes
For maximum customization, you can extend the base provider classes:
from typing import Any, Dict, List
from observe_traces import ObservabilityService, LLMProvider, EmbeddingProvider
class MyCustomLLMProvider(LLMProvider):
"""Custom LLM provider implementation."""
def __init__(self):
super().__init__("my-custom-llm", self._extract_response)
def parse_tokens(self, response_data: Dict[str, Any]) -> Dict[str, int]:
"""Parse tokens from your API response."""
return {
"prompt_tokens": response_data.get("input_tokens", 0),
"completion_tokens": response_data.get("output_tokens", 0),
"total_tokens": response_data.get("total_tokens", 0)
}
def calculate_cost(self, tokens_data: Dict[str, int], model_name: str) -> Dict[str, float]:
"""Calculate cost based on your pricing model."""
input_cost = tokens_data.get("prompt_tokens", 0) * 0.00001 # Your pricing
output_cost = tokens_data.get("completion_tokens", 0) * 0.00002
return {
"input": input_cost,
"output": output_cost,
"total": input_cost + output_cost
}
def _extract_response(self, data: Dict[str, Any]) -> str:
"""Extract response text from your API response."""
return data.get("response", "")
class MyCustomEmbeddingProvider(EmbeddingProvider):
"""Custom embedding provider implementation."""
def __init__(self):
super().__init__("my-custom-embedding", self._extract_embeddings)
def parse_tokens(self, response_data: Dict[str, Any]) -> Dict[str, int]:
"""Parse token usage from embedding response."""
return {
"total_tokens": response_data.get("usage", {}).get("tokens", 0)
}
def calculate_cost(self, tokens_data: Dict[str, int], model_name: str) -> Dict[str, float]:
"""Calculate embedding cost."""
tokens = tokens_data.get("total_tokens", 0)
cost = tokens * 0.0001 # Your embedding pricing
return {"total": cost}
def _extract_embeddings(self, data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Extract embeddings from your API response."""
return data.get("embeddings", [])
# Register custom providers
observability_service = ObservabilityService()
observability_service.register_llm_provider("my-custom-llm", MyCustomLLMProvider())
observability_service.register_embedding_provider("my-custom-embedding", MyCustomEmbeddingProvider())
# Use with decorators
@observability_service.llm_tracing("my-custom-llm")
async def my_custom_llm_function(model: str, prompt: str):
# Your custom LLM API call
return {"response": "Generated text", "input_tokens": 25, "output_tokens": 15}
@observability_service.embedding_tracing("my-custom-embedding")
async def my_custom_embedding_function(model: str, texts: List[str]):
# Your custom embedding API call
return {"embeddings": [{"values": [0.1, 0.2, 0.3]}], "usage": {"tokens": 10}}
Features
- Automatic Request Tracing: Unique trace IDs for each request
- Comprehensive Metadata: Track user info, endpoints, and custom metadata
- Cost Tracking: Automatic calculation of token usage and costs
- Performance Monitoring: Response time measurements for all operations
- Multi-Provider Support: Works with various AI/ML providers
- Flexible Integration: Supports both tuple returns and single response objects
- Context Management: Maintains request state throughout the lifecycle
- Token Cost Tracking: Automatic calculation of costs based on provider-specific pricing
- Streaming Support: Comprehensive tracing for streaming LLM responses
- Custom Provider Support: Easy registration of new providers
- API Call Tracing: Create spans within existing traces to log API calls with input/output data
- General Purpose Tracing: Universal decorator for tracing any Python function with automatic nesting support
Prerequisites
-
Self-Hosted Langfuse: You must have a Langfuse instance running. Configure:
langfuse_host: Your Langfuse server URLlangfuse_public_key: Your public API keylangfuse_secret_key: Your secret API key
-
FastAPI Application: The middleware is designed for FastAPI applications
Best Practices
- Error Handling: The decorators automatically handle exceptions while maintaining trace context
- Metadata: Include relevant metadata in your middleware for better observability
- Resource Cleanup: Call
LangfuseInitializer.close()when shutting down your application - Context Variables: The system uses context variables to maintain request state
- Provider Registration: Use the appropriate registration functions to add custom providers
- Token Cost Tracking: Ensure your provider configurations include accurate pricing information
- Streaming Support: Follow the specified format for streaming responses to ensure proper tracing
Note
The tracing system uses context variables to maintain request state throughout the request lifecycle. It's essential to define your methods using the specified parameters for consistency and compatibility. The decorators handle both tuple returns (response data + raw response) and single raw response returns, making them flexible for different API implementations.
Token Cost Validation
To ensure your application doesn't fail at runtime due to missing model pricing information, it's recommended to validate all your models during application startup. The get_token_costs utility function is used to retrieve pricing information for specific model/provider pairs.
Here's an example of how to validate all your models at application startup:
from observe_traces import get_token_costs
from fastapi import FastAPI
import logging
app = FastAPI()
# Dictionary of all providers and models used in your application
USED_MODELS = {
"openai": ["gpt-4", "gpt-4-turbo", "gpt-3.5-turbo", "text-embedding-ada-002"],
"anthropic": ["claude-3-opus-20240229", "claude-3-sonnet-20240229", "claude-3-haiku-20240307"],
"groq": ["llama-3-8b-8192", "mixtral-8x7b-32768"],
"cohere": ["command", "command-light", "embed-english-v3.0"],
"pinecone": ["rerank-v1"]
}
@app.on_event("startup")
async def validate_models():
"""Validate all models at application startup to fail fast if pricing info is missing"""
missing_models = []
for provider, models in USED_MODELS.items():
for model in models:
try:
# This will raise ValueError if the model/provider pair is not found
cost_info = get_token_costs(model, provider)
logging.info(f"Validated model {model} for provider {provider}: {cost_info}")
except ValueError as e:
missing_models.append(f"{provider}/{model}")
logging.error(f"Error validating model {model} for provider {provider}: {str(e)}")
if missing_models:
# Fail fast - if any model is missing, prevent application startup
error_msg = f"Missing pricing information for models: {', '.join(missing_models)}"
logging.critical(error_msg)
raise RuntimeError(error_msg)
logging.info("All models validated successfully")
This pattern ensures:
- Fail Fast: Your application will fail at startup if any required model is missing pricing information, rather than failing during a user request
- Complete Coverage: All provider/model combinations are validated in one place
- Better Error Messages: Clear error messages identify which models are missing
- Runtime Safety: No unexpected errors during request processing due to missing model information
For models not included in the default pricing configuration, you can extend the configuration or handle exceptions appropriately in your implementation.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file observellm-1.2.11.tar.gz.
File metadata
- Download URL: observellm-1.2.11.tar.gz
- Upload date:
- Size: 134.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c9f054aa364b961a966492b16717f5eda1a619fc21333c523d35f518888ae449
|
|
| MD5 |
da4c5a86524ac72105fccf9c0fc69d30
|
|
| BLAKE2b-256 |
c2d6e93bbd5960841a69f97bdf601766cf860bce9df189d260374e254453ab6a
|
File details
Details for the file observellm-1.2.11-py3-none-any.whl.
File metadata
- Download URL: observellm-1.2.11-py3-none-any.whl
- Upload date:
- Size: 118.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b22f9289088b61394beda1d80be3d6db7a7730807d37d3c5b87bb0f77d688d34
|
|
| MD5 |
fe00098a6b6ae23566882a457ffd35ec
|
|
| BLAKE2b-256 |
6d872e08400e8d7907b12a3f34110dd4995e093eb6eb54947a68e660e450a755
|