Skip to main content

OpenTelemetry-based tracing SDK for AI applications

Project description

msgtrace SDK

OpenTelemetry-based tracing SDK for AI applications.

Installation

Using pip:

pip install msgtrace-sdk

Using uv (recommended):

uv add msgtrace-sdk

Quick Start

import os
from msgtrace.sdk import Spans, MsgTraceAttributes

# Enable tracing
os.environ["MSGTRACE_TELEMETRY_ENABLED"] = "true"
os.environ["MSGTRACE_OTLP_ENDPOINT"] = "http://localhost:8000/api/v1/traces/export"

# Mock function for demonstration
def chat_completion(prompt):
    """Simulate LLM API call."""
    return {"content": "AI is artificial intelligence", "tokens": {"input": 100, "output": 50}}

# Trace your AI operations
with Spans.span_context(name="chat_completion"):
    MsgTraceAttributes.set_model("gpt-5")
    MsgTraceAttributes.set_operation_name("chat")

    # Your AI logic here
    response = chat_completion("What is AI?")

    MsgTraceAttributes.set_usage(
        input_tokens=response["tokens"]["input"],
        output_tokens=response["tokens"]["output"]
    )
    MsgTraceAttributes.set_cost(input_cost=0.003, output_cost=0.0015)

Features

  • Zero-overhead when disabled
  • Thread-safe singleton pattern
  • Async-first with sync support
  • 60+ OpenTelemetry attributes for AI/GenAI
  • Context managers and decorators

Configuration

All configuration via environment variables:

# Enable/disable tracing
MSGTRACE_TELEMETRY_ENABLED=true

# OTLP endpoint
MSGTRACE_OTLP_ENDPOINT=http://localhost:8000/api/v1/traces/export

# Exporter type (otlp or console)
MSGTRACE_EXPORTER=otlp

# Service name
MSGTRACE_SERVICE_NAME=my-ai-app

# Capture platform info
MSGTRACE_CAPTURE_PLATFORM=true

Core API

Creating Spans

from msgtrace.sdk import Spans

# Basic span
with Spans.span_context("operation_name"):
    # Your code here
    pass

# Flow-level span (top-level operation)
with Spans.init_flow("user_query_flow"):
    # Flow logic
    pass

# Module-level span
with Spans.init_module("vector_search"):
    # Module logic
    pass

# Async spans
async with Spans.aspan_context("async_operation"):
    await some_async_function()

# Decorators
@Spans.instrument("process_data")
def process(data: str):
    return data.upper()

@Spans.ainstrument("async_process")
async def async_process(data: str):
    return await process_async(data)

Setting Attributes

All attributes follow OpenTelemetry GenAI semantic conventions:

from msgtrace.sdk import MsgTraceAttributes

# Operation
MsgTraceAttributes.set_operation_name("chat")  # chat, tool, agent, embedding
MsgTraceAttributes.set_system("openai")  # openai, anthropic, google

# Model & Parameters
MsgTraceAttributes.set_model("gpt-5")
MsgTraceAttributes.set_temperature(0.7)
MsgTraceAttributes.set_max_tokens(1000)

# Prompt & Completion
MsgTraceAttributes.set_prompt("What is AI?")
MsgTraceAttributes.set_prompt([
    {"role": "system", "content": "You are helpful"},
    {"role": "user", "content": "What is AI?"}
])
MsgTraceAttributes.set_completion("AI is artificial intelligence...")

# Usage & Cost
MsgTraceAttributes.set_usage(input_tokens=100, output_tokens=50)
MsgTraceAttributes.set_cost(input_cost=0.003, output_cost=0.0015, currency="USD")

# Tools
MsgTraceAttributes.set_tool_name("search_web")
MsgTraceAttributes.set_tool_call_arguments({"query": "AI", "limit": 5})
MsgTraceAttributes.set_tool_response({"results": ["a", "b", "c"]})

