Skip to main content

Keywords AI SDK allows you to interact with the Keywords AI API smoothly

Project description

Building an LLM Workflow with KeywordsAI Tracing

This tutorial demonstrates how to build and trace complex LLM workflows using KeywordsAI Tracing. We'll create an example that generates jokes, translates them to pirate language, and simulates audience reactions - all while capturing detailed telemetry of our LLM calls.

Prerequisites

  • Python 3.7+
  • OpenAI API key
  • Anthropic API key
  • Keywords AI API key, you can get your API key from the API keys page

Installation

pip install keywordsai-tracing openai anthropic

Initialization

KeywordsAITelemetry Configuration

The KeywordsAITelemetry class is the main entry point for the SDK. Initialize it once at application startup:

from keywordsai_tracing import KeywordsAITelemetry

telemetry = KeywordsAITelemetry(
    app_name="my-app",
    api_key="kwai-xxx",  # Or set KEYWORDSAI_API_KEY env var
)

All Initialization Parameters

KeywordsAITelemetry(
    # Basic Configuration
    app_name: str = "keywordsai",              # Application name for telemetry
    api_key: Optional[str] = None,             # API key (or KEYWORDSAI_API_KEY env var)
    base_url: Optional[str] = None,            # API URL (or KEYWORDSAI_BASE_URL env var)
    
    # Logging
    log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO",
    
    # Performance
    disable_batch: Optional[bool] = None,      # Disable background batch processing
    
    # Instrumentation (see Instrumentation section below)
    instruments: Optional[Set[Instruments]] = None,        # Specific instruments to enable
    block_instruments: Optional[Set[Instruments]] = None,  # Instruments to disable
    
    # Advanced
    headers: Optional[Dict[str, str]] = None,              # Additional HTTP headers
    resource_attributes: Optional[Dict[str, str]] = None,  # Resource attributes
    span_postprocess_callback: Optional[Callable] = None,  # Span processing callback
    enabled: bool = True,                                  # Enable/disable telemetry
    custom_exporter: Optional[SpanExporter] = None,        # Custom span exporter
)

Parameter Reference

Basic Configuration

Parameter Type Default Description
app_name str "keywordsai" Name of your application for telemetry identification
api_key str | None None KeywordsAI API key. Can also be set via KEYWORDSAI_API_KEY environment variable
base_url str | None None KeywordsAI API base URL. Can also be set via KEYWORDSAI_BASE_URL environment variable. Defaults to https://api.keywordsai.co/api

Logging

Parameter Type Default Description
log_level str "INFO" Logging level for KeywordsAI tracing. Options: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL". Can also be set via KEYWORDSAI_LOG_LEVEL environment variable

Performance

Parameter Type Default Description
disable_batch bool | None None Disable batch span processing. When True, uses synchronous export (no background threads). Useful for debugging or backends with custom exporters. Can also be set via KEYWORDSAI_DISABLE_BATCH environment variable

Instrumentation

See Instrumentation section for detailed information.

Parameter Type Default Description
instruments Set[Instruments] | None None Specific instruments to enable. If None, enables all available instruments. Use empty set set() to disable all auto-instrumentation
block_instruments Set[Instruments] | None None Instruments to explicitly disable. Use this to block specific instrumentations while enabling others

Examples:

# Enable only specific instruments
from keywordsai_tracing import Instruments

telemetry = KeywordsAITelemetry(
    instruments={Instruments.OPENAI, Instruments.ANTHROPIC}
)

# Block specific instruments
telemetry = KeywordsAITelemetry(
    block_instruments={Instruments.REQUESTS, Instruments.URLLIB3}
)

# Disable all auto-instrumentation
telemetry = KeywordsAITelemetry(
    instruments=set()  # Empty set = no auto-instrumentation
)

Advanced

Parameter Type Default Description
headers Dict[str, str] | None None Additional HTTP headers to send with telemetry data
resource_attributes Dict[str, str] | None None Additional resource attributes to attach to all spans. Useful for adding environment, version, etc.
span_postprocess_callback Callable | None None Optional callback function to process spans before export. Signature: callback(span: ReadableSpan) -> None
enabled bool True Enable or disable telemetry. When False, becomes a no-op (no spans created)
custom_exporter SpanExporter | None None Custom span exporter to use instead of the default HTTP OTLP exporter. When provided, api_key and base_url are not required. See Custom Exporters section

