Skip to main content

LLMOps Observability SDK: decorators + SQS dispatch with compression

Project description

LLMOps Observability SDK

A production-grade Python SDK for LLM observability with SQS-based event streaming for decoupled, scalable observability pipelines. Automatically captures traces, spans, token usage, costs, and metadata from your LLM applications.

🎯 Key Features

  • SQS Event Streaming: Batch events to AWS SQS with automatic spillover recovery
  • 🎨 Simple Decorators: @track_function, @track_llm_call, @track_llm_agent, and @track_llm_tool for instant instrumentation
  • Distributed Tracing: @track_external_service (parent) and @resume_external_service (child) for cross-service trace correlation
  • 🔄 Sync & Async Support: Works with both synchronous and asynchronous functions
  • 🤖 Provider Agnostic: Compatible with any LLM provider (AWS Bedrock, OpenAI, Anthropic, etc.)
  • 🪆 Hierarchical Tracing: Automatic parent-child span relationships with proper nesting, including multi-service spans
  • 💰 Cost Tracking: Built-in token usage and cost calculation for AWS Bedrock models
  • 🔍 Smart Capture: Optionally capture function locals and self for detailed debugging
  • 📊 Size Management: Automatic truncation and compression with SQS payload limits (1 MB) to prevent data issues
  • 🛡️ Production-Ready: Daemon workers, batch processing, clean shutdown handling
  • 🌍 Auto-Injection: Environment and project_id automatically added to every span
  • 🌐 Multi-Service Merge: Child segments from downstream services automatically merged with parent trace before fanout to observability platforms

📦 Installation

# From source (development)
cd llmops-observability_sdk
pip install -e .

# Or with uv
uv sync

⚙️ Configuration

Environment Variables

Create a .env file in your application directory:

# Project Configuration (Required - Auto-injected into every span)
PROJECT_ID=my_project          # Your project identifier
ENV=uat                        # Environment: development, staging, uat, production

# AWS SQS Configuration (Required for trace streaming)
AWS_SQS_URL=https://sqs.us-east-1.amazonaws.com/123456789/my-queue
AWS_PROFILE=default            # AWS profile name
AWS_REGION=us-east-1           # AWS region

# Model Configuration (Optional)
MODEL_ID=anthropic.claude-3-5-sonnet-20241022-v2:0  # Default model for cost calculation

Key Configuration Values:

  • PROJECT_ID: Unique identifier for your project. Auto-injected into every span's metadata.
  • ENV: Environment name (development/staging/uat/production). Auto-injected into every span's metadata.
  • AWS_SQS_URL: SQS queue URL for sending traces to Lambda processor.

💡 Important: LLMOPS_PROJECT_ID and LLMOPS_ENV are automatically injected into every span's metadata by the SDK. You don't need to manually add them to decorators.

🚀 Quick Start

1. Basic Usage with Auto-Configuration

from llmops_observability import (
    TraceManager, 
    track_function, 
    track_llm_call, 
    track_llm_agent, 
    track_llm_tool,
    track_external_service,
    resume_external_service,
    get_injected_headers,
)

# Start a trace - LLMOPS_PROJECT_ID and LLMOPS_ENV are auto-loaded from config.py
TraceManager.start_trace(
    name="rag_pipeline_example",
    user_id="user_123",
    session_id="session_456",
    metadata={"version": "1.0.0"},
    tags=["example", "rag"]
)

# Track regular functions
@track_function()
def process_input(user_query: str):
    return {"query": user_query, "processed": True}

# Track LLM calls with automatic cost calculation
@track_llm_call(model="anthropic.claude-3-sonnet-20240229-v1:0")
def call_llm(prompt: str):
    response = bedrock_client.invoke_model(...)  # Your LLM call
    return response

# Track calls to downstream services (automatically injects trace context)
@track_external_service(service_name="user_service", transport="http")
def fetch_user_profile(user_id: str):
    headers = get_injected_headers()
    response = requests.get(f"{USER_SERVICE_URL}/profile/{user_id}", headers=headers)
    return response.json()

# Execute your workflow
result = process_input("What is Python?")
answer = call_llm("Context: ...\n\nQuestion: What is Python?")
profile = fetch_user_profile("user_123")

# Finalize and send to SQS (optional parameters)
TraceManager.finalize_and_send(
    trace_name="rag_pipeline_example",
    trace_input={"user_msg": "What is Python?"},
    trace_output={"bot_response": answer}
)

📌 Universal API Call Instrumentation (End-to-End)

This enables outbound HTTP telemetry for a wide range of API call styles:

  • requests (sync)
  • httpx.Client (sync)
  • httpx.AsyncClient (async)
  • urllib.request.urlopen (sync, stdlib)
  • FastAPI/Starlette inbound HTTP + outbound child calls in handlers/services

What this solves

When your application calls external APIs (model gateway, internal REST, third-party tools), the SDK now captures those as child spans with one common schema and redaction behavior, independent of which Python client is used.

Scope and constraints

  • ✅ Sync + async outbound support
  • ✅ Redaction for sensitive headers/query/body keys
  • ✅ Parent-child correlation with active trace/span context
  • ✅ Works with FastAPI and non-FastAPI apps
  • ⚠️ Shell curl is not a Python HTTP client, so it is not monkeypatched by the SDK

API surface added for this substory

Use these public helpers:

  • enable_outbound_http_instrumentation(...)
  • disable_outbound_http_instrumentation()
  • outbound_instrumentation_status()
  • enable_requests_instrumentation()
  • enable_httpx_instrumentation(sync=True, async_client=True)
  • enable_urllib_instrumentation()

End-to-end architecture

  1. App starts trace with TraceManager.start_trace(...)
  2. Outbound instrumentation wraps HTTP client calls
  3. Wrapper records method/url/status/latency/error + redacted payload details
  4. Span is added to the active trace with span.kind=client
  5. TraceManager.finalize_and_send(...) emits the full trace to SQS/Lambda pipeline

FastAPI integration pattern (recommended)

Use both:

  • LLMOpsASGIMiddleware for inbound request lifecycle
  • outbound instrumentation for client calls made by your route/service code
from fastapi import FastAPI
import httpx

from llmops_observability import (
    LLMOpsASGIMiddleware,
    TraceManager,
    enable_outbound_http_instrumentation,
)

app = FastAPI()
app.add_middleware(LLMOpsASGIMiddleware)

enable_outbound_http_instrumentation(
    requests_lib=True,
    httpx_lib=True,
    urllib_lib=True,
    httpx_sync=True,
    httpx_async=True,
)

@app.get("/proxy")
async def proxy():
    TraceManager.start_trace(name="proxy_call")
    async with httpx.AsyncClient() as client:
        resp = await client.get("https://api.example.com/data?token=secret", timeout=10.0)
    TraceManager.finalize_and_send(
        trace_name="proxy_call",
        trace_input={},
        trace_output={"status_code": resp.status_code},
    )
    return {"status_code": resp.status_code}

Local validation (simple script)

A dedicated local test script is provided:

  • scripts/local_outbound_http_demo.py

Run it from repo root:

pip install -e ".[http]"
python scripts/local_outbound_http_demo.py

What you should see:

  • instrumentation status for requests, httpx, urllib
  • 4 outbound spans printed (requests, httpx, httpx-async, urllib)
  • redacted query parameter values (e.g., token=[REDACTED])

Optional public endpoint test:

python scripts/local_outbound_http_demo.py --url https://httpbin.org/get

Acceptance checklist for outbound

  • enable_outbound_http_instrumentation() returns expected backend status
  • Sync calls create spans (requests/httpx.Client/urllib)
  • Async calls create spans (httpx.AsyncClient)
  • Metadata includes http.client.library
  • Sensitive values are redacted in URL/headers/body preview
  • Outbound spans are correlated under active trace context
  • LLMOpsASGIMiddleware behavior remains intact for inbound tracing

2. Automatic Environment & Project Injection

Every span automatically gets environment and project_id in metadata:

# Set environment variables
os.environ["LLMOPS_PROJECT_ID"] = "new_test"
os.environ["ENV"] = "uat"

# Start trace
TraceManager.start_trace("my_operation")

# Every @track_function, @track_llm_call, @track_llm_agent, and @track_llm_tool span will automatically have:
# span.metadata = {
#     "environment": "uat",
#     "project_id": "new_test",
#     # ... other metadata ...
# }

No manual injection needed! The SDK automatically adds these to every span.

3. Nested Spans (Parent-Child Relationships)

@track_function()
def parent_function():
    # This creates a parent span
    child_result = child_function()
    return child_result

@track_function()
def child_function():
    # This automatically becomes a child of parent_function
    grandchild_result = grandchild_function()
    return grandchild_result

@track_function()
def grandchild_function():
    # This becomes a child of child_function
    return "result"

# Proper hierarchy maintained in Langfuse/NewRelic/S3

4. Outbound HTTP instrumentation (many clients, sync + async)

Use this when your app calls external HTTP APIs and you want each call as a child span with the same redaction and metadata shape, regardless of client library.

Supported clients (opt-in monkeypatching):

Library Sync Async Notes
requests Session.request Common sync client
httpx Client.request AsyncClient.request Install: pip install 'llmops-observability[http]' or pip install httpx
urllib.request urlopen Stdlib; no extra dependency

FastAPI / Starlette: use LLMOpsASGIMiddleware for incoming HTTP to your app. For outgoing calls from route handlers or services, enable one of the clients above (httpx is typical for async apps).

Shell curl: not a Python API, so it is not monkeypatched. Use httpx or requests in code, or wrap subprocess calls yourself if you must shell out.

One-shot setup (recommended):

from llmops_observability import (
    TraceManager,
    enable_outbound_http_instrumentation,
)

