Skip to main content

OpenTelemetry-based tracing SDK for AI applications

Project description

msgtrace SDK

OpenTelemetry-based tracing SDK for AI applications.

Installation

Using pip:

pip install msgtrace-sdk

Using uv (recommended):

uv add msgtrace-sdk

Quick Start

import os
from msgtrace.sdk import Spans, MsgTraceAttributes

# Enable tracing
os.environ["MSGTRACE_TELEMETRY_ENABLED"] = "true"
os.environ["MSGTRACE_OTLP_ENDPOINT"] = "http://localhost:8000/api/v1/traces/export"

# Mock function for demonstration
def chat_completion(prompt):
    """Simulate LLM API call."""
    return {"content": "AI is artificial intelligence", "tokens": {"input": 100, "output": 50}}

# Trace your AI operations
with Spans.span_context(name="chat_completion"):
    MsgTraceAttributes.set_model("gpt-5")
    MsgTraceAttributes.set_operation_name("chat")

    # Your AI logic here
    response = chat_completion("What is AI?")

    MsgTraceAttributes.set_usage(
        input_tokens=response["tokens"]["input"],
        output_tokens=response["tokens"]["output"]
    )
    MsgTraceAttributes.set_cost(input_cost=0.003, output_cost=0.0015)

Features

  • Zero-overhead when disabled
  • Thread-safe singleton pattern
  • Async-first with sync support
  • 60+ OpenTelemetry attributes for AI/GenAI
  • Context managers and decorators

Configuration

All configuration via environment variables:

# Enable/disable tracing
MSGTRACE_TELEMETRY_ENABLED=true

# OTLP endpoint
MSGTRACE_OTLP_ENDPOINT=http://localhost:8000/api/v1/traces/export

# Exporter type (otlp or console)
MSGTRACE_EXPORTER=otlp

# Service name
MSGTRACE_SERVICE_NAME=my-ai-app

# Capture platform info
MSGTRACE_CAPTURE_PLATFORM=true

Core API

Creating Spans

from msgtrace.sdk import Spans

# Basic span
with Spans.span_context("operation_name"):
    # Your code here
    pass

# Flow-level span (top-level operation)
with Spans.init_flow("user_query_flow"):
    # Flow logic
    pass

# Module-level span
with Spans.init_module("vector_search"):
    # Module logic
    pass

# Async spans
async with Spans.aspan_context("async_operation"):
    await some_async_function()

# Decorators
@Spans.instrument("process_data")
def process(data: str):
    return data.upper()

@Spans.ainstrument("async_process")
async def async_process(data: str):
    return await process_async(data)

Setting Attributes

All attributes follow OpenTelemetry GenAI semantic conventions:

from msgtrace.sdk import MsgTraceAttributes

# Operation
MsgTraceAttributes.set_operation_name("chat")  # chat, tool, agent, embedding
MsgTraceAttributes.set_system("openai")  # openai, anthropic, google

# Model & Parameters
MsgTraceAttributes.set_model("gpt-5")
MsgTraceAttributes.set_temperature(0.7)
MsgTraceAttributes.set_max_tokens(1000)

# Prompt & Completion
MsgTraceAttributes.set_prompt("What is AI?")
MsgTraceAttributes.set_prompt([
    {"role": "system", "content": "You are helpful"},
    {"role": "user", "content": "What is AI?"}
])
MsgTraceAttributes.set_completion("AI is artificial intelligence...")

# Usage & Cost
MsgTraceAttributes.set_usage(input_tokens=100, output_tokens=50)
MsgTraceAttributes.set_cost(input_cost=0.003, output_cost=0.0015, currency="USD")

# Tools
MsgTraceAttributes.set_tool_name("search_web")
MsgTraceAttributes.set_tool_call_arguments({"query": "AI", "limit": 5})
MsgTraceAttributes.set_tool_response({"results": ["a", "b", "c"]})

# Agent
MsgTraceAttributes.set_agent_name("research_agent")
MsgTraceAttributes.set_agent_id("agent_001")
MsgTraceAttributes.set_agent_type("autonomous")