Note: Threading instrumentation is ALWAYS enabled by default (even when specifying custom instruments) because it's critical for context propagation. To disable it explicitly, use block_instruments={Instruments.THREADING}. See Threading Instrumentation for details.

Common Configuration Patterns

Development (Full Visibility)

telemetry = KeywordsAITelemetry(
    app_name="my-app-dev",
    api_key="kwai-xxx",
    log_level="DEBUG",  # Verbose logging
    # All instruments enabled by default
)

Production (Optimized)

telemetry = KeywordsAITelemetry(
    app_name="my-app-prod",
    api_key="kwai-xxx",
    log_level="WARNING",  # Less verbose
    block_instruments={
        Instruments.REQUESTS,  # Reduce noise
        Instruments.URLLIB3,
    }
)

Backend with Custom Logging (Minimal)

from your_exporters import DirectLoggingExporter

telemetry = KeywordsAITelemetry(
    app_name="my-backend",
    custom_exporter=DirectLoggingExporter(),  # Custom exporter
    disable_batch=True,  # No background threads
    instruments=set(),  # No auto-instrumentation
    block_instruments={Instruments.THREADING},  # Disable threading if single-threaded
)

Testing/Disabled

telemetry = KeywordsAITelemetry(
    app_name="my-app-test",
    enabled=False,  # Completely disabled (no-op)
)

Environment Variables

You can configure KeywordsAI tracing using environment variables:

Environment Variable Description Default
KEYWORDSAI_API_KEY API key None
KEYWORDSAI_BASE_URL API base URL https://api.keywordsai.co/api
KEYWORDSAI_LOG_LEVEL Logging level INFO
KEYWORDSAI_DISABLE_BATCH Disable batch processing False

Example:

export KEYWORDSAI_API_KEY="kwai-xxx"
export KEYWORDSAI_LOG_LEVEL="DEBUG"
export KEYWORDSAI_DISABLE_BATCH="true"
# No need to pass parameters - read from env vars
telemetry = KeywordsAITelemetry(app_name="my-app")

Tutorial

Step 1: Initialization

import os
from keywordsai_tracing.main import KeywordsAITelemetry
from keywordsai_tracing.decorators import workflow, task
import time

# Initialize KeywordsAI Telemetry
os.environ["KEYWORDSAI_API_KEY"] = "YOUR_KEYWORDSAI_API_KEY"
k_tl = KeywordsAITelemetry()

# Initialize OpenAI client
from openai import OpenAI
client = OpenAI()

Step 2: First Draft - Basic Workflow

We'll start by creating a simple workflow that generates a joke, translates it to pirate speak, and adds a signature. This demonstrates the basic usage of tasks and workflows.

  • A task is a single unit of work, decorated with @task
  • A workflow is a collection of tasks, decorated with @workflow
  • Tasks can be used independently or as part of workflows
@task(name="joke_creation")
def create_joke():
    completion = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": "Tell me a joke about opentelemetry"}],
        temperature=0.5,
        max_tokens=100,
        frequency_penalty=0.5,
        presence_penalty=0.5,
        stop=["\n"],
        logprobs=True,
    )
    return completion.choices[0].message.content

@task(name="signature_generation")
def generate_signature(joke: str):
    completion = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[
            {"role": "user", "content": "add a signature to the joke:\n\n" + joke}
        ],
    )
    return completion.choices[0].message.content

@task(name="pirate_joke_translation")
def translate_joke_to_pirate(joke: str):
    completion = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[
            {
                "role": "user",
                "content": "translate the joke to pirate language:\n\n" + joke,
            }
        ],
    )
    return completion.choices[0].message.content

@workflow(name="pirate_joke_generator")
def joke_workflow():
    eng_joke = create_joke()
    pirate_joke = translate_joke_to_pirate(eng_joke)
    signature = generate_signature(pirate_joke)
    return pirate_joke + signature

if __name__ == "__main__":
    joke_workflow()

Run the workflow and see the trace in Keywords AI Traces tab.

Step 3: Adding Another Workflow

Let's add audience reactions to make our workflow more complex and demonstrate what multiple workflow traces look like.

@task(name="audience_laughs")
def audience_laughs(joke: str):
    completion = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[
            {
                "role": "user",
                "content": "This joke:\n\n" + joke + " is funny, say hahahahaha",
            }
        ],
        max_tokens=10,
    )
    return completion.choices[0].message.content