# Agent
MsgTraceAttributes.set_agent_name("research_agent")
MsgTraceAttributes.set_agent_id("agent_001")
MsgTraceAttributes.set_agent_type("autonomous")

# Workflow
MsgTraceAttributes.set_workflow_name("user_query_flow")
MsgTraceAttributes.set_workflow_id("wf_123")
MsgTraceAttributes.set_user_id("user_456")
MsgTraceAttributes.set_session_id("session_789")

# Custom attributes
MsgTraceAttributes.set_custom("business_metric", 99.9)
MsgTraceAttributes.set_custom("metadata", {"key": "value"})

Examples

Complete Chat Completion

import os
from msgtrace.sdk import Spans, MsgTraceAttributes

os.environ["MSGTRACE_TELEMETRY_ENABLED"] = "true"

# Mock LLM API call
def call_llm(prompt):
    """Simulate OpenAI API call."""
    return {
        "id": "resp_123",
        "content": "AI is artificial intelligence...",
        "usage": {"input_tokens": 10, "output_tokens": 50}
    }

with Spans.span_context("chat_completion"):
    # Request
    MsgTraceAttributes.set_operation_name("chat")
    MsgTraceAttributes.set_system("openai")
    MsgTraceAttributes.set_model("gpt-5")
    MsgTraceAttributes.set_temperature(0.7)

    prompt = "What is AI?"
    MsgTraceAttributes.set_prompt(prompt)

    # API call
    response = call_llm(prompt)

    # Response
    MsgTraceAttributes.set_response_id(response["id"])
    MsgTraceAttributes.set_finish_reason("stop")
    MsgTraceAttributes.set_completion(response["content"])
    MsgTraceAttributes.set_usage(
        input_tokens=response["usage"]["input_tokens"],
        output_tokens=response["usage"]["output_tokens"]
    )
    MsgTraceAttributes.set_cost(input_cost=0.0015, output_cost=0.0005)

Agent Workflow

with Spans.init_flow("research_flow"):
    MsgTraceAttributes.set_workflow_name("research_agent")
    MsgTraceAttributes.set_user_id("user_123")

    # Tool execution
    with Spans.init_module("tool_search"):
        MsgTraceAttributes.set_operation_name("tool")
        MsgTraceAttributes.set_tool_name("search_web")
        MsgTraceAttributes.set_tool_call_arguments({"query": "AI"})

        # Execute tool
        # results = search_web("AI")

        MsgTraceAttributes.set_tool_response({"results": [...]})

    # LLM processing
    with Spans.init_module("llm_synthesis"):
        MsgTraceAttributes.set_operation_name("chat")
        MsgTraceAttributes.set_model("gpt-5")
        MsgTraceAttributes.set_usage(input_tokens=200, output_tokens=100)
        MsgTraceAttributes.set_cost(input_cost=0.006, output_cost=0.003)

Using Decorators

@Spans.set_tool_attributes("search_db", description="Search database")
@Spans.instrument("database_search")
def search(query: str):
    MsgTraceAttributes.set_tool_call_arguments({"query": query})

    # Database search
    results = db.search(query)

    MsgTraceAttributes.set_tool_response({"count": len(results)})
    return results

# Call it
results = search("AI research")

Async Operations

import asyncio
from msgtrace.sdk import Spans, MsgTraceAttributes

# Mock async API call
async def async_api_call(prompt):
    """Simulate async LLM API call."""
    await asyncio.sleep(0.1)
    return {"content": "AI response", "tokens": {"input": 50, "output": 30}}

@Spans.ainstrument("async_chat")
async def chat_completion(prompt: str):
    MsgTraceAttributes.set_operation_name("chat")
    MsgTraceAttributes.set_model("gpt-5")

    # Async API call
    response = await async_api_call(prompt)

    MsgTraceAttributes.set_usage(
        input_tokens=response["tokens"]["input"],
        output_tokens=response["tokens"]["output"]
    )
    return response["content"]