# Workflow
MsgTraceAttributes.set_workflow_name("user_query_flow")
MsgTraceAttributes.set_workflow_id("wf_123")
MsgTraceAttributes.set_user_id("user_456")
MsgTraceAttributes.set_session_id("session_789")

# Custom attributes
MsgTraceAttributes.set_custom("business_metric", 99.9)
MsgTraceAttributes.set_custom("metadata", {"key": "value"})

Examples

Complete Chat Completion

import os
from msgtrace.sdk import Spans, MsgTraceAttributes

os.environ["MSGTRACE_TELEMETRY_ENABLED"] = "true"

# Mock LLM API call
def call_llm(prompt):
    """Simulate OpenAI API call."""
    return {
        "id": "resp_123",
        "content": "AI is artificial intelligence...",
        "usage": {"input_tokens": 10, "output_tokens": 50}
    }

with Spans.span_context("chat_completion"):
    # Request
    MsgTraceAttributes.set_operation_name("chat")
    MsgTraceAttributes.set_system("openai")
    MsgTraceAttributes.set_model("gpt-5")
    MsgTraceAttributes.set_temperature(0.7)

    prompt = "What is AI?"
    MsgTraceAttributes.set_prompt(prompt)

    # API call
    response = call_llm(prompt)

    # Response
    MsgTraceAttributes.set_response_id(response["id"])
    MsgTraceAttributes.set_finish_reason("stop")
    MsgTraceAttributes.set_completion(response["content"])
    MsgTraceAttributes.set_usage(
        input_tokens=response["usage"]["input_tokens"],
        output_tokens=response["usage"]["output_tokens"]
    )
    MsgTraceAttributes.set_cost(input_cost=0.0015, output_cost=0.0005)

Agent Workflow

with Spans.init_flow("research_flow"):
    MsgTraceAttributes.set_workflow_name("research_agent")
    MsgTraceAttributes.set_user_id("user_123")

    # Tool execution
    with Spans.init_module("tool_search"):
        MsgTraceAttributes.set_operation_name("tool")
        MsgTraceAttributes.set_tool_name("search_web")
        MsgTraceAttributes.set_tool_call_arguments({"query": "AI"})

        # Execute tool
        # results = search_web("AI")

        MsgTraceAttributes.set_tool_response({"results": [...]})

    # LLM processing
    with Spans.init_module("llm_synthesis"):
        MsgTraceAttributes.set_operation_name("chat")
        MsgTraceAttributes.set_model("gpt-5")
        MsgTraceAttributes.set_usage(input_tokens=200, output_tokens=100)
        MsgTraceAttributes.set_cost(input_cost=0.006, output_cost=0.003)

Using Decorators

@Spans.set_tool_attributes("search_db", description="Search database")
@Spans.instrument("database_search")
def search(query: str):
    MsgTraceAttributes.set_tool_call_arguments({"query": query})

    # Database search
    results = db.search(query)

    MsgTraceAttributes.set_tool_response({"count": len(results)})
    return results

# Call it
results = search("AI research")

Async Operations

import asyncio
from msgtrace.sdk import Spans, MsgTraceAttributes

# Mock async API call
async def async_api_call(prompt):
    """Simulate async LLM API call."""
    await asyncio.sleep(0.1)
    return {"content": "AI response", "tokens": {"input": 50, "output": 30}}

@Spans.ainstrument("async_chat")
async def chat_completion(prompt: str):
    MsgTraceAttributes.set_operation_name("chat")
    MsgTraceAttributes.set_model("gpt-5")

    # Async API call
    response = await async_api_call(prompt)

    MsgTraceAttributes.set_usage(
        input_tokens=response["tokens"]["input"],
        output_tokens=response["tokens"]["output"]
    )
    return response["content"]

# Use it
async def main():
    async with Spans.ainit_flow("async_flow"):
        result = await chat_completion("What is AI?")
        print(result)

# Run
asyncio.run(main())

Custom Decorators

Create custom decorators to capture function arguments and outputs:

Basic Custom Decorator