@task(name="audience_claps")
def audience_claps():
    completion = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": "Clap once"}],
        max_tokens=5,
    )
    return completion.choices[0].message.content

@task(name="audience_applaud")
def audience_applaud(joke: str):
    clap = audience_claps()
    completion = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[
            {
                "role": "user",
                "content": "Applaud to the joke, clap clap! " + clap,
            }
        ],
        max_tokens=10,
    )
    return completion.choices[0].message.content

@workflow(name="audience_reaction")
def audience_reaction(joke: str):
    laughter = audience_laughs(joke=joke)
    applauds = audience_applaud(joke=joke)
    return laughter + applauds


@workflow(name="joke_and_audience_reaction") #<--------- Create the new workflow that combines both workflows together
def joke_and_audience_reaction():
    pirate_joke = joke_workflow()
    reactions = audience_reaction(pirate_joke)

Don't forget to update the entrypoint!

if __name__ == "__main__":
    joke_and_audience_reaction() # <--------- Update the entrypoint here

Run the workflow again and see the trace in Keywords AI Traces tab, notice the new span for the audience_reaction workflow in parallel with the joke_workflow. Congratulation! You have created a trace with multiple workflows.

Step 4: Adding Vector Storage Capability

To demonstrate how to integrate with vector databases and embeddings, we'll add a store_joke task that generates embeddings for our jokes.

@task(name="store_joke")
def store_joke(joke: str):
    """Simulate storing a joke in a vector database."""
    embedding = client.embeddings.create(
        model="text-embedding-3-small",
        input=joke,
    )
    return embedding.data[0].embedding

Update create_joke to use store_joke

@task(name="joke_creation")
def create_joke():
    completion = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": "Tell me a joke about opentelemetry"}],
        temperature=0.5,
        max_tokens=100,
        frequency_penalty=0.5,
        presence_penalty=0.5,
        stop=["\n"],
        logprobs=True,
    )
    joke = completion.choices[0].message.content
    store_joke(joke)  # <--------- Add the task here
    return joke

Run the workflow again and see the trace in Keywords AI Traces tab, notice the new span for the store_joke task.

Expanding the store_joke task, you can see the embeddings call is recognized as openai.embeddings.

Step 5: Adding Arbitrary Function Calls

Demonstrate how to trace non-LLM functions by adding a logging task.

@task(name="logging_joke")
def logging_joke(joke: str, reactions: str):
    """Simulates logging the process into a database."""
    print(joke + "\n\n" + reactions)
    time.sleep(1)

Update joke_and_audience_reaction

@workflow(name="joke_and_audience_reaction")
def joke_and_audience_reaction():
    pirate_joke = joke_workflow()
    reactions = audience_reaction(pirate_joke)
    logging_joke(pirate_joke, reactions) # <-------- Add this workflow here

Run the workflow again and see the trace in Keywords AI Traces tab, notice the new span for the logging_joke task.

This is a simple example of how to trace arbitrary functions. You can see the all the inputs and outputs of logging_joke task.

Step 6: Adding Different LLM Provider (Anthropic)

Demonstrate compatibility with multiple LLM providers by adding Anthropic integration.

from anthropic import Anthropic
anthropic = Anthropic()

@task(name="ask_for_comments")
def ask_for_comments(joke: str):
    completion = anthropic.messages.create(
        model="claude-3-5-sonnet-20240620",
        messages=[{"role": "user", "content": f"What do you think about this joke: {joke}"}],
        max_tokens=100,
    )
    return completion.content[0].text

@task(name="read_joke_comments")
def read_joke_comments(comments: str):
    return f"Here is the comment from the audience: {comments}"

@workflow(name="audience_interaction")
def audience_interaction(joke: str):
    comments = ask_for_comments(joke=joke)
    read_joke_comments(comments=comments)

Update joke_and_audience_reaction

@workflow(name="joke_and_audience_reaction")
def joke_and_audience_reaction():
    pirate_joke = joke_workflow()
    reactions = audience_reaction(pirate_joke)
    audience_interaction(pirate_joke) # <-------- Add this workflow here
    logging_joke(pirate_joke, reactions)

Running the workflow for one last time, you can see that the new audience_interaction can recognize the anthropic.completion calls.

Instrumentation

What is Instrumentation?