# Use it
async def main():
    async with Spans.ainit_flow("async_flow"):
        result = await chat_completion("What is AI?")
        print(result)

# Run
asyncio.run(main())

Custom Decorators

Create custom decorators to capture function arguments and outputs:

Basic Custom Decorator

from functools import wraps
from msgtrace.sdk import Spans, MsgTraceAttributes

def trace_function(operation_name: str = None):
    """Custom decorator that captures function arguments and output."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Use function name if operation_name not provided
            span_name = operation_name or func.__name__

            with Spans.span_context(span_name):
                # Capture function arguments
                MsgTraceAttributes.set_custom("function.args", list(args))
                MsgTraceAttributes.set_custom("function.kwargs", kwargs)

                # Execute function
                result = func(*args, **kwargs)

                # Capture output (be careful with large outputs)
                MsgTraceAttributes.set_custom("function.output", str(result)[:1000])

                return result
        return wrapper
    return decorator

# Usage
@trace_function("calculate_price")
def calculate_price(base_price: float, discount: float = 0.0):
    return base_price * (1 - discount)

result = calculate_price(100.0, discount=0.2)
# Traces: function.args=[100.0], function.kwargs={'discount': 0.2}, function.output='80.0'

Async Custom Decorator

import asyncio
from functools import wraps
from msgtrace.sdk import Spans, MsgTraceAttributes

def trace_async_function(operation_name: str = None):
    """Custom decorator for async functions."""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            span_name = operation_name or func.__name__

            async with Spans.aspan_context(span_name):
                # Capture inputs
                MsgTraceAttributes.set_custom("function.args", list(args))
                MsgTraceAttributes.set_custom("function.kwargs", kwargs)

                # Execute async function
                result = await func(*args, **kwargs)

                # Capture output
                MsgTraceAttributes.set_custom("function.output", str(result)[:1000])

                return result
        return wrapper
    return decorator

# Usage
@trace_async_function("fetch_user_data")
async def fetch_user_data(user_id: str):
    # Simulate async API call
    await asyncio.sleep(0.1)
    return {"id": user_id, "name": "John Doe"}

LLM Call Decorator

def trace_llm_call(model: str, provider: str = "openai"):
    """Specialized decorator for LLM calls."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            with Spans.span_context(f"llm_{func.__name__}"):
                # Set LLM attributes
                MsgTraceAttributes.set_operation_name("chat")
                MsgTraceAttributes.set_model(model)
                MsgTraceAttributes.set_system(provider)

                # Capture prompt (first argument)
                if args:
                    MsgTraceAttributes.set_prompt(str(args[0]))

                # Execute LLM call
                result = func(*args, **kwargs)

                # Capture completion
                if isinstance(result, dict) and "content" in result:
                    MsgTraceAttributes.set_completion(result["content"])

                    # Capture usage if available
                    if "usage" in result:
                        usage = result["usage"]
                        MsgTraceAttributes.set_usage(
                            input_tokens=usage.get("input_tokens", 0),
                            output_tokens=usage.get("output_tokens", 0)
                        )

                return result
        return wrapper
    return decorator

# Usage
@trace_llm_call(model="gpt-5", provider="openai")
def ask_llm(prompt: str):
    # Your LLM API call here
    return {
        "content": "AI is artificial intelligence...",
        "usage": {"input_tokens": 10, "output_tokens": 50}
    }

Error Tracking Decorator