# Enables requests + urllib always; enables httpx sync+async when httpx is installed
enable_outbound_http_instrumentation(
    requests_lib=True,
    httpx_lib=True,
    urllib_lib=True,
    httpx_sync=True,
    httpx_async=True,
)

Or enable individually: enable_requests_instrumentation(), enable_httpx_instrumentation(sync=..., async_client=...), enable_urllib_instrumentation(). Disable all with disable_outbound_http_instrumentation().

requests example:

from llmops_observability import TraceManager, enable_requests_instrumentation
import requests

enable_requests_instrumentation()

TraceManager.start_trace(name="external_call_flow")

response = requests.get(
    "https://example.com/search?q=test&api_key=secret",
    headers={"Authorization": "Bearer super-secret-token"},
    timeout=5,
)

TraceManager.finalize_and_send(
    trace_name="external_call_flow",
    trace_input={"query": "test"},
    trace_output={"status_code": response.status_code},
)

httpx async example:

import httpx
from llmops_observability import TraceManager, enable_httpx_instrumentation

enable_httpx_instrumentation(sync=True, async_client=True)
TraceManager.start_trace(name="async_http")

async def main():
    async with httpx.AsyncClient() as client:
        r = await client.get("https://api.example.com/v1/data", timeout=10.0)
    TraceManager.finalize_and_send(trace_name="async_http", trace_input={}, trace_output={"status": r.status_code})

# asyncio.run(main())

Captured outbound metadata includes (among others): method, url (redacted query params), status_code, latency_ms, http.client.library (requests, httpx, httpx-async, urllib), http.call_mode (sync or async), error (if any).

Quick filter snippet (demo-friendly):

# spans = TraceManager._active.get("spans", [])
outbound_spans = [s for s in spans if s.metadata and s.metadata.get("outbound_http")]
sync_spans = [s for s in outbound_spans if s.metadata.get("http.call_mode") == "sync"]
async_spans = [s for s in outbound_spans if s.metadata.get("http.call_mode") == "async"]

print("outbound total:", len(outbound_spans))
print("sync outbound:", len(sync_spans))
print("async outbound:", len(async_spans))

Default redaction behavior:

  • Redacts sensitive query/header/body keys (examples: token, password, api_key, authorization).
  • Request body is captured as a redacted/truncated summary (safe preview), not full raw content.

Correlation:

  • When a trace/span is active, outbound call spans are automatically linked as children.
  • Inbound ASGI middleware behavior remains unchanged.

5. Response evaluation telemetry (score / label / rationale)

The SDK supports two evaluation paths:

  1. Decorator-first (recommended): enable evaluations directly on @track_llm_call
  2. Manual API: call record_evaluation(...) yourself for custom/offline checks

Customer quick start (recommended)

Minimal tracing (no evaluation):

from llmops_observability import TraceManager, track_llm_call

TraceManager.start_trace(name="chat_basic")

@track_llm_call()
def call_model(prompt: str):
    return {"output": {"message": {"content": [{"text": "answer"}]}}}

Enable built-in evaluation with one parameter:

from llmops_observability import TraceManager, track_llm_call

TraceManager.start_trace(name="chat_with_eval")

@track_llm_call(evals=["correctness"])
def call_model(prompt: str):
    return {"output": {"message": {"content": [{"text": "Paris is the capital of France."}]}}}

Advanced usage with sampling (cost/latency control):

from llmops_observability import TraceManager, track_llm_call

TraceManager.start_trace(name="chat_with_eval_sampling")

@track_llm_call(
    model="anthropic.claude-3-sonnet-20240229-v1:0",
    evals=["correctness"],
    sample_rate=0.2,  # evaluate ~20% of requests
    evaluation_metadata={"policy_version": "v1"},
)
def call_model(prompt: str):
    return {"output": {"message": {"content": [{"text": "answer"}]}}}

What happens automatically

  • The SDK runs internal LLM-as-a-judge logic behind the scenes (no customer boilerplate).
  • Each evaluation emits an evaluation child span with:
    • score (numeric)
    • label (categorical)
    • rationale (text)
  • Evaluation spans are attached to the generation span via parent_span_id / evaluated_span_id.
  • Evaluation runs non-blocking in the background.
  • If evaluation fails, your main function still succeeds (failure is isolated and logged).

Backward compatibility

evaluation=True still works as an alias for the default criterion:

@track_llm_call(evaluation=True)
def call_model(prompt: str):
    ...

Equivalent to:

@track_llm_call(evals=["correctness"])
def call_model(prompt: str):
    ...

Manual evaluation API (for custom/offline checks)

Use record_evaluation(...) when you need explicit control (for example: rule engines, offline batch scoring, non-LLM downstream checks):

from llmops_observability import TraceManager, record_evaluation, track_function

TraceManager.start_trace(name="rest_downstream_quality")

@track_function()
def call_status_service() -> dict:
    # ... your HTTP call ...
    healthy = True
    record_evaluation(
        name="status_service_eval",
        score=1.0 if healthy else 0.0,
        label="healthy" if healthy else "unhealthy",
        rationale="HTTP and payload checks passed.",
        evaluator_metadata={"method": "rules_v1"},
    )
    return {"ok": healthy}

Pattern: downstream non-LLM service (REST) quality — evaluate inside the client span

Wrap the HTTP call in @track_function, run your quality rules on the JSON/status, then call record_evaluation(...) before return so the evaluation nests under that downstream span.

Note: this example uses requests (install with pip install requests, or pip install -e ".[test]" from this repo). Swap for httpx, stdlib urllib, or your internal HTTP client.

import requests
from llmops_observability import TraceManager, record_evaluation, track_function

TraceManager.start_trace(name="rest_downstream_quality")

@track_function()
def call_status_service() -> dict:
    resp = requests.get("https://api.example.com/v1/status", timeout=10.0)
    data = resp.json() if resp.content else {}
    healthy = resp.status_code == 200 and data.get("ok") is True

    record_evaluation(
        name="status_service_eval",
        score=1.0 if healthy else 0.0,
        label="healthy" if healthy else "unhealthy",
        rationale=f"HTTP {resp.status_code}, body.ok={data.get('ok')}",
        evaluator_metadata={"service": "example_status", "client": "requests"},
    )
    return data

call_status_service()
TraceManager.finalize_and_send(trace_name="rest_downstream_quality", trace_input={}, trace_output={})
  • Span type: evaluation (see EVALUATION_SPAN_TYPE).
  • Payload: score, label, rationale, trace_id, and optional parent_span_id / evaluated_span_id are stored under input_data.evaluation.
  • Evaluator metadata: use evaluator_metadata={...} for judge/model details; merged into span metadata as metadata.evaluator.
  • Validation: malformed payloads raise clear ValueError messages (for example: invalid score, empty label/rationale, wrong metadata types, missing score+label+rationale).
  • No trace: record_evaluation returns None when there is no active/finalized trace.

Async example:

import asyncio
from llmops_observability import TraceManager, record_evaluation

async def evaluate_http_response(resp: dict) -> None:
    # Works for non-LLM downstream responses too.
    record_evaluation(
        score=1.0 if resp.get("status_code") == 200 else 0.0,
        label="pass" if resp.get("status_code") == 200 else "fail",
        rationale=f"HTTP status was {resp.get('status_code')}",
        evaluated_span_id=resp.get("span_id"),
        evaluator_metadata={"type": "rules", "name": "http_health_check"},
    )

TraceManager.start_trace(name="async_eval")
asyncio.run(evaluate_http_response({"status_code": 200, "span_id": "http_span_1"}))
TraceManager.finalize_and_send(trace_name="async_eval", trace_input={}, trace_output={})

6. Agentic crawl / search telemetry (Tavily / Scrapy-style workflows)

Model long-running crawl or search sessions with:

  • a session span (crawl_session) for start/end + aggregate counters
  • per-page spans (crawl_page) for fetches and URL/status/failure details
  • per-event spans (crawl_event) for non-page workflow steps (search request, rerank, parse, etc.)

URLs and query payloads are redacted using the same policy as outbound HTTP metadata redaction.

from llmops_observability import (
    TraceManager,
    crawl_session,
    record_crawl_page,
    record_crawl_event,
)

TraceManager.start_trace(name="research_agent")

with crawl_session(provider="scrapy", query="climate data", metadata={"run": "1"}):
    record_crawl_event(
        event_type="search_request",
        query={"q": "climate data", "api_key": "secret"},
        status="ok",
        duration_ms=15,
    )
    record_crawl_page(
        url="https://docs.example.com/guide?token=secret",
        query={"q": "climate data", "token": "secret"},
        status_code=200,
        latency_ms=120,
    )
    record_crawl_page(
        url="https://docs.example.com/missing",
        status_code=404,
        error="not found",
        latency_ms=40,
    )
    record_crawl_event(
        event_type="result_parse",
        status="failed",
        failure_reason="invalid html fragment",
        duration_ms=12,
    )

TraceManager.finalize_and_send(trace_name="research_agent", trace_input={}, trace_output={})
  • Span types:
    • crawl_session: aggregate summary (crawl.pages_ok, crawl.pages_fail, crawl.events_total, crawl.events_fail, duration)
    • crawl_page: URL/query/status/failure/duration for each fetched page
    • crawl_event: generic step telemetry for search/crawl stages that are not page fetches
  • API: start_crawl_session / record_crawl_page / record_crawl_event / end_crawl_session, or the crawl_session(...) context manager.
  • Stack: Starting a session pushes a synthetic id on the span stack so page spans nest under it until end_crawl_session.
  • Partial failures: failed pages/events increment failure counters while session completion telemetry is still emitted.

7. Transaction lifecycle telemetry (start/end + unique span id)