from functools import wraps
from msgtrace.sdk import Spans, MsgTraceAttributes

def trace_function(operation_name: str = None):
    """Custom decorator that captures function arguments and output."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Use function name if operation_name not provided
            span_name = operation_name or func.__name__

            with Spans.span_context(span_name):
                # Capture function arguments
                MsgTraceAttributes.set_custom("function.args", list(args))
                MsgTraceAttributes.set_custom("function.kwargs", kwargs)

                # Execute function
                result = func(*args, **kwargs)

                # Capture output (be careful with large outputs)
                MsgTraceAttributes.set_custom("function.output", str(result)[:1000])

                return result
        return wrapper
    return decorator

# Usage
@trace_function("calculate_price")
def calculate_price(base_price: float, discount: float = 0.0):
    return base_price * (1 - discount)

result = calculate_price(100.0, discount=0.2)
# Traces: function.args=[100.0], function.kwargs={'discount': 0.2}, function.output='80.0'

Async Custom Decorator

import asyncio
from functools import wraps
from msgtrace.sdk import Spans, MsgTraceAttributes

def trace_async_function(operation_name: str = None):
    """Custom decorator for async functions."""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            span_name = operation_name or func.__name__

            async with Spans.aspan_context(span_name):
                # Capture inputs
                MsgTraceAttributes.set_custom("function.args", list(args))
                MsgTraceAttributes.set_custom("function.kwargs", kwargs)

                # Execute async function
                result = await func(*args, **kwargs)

                # Capture output
                MsgTraceAttributes.set_custom("function.output", str(result)[:1000])

                return result
        return wrapper
    return decorator

# Usage
@trace_async_function("fetch_user_data")
async def fetch_user_data(user_id: str):
    # Simulate async API call
    await asyncio.sleep(0.1)
    return {"id": user_id, "name": "John Doe"}

LLM Call Decorator

def trace_llm_call(model: str, provider: str = "openai"):
    """Specialized decorator for LLM calls."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            with Spans.span_context(f"llm_{func.__name__}"):
                # Set LLM attributes
                MsgTraceAttributes.set_operation_name("chat")
                MsgTraceAttributes.set_model(model)
                MsgTraceAttributes.set_system(provider)

                # Capture prompt (first argument)
                if args:
                    MsgTraceAttributes.set_prompt(str(args[0]))

                # Execute LLM call
                result = func(*args, **kwargs)

                # Capture completion
                if isinstance(result, dict) and "content" in result:
                    MsgTraceAttributes.set_completion(result["content"])

                    # Capture usage if available
                    if "usage" in result:
                        usage = result["usage"]
                        MsgTraceAttributes.set_usage(
                            input_tokens=usage.get("input_tokens", 0),
                            output_tokens=usage.get("output_tokens", 0)
                        )

                return result
        return wrapper
    return decorator

# Usage
@trace_llm_call(model="gpt-5", provider="openai")
def ask_llm(prompt: str):
    # Your LLM API call here
    return {
        "content": "AI is artificial intelligence...",
        "usage": {"input_tokens": 10, "output_tokens": 50}
    }

Error Tracking Decorator

