Keywords AI SDK allows you to interact with the Keywords AI API smoothly
Project description
Building an LLM Workflow with KeywordsAI Tracing
This tutorial demonstrates how to build and trace complex LLM workflows using KeywordsAI Tracing. We'll create an example that generates jokes, translates them to pirate language, and simulates audience reactions - all while capturing detailed telemetry of our LLM calls.
Prerequisites
- Python 3.7+
- OpenAI API key
- Anthropic API key
- Keywords AI API key, you can get your API key from the API keys page
Installation
pip install keywordsai-tracing openai anthropic
Initialization
KeywordsAITelemetry Configuration
The KeywordsAITelemetry class is the main entry point for the SDK. Initialize it once at application startup:
from keywordsai_tracing import KeywordsAITelemetry
telemetry = KeywordsAITelemetry(
app_name="my-app",
api_key="kwai-xxx", # Or set KEYWORDSAI_API_KEY env var
)
All Initialization Parameters
KeywordsAITelemetry(
# Basic Configuration
app_name: str = "keywordsai", # Application name for telemetry
api_key: Optional[str] = None, # API key (or KEYWORDSAI_API_KEY env var)
base_url: Optional[str] = None, # API URL (or KEYWORDSAI_BASE_URL env var)
# Logging
log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO",
# Performance
disable_batch: Optional[bool] = None, # Disable background batch processing
# Instrumentation (see Instrumentation section below)
instruments: Optional[Set[Instruments]] = None, # Specific instruments to enable
block_instruments: Optional[Set[Instruments]] = None, # Instruments to disable
# Advanced
headers: Optional[Dict[str, str]] = None, # Additional HTTP headers
resource_attributes: Optional[Dict[str, str]] = None, # Resource attributes
span_postprocess_callback: Optional[Callable] = None, # Span processing callback
enabled: bool = True, # Enable/disable telemetry
custom_exporter: Optional[SpanExporter] = None, # Custom span exporter
)
Parameter Reference
Basic Configuration
| Parameter | Type | Default | Description |
|---|---|---|---|
app_name |
str |
"keywordsai" |
Name of your application for telemetry identification |
api_key |
str | None |
None |
KeywordsAI API key. Can also be set via KEYWORDSAI_API_KEY environment variable |
base_url |
str | None |
None |
KeywordsAI API base URL. Can also be set via KEYWORDSAI_BASE_URL environment variable. Defaults to https://api.keywordsai.co/api |
Logging
| Parameter | Type | Default | Description |
|---|---|---|---|
log_level |
str |
"INFO" |
Logging level for KeywordsAI tracing. Options: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL". Can also be set via KEYWORDSAI_LOG_LEVEL environment variable |
Performance
| Parameter | Type | Default | Description |
|---|---|---|---|
disable_batch |
bool | None |
None |
Disable batch span processing. When True, uses synchronous export (no background threads). Useful for debugging or backends with custom exporters. Can also be set via KEYWORDSAI_DISABLE_BATCH environment variable |
Instrumentation
See Instrumentation section for detailed information.
| Parameter | Type | Default | Description |
|---|---|---|---|
instruments |
Set[Instruments] | None |
None |
Specific instruments to enable. If None, enables all available instruments. Use empty set set() to disable all auto-instrumentation |
block_instruments |
Set[Instruments] | None |
None |
Instruments to explicitly disable. Use this to block specific instrumentations while enabling others |
Examples:
# Enable only specific instruments
from keywordsai_tracing import Instruments
telemetry = KeywordsAITelemetry(
instruments={Instruments.OPENAI, Instruments.ANTHROPIC}
)
# Block specific instruments
telemetry = KeywordsAITelemetry(
block_instruments={Instruments.REQUESTS, Instruments.URLLIB3}
)
# Disable all auto-instrumentation
telemetry = KeywordsAITelemetry(
instruments=set() # Empty set = no auto-instrumentation
)
Advanced
| Parameter | Type | Default | Description |
|---|---|---|---|
headers |
Dict[str, str] | None |
None |
Additional HTTP headers to send with telemetry data |
resource_attributes |
Dict[str, str] | None |
None |
Additional resource attributes to attach to all spans. Useful for adding environment, version, etc. |
span_postprocess_callback |
Callable | None |
None |
Optional callback function to process spans before export. Signature: callback(span: ReadableSpan) -> None |
enabled |
bool |
True |
Enable or disable telemetry. When False, becomes a no-op (no spans created) |
custom_exporter |
SpanExporter | None |
None |
Custom span exporter to use instead of the default HTTP OTLP exporter. When provided, api_key and base_url are not required. See Custom Exporters section |
Note: Threading instrumentation is ALWAYS enabled by default (even when specifying custom instruments) because it's critical for context propagation. To disable it explicitly, use block_instruments={Instruments.THREADING}. See Threading Instrumentation for details.
Common Configuration Patterns
Development (Full Visibility)
telemetry = KeywordsAITelemetry(
app_name="my-app-dev",
api_key="kwai-xxx",
log_level="DEBUG", # Verbose logging
# All instruments enabled by default
)
Production (Optimized)
telemetry = KeywordsAITelemetry(
app_name="my-app-prod",
api_key="kwai-xxx",
log_level="WARNING", # Less verbose
block_instruments={
Instruments.REQUESTS, # Reduce noise
Instruments.URLLIB3,
}
)
Backend with Custom Logging (Minimal)
from your_exporters import DirectLoggingExporter
telemetry = KeywordsAITelemetry(
app_name="my-backend",
custom_exporter=DirectLoggingExporter(), # Custom exporter
disable_batch=True, # No background threads
instruments=set(), # No auto-instrumentation
block_instruments={Instruments.THREADING}, # Disable threading if single-threaded
)
Testing/Disabled
telemetry = KeywordsAITelemetry(
app_name="my-app-test",
enabled=False, # Completely disabled (no-op)
)
Environment Variables
You can configure KeywordsAI tracing using environment variables:
| Environment Variable | Description | Default |
|---|---|---|
KEYWORDSAI_API_KEY |
API key | None |
KEYWORDSAI_BASE_URL |
API base URL | https://api.keywordsai.co/api |
KEYWORDSAI_LOG_LEVEL |
Logging level | INFO |
KEYWORDSAI_DISABLE_BATCH |
Disable batch processing | False |
Example:
export KEYWORDSAI_API_KEY="kwai-xxx"
export KEYWORDSAI_LOG_LEVEL="DEBUG"
export KEYWORDSAI_DISABLE_BATCH="true"
# No need to pass parameters - read from env vars
telemetry = KeywordsAITelemetry(app_name="my-app")
Tutorial
Step 1: Initialization
import os
from keywordsai_tracing.main import KeywordsAITelemetry
from keywordsai_tracing.decorators import workflow, task
import time
# Initialize KeywordsAI Telemetry
os.environ["KEYWORDSAI_API_KEY"] = "YOUR_KEYWORDSAI_API_KEY"
k_tl = KeywordsAITelemetry()
# Initialize OpenAI client
from openai import OpenAI
client = OpenAI()
Step 2: First Draft - Basic Workflow
We'll start by creating a simple workflow that generates a joke, translates it to pirate speak, and adds a signature. This demonstrates the basic usage of tasks and workflows.
- A task is a single unit of work, decorated with
@task - A workflow is a collection of tasks, decorated with
@workflow - Tasks can be used independently or as part of workflows
@task(name="joke_creation")
def create_joke():
completion = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Tell me a joke about opentelemetry"}],
temperature=0.5,
max_tokens=100,
frequency_penalty=0.5,
presence_penalty=0.5,
stop=["\n"],
logprobs=True,
)
return completion.choices[0].message.content
@task(name="signature_generation")
def generate_signature(joke: str):
completion = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "user", "content": "add a signature to the joke:\n\n" + joke}
],
)
return completion.choices[0].message.content
@task(name="pirate_joke_translation")
def translate_joke_to_pirate(joke: str):
completion = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "translate the joke to pirate language:\n\n" + joke,
}
],
)
return completion.choices[0].message.content
@workflow(name="pirate_joke_generator")
def joke_workflow():
eng_joke = create_joke()
pirate_joke = translate_joke_to_pirate(eng_joke)
signature = generate_signature(pirate_joke)
return pirate_joke + signature
if __name__ == "__main__":
joke_workflow()
Run the workflow and see the trace in Keywords AI Traces tab.
Step 3: Adding Another Workflow
Let's add audience reactions to make our workflow more complex and demonstrate what multiple workflow traces look like.
@task(name="audience_laughs")
def audience_laughs(joke: str):
completion = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "This joke:\n\n" + joke + " is funny, say hahahahaha",
}
],
max_tokens=10,
)
return completion.choices[0].message.content
@task(name="audience_claps")
def audience_claps():
completion = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Clap once"}],
max_tokens=5,
)
return completion.choices[0].message.content
@task(name="audience_applaud")
def audience_applaud(joke: str):
clap = audience_claps()
completion = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "user",
"content": "Applaud to the joke, clap clap! " + clap,
}
],
max_tokens=10,
)
return completion.choices[0].message.content
@workflow(name="audience_reaction")
def audience_reaction(joke: str):
laughter = audience_laughs(joke=joke)
applauds = audience_applaud(joke=joke)
return laughter + applauds
@workflow(name="joke_and_audience_reaction") #<--------- Create the new workflow that combines both workflows together
def joke_and_audience_reaction():
pirate_joke = joke_workflow()
reactions = audience_reaction(pirate_joke)
Don't forget to update the entrypoint!
if __name__ == "__main__":
joke_and_audience_reaction() # <--------- Update the entrypoint here
Run the workflow again and see the trace in Keywords AI Traces tab, notice the new span for the audience_reaction workflow in parallel with the joke_workflow. Congratulation! You have created a trace with multiple workflows.
Step 4: Adding Vector Storage Capability
To demonstrate how to integrate with vector databases and embeddings, we'll add a store_joke task that generates embeddings for our jokes.
@task(name="store_joke")
def store_joke(joke: str):
"""Simulate storing a joke in a vector database."""
embedding = client.embeddings.create(
model="text-embedding-3-small",
input=joke,
)
return embedding.data[0].embedding
Update create_joke to use store_joke
@task(name="joke_creation")
def create_joke():
completion = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Tell me a joke about opentelemetry"}],
temperature=0.5,
max_tokens=100,
frequency_penalty=0.5,
presence_penalty=0.5,
stop=["\n"],
logprobs=True,
)
joke = completion.choices[0].message.content
store_joke(joke) # <--------- Add the task here
return joke
Run the workflow again and see the trace in Keywords AI Traces tab, notice the new span for the store_joke task.
Expanding the store_joke task, you can see the embeddings call is recognized as openai.embeddings.
Step 5: Adding Arbitrary Function Calls
Demonstrate how to trace non-LLM functions by adding a logging task.
@task(name="logging_joke")
def logging_joke(joke: str, reactions: str):
"""Simulates logging the process into a database."""
print(joke + "\n\n" + reactions)
time.sleep(1)
Update joke_and_audience_reaction
@workflow(name="joke_and_audience_reaction")
def joke_and_audience_reaction():
pirate_joke = joke_workflow()
reactions = audience_reaction(pirate_joke)
logging_joke(pirate_joke, reactions) # <-------- Add this workflow here
Run the workflow again and see the trace in Keywords AI Traces tab, notice the new span for the logging_joke task.
This is a simple example of how to trace arbitrary functions. You can see the all the inputs and outputs of logging_joke task.
Step 6: Adding Different LLM Provider (Anthropic)
Demonstrate compatibility with multiple LLM providers by adding Anthropic integration.
from anthropic import Anthropic
anthropic = Anthropic()
@task(name="ask_for_comments")
def ask_for_comments(joke: str):
completion = anthropic.messages.create(
model="claude-3-5-sonnet-20240620",
messages=[{"role": "user", "content": f"What do you think about this joke: {joke}"}],
max_tokens=100,
)
return completion.content[0].text
@task(name="read_joke_comments")
def read_joke_comments(comments: str):
return f"Here is the comment from the audience: {comments}"
@workflow(name="audience_interaction")
def audience_interaction(joke: str):
comments = ask_for_comments(joke=joke)
read_joke_comments(comments=comments)
Update joke_and_audience_reaction
@workflow(name="joke_and_audience_reaction")
def joke_and_audience_reaction():
pirate_joke = joke_workflow()
reactions = audience_reaction(pirate_joke)
audience_interaction(pirate_joke) # <-------- Add this workflow here
logging_joke(pirate_joke, reactions)
Running the workflow for one last time, you can see that the new audience_interaction can recognize the anthropic.completion calls.
Instrumentation
What is Instrumentation?
Instrumentation is the process of automatically adding telemetry (traces/spans) to library calls without modifying your code. When you enable instrumentation for a library (like OpenAI, Anthropic, LangChain), the SDK automatically captures:
- LLM requests and responses
- Model parameters (temperature, max_tokens, etc.)
- Token usage and costs
- Latency and timing
- Errors and exceptions
Default Behavior: All Instrumentations Enabled
By default, KeywordsAI tracing attempts to enable ALL available instrumentations.
from keywordsai_tracing import KeywordsAITelemetry
# This enables ALL available instrumentations (if packages are installed)
telemetry = KeywordsAITelemetry(
app_name="my-app",
api_key="kwai-xxx"
)
If a library is installed in your environment, its instrumentation will be automatically enabled. If not installed, it's silently skipped (no errors).
Available Instrumentations
The SDK supports instrumentation for:
AI/ML Libraries:
openai- OpenAI APIanthropic- Anthropic (Claude) APIcohere- Cohere APImistral- Mistral AIollama- Ollama (local models)groq- Groq APItogether- Together AIreplicate- Replicatetransformers- Hugging Face Transformers
Cloud AI Services:
bedrock- AWS Bedrocksagemaker- AWS SageMakervertexai- Google Vertex AIgoogle_generativeai- Google AI (Gemini)watsonx- IBM WatsonXalephalpha- Aleph Alpha
Vector Databases:
pinecone- Pineconeqdrant- Qdrantchroma- Chromamilvus- Milvusweaviate- Weaviatelancedb- LanceDBmarqo- Marqo
Frameworks:
langchain- LangChainllama_index- LlamaIndexhaystack- Haystackcrew- CrewAImcp- Model Context Protocol
Infrastructure:
redis- Redisrequests- HTTP requests libraryurllib3- urllib3 HTTP clientpymysql- PyMySQL database clientthreading- Context propagation across threads (⚠️ Always enabled by default)
Installing Instrumentation Packages
Important: To trace a specific library, you need to install the corresponding OpenTelemetry instrumentation package.
The SDK uses the OpenTelemetry standard instrumentation packages:
# Install instrumentation for the libraries you use
pip install opentelemetry-instrumentation-openai
pip install opentelemetry-instrumentation-anthropic
pip install opentelemetry-instrumentation-langchain
pip install opentelemetry-instrumentation-requests
Naming convention: opentelemetry-instrumentation-<library-name>
Example: Tracing OpenAI Calls
# 1. Install OpenAI client
pip install openai
# 2. Install OpenTelemetry instrumentation for OpenAI
pip install opentelemetry-instrumentation-openai
# 3. Install KeywordsAI tracing
pip install keywordsai-tracing
from keywordsai_tracing import KeywordsAITelemetry
from openai import OpenAI
# Initialize telemetry (OpenAI instrumentation auto-enabled)
telemetry = KeywordsAITelemetry(
app_name="my-app",
api_key="kwai-xxx"
)
client = OpenAI()
# This call is automatically traced!
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}]
)
The OpenAI call will automatically create a span with:
- Model name
- Prompt and completion
- Token usage
- Latency
- Cost (if available)
Important: Threading Instrumentation
Threading instrumentation is automatically enabled (even when you specify custom instruments) because it's critical for context propagation across threads.
# These all include threading by default:
telemetry = KeywordsAITelemetry() # All instruments including threading
telemetry = KeywordsAITelemetry(
instruments={Instruments.OPENAI} # OpenAI + Threading (auto-added!)
)
telemetry = KeywordsAITelemetry(
instruments={Instruments.OPENAI, Instruments.ANTHROPIC} # + Threading!
)
To disable threading (if you're certain your app is single-threaded):
telemetry = KeywordsAITelemetry(
block_instruments={Instruments.THREADING} # Explicitly disabled
)
Controlling Instrumentation
Option 1: Disable Specific Instruments
Use block_instruments to disable specific instrumentations you don't want:
from keywordsai_tracing import KeywordsAITelemetry, Instruments
telemetry = KeywordsAITelemetry(
app_name="my-app",
api_key="kwai-xxx",
block_instruments={
Instruments.REQUESTS, # Don't trace HTTP requests
Instruments.URLLIB3, # Don't trace urllib3
Instruments.REDIS, # Don't trace Redis calls
}
)
Use case: Reduce noise by blocking low-level HTTP instrumentations when you only care about high-level LLM calls.
Option 2: Enable Only Specific Instruments
Use instruments to enable only the instrumentations you want:
from keywordsai_tracing import KeywordsAITelemetry, Instruments
telemetry = KeywordsAITelemetry(
app_name="my-app",
api_key="kwai-xxx",
instruments={
Instruments.OPENAI, # Only trace OpenAI
Instruments.ANTHROPIC, # Only trace Anthropic
}
)
Use case: Maximum performance and minimal noise - only instrument what you need.
Option 3: Disable All Instrumentation
Pass an empty set to disable all automatic instrumentation:
from keywordsai_tracing import KeywordsAITelemetry
telemetry = KeywordsAITelemetry(
app_name="my-app",
api_key="kwai-xxx",
instruments=set(), # No auto-instrumentation
)
Use case:
- Backend systems that only need
@workflow/@taskdecorators - Custom manual instrumentation only
- Minimal overhead
Even with instruments=set(), you can still:
- ✅ Use
@workflowand@taskdecorators - ✅ Create manual spans with
tracer.start_as_current_span() - ✅ Use
get_client()to update spans - ❌ Won't automatically trace OpenAI/Anthropic/etc calls
Instrumentation Best Practices
1. Install Only What You Need
# Bad: Installing all instrumentation packages (bloats dependencies)
pip install opentelemetry-instrumentation-openai \
opentelemetry-instrumentation-anthropic \
opentelemetry-instrumentation-langchain \
# ... 30+ packages
# Good: Install only what you use
pip install opentelemetry-instrumentation-openai # You use OpenAI
pip install opentelemetry-instrumentation-langchain # You use LangChain
2. Reduce Noise in Production
# Development: Enable everything for visibility
dev_telemetry = KeywordsAITelemetry(
app_name="my-app-dev"
)
# Production: Block low-level instrumentations to reduce noise
prod_telemetry = KeywordsAITelemetry(
app_name="my-app-prod",
block_instruments={
Instruments.REQUESTS,
Instruments.URLLIB3,
Instruments.REDIS,
}
)
3. Backend Services: Minimal Instrumentation
# Backend API: Disable auto-instrumentation, use decorators only
backend_telemetry = KeywordsAITelemetry(
app_name="backend-api",
instruments=set(), # No auto-instrumentation
disable_batch=True, # No background threads
)
# Manual instrumentation only
@workflow(name="api_endpoint")
def api_endpoint(data):
# Your logic here
pass
Troubleshooting Instrumentation
Issue: "My OpenAI calls aren't being traced"
Solution: Install the instrumentation package:
pip install opentelemetry-instrumentation-openai
Verify it's working:
from keywordsai_tracing import KeywordsAITelemetry
telemetry = KeywordsAITelemetry(log_level="DEBUG")
# Check logs for: "Initialized OpenAI instrumentation"
Issue: "Too many spans are being created (noise)"
Solution: Block unnecessary instrumentations:
from keywordsai_tracing import Instruments
telemetry = KeywordsAITelemetry(
block_instruments={
Instruments.REQUESTS, # Block HTTP client spans
Instruments.URLLIB3, # Block urllib3 spans
}
)
Issue: "Performance overhead in my backend"
Solution: Disable auto-instrumentation entirely:
telemetry = KeywordsAITelemetry(
instruments=set(), # No auto-instrumentation
disable_batch=True, # No background threads
)
Summary
| Configuration | Auto-Instrumentation | Use Case |
|---|---|---|
| Default | All available | Development, full visibility |
block_instruments={...} |
All except blocked | Production, reduce noise |
instruments={...} |
Only specified | Targeted tracing |
instruments=set() |
None | Backend, minimal overhead |
Key Points:
- ✅ Default: All instrumentations enabled (if packages installed)
- ✅ Install:
opentelemetry-instrumentation-<library>for each library - ✅ Control: Use
instrumentsorblock_instrumentsparameters - ✅ Performance: Disable unused instrumentations to reduce overhead
- ✅ Flexibility: Can disable all auto-instrumentation and use decorators only
Advanced Features
Custom Exporters
If you need custom export logic (e.g., sending to an internal system instead of KeywordsAI API), you can provide your own SpanExporter:
from keywordsai_tracing import KeywordsAITelemetry
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
class CustomExporter(SpanExporter):
"""Custom exporter that sends spans to your internal system"""
def export(self, spans):
# Your custom export logic here
for span in spans:
# Send to your internal system
my_internal_system.log(span)
return SpanExportResult.SUCCESS
def shutdown(self):
pass
def force_flush(self, timeout_millis=30000):
return True
# Initialize with custom exporter
telemetry = KeywordsAITelemetry(
app_name="my-app",
custom_exporter=CustomExporter(),
)
When using a custom exporter, api_key and base_url parameters are not required since spans won't be sent to the KeywordsAI API.
Use cases for custom exporters:
- Internal integrations that need custom export logic
- Sending spans to custom backends or databases
- Writing spans to files for debugging
- Testing and development environments
See examples/custom_exporter_example.py for complete working examples including:
- File exporter (writing spans to JSON lines)
- Console exporter (printing spans to terminal)
- Custom backend exporter (sending to your own API)
Update Span Functionality
You can dynamically update spans while they're running using the get_client() API. This is useful for:
- Adding KeywordsAI-specific parameters (like
customer_identifier,trace_group_identifier) - Setting custom attributes during execution
- Adding events to track progress
- Recording exceptions and errors
- Changing span names based on runtime conditions
from keywordsai_tracing import KeywordsAITelemetry, get_client, workflow
from openai import OpenAI
telemetry = KeywordsAITelemetry(
app_name="my-app",
api_key="kwai-xxx"
)
client = OpenAI()
@workflow(name="data_processing")
def process_data(user_id: str, data: dict):
# Get the client to interact with the current span
kwai_client = get_client()
# Get current trace information
trace_id = kwai_client.get_current_trace_id()
span_id = kwai_client.get_current_span_id()
print(f"Processing in trace: {trace_id}")
# Update span with KeywordsAI-specific parameters
kwai_client.update_current_span(
keywordsai_params={
"customer_identifier": user_id,
"trace_group_identifier": "data-processing-pipeline",
"metadata": {
"data_size": len(str(data)),
"processing_type": "batch"
}
}
)
# Add an event to track progress
kwai_client.add_event("validation_started", {
"record_count": len(data)
})
# Add custom attributes
kwai_client.update_current_span(
attributes={
"custom.user_id": user_id,
"custom.data_type": type(data).__name__
}
)
try:
# Call LLM
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": f"Process: {data}"}]
)
# Update span name based on result
kwai_client.update_current_span(
name="data_processing.success",
attributes={"result_length": len(response.choices[0].message.content)}
)
return response.choices[0].message.content
except Exception as e:
# Record exception in the span
kwai_client.record_exception(e)
raise
# Use the workflow
result = process_data("user-123", {"key": "value"})
Available Client Methods:
get_current_trace_id()- Get the current trace IDget_current_span_id()- Get the current span IDupdate_current_span()- Update span with params, attributes, name, or statusadd_event()- Add an event to the current spanrecord_exception()- Record an exception on the current spanis_recording()- Check if the current span is recording
See examples/simple_span_updating_example.py for a complete example.
Manual Span Creation
For fine-grained control, you can manually create custom spans using the tracer directly. This is useful when:
- You need spans that don't fit the
@workflow/@taskpattern - You want to instrument specific code blocks
- You're integrating with existing tracing code
- You need to create spans conditionally
from keywordsai_tracing import KeywordsAITelemetry, get_client
from opentelemetry import trace
telemetry = KeywordsAITelemetry(
app_name="my-app",
api_key="kwai-xxx"
)
# Get the tracer instance
tracer = telemetry.tracer.get_tracer()
# Create a parent span manually
with tracer.start_as_current_span("database_operation") as parent_span:
parent_span.set_attribute("db.system", "postgresql")
parent_span.set_attribute("db.operation", "query")
parent_span.add_event("Connection established")
# Create nested child spans
with tracer.start_as_current_span("execute_query") as query_span:
query_span.set_attribute("db.statement", "SELECT * FROM users")
query_span.set_attribute("db.rows_affected", 42)
# You can still use get_client() within manual spans
client = get_client()
client.update_current_span(
keywordsai_params={
"customer_identifier": "admin-user"
}
)
# Simulate query execution
result = execute_database_query()
with tracer.start_as_current_span("process_results") as process_span:
process_span.set_attribute("result.count", len(result))
processed = process_results(result)
parent_span.add_event("Operation completed", {
"total_time_ms": 150
})
Combining Manual Spans with Decorators:
You can mix manual span creation with decorator-based spans:
from keywordsai_tracing import workflow, task, get_client
@workflow(name="hybrid_workflow")
def hybrid_workflow(data):
client = get_client()
telemetry = KeywordsAITelemetry()
tracer = telemetry.tracer.get_tracer()
# Use decorator-based task
validated_data = validate_data(data)
# Create manual span for specific instrumentation
with tracer.start_as_current_span("custom_processing") as span:
span.set_attribute("processing.type", "custom")
span.set_attribute("data.size", len(validated_data))
# Custom processing logic
for item in validated_data:
with tracer.start_as_current_span(f"process_item_{item['id']}") as item_span:
item_span.set_attribute("item.id", item['id'])
process_single_item(item)
# Use another decorator-based task
return finalize_results(validated_data)
@task(name="validate_data")
def validate_data(data):
# Decorator-based span
return [item for item in data if item.get('valid')]
@task(name="finalize_results")
def finalize_results(data):
# Decorator-based span
return {"processed": len(data), "data": data}
Thread-Safe Context Propagation:
When working with threads, you can manually propagate context:
from opentelemetry import context as otel_context
import threading
@workflow(name="threaded_workflow")
def threaded_workflow():
client = get_client()
telemetry = KeywordsAITelemetry()
tracer = telemetry.tracer.get_tracer()
# Capture the current context
current_context = otel_context.get_current()
def worker_function():
# Attach the captured context in the worker thread
token = otel_context.attach(current_context)
try:
# Create spans in the worker thread
with tracer.start_as_current_span("worker_task") as span:
span.set_attribute("thread.name", threading.current_thread().name)
# Your work here
pass
finally:
# Detach the context
otel_context.detach(token)
# Start worker thread
thread = threading.Thread(target=worker_function)
thread.start()
thread.join()
Key Points:
- Use
tracer.start_as_current_span(name)to create custom spans - Spans automatically nest based on context
- You can mix manual spans with
@workflowand@taskdecorators - Use
get_client()within manual spans to access client API - Threading instrumentation is enabled by default for automatic context propagation across threads
See examples/custom_exporter_example.py for examples of manual span creation.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file keywordsai_tracing-0.0.49.tar.gz.
File metadata
- Download URL: keywordsai_tracing-0.0.49.tar.gz
- Upload date:
- Size: 86.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.7.1 CPython/3.12.8 Darwin/25.1.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
97bac2cd916b27f856bf38820835a778b6662c6929e8eb7be7694d6b280e18dd
|
|
| MD5 |
2992bed599d45b5ea6df3f7fadcb102a
|
|
| BLAKE2b-256 |
28d8965fed54d4f6331fc09689808ca70257d24ac9feda15836fca2d321073a5
|
File details
Details for the file keywordsai_tracing-0.0.49-py3-none-any.whl.
File metadata
- Download URL: keywordsai_tracing-0.0.49-py3-none-any.whl
- Upload date:
- Size: 34.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.7.1 CPython/3.12.8 Darwin/25.1.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
071dab85c39d944b44312dfdbd9fa2af2fb3148461d8780bf70936948b96a881
|
|
| MD5 |
e67271e78b781aa6fffb586a5403191a
|
|
| BLAKE2b-256 |
69a3abe082e36f7527650e317aef80f5341b5f279c57ddf77c05f40dace65541
|