Use this API when you need an explicit transaction boundary (for example, "transaction logger Lambda" as a child call).

import urllib.request
from llmops_observability import (
    TraceManager,
    start_transaction,
    end_transaction,
)

TraceManager.start_trace(name="transaction_flow")
txid = start_transaction(
    name="logger_lambda_transaction",
    metadata={"component": "transaction_logger"},
    input_data={"record_count": 3},
)

if txid:
    try:
        # Any outbound HTTP span recorded here becomes child of the transaction
        urllib.request.urlopen("https://api.example.com/lambda/log?token=secret", timeout=5)
        end_transaction(txid, status="success", output_data={"lambda_status": "accepted"})
    except Exception as exc:
        end_transaction(txid, status="error", error=str(exc))
        raise

TraceManager.finalize_and_send(trace_name="transaction_flow", trace_input={}, trace_output={})

Context manager / decorator helpers:

from llmops_observability import transaction, track_transaction

with transaction(name="tx_with_context", metadata={"source": "orders"}):
    pass

@track_transaction(name="tx_decorated")
def run_business_step():
    return "ok"

Contract and guard rails:

  • start_transaction(...) returns a unique span id (uuid4().hex) or None when tracing is unavailable.
  • end_transaction(transaction_id, ...) returns True on success, False for unknown/already-ended ids (double-end guard).
  • get_open_transaction_ids() helps detect missing-end situations in long-running workflows.
  • Transaction span type is transaction (TRANSACTION_SPAN_TYPE) and is correlated using the active parent stack at start time.

📊 Data Flow Architecture

┌─────────────────────┐
│   Your LLM App      │
│  (with decorators)  │
└──────────┬──────────┘
           │
           ▼
┌─────────────────────┐
│  TraceManager       │
│  (collects spans)   │
│  + Auto-injects:    │
│    - environment    │
│    - project_id     │
└──────────┬──────────┘
           │
           ▼
┌─────────────────────┐
│  SQS Batch Workers  │
│  (compress & send)  │
└──────────┬──────────┘
           │
           ▼
┌─────────────────────┐
│  AWS SQS Queue      │
└──────────┬──────────┘
           │
           ▼
┌─────────────────────┐
│  Lambda Processor   │
│  (parallel routing) │
└─────┬──────┬────┬───┘
      │      │    │
      ▼      ▼    ▼
  Langfuse  S3  NewRelic

🔗 Distributed Tracing (Multi-Service Correlation)

Track requests across multiple microservices and correlate them into a single unified trace.

Use Case

When your FastAPI app calls a downstream microservice (e.g., user_service), both emit trace segments to the same SQS queue. The EKS processor merges them into one complete trace before fanout to Langfuse/NewRelic/S3.

Parent Service: @track_external_service

Mark a function that calls a downstream service. Automatically generates trace context headers to inject into the HTTP request:

from fastapi import FastAPI
from llmops_observability import TraceManager, track_external_service, get_injected_headers
import httpx

app = FastAPI()

@app.post("/generate")
@track_external_service(service_name="user_service", transport="http")
async def generate(prompt: str):
    # get_injected_headers() returns trace context to forward
    headers = get_injected_headers()
    
    async with httpx.AsyncClient() as client:
        response = await client.get(
            "http://user_service:8002/profile/123",
            headers=headers,  # Forward the trace context
            timeout=5,
        )
    
    # Parent trace automatically marks: has_external_segments=true
    # (only if downstream call succeeds)
    return {"response": response.json()}

# Lambda or manual invocation
TraceManager.start_trace(name="generate", user_id="alice")
result = generate("My prompt")
TraceManager.finalize_and_send(
    trace_name="generate",
    trace_input={"prompt": "My prompt"},
    trace_output=result,
)

Key behavior:

  • Only marks has_external_segments=true if the downstream call succeeds
  • If the downstream service is unavailable (connection refused, timeout), has_external_segments stays false and the parent processes immediately
  • Prevents orphaned parent traces waiting for child segments that will never arrive

Child Service: @resume_external_service

Consume the trace context from parent headers and emit a child segment:

from fastapi import FastAPI
from llmops_observability import track_function, resume_external_service

app = FastAPI()

@app.get("/profile/{user_id}")
@resume_external_service()  # Extracts traceparent + x-llmops-trace-id
@track_function()
async def get_user_profile(user_id: str):
    # This function emits a child segment with the parent's trace_id
    profile = await db.fetch_user(user_id)
    return {"profile": profile}

Key behavior:

  • Automatically extracts traceparent and x-llmops-trace-id headers from request
  • Emits a child segment to SQS (marked is_child_segment=true)
  • Child segment is staged in S3 under child_traces/{trace_id}/ for later merge
  • ASGI middleware skips auto-tracing (no duplicate envelope) when distributed headers are present

Helper: get_injected_headers()

Call inside a @track_external_service function to get headers to forward:

from llmops_observability import get_injected_headers

@track_external_service(service_name="downstream")
def call_api():
    headers = get_injected_headers()  # Returns {'traceparent': '...', 'x-llmops-trace-id': '...'}
    # Pass to your HTTP client
    requests.get("http://api.local:8002/endpoint", headers=headers)

EKS Processor Flow

The Lambda handler in otel_handler.py:

  1. Decompresses incoming messages from SQS
  2. Checks if message is child (is_child_segment=true) or parent (has_external_segments=true)
  3. Stages child segments in S3: s3://bucket/child_traces/{trace_id}/{segment_id}.json
  4. Merges parent + all child segments (fetched from S3) into one trace
  5. Fans out merged trace to Langfuse, NewRelic, S3 with complete span hierarchy

Timeout & Cleanup:

  • If parent arrives without children after 60 seconds (sweep), it's treated as a standalone trace
  • Orphaned children that arrive after parent has been processed are queued for late replay

End-to-End Example

FastAPI (parent request):

# POST /generate
@app.post("/generate")
@track_external_service(service_name="user_service")
async def generate(prompt: str):
    headers = get_injected_headers()
    resp = httpx.get("http://user_service:8002/profile/123", headers=headers)
    return {"result": resp.json()}

Downstream Microservice (child):

# GET /profile/{user_id}
@app.get("/profile/{user_id}")
@resume_external_service()
async def get_profile(user_id: str):
    return {"name": "Alice", "tier": "premium"}

Result in Langfuse:

  • Single trace trace_id=... with name "generate"
  • Parent spans from FastAPI + child spans from user_service merged together
  • Full hierarchy preserved (parent → child operations visible)

🎨 Decorator Reference

@track_function - Complete Guide

Basic Usage (No Parameters)

@track_function()
def process_data(input_data):
    # Automatically captures:
    # - Function name as span name
    # - Function arguments (args, kwargs)
    # - Return value
    # - Execution time
    # - Environment and project_id (auto-injected)
    return {"processed": input_data}

Parameter: name (Custom Span Name)

@track_function(name="custom_span_name")
def my_function():
    # Span will appear as "custom_span_name" instead of "my_function"
    return result

# Use case: Make span names more descriptive in traces
@track_function(name="fetch_user_profile_from_db")
def get_user(user_id):
    return db.query(user_id)

Parameter: metadata (Add Custom Metadata)

@track_function(metadata={"service": "auth", "priority": "high"})
def authenticate_user(username, password):
    # Span metadata will include:
    # {
    #     "service": "auth",
    #     "priority": "high",
    #     "environment": "uat",      # auto-injected
    #     "project_id": "new_test"   # auto-injected
    # }
    return auth_result

# Use case: Tag spans with business context
@track_function(metadata={
    "database": "postgres",
    "table": "users",
    "operation": "read"
})
def query_users(filters):
    return db.execute(query)

Parameter: capture_locals=True (Capture All Local Variables)

@track_function(capture_locals=True)
def process_payment(amount, currency):
    user_id = "user_123"
    transaction_id = generate_id()
    tax = amount * 0.1
    total = amount + tax
    
    # All local variables captured in span.input_data.locals:
    # {
    #     "user_id": "user_123",
    #     "transaction_id": "txn_abc",
    #     "tax": 10.0,
    #     "total": 110.0,
    #     "amount": 100.0,
    #     "currency": "USD"
    # }
    
    return {"total": total}

# ⚠️ Warning: Can capture large amounts of data. Use for debugging only.

Parameter: capture_locals=["var1", "var2"] (Capture Specific Variables)

@track_function(capture_locals=["user_id", "total"])
def process_payment(amount, currency):
    user_id = "user_123"
    transaction_id = generate_id()
    tax = amount * 0.1
    total = amount + tax
    
    # Only specified variables captured in span.input_data.locals:
    # {
    #     "user_id": "user_123",
    #     "total": 110.0
    # }
    # Note: transaction_id, tax, amount, currency are NOT captured
    
    return {"total": total}

# ✅ Recommended: Capture only what you need for debugging

Parameter: capture_self=True (Capture self in Class Methods)

class PaymentProcessor:
    def __init__(self, merchant_id):
        self.merchant_id = merchant_id
        self.fee_rate = 0.029
    
    @track_function(capture_self=True)
    def process(self, amount):
        # Captures self attributes in span.input_data.self:
        # {
        #     "merchant_id": "merch_123",
        #     "fee_rate": 0.029
        # }
        fee = amount * self.fee_rate
        return amount - fee

# Use case: Debug class state during execution
class DatabaseConnection:
    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.connected = False
    
    @track_function(capture_self=True, capture_locals=["query"])
    def execute(self, query):
        # Captures both self and specific locals
        result = self._run_query(query)
        return result

Combined Parameters Example