def trace_with_error_handling(operation_name: str = None):
    """Decorator that captures exceptions and function metadata."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            span_name = operation_name or func.__name__

            with Spans.span_context(span_name):
                # Set function metadata
                MsgTraceAttributes.set_custom("function.name", func.__name__)
                MsgTraceAttributes.set_custom("function.module", func.__module__)

                try:
                    # Capture inputs
                    MsgTraceAttributes.set_custom("function.args", list(args))
                    MsgTraceAttributes.set_custom("function.kwargs", kwargs)

                    # Execute function
                    result = func(*args, **kwargs)

                    # Mark as successful
                    MsgTraceAttributes.set_custom("function.success", True)
                    MsgTraceAttributes.set_custom("function.output_type", type(result).__name__)

                    return result

                except Exception as e:
                    # Capture error details
                    MsgTraceAttributes.set_custom("function.success", False)
                    MsgTraceAttributes.set_custom("error.type", type(e).__name__)
                    MsgTraceAttributes.set_custom("error.message", str(e))
                    raise

        return wrapper
    return decorator

# Usage
@trace_with_error_handling("risky_operation")
def divide(a: float, b: float):
    return a / b

try:
    result = divide(10, 0)  # Will trace the error
except ZeroDivisionError:
    pass

Best Practices for Custom Decorators

  1. Limit captured data size: Truncate large strings/objects
  2. Sanitize sensitive data: Don't capture passwords, API keys, etc.
  3. Use appropriate attribute names: Clear, descriptive keys
  4. Handle exceptions properly: Let exceptions propagate after capturing
  5. Combine with built-in decorators: Stack with @Spans.instrument()

Span Naming Conventions

For better visualization in the msgtrace frontend, follow these naming conventions:

Module Type (module.type)

Use the module.type attribute to categorize spans for specialized visualizations:

from msgtrace.sdk import Spans, MsgTraceAttributes

# Agent visualization
with Spans.init_module("research_agent"):
    MsgTraceAttributes.set_custom("module.type", "Agent")
    MsgTraceAttributes.set_agent_name("research_agent")
    # Agent logic here

# Tool visualization
with Spans.init_module("web_search"):
    MsgTraceAttributes.set_custom("module.type", "Tool")
    MsgTraceAttributes.set_tool_name("search_web")
    # Tool execution here

# Transcriber visualization
with Spans.init_module("speech_to_text"):
    MsgTraceAttributes.set_custom("module.type", "Transcriber")
    # Transcription logic here

# LLM visualization
with Spans.init_module("llm_call"):
    MsgTraceAttributes.set_custom("module.type", "LLM")
    MsgTraceAttributes.set_model("gpt-5")
    # LLM call here

Common Module Types

Type Description Visualization
Agent Autonomous agents Agent flow diagram
Tool Tool executions Tool analytics
LLM LLM API calls Token/cost analysis
Transcriber Speech-to-text Audio processing view
Retriever Vector/DB search Retrieval metrics
Embedder Text embedding Embedding analytics
Custom Custom operations Generic span view

Module Naming Best Practices

# ✅ Good: Descriptive and consistent
with Spans.init_module("data_retrieval"):
    MsgTraceAttributes.set_custom("module.type", "Retriever")
    MsgTraceAttributes.set_custom("module.name", "vector_search")

# ✅ Good: Clear hierarchy
with Spans.init_flow("user_query"):
    with Spans.init_module("intent_classifier"):
        MsgTraceAttributes.set_custom("module.type", "LLM")

    with Spans.init_module("response_generator"):
        MsgTraceAttributes.set_custom("module.type", "Agent")

# ❌ Bad: Vague names
with Spans.init_module("process"):  # What process?
    pass

# ❌ Bad: Inconsistent typing
with Spans.init_module("tool_call"):
    MsgTraceAttributes.set_custom("module.type", "tool")  # Should be "Tool"

Complete Example with Conventions

from msgtrace.sdk import Spans, MsgTraceAttributes

with Spans.init_flow("customer_support_query"):
    MsgTraceAttributes.set_workflow_name("support_agent")
    MsgTraceAttributes.set_user_id("user_123")

    # Step 1: Classify intent
    with Spans.init_module("intent_classification"):
        MsgTraceAttributes.set_custom("module.type", "LLM")
        MsgTraceAttributes.set_custom("module.name", "intent_classifier")
        MsgTraceAttributes.set_model("gpt-5")
        # Classification logic

    # Step 2: Search knowledge base
    with Spans.init_module("knowledge_retrieval"):
        MsgTraceAttributes.set_custom("module.type", "Retriever")
        MsgTraceAttributes.set_custom("module.name", "vector_db")
        # Vector search logic

    # Step 3: Execute tool if needed
    with Spans.init_module("order_lookup"):
        MsgTraceAttributes.set_custom("module.type", "Tool")
        MsgTraceAttributes.set_custom("module.name", "order_api")
        MsgTraceAttributes.set_tool_name("get_order_status")
        # Tool execution

    # Step 4: Generate response
    with Spans.init_module("response_generation"):
        MsgTraceAttributes.set_custom("module.type", "Agent")
        MsgTraceAttributes.set_custom("module.name", "response_agent")
        MsgTraceAttributes.set_agent_name("support_responder")
        # Agent response logic

These conventions enable the msgtrace frontend to:

  • Group related operations by type
  • Generate specialized visualizations (agent flows, tool analytics)
  • Calculate type-specific metrics (LLM costs, tool latencies)
  • Provide better filtering and search capabilities

Best Practices

  1. Enable conditionally: Use environment variables to control tracing
  2. Set attributes early: Set operation/model before execution
  3. Use decorators: For frequently instrumented functions
  4. Nest properly: Flow → Module → Span hierarchy
  5. Handle errors: Let context managers auto-record exceptions
  6. Shutdown gracefully: Call tracer_manager.shutdown() at exit

Thread Safety

All operations are thread-safe:

  • TracerManager uses RLock for initialization
  • OpenTelemetry SDK is thread-safe
  • Multiple threads can create spans simultaneously

Zero Overhead

When MSGTRACE_TELEMETRY_ENABLED=false:

  • Tracer initialization is lazy (no cost until used)
  • No-op tracer is created (minimal overhead)
  • Attribute setters check span.is_recording() (fast path)

Development

Setup

# Clone repository
git clone https://github.com/msgflux/msgtrace-sdk.git
cd msgtrace-sdk

# Install dependencies
uv sync

# Install with dev dependencies
uv sync --group dev

Testing

# Run tests
uv run pytest -v

# With coverage
uv run pytest -v --cov=src/msgtrace --cov-report=html

# Run specific test
uv run pytest tests/test_attributes.py -v

Code Quality

# Format code
uv run ruff format

# Lint
uv run ruff check

# Auto-fix
uv run ruff check --fix

CI/CD

The project uses GitHub Actions for CI/CD:

  • CI (ci.yml) - Lint, format, test on Python 3.10-3.13
  • Version Validation (validate-version-bump.yml) - Validates version bumps on PRs
  • Auto-Tag (auto-tag.yml) - Creates git tags on version changes
  • Publish (publish.yml) - Publishes to TestPyPI and PyPI

To release a new version, update src/msgtrace/version.py and open a PR. On merge, the version is automatically tagged and published to PyPI.

License

MIT License - see LICENSE file for details.

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

msgtrace_sdk-1.0.0.tar.gz (46.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

msgtrace_sdk-1.0.0-py3-none-any.whl (14.1 kB view details)

Uploaded Python 3

File details

Details for the file msgtrace_sdk-1.0.0.tar.gz.

File metadata

  • Download URL: msgtrace_sdk-1.0.0.tar.gz
  • Upload date:
  • Size: 46.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for msgtrace_sdk-1.0.0.tar.gz
Algorithm Hash digest
SHA256 d415ca43dbb8564ee8c16b92336c3a11fd37efe1fbb861ae4dfbde82a68987d7
MD5 7a927bcd676eb1d935482f6ba9aec7d8
BLAKE2b-256 c46eb0e4f2bcb0f4c4e2965902f52759808ed7cbff336150ce1b946652820bfd

See more details on using hashes here.

Provenance

The following attestation bundles were made for msgtrace_sdk-1.0.0.tar.gz:

Publisher: publish.yml on msgflux/msgtrace-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file msgtrace_sdk-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: msgtrace_sdk-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 14.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for msgtrace_sdk-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e3ec4ee94152365b86f8619fcfa240733b7402276b4db36c1de5cc64c8833a4d
MD5 421f3f0affbd1ef36b75c3a40ed037c9
BLAKE2b-256 e4e586d2678e1c102fbc2a9d3ecb2187ad5d6d3a5bbc5d44c263c1d959dfd6e1

See more details on using hashes here.

Provenance

The following attestation bundles were made for msgtrace_sdk-1.0.0-py3-none-any.whl:

Publisher: publish.yml on msgflux/msgtrace-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page