Instrumentation is the process of automatically adding telemetry (traces/spans) to library calls without modifying your code. When you enable instrumentation for a library (like OpenAI, Anthropic, LangChain), the SDK automatically captures:

  • LLM requests and responses
  • Model parameters (temperature, max_tokens, etc.)
  • Token usage and costs
  • Latency and timing
  • Errors and exceptions

Default Behavior: All Instrumentations Enabled

By default, KeywordsAI tracing attempts to enable ALL available instrumentations.

from keywordsai_tracing import KeywordsAITelemetry

# This enables ALL available instrumentations (if packages are installed)
telemetry = KeywordsAITelemetry(
    app_name="my-app",
    api_key="kwai-xxx"
)

If a library is installed in your environment, its instrumentation will be automatically enabled. If not installed, it's silently skipped (no errors).

Available Instrumentations

The SDK supports instrumentation for:

AI/ML Libraries:

  • openai - OpenAI API
  • anthropic - Anthropic (Claude) API
  • cohere - Cohere API
  • mistral - Mistral AI
  • ollama - Ollama (local models)
  • groq - Groq API
  • together - Together AI
  • replicate - Replicate
  • transformers - Hugging Face Transformers

Cloud AI Services:

  • bedrock - AWS Bedrock
  • sagemaker - AWS SageMaker
  • vertexai - Google Vertex AI
  • google_generativeai - Google AI (Gemini)
  • watsonx - IBM WatsonX
  • alephalpha - Aleph Alpha

Vector Databases:

  • pinecone - Pinecone
  • qdrant - Qdrant
  • chroma - Chroma
  • milvus - Milvus
  • weaviate - Weaviate
  • lancedb - LanceDB
  • marqo - Marqo

Frameworks:

  • langchain - LangChain
  • llama_index - LlamaIndex
  • haystack - Haystack
  • crew - CrewAI
  • mcp - Model Context Protocol

Infrastructure:

  • redis - Redis
  • requests - HTTP requests library
  • urllib3 - urllib3 HTTP client
  • pymysql - PyMySQL database client
  • threading - Context propagation across threads (⚠️ Always enabled by default)

Installing Instrumentation Packages

Important: To trace a specific library, you need to install the corresponding OpenTelemetry instrumentation package.

The SDK uses the OpenTelemetry standard instrumentation packages:

# Install instrumentation for the libraries you use
pip install opentelemetry-instrumentation-openai
pip install opentelemetry-instrumentation-anthropic
pip install opentelemetry-instrumentation-langchain
pip install opentelemetry-instrumentation-requests

Naming convention: opentelemetry-instrumentation-<library-name>

Example: Tracing OpenAI Calls

# 1. Install OpenAI client
pip install openai

# 2. Install OpenTelemetry instrumentation for OpenAI
pip install opentelemetry-instrumentation-openai

# 3. Install KeywordsAI tracing
pip install keywordsai-tracing
from keywordsai_tracing import KeywordsAITelemetry
from openai import OpenAI

# Initialize telemetry (OpenAI instrumentation auto-enabled)
telemetry = KeywordsAITelemetry(
    app_name="my-app",
    api_key="kwai-xxx"
)

client = OpenAI()

# This call is automatically traced!
response = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Hello!"}]
)

The OpenAI call will automatically create a span with:

  • Model name
  • Prompt and completion
  • Token usage
  • Latency
  • Cost (if available)

Important: Threading Instrumentation

Threading instrumentation is automatically enabled (even when you specify custom instruments) because it's critical for context propagation across threads.

# These all include threading by default:
telemetry = KeywordsAITelemetry()  # All instruments including threading

telemetry = KeywordsAITelemetry(
    instruments={Instruments.OPENAI}  # OpenAI + Threading (auto-added!)
)

telemetry = KeywordsAITelemetry(
    instruments={Instruments.OPENAI, Instruments.ANTHROPIC}  # + Threading!
)