@track_function(
    name="complex_data_pipeline",
    metadata={"stage": "preprocessing", "version": "2.0"},
    capture_locals=["processed_count", "errors"],
    capture_self=False
)
def pipeline_stage(data):
    processed_count = 0
    errors = []
    temp_cache = {}  # Not captured
    
    for item in data:
        try:
            process_item(item)
            processed_count += 1
        except Exception as e:
            errors.append(str(e))
    
    return {"count": processed_count, "errors": errors}

@track_llm_call - Complete Guide

Basic Usage (No Parameters)

@track_llm_call()
def call_llm(prompt):
    # Automatically captures:
    # - Function arguments (prompt)
    # - LLM response
    # - Execution time
    # - Span type = "generation"
    # - Environment and project_id (auto-injected)
    response = bedrock_client.invoke_model(...)
    return response

Parameter: name (Custom Span Name)

@track_llm_call(name="bedrock_claude_sonnet")
def call_llm(prompt):
    # Span appears as "bedrock_claude_sonnet" instead of "call_llm"
    response = bedrock_client.invoke_model(...)
    return response

# Use case: Distinguish between different LLM providers/models
@track_llm_call(name="openai_gpt4_turbo")
def call_openai(prompt):
    return openai.chat.completions.create(...)

@track_llm_call(name="anthropic_claude_opus")
def call_anthropic(prompt):
    return anthropic.messages.create(...)

Parameter: model (For Cost Calculation)

@track_llm_call(model="anthropic.claude-3-sonnet-20240229-v1:0")
def call_bedrock(prompt):
    # SDK automatically calculates cost based on:
    # - Token usage from response
    # - Model pricing for Claude 3 Sonnet
    # 
    # Captured in span:
    # - usage.input_tokens
    # - usage.output_tokens
    # - cost.input_cost
    # - cost.output_cost
    # - cost.total_cost
    
    response = bedrock_client.invoke_model(
        modelId="anthropic.claude-3-sonnet-20240229-v1:0",
        body=json.dumps({"prompt": prompt})
    )
    return response

# Supported AWS Bedrock models (see pricing.py):
# - anthropic.claude-3-5-sonnet-20241022-v2:0
# - anthropic.claude-3-sonnet-20240229-v1:0
# - anthropic.claude-3-haiku-20240307-v1:0
# - anthropic.claude-3-opus-20240229-v1:0
# - And more...

Parameter: metadata (Add Custom Metadata)

@track_llm_call(
    model="anthropic.claude-3-sonnet-20240229-v1:0",
    metadata={
        "temperature": 0.7,
        "max_tokens": 1000,
        "use_case": "code_generation"
    }
)
def generate_code(prompt):
    # Span metadata includes:
    # {
    #     "temperature": 0.7,
    #     "max_tokens": 1000,
    #     "use_case": "code_generation",
    #     "environment": "uat",      # auto-injected
    #     "project_id": "new_test"   # auto-injected
    # }
    return llm_response

Parameter: capture_locals=True (Capture All Locals)

@track_llm_call(
    model="anthropic.claude-3-sonnet-20240229-v1:0",
    capture_locals=True
)
def enhanced_llm_call(user_query, context_docs):
    # Build prompt with context
    formatted_context = format_documents(context_docs)
    system_prompt = "You are a helpful assistant."
    final_prompt = f"{system_prompt}\n\nContext: {formatted_context}\n\nQuestion: {user_query}"
    
    # All locals captured:
    # {
    #     "user_query": "What is Python?",
    #     "context_docs": [...],
    #     "formatted_context": "...",
    #     "system_prompt": "You are a helpful assistant.",
    #     "final_prompt": "..."
    # }
    
    response = bedrock_client.invoke_model(...)
    return response

Parameter: capture_locals=["prompt", "temperature"] (Specific Variables)

@track_llm_call(
    model="anthropic.claude-3-sonnet-20240229-v1:0",
    capture_locals=["final_prompt", "model_config"]
)
def call_with_config(user_input):
    model_config = {"temperature": 0.7, "max_tokens": 1000}
    system_message = "You are an AI assistant."  # NOT captured
    final_prompt = f"{system_message}\n\n{user_input}"
    temp_cache = {}  # NOT captured
    
    # Only captures:
    # {
    #     "final_prompt": "...",
    #     "model_config": {"temperature": 0.7, "max_tokens": 1000}
    # }
    
    response = call_llm(final_prompt, **model_config)
    return response

Parameter: capture_self=True (For Class Methods)

class LLMOrchestrator:
    def __init__(self, model_id, api_key):
        self.model_id = model_id
        self.api_key = api_key
        self.request_count = 0
        self.total_cost = 0.0
    
    @track_llm_call(
        model="anthropic.claude-3-sonnet-20240229-v1:0",
        capture_self=True
    )
    def call_llm(self, prompt):
        # Captures self attributes:
        # {
        #     "model_id": "anthropic.claude-3-sonnet-20240229-v1:0",
        #     "request_count": 5,
        #     "total_cost": 0.042
        # }
        # Note: api_key might be captured - be careful with secrets!
        
        self.request_count += 1
        response = self._invoke_model(prompt)
        return response

Combined Parameters Example (Production Pattern)

class ChatbotService:
    def __init__(self, model_id):
        self.model_id = model_id
        self.system_prompt = "You are a helpful chatbot."
    
    @track_llm_call(
        name="chatbot_generation",
        model="anthropic.claude-3-sonnet-20240229-v1:0",
        metadata={
            "service": "customer_support",
            "model_version": "v2.0",
            "priority": "high"
        },
        capture_locals=["full_prompt", "temperature"],
        capture_self=False  # Don't capture self to avoid secrets
    )
    def generate_response(self, user_message, conversation_history):
        temperature = 0.7
        full_prompt = self._build_prompt(user_message, conversation_history)
        cache_key = hash(full_prompt)  # Not captured
        
        response = bedrock_client.invoke_model(
            modelId=self.model_id,
            body=json.dumps({
                "prompt": full_prompt,
                "temperature": temperature,
                "max_tokens": 1000
            })
        )
        return response

Quick Reference Table

Parameter @track_function @track_llm_call Type Description
name str Custom span name
metadata Dict[str, Any] Additional metadata
capture_locals bool or List[str] Capture local variables
capture_self bool Capture self in methods
model str Model ID for cost calculation

Best Practices

✅ DO

# Capture specific variables for debugging
@track_function(capture_locals=["error_code", "retry_count"])

# Use metadata for business context
@track_function(metadata={"team": "payments", "priority": "critical"})

# Specify model for accurate cost tracking
@track_llm_call(model="anthropic.claude-3-sonnet-20240229-v1:0")

# Use descriptive names
@track_llm_call(name="rag_retrieval_claude")

❌ DON'T

# Don't capture all locals in production (too much data)
@track_function(capture_locals=True)  # Only for debugging!

# Don't capture self if it contains secrets
@track_function(capture_self=True)  # Check for API keys first!

# Don't use generic names
@track_function(name="function_1")  # Not helpful

# Don't forget model for LLM calls
@track_llm_call()  # Missing model = no cost calculation

📈 What Gets Captured

Trace-Level Data

  • trace_id, trace_name, project_id, environment
  • user_id, session_id
  • start_time, end_time, duration_ms
  • trace_input, trace_output
  • metadata, tags
  • total_spans, total_generations
  • sdk_name, sdk_version

Span-Level Data (Auto-captured for every span)

  • Core: span_id, span_name, span_type, parent_span_id
  • Timing: start_time, end_time, duration_ms
  • I/O: input_data, output_data
  • Status: status, status_message, error
  • LLM: model_id, prompt, response
  • Usage: usage.input_tokens, usage.output_tokens, usage.total_tokens
  • Cost: Calculated from model pricing
  • Metadata: environment, project_id (auto-injected), custom metadata
  • Context: tags, level

🔧 Configuration Reference

Size Limits (in config.py)

MAX_OUTPUT_SIZE = 200 * 1024      # 200 KB - max individual field
MAX_SPAN_IO_SIZE = 20_000          # 20 KB - span input/output
MAX_TRACE_IO_SIZE = 50_000         # 50 KB - trace input/output
MAX_SQS_SIZE = 200_000             # 200 KB - SQS message
PROMPT_RESPONSE_MAX_SIZE = 10_000  # 10 KB - prompt/response fields

SQS Configuration

SQS_WORKER_COUNT = 4           # Background worker threads
SQS_BATCH_SIZE = 10            # Batch size before flush
SQS_BATCH_TIMEOUT = 0.2        # Timeout in seconds
SQS_FLUSH_TIME_THRESHOLD = 0.15
SQS_SHUTDOWN_TIMEOUT = 1.0
SQS_RETRY_ATTEMPTS = 3
SQS_RETRY_DELAY = 0.1
SQS_RETRY_BACKOFF_MULTIPLIER = 2.0
SQS_RETRY_MAX_DELAY = 0.5
SQS_IMMEDIATE_MAX_BLOCK_MS = 250

Safe export hardening knobs (Substory 5)

The SDK now applies hardening by default across exported telemetry:

  • Idempotency: trace payload + each span includes deterministic idempotency_key.
  • Redaction: export path applies centralized deep redaction before serialization.
  • Non-blocking sink behavior: immediate sends use bounded retry + backoff; failures spill to disk.

Tune via environment variables:

LLMOPS_SQS_IMMEDIATE_MAX_BLOCK_MS=250
LLMOPS_SQS_RETRY_BACKOFF_MULTIPLIER=2.0
LLMOPS_SQS_RETRY_MAX_DELAY=0.5

Operational guidance:

  • Keep LLMOPS_SQS_IMMEDIATE_MAX_BLOCK_MS low (100–300ms) for request-path safety.
  • Use spillover file volume alerts to detect prolonged sink outages.
  • Recovery path is automatic: spillover entries are re-queued at next startup.