def trace_with_error_handling(operation_name: str = None):
    """Decorator that captures exceptions and function metadata."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            span_name = operation_name or func.__name__

            with Spans.span_context(span_name):
                # Set function metadata
                MsgTraceAttributes.set_custom("function.name", func.__name__)
                MsgTraceAttributes.set_custom("function.module", func.__module__)

                try:
                    # Capture inputs
                    MsgTraceAttributes.set_custom("function.args", list(args))
                    MsgTraceAttributes.set_custom("function.kwargs", kwargs)

                    # Execute function
                    result = func(*args, **kwargs)

                    # Mark as successful
                    MsgTraceAttributes.set_custom("function.success", True)
                    MsgTraceAttributes.set_custom("function.output_type", type(result).__name__)

                    return result

                except Exception as e:
                    # Capture error details
                    MsgTraceAttributes.set_custom("function.success", False)
                    MsgTraceAttributes.set_custom("error.type", type(e).__name__)
                    MsgTraceAttributes.set_custom("error.message", str(e))
                    raise

        return wrapper
    return decorator

# Usage
@trace_with_error_handling("risky_operation")
def divide(a: float, b: float):
    return a / b

try:
    result = divide(10, 0)  # Will trace the error
except ZeroDivisionError:
    pass

Best Practices for Custom Decorators

  1. Limit captured data size: Truncate large strings/objects
  2. Sanitize sensitive data: Don't capture passwords, API keys, etc.
  3. Use appropriate attribute names: Clear, descriptive keys
  4. Handle exceptions properly: Let exceptions propagate after capturing
  5. Combine with built-in decorators: Stack with @Spans.instrument()

Span Naming Conventions

For better visualization in the msgtrace frontend, follow these naming conventions:

Module Type (module.type)

Use the module.type attribute to categorize spans for specialized visualizations:

from msgtrace.sdk import Spans, MsgTraceAttributes

# Agent visualization
with Spans.init_module("research_agent"):
    MsgTraceAttributes.set_custom("module.type", "Agent")
    MsgTraceAttributes.set_agent_name("research_agent")
    # Agent logic here

# Tool visualization
with Spans.init_module("web_search"):
    MsgTraceAttributes.set_custom("module.type", "Tool")
    MsgTraceAttributes.set_tool_name("search_web")
    # Tool execution here

# Transcriber visualization
with Spans.init_module("speech_to_text"):
    MsgTraceAttributes.set_custom("module.type", "Transcriber")
    # Transcription logic here

# LLM visualization
with Spans.init_module("llm_call"):
    MsgTraceAttributes.set_custom("module.type", "LLM")
    MsgTraceAttributes.set_model("gpt-5")
    # LLM call here

Common Module Types

Type Description Visualization
Agent Autonomous agents Agent flow diagram
Tool Tool executions Tool analytics
LLM LLM API calls Token/cost analysis
Transcriber Speech-to-text Audio processing view
Retriever Vector/DB search Retrieval metrics
Embedder Text embedding Embedding analytics
Custom Custom operations Generic span view

Module Naming Best Practices

# ✅ Good: Descriptive and consistent
with Spans.init_module("data_retrieval"):
    MsgTraceAttributes.set_custom("module.type", "Retriever")
    MsgTraceAttributes.set_custom("module.name", "vector_search")

# ✅ Good: Clear hierarchy
with Spans.init_flow("user_query"):
    with Spans.init_module("intent_classifier"):
        MsgTraceAttributes.set_custom("module.type", "LLM")

    with Spans.init_module("response_generator"):
        MsgTraceAttributes.set_custom("module.type", "Agent")

# ❌ Bad: Vague names
with Spans.init_module("process"):  # What process?
    pass

# ❌ Bad: Inconsistent typing
with Spans.init_module("tool_call"):
    MsgTraceAttributes.set_custom("module.type", "tool")  # Should be "Tool"

Complete Example with Conventions

from msgtrace.sdk import Spans, MsgTraceAttributes

with Spans.init_flow("customer_support_query"):
    MsgTraceAttributes.set_workflow_name("support_agent")
    MsgTraceAttributes.set_user_id("user_123")

    # Step 1: Classify intent
    with Spans.init_module("intent_classification"):
        MsgTraceAttributes.set_custom("module.type", "LLM")
        MsgTraceAttributes.set_custom("module.name", "intent_classifier")
        MsgTraceAttributes.set_model("gpt-5")
        # Classification logic

    # Step 2: Search knowledge base
    with Spans.init_module("knowledge_retrieval"):
        MsgTraceAttributes.set_custom("module.type", "Retriever")
        MsgTraceAttributes.set_custom("module.name", "vector_db")
        # Vector search logic

    # Step 3: Execute tool if needed
    with Spans.init_module("order_lookup"):
        MsgTraceAttributes.set_custom("module.type", "Tool")
        MsgTraceAttributes.set_custom("module.name", "order_api")
        MsgTraceAttributes.set_tool_name("get_order_status")
        # Tool execution

    # Step 4: Generate response
    with Spans.init_module("response_generation"):
        MsgTraceAttributes.set_custom("module.type", "Agent")
        MsgTraceAttributes.set_custom("module.name", "response_agent")
        MsgTraceAttributes.set_agent_name("support_responder")
        # Agent response logic

These conventions enable the msgtrace frontend to:

  • Group related operations by type
  • Generate specialized visualizations (agent flows, tool analytics)
  • Calculate type-specific metrics (LLM costs, tool latencies)
  • Provide better filtering and search capabilities

Best Practices

  1. Enable conditionally: Use environment variables to control tracing
  2. Set attributes early: Set operation/model before execution
  3. Use decorators: For frequently instrumented functions
  4. Nest properly: Flow → Module → Span hierarchy
  5. Handle errors: Let context managers auto-record exceptions
  6. Shutdown gracefully: Call tracer_manager.shutdown() at exit

Thread Safety

All operations are thread-safe:

  • TracerManager uses RLock for initialization
  • OpenTelemetry SDK is thread-safe
  • Multiple threads can create spans simultaneously

Zero Overhead

When MSGTRACE_TELEMETRY_ENABLED=false:

  • Tracer initialization is lazy (no cost until used)
  • No-op tracer is created (minimal overhead)
  • Attribute setters check span.is_recording() (fast path)

Contributing

We welcome contributions! Please see CONTRIBUTING.md for:

  • Development setup
  • Testing and code quality guidelines
  • Pull request process
  • Release workflow

For automation details (CI/CD, bots, etc.), see AUTOMATION.md.

License

MIT License - see LICENSE file for details.

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

msgtrace_sdk-1.1.0.tar.gz (48.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

msgtrace_sdk-1.1.0-py3-none-any.whl (14.1 kB view details)

Uploaded Python 3

File details

Details for the file msgtrace_sdk-1.1.0.tar.gz.

File metadata

  • Download URL: msgtrace_sdk-1.1.0.tar.gz
  • Upload date:
  • Size: 48.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for msgtrace_sdk-1.1.0.tar.gz
Algorithm Hash digest
SHA256 b1ad04dab5a56a39b231f5b5baf3cf7e209d25219e5c14a84f3c6964d811db61
MD5 d8d9a81041b54cc4c35d54af26871086
BLAKE2b-256 cd473f0c358b3f4cb5f17842b1dc5e713c793dc9a5c2ce56a41143d66ddf86e2

See more details on using hashes here.

Provenance

The following attestation bundles were made for msgtrace_sdk-1.1.0.tar.gz:

Publisher: publish.yml on msgflux/msgtrace-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file msgtrace_sdk-1.1.0-py3-none-any.whl.

File metadata

  • Download URL: msgtrace_sdk-1.1.0-py3-none-any.whl
  • Upload date:
  • Size: 14.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for msgtrace_sdk-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d84681a397f289a96d6557ddbeef6046200c75b7b507fb94593b8812b1a79f23
MD5 a56bf518bc403b2e7ec601935b2b00aa
BLAKE2b-256 6d46ea74f2e1a49d1f887b0392b1b6cc822720594321e299b56319a8419ddd24

See more details on using hashes here.

Provenance

The following attestation bundles were made for msgtrace_sdk-1.1.0-py3-none-any.whl:

Publisher: publish.yml on msgflux/msgtrace-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page