To disable threading (if you're certain your app is single-threaded):

telemetry = KeywordsAITelemetry(
    block_instruments={Instruments.THREADING}  # Explicitly disabled
)

Controlling Instrumentation

Option 1: Disable Specific Instruments

Use block_instruments to disable specific instrumentations you don't want:

from keywordsai_tracing import KeywordsAITelemetry, Instruments

telemetry = KeywordsAITelemetry(
    app_name="my-app",
    api_key="kwai-xxx",
    block_instruments={
        Instruments.REQUESTS,  # Don't trace HTTP requests
        Instruments.URLLIB3,   # Don't trace urllib3
        Instruments.REDIS,     # Don't trace Redis calls
    }
)

Use case: Reduce noise by blocking low-level HTTP instrumentations when you only care about high-level LLM calls.

Option 2: Enable Only Specific Instruments

Use instruments to enable only the instrumentations you want:

from keywordsai_tracing import KeywordsAITelemetry, Instruments

telemetry = KeywordsAITelemetry(
    app_name="my-app",
    api_key="kwai-xxx",
    instruments={
        Instruments.OPENAI,      # Only trace OpenAI
        Instruments.ANTHROPIC,   # Only trace Anthropic
    }
)

Use case: Maximum performance and minimal noise - only instrument what you need.

Option 3: Disable All Instrumentation

Pass an empty set to disable all automatic instrumentation:

from keywordsai_tracing import KeywordsAITelemetry

telemetry = KeywordsAITelemetry(
    app_name="my-app",
    api_key="kwai-xxx",
    instruments=set(),  # No auto-instrumentation
)

Use case:

  • Backend systems that only need @workflow/@task decorators
  • Custom manual instrumentation only
  • Minimal overhead

Even with instruments=set(), you can still:

  • ✅ Use @workflow and @task decorators
  • ✅ Create manual spans with tracer.start_as_current_span()
  • ✅ Use get_client() to update spans
  • ❌ Won't automatically trace OpenAI/Anthropic/etc calls

Instrumentation Best Practices

1. Install Only What You Need

# Bad: Installing all instrumentation packages (bloats dependencies)
pip install opentelemetry-instrumentation-openai \
            opentelemetry-instrumentation-anthropic \
            opentelemetry-instrumentation-langchain \
            # ... 30+ packages

# Good: Install only what you use
pip install opentelemetry-instrumentation-openai  # You use OpenAI
pip install opentelemetry-instrumentation-langchain  # You use LangChain

2. Reduce Noise in Production

# Development: Enable everything for visibility
dev_telemetry = KeywordsAITelemetry(
    app_name="my-app-dev"
)

# Production: Block low-level instrumentations to reduce noise
prod_telemetry = KeywordsAITelemetry(
    app_name="my-app-prod",
    block_instruments={
        Instruments.REQUESTS,
        Instruments.URLLIB3,
        Instruments.REDIS,
    }
)

3. Backend Services: Minimal Instrumentation

# Backend API: Disable auto-instrumentation, use decorators only
backend_telemetry = KeywordsAITelemetry(
    app_name="backend-api",
    instruments=set(),  # No auto-instrumentation
    disable_batch=True,  # No background threads
)

# Manual instrumentation only
@workflow(name="api_endpoint")
def api_endpoint(data):
    # Your logic here
    pass

Troubleshooting Instrumentation

Issue: "My OpenAI calls aren't being traced"

Solution: Install the instrumentation package:

pip install opentelemetry-instrumentation-openai

Verify it's working:

from keywordsai_tracing import KeywordsAITelemetry

telemetry = KeywordsAITelemetry(log_level="DEBUG")
# Check logs for: "Initialized OpenAI instrumentation"

Issue: "Too many spans are being created (noise)"

Solution: Block unnecessary instrumentations:

from keywordsai_tracing import Instruments

telemetry = KeywordsAITelemetry(
    block_instruments={
        Instruments.REQUESTS,  # Block HTTP client spans
        Instruments.URLLIB3,   # Block urllib3 spans
    }
)

Issue: "Performance overhead in my backend"

Solution: Disable auto-instrumentation entirely:

telemetry = KeywordsAITelemetry(
    instruments=set(),  # No auto-instrumentation
    disable_batch=True,  # No background threads
)

Summary

Configuration Auto-Instrumentation Use Case
Default All available Development, full visibility
block_instruments={...} All except blocked Production, reduce noise
instruments={...} Only specified Targeted tracing
instruments=set() None Backend, minimal overhead

Key Points:

  • ✅ Default: All instrumentations enabled (if packages installed)
  • ✅ Install: opentelemetry-instrumentation-<library> for each library
  • ✅ Control: Use instruments or block_instruments parameters
  • ✅ Performance: Disable unused instrumentations to reduce overhead
  • ✅ Flexibility: Can disable all auto-instrumentation and use decorators only

Advanced Features

Custom Exporters

If you need custom export logic (e.g., sending to an internal system instead of KeywordsAI API), you can provide your own SpanExporter:

from keywordsai_tracing import KeywordsAITelemetry
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult

class CustomExporter(SpanExporter):
    """Custom exporter that sends spans to your internal system"""
    
    def export(self, spans):
        # Your custom export logic here
        for span in spans:
            # Send to your internal system
            my_internal_system.log(span)
        return SpanExportResult.SUCCESS
    
    def shutdown(self):
        pass
    
    def force_flush(self, timeout_millis=30000):
        return True

# Initialize with custom exporter
telemetry = KeywordsAITelemetry(
    app_name="my-app",
    custom_exporter=CustomExporter(),
)

When using a custom exporter, api_key and base_url parameters are not required since spans won't be sent to the KeywordsAI API.

Use cases for custom exporters:

  • Internal integrations that need custom export logic
  • Sending spans to custom backends or databases
  • Writing spans to files for debugging
  • Testing and development environments

See examples/custom_exporter_example.py for complete working examples including:

  • File exporter (writing spans to JSON lines)
  • Console exporter (printing spans to terminal)
  • Custom backend exporter (sending to your own API)

Update Span Functionality

You can dynamically update spans while they're running using the get_client() API. This is useful for:

  • Adding KeywordsAI-specific parameters (like customer_identifier, trace_group_identifier)
  • Setting custom attributes during execution
  • Adding events to track progress
  • Recording exceptions and errors
  • Changing span names based on runtime conditions
from keywordsai_tracing import KeywordsAITelemetry, get_client, workflow
from openai import OpenAI

telemetry = KeywordsAITelemetry(
    app_name="my-app",
    api_key="kwai-xxx"
)

client = OpenAI()

@workflow(name="data_processing")
def process_data(user_id: str, data: dict):
    # Get the client to interact with the current span
    kwai_client = get_client()
    
    # Get current trace information
    trace_id = kwai_client.get_current_trace_id()
    span_id = kwai_client.get_current_span_id()
    print(f"Processing in trace: {trace_id}")
    
    # Update span with KeywordsAI-specific parameters
    kwai_client.update_current_span(
        keywordsai_params={
            "customer_identifier": user_id,
            "trace_group_identifier": "data-processing-pipeline",
            "metadata": {
                "data_size": len(str(data)),
                "processing_type": "batch"
            }
        }
    )
    
    # Add an event to track progress
    kwai_client.add_event("validation_started", {
        "record_count": len(data)
    })
    
    # Add custom attributes
    kwai_client.update_current_span(
        attributes={
            "custom.user_id": user_id,
            "custom.data_type": type(data).__name__
        }
    )
    
    try:
        # Call LLM
        response = client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": f"Process: {data}"}]
        )
        
        # Update span name based on result
        kwai_client.update_current_span(
            name="data_processing.success",
            attributes={"result_length": len(response.choices[0].message.content)}
        )
        
        return response.choices[0].message.content
        
    except Exception as e:
        # Record exception in the span
        kwai_client.record_exception(e)
        raise