🏭 Production Best Practices

1. Proper Trace Lifecycle

try:
    # Start trace
    TraceManager.start_trace("operation_name")
    
    # Your application logic with decorators
    result = my_tracked_function()
    
    # Finalize with trace data
    TraceManager.finalize_and_send(
        trace_input={"request": "data"},
        trace_output={"response": result}
    )
except Exception as e:
    # Trace will still be sent with error information
    logger.error(f"Error: {e}")

2. Environment-Specific Configuration

# production.env
PROJECT_ID=my_app
ENV=production
AWS_SQS_URL=https://sqs.us-east-1.amazonaws.com/123/prod-queue

# staging.env
PROJECT_ID=my_app
ENV=staging
AWS_SQS_URL=https://sqs.us-east-1.amazonaws.com/123/staging-queue

3. Async Support

@track_function()
async def async_function():
    result = await some_async_operation()
    return result

@track_llm_call(model="...")
async def async_llm_call():
    response = await async_bedrock_call()
    return response

📝 Example: Complete RAG Pipeline

Track LLM calls

@track_llm_call() def call_bedrock(prompt): # Call your LLM response = bedrock_client.converse( modelId="anthropic.claude-3-sonnet", messages=[{"role": "user", "content": prompt}] ) return response

Use the functions

result = process_data("some data") llm_response = call_bedrock("Hello, world!")

End the trace (flushes to Langfuse)

TraceManager.end_trace()


**Method 2: Explicit Project and Environment Override**
```python
# Override PROJECT_ID and ENV from .env
TraceManager.start_trace(
    name="chat_message",  # Operation name
    project_id="custom_project",  # Override PROJECT_ID
    environment="staging",  # Override ENV
    metadata={"user_id": "123"},
)

# Your code...

TraceManager.end_trace()

Method 3: Using finalize_and_send() (llmops-observability)

# Start trace
TraceManager.start_trace(name="chat_session")

# Your code
user_input = "What is machine learning?"
response = await llm.generate(user_input)

# Finalize with input/output in one call
TraceManager.finalize_and_send(
    user_id="user_123",
    session_id="session_456",
    trace_name="chat_message",
    trace_input={"user_msg": user_input},
    trace_output={"bot_response": str(response)}
)

3. Capture Local Variables (Debugging)

@track_function(capture_locals=True)
def complex_calculation(x, y, z):
    intermediate = x + y
    result = intermediate * z
    final = result ** 2
    # All local variables are captured in Langfuse
    return final

# Capture specific variables only
@track_function(capture_locals=["important_var", "result"])
def selective_capture(data):
    important_var = process(data)
    temp_var = "not captured"
    result = transform(important_var)
    return result

4. Nested Spans (Parent-Child Tracking)

@track_function(name="parent_task")
def parent_function():
    data = fetch_data()
    # Child spans are automatically nested
    processed = child_function(data)
    return processed

@track_function(name="child_task")
def child_function(data):
    return data.upper()

# Langfuse will show: parent_task → child_task

5. ASGI Middleware (FastAPI Auto-Tracing)

from fastapi import FastAPI
from llmops_observability import LLMOpsASGIMiddleware

app = FastAPI()
app.add_middleware(LLMOpsASGIMiddleware, service_name="my_api")

@app.get("/")
async def root():
    # Request is automatically traced
    return {"message": "Hello World"}

@app.post("/generate")
async def generate(prompt: str):
    # All decorated functions within request are nested
    result = await generate_text(prompt)
    return result

6. SQS Event Streaming (Event-Driven Architecture)

For event-driven, scalable deployments, the SDK supports optional event streaming to AWS SQS. Trace events are published to SQS queues where Lambda functions (or other consumers) can process them asynchronously:

Application (sends trace events)
    ↓
SQS Queue (decoupled message broker)
    ↓
Lambda Consumers (process & forward)
    ↓ ↓ ↓
  S3  New Relic  Datadog  (etc.)

Setup:

# Enable SQS streaming by setting AWS_SQS_URL
export AWS_SQS_URL=https://sqs.us-east-1.amazonaws.com/123456789/my-queue
export AWS_PROFILE=default
export AWS_REGION=us-east-1
from llmops_observability import TraceManager, track_function

# When AWS_SQS_URL is set, events are automatically streamed to SQS
TraceManager.start_trace(
    name="chat_message",
    metadata={"channel": "web"}
)

@track_function()
def process_message(msg):
    return process(msg)

# All trace events are batched and sent to SQS (non-blocking)
TraceManager.end_trace()

Lambda Consumer Example:

import json
import boto3

s3_client = boto3.client('s3')
newrelic = boto3.client('cloudwatch')  # Or use New Relic SDK

def lambda_handler(event, context):
    """Process trace events from SQS"""
    for record in event['Records']:
        # Parse trace event from SQS message
        trace_event = json.loads(record['body'])
        
        # Store to S3
        s3_client.put_object(
            Bucket='trace-events',
            Key=f"{trace_event['trace_id']}.json",
            Body=json.dumps(trace_event)
        )
        
        # Send metrics to New Relic
        if trace_event['event_type'] == 'llm_call':
            newrelic.put_metric_data(
                Namespace='LLMOps',
                MetricData=[{
                    'MetricName': 'TokenUsage',
                    'Value': trace_event['tokens_used'],
                    'Unit': 'Count'
                }]
            )

SQS Features:

  • Automatic Batching: Groups events for efficient SQS sending (batch size 1-10)
  • Spillover Recovery: Saves messages to disk if SQS is unavailable, retries on restart
  • Daemon Workers: 4 background threads handle async SQS operations
  • Clean Shutdown: Graceful shutdown flushes pending messages
  • Resilient: Auto-restart failed workers, exponential backoff
  • No Blocking: SQS operations never block main application thread

Events Streamed to SQS:

  • trace_start: Trace initialization with metadata
  • span_created: Function execution tracking
  • llm_call: LLM API calls with token usage
  • trace_end: Trace completion with duration

Configuration:

# Required: SQS queue URL
export AWS_SQS_URL=https://sqs.us-east-1.amazonaws.com/123456789/llm-traces

# Optional: AWS authentication (defaults to IAM role if in Lambda/EC2)
export AWS_PROFILE=custom-profile  # Default: "default"
export AWS_REGION=eu-west-1        # Default: "us-east-1"

Use Cases:

  • 📊 Send trace events to New Relic, Datadog, CloudWatch
  • 💾 Archive all LLM interactions to S3 for compliance/audit
  • 🔄 Post-processing: cost calculation, quality analysis, retraining data
  • 🚀 Scale: decouple application from storage/monitoring infrastructure

📥 Incoming SDK Message Schema

When SQS streaming is enabled, the SDK sends trace data in a compressed SQS message format that Lambda consumers can decompress and process. This section documents the message format and decompressed payload structure.

SQS Message Wrapper Format

{
  "compressed": true,
  "data": "H4sIANPGn2YC/...",
  "trace_id": "87a41b12-cc61-4fdf-9bf2-a50a369b4d30",
  "type": "SDKTraceData"
}

Wrapper Fields:

  • compressed (boolean): Indicates Base64 + Gzip compression is applied
  • data (string): Base64-encoded, Gzip-compressed JSON payload
  • trace_id (string): Unique trace identifier for deduplication
  • type (string): Message type identifier ("SDKTraceData")

Decompressed SDK Trace Data Schema

{
  "trace_id": "87a41b12-cc61-4fdf-9bf2-a50a369b4d30",
  "trace_name": "rag_pipeline_example",
  "project_id": "new_test",
  "environment": "uat",
  "user_id": "user_123",
  "session_id": "session_456",
  
  "start_time": 1769446311.0,
  "end_time": 1769446318.021,
  "duration_ms": 7021,
  
  "trace_input": {
    "user_msg": "What is Android ????"
  },
  "trace_output": {
    "bot_response": "Android is a mobile operating system..."
  },
  
  "token_usage": {
    "total_input_tokens": 145,
    "total_output_tokens": 87,
    "total_tokens": 232
  },
  
  "cost": {
    "total_cost": 0.000456
  },
  
  "spans": [
    {
      "span_id": "87a41b12-cc61-4fdf-9bf2-a50a369b4d31",
      "span_name": "retrieve_context",
      "span_type": "span",
      "parent_span_id": null,
      
      "start_time": 1769446311.0,
      "end_time": 1769446313.0,
      "duration_ms": 2000,
      
      "input_data": {"args": [], "kwargs": {"query": "Android"}, "locals": {}},
      "output_data": {"output": ["Doc 1", "Doc 2"]},
      
      "error": null,
      "model_id": null,
      "status": "success",
      
      "metadata": {
        "environment": "uat",
        "project_id": "new_test"
      },
      "tags": []
    },
    {
      "span_id": "87a41b12-cc61-4fdf-9bf2-a50a369b4d32",
      "span_name": "call_llm",
      "span_type": "generation",
      "parent_span_id": "87a41b12-cc61-4fdf-9bf2-a50a369b4d31",
      
      "start_time": 1769446313.0,
      "end_time": 1769446318.021,
      "duration_ms": 5021,
      
      "input_data": {"args": [], "kwargs": {"prompt": "Context: Doc 1, Doc 2\n\nQuestion: What is Android ????"}, "locals": {}},
      "output_data": {"output": {"message": {"content": "Android is a mobile operating system..."}}},
      
      "error": null,
      "model_id": "anthropic.claude-3-sonnet-20240229-v1:0",
      
      "usage": {
        "input_tokens": 145,
        "output_tokens": 87,
        "total_tokens": 232
      },
      
      "prompt": "Context: Doc 1, Doc 2\n\nQuestion: What is Android ????",
      "response": "Android is a mobile operating system developed by Google...",
      
      "metadata": {
        "environment": "uat",
        "project_id": "new_test"
      },
      
      "status": "success",
      "tags": []
    }
  ],
  
  "metadata": {
    "version": "1.0.0"
  },
  "tags": ["example", "rag"],
  
  "total_spans": 2,
  "total_generations": 1,
  
  "sdk_name": "llmops-observability",
  "sdk_version": "2.0.0"
}

Key Fields Reference

Field Type Description Auto-Injected
trace_id string Unique trace identifier -
trace_name string Trace/operation name -
project_id string Project identifier from PROJECT_ID env var
environment string Environment from ENV env var
user_id string User identifier (optional) -
session_id string Session identifier (optional) -
start_time float Unix timestamp (seconds) -
end_time float Unix timestamp (seconds) -
duration_ms int Trace duration in milliseconds -
spans[].metadata.environment string Environment (auto-injected to every span)
spans[].metadata.project_id string Project ID (auto-injected to every span)
spans[].token_usage object Input/output token counts -
spans[].cost object Token cost calculation (Bedrock models) -

Lambda Decompression Example

import json
import gzip
import base64

def lambda_handler(event, context):
    """Decompress and process SDK trace messages from SQS"""
    for record in event['Records']:
        # Parse SQS message
        message = json.loads(record['body'])
        
        if message.get('compressed'):
            # Decode Base64
            compressed_data = base64.b64decode(message['data'])
            
            # Decompress Gzip
            decompressed_data = gzip.decompress(compressed_data)
            
            # Parse JSON
            trace_data = json.loads(decompressed_data)
        else:
            trace_data = message
        
        # Now trace_data contains the full SDK trace with spans
        print(f"Trace: {trace_data['trace_id']}")
        print(f"Project: {trace_data['project_id']}")
        print(f"Environment: {trace_data['environment']}")
        print(f"Spans: {len(trace_data['spans'])}")
        
        # Process further (send to Langfuse, S3, NewRelic, etc.)
        process_trace(trace_data)

Size Limits and Truncation

  • Max Message Size: 256KB (SQS hard limit)
  • Auto-Truncation: Fields > 200KB are automatically truncated
  • Fallback to Disk: If SQS is unavailable, messages spill to disk and retry on restart
  • Compression: Typical traces compress to 10-30% of original size

8. Token Pricing & Cost Calculation

The SDK includes built-in AWS Bedrock token pricing for cost analysis:

from llmops_observability.pricing import calculate_cost

# Calculate cost for a single LLM call
cost = calculate_cost(
    model_id="anthropic.claude-3-sonnet-20240229-v1:0",
    input_tokens=1500,
    output_tokens=800
)

print(f"Cost: ${cost:.4f}")  # Output: Cost: $0.0075

# Analyze costs by model
models_used = {
    "anthropic.claude-3-sonnet-20240229-v1:0": {
        "input_tokens": 10000,
        "output_tokens": 5000
    },
    "anthropic.claude-3-haiku-20240307-v1:0": {
        "input_tokens": 20000,
        "output_tokens": 10000
    }
}

total_cost = sum(
    calculate_cost(model, data["input_tokens"], data["output_tokens"])
    for model, data in models_used.items()
)
print(f"Total cost: ${total_cost:.4f}")

Supported Models:

  • Claude 3.5 Sonnet (all variants)
  • Claude 3 Sonnet/Opus/Haiku
  • Claude 2.1 & 2.0
  • Amazon Titan Text (Express, Lite)
  • Cohere Command
  • AI21 Jurassic
  • Meta Llama 2 & 3

Pricing Reference: All prices are updated as of 2024 and reflect AWS Bedrock official pricing. Update the pricing table in src/llmops_observability/pricing.py as needed.

9. Async Support

@track_function()
async def async_process(data):
    return await some_async_operation(data)

@track_llm_call(name="summarize")
async def async_llm_call(text):
    return await chain.ainvoke({"text": text})

# Both sync and async work seamlessly

Per-Application Configuration

Each Gen AI application using this SDK should have its own Langfuse project and credentials. This ensures proper isolation and organization.

Step 1: Create Langfuse Project

  1. Go to your Langfuse instance
  2. Create a new project for your application (e.g., "chatbot-api", "doc-analyzer")
  3. Copy the project's public key, secret key, and base URL

Step 2: Configure in Your Application

Method 1: Environment Variables (Recommended for production)

# .env file in your application root
LANGFUSE_PUBLIC_KEY=pk-lf-abc123...
LANGFUSE_SECRET_KEY=sk-lf-xyz789...
LANGFUSE_BASE_URL=https://langfuse.company.com
LANGFUSE_VERIFY_SSL=false
from llmops_observability import TraceManager
from dotenv import load_dotenv

load_dotenv()  # Loads .env from current directory
# SDK auto-configures from environment variables

Method 2: Explicit Configuration (Recommended for testing)

from llmops_observability import configure
import os

# At application startup (e.g., main.py)
configure(
    public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
    secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
    base_url=os.getenv("LANGFUSE_BASE_URL"),
    verify_ssl=False
)

Environment Variables Reference

Variable Required Default Description
LANGFUSE_PUBLIC_KEY Yes None Langfuse public key from your project
LANGFUSE_SECRET_KEY Yes None Langfuse secret key from your project
LANGFUSE_BASE_URL Yes None Langfuse instance URL
LANGFUSE_VERIFY_SSL No false Whether to verify SSL certificates
PROJECT_ID No unknown_project Project identifier (used as trace name in Langfuse)
ENV No development Environment name (production, staging, development, etc.) - automatically mapped to LANGFUSE_TRACING_ENVIRONMENT
MODEL_ID No anthropic.claude-3-5-sonnet-20241022-v2:0 Default model ID for cost calculation when not explicitly provided
AWS_SQS_URL No None AWS SQS queue URL (when provided, enables SQS event streaming)
AWS_PROFILE No default AWS profile name for SQS authentication
AWS_REGION No us-east-1 AWS region for SQS
LANGFUSE_DEBUG No false Enable debug logging for Langfuse client

Environment Tracking:

  • The ENV variable is automatically mapped to Langfuse's LANGFUSE_TRACING_ENVIRONMENT
  • This applies the environment as a top-level attribute to all traces and observations
  • Allows easy filtering by environment in Langfuse UI
  • Must follow regex: ^(?!langfuse)[a-z0-9-_]+$ with max 40 characters Track regular function execution with optional local variable capture.
@track_function()
def my_function(x, y):
    return x + y

@track_function(name="custom_name", tags={"version": "1.0"})
def another_function():
    pass

# Capture all local variables for debugging
@track_function(capture_locals=True)
def debug_function(data):
    step1 = process(data)
    step2 = transform(step1)
    return step2  # All locals captured in Langfuse

# Capture specific variables only
@track_function(capture_locals=["result", "important_var"])
def selective_function(input):
    temp = input * 2  # Not captured
    result = temp + 10  # Captured
    important_var = compute(result)  # Captured
    return important_var

Parameters:

  • name: Custom span name (default: function name)
  • tags: Dictionary of tags/metadata
  • capture_locals: Capture local variables - True (all), False (none), or list of variable names
  • capture_self: Whether to capture self in methods (default: True)

API Reference

TraceManager

start_trace(name, project_id=None, environment=None, metadata=None, user_id=None, session_id=None, tags=None)

Start a new trace with project and environment tracking.

TraceManager.start_trace(
    name="chat_message",  # Operation name (required)
    project_id="my_project",  # Optional: defaults to PROJECT_ID env var
    environment="production",  # Optional: defaults to ENV env var
    metadata={"custom": "data"},
    user_id="user_123",
    session_id="session_456",
    tags=["experiment"]
)

Parameters:

  • name (required): Operation/trace name (e.g., "chat_message", "document_analysis")
  • project_id (optional): Project identifier. Defaults to PROJECT_ID from .env. Used as trace name in Langfuse.
  • environment (optional): Environment name (e.g., "production", "staging"). Defaults to ENV from .env. Automatically mapped to LANGFUSE_TRACING_ENVIRONMENT.
  • metadata (optional): Custom metadata dictionary
  • user_id (optional): User identifier
  • session_id (optional): Session identifier
  • tags (optional): List of tags

Returns: Trace ID (string)

Example with .env auto-loading:

# .env file
PROJECT_ID=chatbot-api
ENV=production
# Automatically uses PROJECT_ID and ENV from .env
TraceManager.start_trace(
    name="user_query",
    metadata={"version": "2.0"}
)
# Trace name in Langfuse: "chatbot-api"
# Environment in Langfuse: "production"

end_trace()

End the current trace and flush to Langfuse.

TraceManager.end_trace()

finalize_and_send(user_id, session_id, trace_name, trace_input, trace_output)

Finalize and send the trace with input/output metadata.

This is a convenience method that combines setting trace metadata and ending the trace in one call.

TraceManager.start_trace(name="chat_message")

# ... your code executes ...

# Finalize with input/output details
TraceManager.finalize_and_send(
    user_id="user_123",
    session_id="session_456",
    trace_name="bedrock_chat_message",
    trace_input={"user_msg": "What is Python?"},
    trace_output={"bot_response": "Python is a programming language..."}
)

Parameters:

  • user_id: User identifier
  • session_id: Session identifier
  • trace_name: Name for the trace (can override the initial name)
  • trace_input: Dictionary containing the input data
  • trace_output: Dictionary containing the output/response data

end_trace() vs finalize_and_send() - When to Use?

Method Purpose When to Use Example
end_trace() Simply close trace, flush to Langfuse Simple operations without trace-level input/output Process data, internal workflows
finalize_and_send() Close trace + capture end-to-end input/output When you want full conversation/request visibility User query → Bot response, LLM interactions

Code Comparison:

# Simple: Just close the trace
TraceManager.start_trace(name="chat_message")
result = process_data("some data")
llm_response = call_bedrock("Hello, world!")
TraceManager.end_trace()
# → Individual spans are captured, but no trace-level input/output
# Full Visibility: Capture entire flow
TraceManager.start_trace(name="chat_session")
user_input = "What is machine learning?"
response = await llm.generate(user_input)
TraceManager.finalize_and_send(
    user_id="user_123",
    session_id="session_456",
    trace_name="chat_message",
    trace_input={"user_msg": user_input},        # ← What went in
    trace_output={"bot_response": str(response)} # ← What came out
)
# → Both span-level AND trace-level input/output captured for complete visibility

In Langfuse UI:

  • end_trace(): Shows individual function spans with their inputs/outputs
  • finalize_and_send(): Shows complete conversation flow + individual spans

Decorators

Use these decorators together for agentic GenAI apps:

  • @track_function(...) for non-LLM business logic and helper spans
  • @track_llm_call(...) for model generation calls (span_type = generation)
  • @track_llm_agent(...) for agent orchestrator loops (span_type = agent)
  • @track_llm_tool(...) for tool execution and dispatch (span_type = tool)

@track_function(name=None, metadata=None, ...)

Track regular function execution.

@track_function(name="retrieve", metadata={"component": "vector_search"})
def retrieve(query: str):
    return search_index(query)

@track_llm_call(name=None, model=None, metadata=None, ...)

Track LLM generation calls with automatic model and cost tracking.

@track_llm_call(name="bedrock_converse", model="anthropic.claude-3-sonnet-20240229-v1:0")
def call_llm(messages):
    return bedrock.converse(
        modelId="anthropic.claude-3-sonnet-20240229-v1:0",
        messages=messages,
    )

@track_llm_agent(name=None, metadata=None, ...)

Track agent orchestrator spans (planner/loop/controller).

@track_llm_agent(name="agent_loop", metadata={"component": "agent_orchestrator"})
def agent_loop(user_query: str):
    # planning + tool routing + iteration control
    ...

@track_llm_tool(name=None, metadata=None, ...)

Track tool execution spans.

@track_llm_tool(name="handle_tool_call", metadata={"component": "tool_dispatch"})
def handle_tool_call(tool_name: str, tool_input: dict):
    return TOOL_REGISTRY[tool_name](**tool_input)

Agentic Integration Pattern (Recommended)

from llmops_observability import (
    TraceManager,
    track_function,
    track_llm_call,
    track_llm_agent,
    track_llm_tool,
)

@track_llm_tool(name="retrieve_context")
def retrieve_context_tool(query: str):
    return retrieve(query)

@track_llm_call(name="bedrock_converse")
def call_llm(messages, tools):
    return bedrock.converse(modelId="...", messages=messages, toolConfig={"tools": tools})

@track_llm_agent(name="agent_loop")
def agent_loop(user_query: str):
    # calls call_llm(...), handles tool_use, invokes retrieve_context_tool(...)
    ...

Migration Notes for Existing Apps

  1. Keep @track_llm_call on model invocation functions only.
  2. Move top-level orchestrator functions from @track_function to @track_llm_agent.
  3. Mark tool implementations and tool dispatch functions with @track_llm_tool.
  4. Keep retrieval/parsing/helper utilities under @track_function.

This gives clear span types in observability backends and improves agent graph readability for debugging and performance analysis.

Advanced Features

Nested Spans & Parent-Child Relationships

The SDK automatically handles nested function calls, creating parent-child relationships in Langfuse:

@track_function(name="orchestrator")
def main_workflow(user_query):
    # This is the parent span
    context = retrieve_documents(user_query)  # Child span 1
    answer = generate_response(user_query, context)  # Child span 2
    return answer

@track_function(name="retrieval")
def retrieve_documents(query):
    # This becomes a child of main_workflow
    return db.search(query)

@track_function(name="generation")
def generate_response(query, context):
    # This also becomes a child of main_workflow
    return llm.generate(query, context)

Data Size Management

The SDK automatically limits output size to 200KB to prevent issues with large data:

  • Outputs larger than 200KB are truncated with metadata
  • Preview of first ~1KB is included
  • Prevents memory/network issues with large responses

ASGI Middleware for FastAPI

Automatically trace all HTTP requests:

from fastapi import FastAPI
from llmops_observability import LLMOpsASGIMiddleware, track_function

app = FastAPI()
app.add_middleware(LLMOpsASGIMiddleware, service_name="chatbot_api")

@app.post("/chat")
async def chat_endpoint(message: str):
    # Entire request is automatically traced
    response = process_message(message)
    return {"response": response}

@track_function()
def process_message(msg):
    # This becomes a child span of the HTTP request trace
    return "Response"

The middleware captures:

  • Request method, path, headers
  • Response status code
  • Request duration
  • User agent, client IP
  • Automatic trace naming: {project}_{hostname}

Project Structure

llmops-observability_sdk/
├── src/
│   └── llmops_observability/
│       ├── __init__.py                # Public API & exports
│       ├── config.py                  # Langfuse client + SQS configuration
│       ├── trace_manager.py           # Core TraceManager class & @track_function decorator
│       ├── llm.py                     # @track_llm_call decorator with LLM response parsing
│       ├── models.py                  # SpanContext, TraceConfig data models
│       ├── asgi_middleware.py         # FastAPI/Starlette ASGI middleware
│       ├── sqs.py                     # Production SQS sender with batching & spillover
│       └── pricing.py                 # AWS Bedrock token pricing calculator
├── pyproject.toml                     # Project metadata & dependencies
└── README.md                          # This file

Module Details:

  • config.py: Manages Langfuse client initialization and SQS configuration
  • trace_manager.py: Core orchestration - handles trace lifecycle, nested spans, Langfuse API calls
  • llm.py: LLM call decorator with support for 10+ LLM provider response formats
  • sqs.py: Production-grade SQS integration with 4 daemon workers, batching, spillover recovery
  • pricing.py: Token cost calculator for 15+ AWS Bedrock model variants
  • asgi_middleware.py: Automatic HTTP request tracing for FastAPI applications

Architecture

Direct Langfuse Mode (Default)

Application
    ↓
TraceManager
    ↓
Langfuse (Real-time)

Traces are sent immediately to Langfuse with no intermediate storage or batching.

SQS Event Streaming Mode (Event-Driven)

Application
    ↓
TraceManager → SQS Events (Batched)
                    ↓
                Lambda Functions
                    ↓ ↓ ↓
                S3  NR  DW  (etc.)

When AWS_SQS_URL is set:

  • Application sends trace events to SQS asynchronously
  • Main application thread is never blocked
  • Lambda functions or other services consume events from SQS
  • Events forwarded to S3, New Relic, Datadog, or custom processors
  • Failed sends are saved to spillover file on disk for recovery
  • 4 daemon worker threads handle all SQS operations independently
  • Automatic cleanup on application shutdown

Best Practices

1. Configuration Management

  • Each application gets its own .env file with unique Langfuse credentials
  • ✅ Use .gitignore to exclude .env files from version control
  • ✅ Call configure() at application startup before any tracing
  • ❌ Never hardcode credentials in the SDK or application code

2. Trace Organization

# Good: Descriptive trace names with context
TraceManager.start_trace(
    name="document_analysis_pipeline",
    user_id=user_id,
    session_id=session_id,
    metadata={"doc_type": "pdf", "version": "2.0"},
    tags=["production", "critical"]
)

# Bad: Generic names without context
TraceManager.start_trace(name="process")

3. Local Variables Capture

# Use for debugging only - has performance impact
@track_function(capture_locals=True)  # Development
def debug_complex_logic(data):
    # All locals captured
    pass

# Production: Disable or be selective
@track_function(capture_locals=False)  # Production
@track_function(capture_locals=["final_result"])  # Selective

4. Always End Traces

try:
    TraceManager.start_trace(name="workflow")
    result = process()
    return result
finally:
    TraceManager.end_trace()  # Always flush

5. Trace Naming Convention

  • Trace Name (in Langfuse): Uses PROJECT_ID for easy project identification
  • Operation Name: The name parameter describes what operation is being traced
  • Environment: Tracked automatically from ENV variable
# Example:
# .env: PROJECT_ID=payment-service, ENV=production

TraceManager.start_trace(name="process_payment")
# In Langfuse UI:
#   - Trace Name: "payment-service"
#   - Environment: "production"
#   - Operation: "process_payment" (in metadata)

📦 SQS Message Schema

Message Wrapper (What SDK Sends to SQS)

{
  "compressed": true,
  "data": "H4sIAAAAAAAC/+1Y...",
  "trace_id": "87a41b12-cc61-4fdf-9bf2-a50a369b4d30",
  "type": "trace"
}

Decompression Steps:

  1. Base64 decode the data field → binary gzip data
  2. Gzip decompress → JSON string
  3. JSON parse → Complete trace data

Complete Trace Data (Decompressed)

{
  "trace_id": "87a41b12-cc61-4fdf-9bf2-a50a369b4d30",
  "trace_name": "rag_pipeline_example",
  "project_id": "new_test",
  "environment": "uat",
  "user_id": "user_123",
  "session_id": "session_456",
  
  "start_time": 1769446311.0,
  "end_time": 1769446318.021,
  "duration_ms": 7021,
  
  "trace_input": {"user_msg": "What is Android ????"},
  "trace_output": {"bot_response": "Android is a mobile operating system..."},
  
  "spans": [
    {
      "span_id": "64a2a265-017e-4af1-bf49-15c3dd51e2fd",
      "span_name": "retrieve_context",
      "span_type": "span",
      "parent_span_id": null,
      
      "start_time": 1769446311.0,
      "end_time": 1769446312.0,
      "duration_ms": 1000,
      
      "input_data": {
        "args": ["What is Android ????"],
        "kwargs": {},
        "locals": {}
      },
      
      "output_data": {"output": {"documents": ["Doc 1", "Doc 2"]}},
      
      "error": null,
      "model_id": null,
      
      "metadata": {
        "environment": "uat",
        "project_id": "new_test"
      },
      
      "tags": [],
      "usage": null,
      "prompt": null,
      "response": null,
      "status": "success",
      "status_message": null,
      "level": "DEFAULT"
    },
    {
      "span_id": "6ba7b810-9dad-11d1-80b4-00c04fd430c8",
      "span_name": "call_llm",
      "span_type": "generation",
      "parent_span_id": null,
      
      "start_time": 1769446312.0,
      "end_time": 1769446316.0,
      "duration_ms": 4000,
      
      "input_data": {
        "args": [],
        "kwargs": {"prompt": "Context: Doc 1, Doc 2\n\nQuestion: What is Android ????"},
        "locals": {}
      },
      
      "output_data": {"output": {"message": {"content": "Android is a mobile operating system..."}}},
      
      "error": null,
      "model_id": "anthropic.claude-3-sonnet-20240229-v1:0",
      
      "metadata": {
        "environment": "uat",
        "project_id": "new_test"
      },
      
      "tags": [],
      
      "usage": {
        "input_tokens": 145,
        "output_tokens": 87,
        "total_tokens": 232
      },
      
      "prompt": "Context: Doc 1, Doc 2\n\nQuestion: What is Android ????",
      "response": "Android is a mobile operating system developed by Google...",
      
      "status": "success",
      "status_message": null,
      "level": "DEFAULT"
    }
  ],
  
  "metadata": {"version": "1.0.0"},
  "tags": ["example", "rag"],
  
  "total_spans": 2,
  "total_generations": 1,
  
  "sdk_name": "llmops-observability",
  "sdk_version": "2.0.0"
}

Field Reference

Trace Level:

Field Auto-Injected Description
trace_id UUID generated on start_trace()
trace_name Operation name from start_trace()
project_id From PROJECT_ID env var
environment From ENV env var
user_id From start_trace() or finalize_and_send()
session_id From start_trace() or finalize_and_send()
start_time Unix timestamp (seconds)
end_time Unix timestamp (seconds)
duration_ms Calculated: (end_time - start_time) * 1000
trace_input From finalize_and_send()
trace_output From finalize_and_send()
spans Array of span objects
total_spans Count of all spans
total_generations Count of spans with span_type == "generation"

Span Level:

Field Auto-Injected Description
span_id UUID for span
span_name Function name or custom name
span_type "span" or "generation"
parent_span_id Parent span ID (null for root)
duration_ms Execution time
input_data Function args, kwargs, locals
output_data Return value
model_id From @track_llm_call(model=...)
usage Token counts (generation spans only)
prompt Prompt text (generation spans only)
response Response text (generation spans only)
metadata.environment Auto-injected from ENV
metadata.project_id Auto-injected from PROJECT_ID
status "success" or "error"

Size Limits & Truncation

Field Limit Behavior
trace_input 50 KB Truncated with preview if exceeded
trace_output 50 KB Truncated with preview if exceeded
span.input_data 20 KB Truncated with preview if exceeded
span.output_data 20 KB Truncated with preview if exceeded
span.prompt 10 KB Truncated with preview if exceeded
span.response 10 KB Truncated with preview if exceeded
Total Message 200 KB Aggressive truncation applied

Lambda Decompression (Reference)

import json
import base64
import gzip

def decompress_sqs_message(message_body: str) -> dict:
    """Decompress SDK trace data from SQS message."""
    sqs_message = json.loads(message_body)
    
    if not sqs_message.get("compressed"):
        return sqs_message
    
    # Decompress
    compressed_data = base64.b64decode(sqs_message['data'])
    decompressed = gzip.decompress(compressed_data)
    trace_data = json.loads(decompressed)
    
    return trace_data

When to Use This SDK

Use llmops-observability when:

Development & Testing:

  • Developing and testing LLM applications locally
  • Need quick debugging with local variable capture
  • Want instant trace visibility in Langfuse (no delays)
  • Simple, straightforward tracing without infrastructure setup

Production Deployments:

  • Small to medium-scale with direct Langfuse integration
  • Enterprise event-driven architectures with SQS + Lambda + S3
  • Multi-destination observability (S3, New Relic, Datadog, custom systems)
  • Centralized observability across multiple LLM applications
  • Token cost tracking and analysis
  • Compliance/audit: archive all LLM interactions with full traceability

Common Use Cases:

  • RAG (Retrieval Augmented Generation) systems
  • LLM-powered APIs and microservices
  • Chat applications and conversational AI
  • Document analysis and processing pipelines
  • Real-time LLM inference monitoring
  • Multi-step LLM workflows with nested tracking

Key Advantages:

  • ✨ No external dependencies for basic tracing (Direct Langfuse mode)
  • 🚀 Optional SQS integration for enterprise deployments
  • 🔄 Automatic nested span tracking for complex workflows
  • 💰 Built-in token cost calculation
  • 🛡️ Production-ready with daemon workers and spillover recovery

Troubleshooting

Configuration Errors

Error: "Langfuse not configured"

# Solution: Ensure env vars are set or call configure()
from dotenv import load_dotenv
load_dotenv()  # Load .env file

# Or configure explicitly
from llmops_observability import configure
configure(public_key="...", secret_key="...", base_url="...")

Trace Not Appearing in Langfuse

  1. Check that TraceManager.end_trace() is called
  2. Verify credentials are correct
  3. Check Langfuse URL is accessible
  4. Look for error messages in console output

SSL Certificate Issues

# Disable SSL verification if using self-signed certs
configure(
    public_key="...",
    secret_key="...",
    base_url="...",
    verify_ssl=False  # ← Disable SSL verification
)

Version History

v8.0.0 (Current) - Production-Ready Enterprise Release

  • Dual-Mode Tracing: Direct Langfuse integration + optional SQS event streaming
  • 🎯 SQS Event Streaming: Production-grade AWS SQS sender with:
    • Automatic batching for efficiency
    • Spillover recovery to disk
    • 4 daemon worker threads
    • Clean shutdown support
  • 💰 Token Pricing: AWS Bedrock cost calculator for 15+ model variants
  • 🪆 Nested Spans: Automatic parent-child relationship tracking
  • 🔍 Locals Capture: Function local variable capture for debugging
  • 🌐 ASGI Middleware: FastAPI/Starlette auto-tracing
  • 📊 Smart Serialization: 200KB automatic data size limits
  • 🔄 Sync & Async: Full async/await support
  • 🛡️ Resilient: Auto-restart failed workers, graceful shutdown

License

Proprietary - Verisk Analytics

Contributing

Internal SDK - For questions or contributions, contact the LLMOps team.

Example: Complete Workflow

from llmops_observability import TraceManager, track_function, track_llm_call, track_llm_agent, track_llm_tool
import boto3

# Initialize Bedrock client
bedrock = boto3.client("bedrock-runtime", region_name="us-east-1")

@track_function()
def retrieve_context(query):
    # Simulate RAG retrieval
    return {"documents": ["Context doc 1", "Context doc 2"]}

@track_llm_call()
def generate_answer(prompt, context):
    response = bedrock.converse(
        modelId="anthropic.claude-3-sonnet-20240229-v1:0",
        messages=[{
            "role": "user",
            "content": f"Context: {context}\n\nQuestion: {prompt}"
        }]
    )
    return response

# Start trace
TraceManager.start_trace(
    name="rag_pipeline",
    user_id="user_123",
    metadata={"pipeline": "v1"}
)

# Execute workflow
context = retrieve_context("What is Python?")
answer = generate_answer("What is Python?", context)

# End trace
TraceManager.end_trace()

Thanks to

Verisk LLMOps Team ❤️

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

llmops_observability-49.1.9.tar.gz (161.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

llmops_observability-49.1.9-py3-none-any.whl (91.3 kB view details)

Uploaded Python 3

File details

Details for the file llmops_observability-49.1.9.tar.gz.

File metadata

  • Download URL: llmops_observability-49.1.9.tar.gz
  • Upload date:
  • Size: 161.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.9 {"installer":{"name":"uv","version":"0.9.9"},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for llmops_observability-49.1.9.tar.gz
Algorithm Hash digest
SHA256 8b7527237bb25f704b636722420243a61e5554570af5b0cc5ff51f7cd588621d
MD5 a9320745621e5b1b98bc62f4bcaf3fbd
BLAKE2b-256 e3198521dc0e859933480f009fd28709473ccb646a80fbe1e392cd7d0f3319cb

See more details on using hashes here.

File details

Details for the file llmops_observability-49.1.9-py3-none-any.whl.

File metadata

  • Download URL: llmops_observability-49.1.9-py3-none-any.whl
  • Upload date:
  • Size: 91.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.9 {"installer":{"name":"uv","version":"0.9.9"},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for llmops_observability-49.1.9-py3-none-any.whl
Algorithm Hash digest
SHA256 f2ec6c31582aaa46046876f13ec1ace582554f30b181e8ab65312c58ed53ee16
MD5 fe0f4d76e93064669b0c45ec9608b548
BLAKE2b-256 5be1ea22970877ba6534c78c33ef3b8f4c78bf354c8f25756333f4511f9bd437

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page