# Use the workflow
result = process_data("user-123", {"key": "value"})

Available Client Methods:

  • get_current_trace_id() - Get the current trace ID
  • get_current_span_id() - Get the current span ID
  • update_current_span() - Update span with params, attributes, name, or status
  • add_event() - Add an event to the current span
  • record_exception() - Record an exception on the current span
  • is_recording() - Check if the current span is recording

See examples/simple_span_updating_example.py for a complete example.

Manual Span Creation

For fine-grained control, you can manually create custom spans using the tracer directly. This is useful when:

  • You need spans that don't fit the @workflow/@task pattern
  • You want to instrument specific code blocks
  • You're integrating with existing tracing code
  • You need to create spans conditionally
from keywordsai_tracing import KeywordsAITelemetry, get_client
from opentelemetry import trace

telemetry = KeywordsAITelemetry(
    app_name="my-app",
    api_key="kwai-xxx"
)

# Get the tracer instance
tracer = telemetry.tracer.get_tracer()

# Create a parent span manually
with tracer.start_as_current_span("database_operation") as parent_span:
    parent_span.set_attribute("db.system", "postgresql")
    parent_span.set_attribute("db.operation", "query")
    parent_span.add_event("Connection established")
    
    # Create nested child spans
    with tracer.start_as_current_span("execute_query") as query_span:
        query_span.set_attribute("db.statement", "SELECT * FROM users")
        query_span.set_attribute("db.rows_affected", 42)
        
        # You can still use get_client() within manual spans
        client = get_client()
        client.update_current_span(
            keywordsai_params={
                "customer_identifier": "admin-user"
            }
        )
        
        # Simulate query execution
        result = execute_database_query()
    
    with tracer.start_as_current_span("process_results") as process_span:
        process_span.set_attribute("result.count", len(result))
        processed = process_results(result)
    
    parent_span.add_event("Operation completed", {
        "total_time_ms": 150
    })

Combining Manual Spans with Decorators:

You can mix manual span creation with decorator-based spans:

from keywordsai_tracing import workflow, task, get_client

@workflow(name="hybrid_workflow")
def hybrid_workflow(data):
    client = get_client()
    telemetry = KeywordsAITelemetry()
    tracer = telemetry.tracer.get_tracer()
    
    # Use decorator-based task
    validated_data = validate_data(data)
    
    # Create manual span for specific instrumentation
    with tracer.start_as_current_span("custom_processing") as span:
        span.set_attribute("processing.type", "custom")
        span.set_attribute("data.size", len(validated_data))
        
        # Custom processing logic
        for item in validated_data:
            with tracer.start_as_current_span(f"process_item_{item['id']}") as item_span:
                item_span.set_attribute("item.id", item['id'])
                process_single_item(item)
    
    # Use another decorator-based task
    return finalize_results(validated_data)

@task(name="validate_data")
def validate_data(data):
    # Decorator-based span
    return [item for item in data if item.get('valid')]

@task(name="finalize_results")
def finalize_results(data):
    # Decorator-based span
    return {"processed": len(data), "data": data}

Thread-Safe Context Propagation:

When working with threads, you can manually propagate context:

from opentelemetry import context as otel_context
import threading

@workflow(name="threaded_workflow")
def threaded_workflow():
    client = get_client()
    telemetry = KeywordsAITelemetry()
    tracer = telemetry.tracer.get_tracer()
    
    # Capture the current context
    current_context = otel_context.get_current()
    
    def worker_function():
        # Attach the captured context in the worker thread
        token = otel_context.attach(current_context)
        
        try:
            # Create spans in the worker thread
            with tracer.start_as_current_span("worker_task") as span:
                span.set_attribute("thread.name", threading.current_thread().name)
                # Your work here
                pass
        finally:
            # Detach the context
            otel_context.detach(token)
    
    # Start worker thread
    thread = threading.Thread(target=worker_function)
    thread.start()
    thread.join()

Key Points:

  • Use tracer.start_as_current_span(name) to create custom spans
  • Spans automatically nest based on context
  • You can mix manual spans with @workflow and @task decorators
  • Use get_client() within manual spans to access client API
  • Threading instrumentation is enabled by default for automatic context propagation across threads

See examples/custom_exporter_example.py for examples of manual span creation.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

keywordsai_tracing-0.0.49.tar.gz (86.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

keywordsai_tracing-0.0.49-py3-none-any.whl (34.4 kB view details)

Uploaded Python 3

File details

Details for the file keywordsai_tracing-0.0.49.tar.gz.

File metadata

  • Download URL: keywordsai_tracing-0.0.49.tar.gz
  • Upload date:
  • Size: 86.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.12.8 Darwin/25.1.0

File hashes

Hashes for keywordsai_tracing-0.0.49.tar.gz
Algorithm Hash digest
SHA256 97bac2cd916b27f856bf38820835a778b6662c6929e8eb7be7694d6b280e18dd
MD5 2992bed599d45b5ea6df3f7fadcb102a
BLAKE2b-256 28d8965fed54d4f6331fc09689808ca70257d24ac9feda15836fca2d321073a5

See more details on using hashes here.

File details

Details for the file keywordsai_tracing-0.0.49-py3-none-any.whl.

File metadata

File hashes

Hashes for keywordsai_tracing-0.0.49-py3-none-any.whl
Algorithm Hash digest
SHA256 071dab85c39d944b44312dfdbd9fa2af2fb3148461d8780bf70936948b96a881
MD5 e67271e78b781aa6fffb586a5403191a
BLAKE2b-256 69a3abe082e36f7527650e317aef80f5341b5f279c57ddf77c05f40dace65541

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page