Skip to main content

Lumenova Beacon SDK - A Python SDK for observability tracing with OpenTelemetry-compatible span export

Project description

Lumenova Beacon SDK

PyPI version Python Versions License

A Python observability tracing SDK that sends spans in OpenTelemetry-compatible format, designed for AI/LLM applications.

Features

  • OpenTelemetry Integration - Automatic instrumentation for Anthropic, OpenAI, FastAPI, Redis, HTTPX, and more
  • Manual & Decorator Tracing - Create spans manually or use @trace decorator
  • LangChain Integration - Automatic tracing for chains, agents, tools, and retrievers
  • Dataset Management - ActiveRecord-style API for managing test datasets
  • Prompt Management - Version-controlled prompt templates with labels (staging, production)
  • Flexible Transport - HTTP or file-based span export
  • Full Async Support - Async/await throughout

Requirements

  • Python 3.10+

Installation

# Base installation
pip install lumenova-beacon

# With OpenTelemetry support
pip install lumenova-beacon[opentelemetry]

# With LangChain support
pip install lumenova-beacon[langchain]

# With LiteLLM support
pip install lumenova-beacon[litellm]

Quick Start

from lumenova_beacon import BeaconClient, trace

# Initialize client with your Beacon credentials
client = BeaconClient(
    endpoint="https://your-beacon-endpoint.lumenova.ai",  # Your Beacon endpoint
    api_key="your-api-key",  # API key from your Beacon account
    session_id="my-session"
)

# Use decorator for automatic tracing
@trace
def my_function(x, y):
    return x + y

result = my_function(10, 20)  # Automatically traced

Configuration

Environment Variables

All environment variables work as fallback - constructor parameters override them:

# Bash/Linux/macOS
export BEACON_ENDPOINT="https://your-beacon-endpoint.lumenova.ai"
export BEACON_API_KEY="your-api-key"
export BEACON_SESSION_ID="my-session"
# PowerShell
$env:BEACON_ENDPOINT = "https://your-beacon-endpoint.lumenova.ai"
$env:BEACON_API_KEY = "your-api-key"
$env:BEACON_SESSION_ID = "my-session"

Configuration Options

from lumenova_beacon import BeaconClient, BeaconConfig

client = BeaconClient(
    # HTTP Transport
    endpoint="https://your-beacon-endpoint.lumenova.ai",
    api_key="your-api-key",
    timeout=10.0,
    verify=True,
    headers={"Custom-Header": "value"},

    # Span Configuration
    session_id="my-session",

    # General
    enabled=True,
    debug=False,
)

File Transport

For local development or testing:

from lumenova_beacon import BeaconClient
from lumenova_beacon.core.transport import FileTransport

client = BeaconClient(
    transport=FileTransport(
        directory="./traces",
        filename_pattern="{span_id}.json",
        pretty_print=True
    )
)

Core Features

1. Tracing

Decorator Tracing

The @trace decorator automatically captures function execution:

from lumenova_beacon import trace

# Simple usage
@trace
def process_data(data):
    return data.upper()

# With custom name
@trace(name="custom_operation")
def another_function():
    pass

# Capture inputs and outputs
@trace(capture_args=True, capture_result=True)
def calculate(x, y):
    return x + y

# Works with async functions
@trace
async def async_operation():
    await some_async_call()

Manual Tracing

For more control, use context managers:

from lumenova_beacon import BeaconClient
from lumenova_beacon.types import SpanKind, StatusCode

client = BeaconClient()

# Context manager
with client.trace("operation_name") as span:
    span.set_attribute("user_id", "123")
    span.set_input({"query": "search term"})

    try:
        result = do_work()
        span.set_output(result)
        span.set_status(StatusCode.OK)
    except Exception as e:
        span.record_exception(e)
        span.set_status(StatusCode.ERROR, str(e))
        raise

# Async context manager
async with client.trace("async_operation") as span:
    result = await async_work()
    span.set_output(result)

# Direct span creation
span = client.create_span(
    name="manual_span",
    kind=SpanKind.CLIENT,
)
span.start()
# ... do work ...
span.end()

2. OpenTelemetry Integration

Beacon automatically configures OpenTelemetry to export spans:

from lumenova_beacon import BeaconClient
from opentelemetry.instrumentation.anthropic import AnthropicInstrumentor
from opentelemetry.instrumentation.openai import OpenAIInstrumentor

# Initialize (auto-configures OpenTelemetry)
client = BeaconClient(
    endpoint="https://your-beacon-endpoint.lumenova.ai",
    api_key="your-api-key",
    auto_instrument_opentelemetry=True  # Default
)

# Instrument libraries
AnthropicInstrumentor().instrument()
OpenAIInstrumentor().instrument()

# Now all API calls are automatically traced!
from anthropic import Anthropic
anthropic = Anthropic()
response = anthropic.messages.create(
    model="claude-3-5-sonnet-20241022",
    messages=[{"role": "user", "content": "Hello!"}]
)  # Automatically traced with proper span hierarchy

Supported Instrumentors

Install additional instrumentors as needed:

pip install opentelemetry-instrumentation-anthropic
pip install opentelemetry-instrumentation-openai
pip install opentelemetry-instrumentation-fastapi
pip install opentelemetry-instrumentation-redis
pip install opentelemetry-instrumentation-httpx
pip install opentelemetry-instrumentation-requests

3. LangChain Integration

Automatically trace all LangChain operations:

from lumenova_beacon import BeaconClient, BeaconCallbackHandler
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

client = BeaconClient()
handler = BeaconCallbackHandler(
    session_id="session-123"
)

# Use with request-time callbacks (recommended)
llm = ChatOpenAI(model="gpt-4")
response = llm.invoke(
    "What is the capital of France?",
    config={"callbacks": [handler]}
)

# Works with chains
prompt = ChatPromptTemplate.from_template("Tell me about {topic}")
chain = prompt | llm
response = chain.invoke(
    {"topic": "AI"},
    config={"callbacks": [handler]}
)

# Traces agents, tools, retrievers, and more
from langchain.agents import create_react_agent, AgentExecutor

agent = create_react_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools)
result = executor.invoke(
    {"input": "What's the weather?"},
    config={"callbacks": [handler]}
)

4. Dataset Management

Manage test datasets with an ActiveRecord-style API. Both sync and async methods are available:

  • Sync methods (simple names): Dataset.method(...) or dataset.method(...)
  • Async methods ('a' prefix): await Dataset.amethod(...) or await dataset.amethod(...)
from lumenova_beacon import BeaconClient
from lumenova_beacon.datasets import Dataset, DatasetRecord

client = BeaconClient()

# Create dataset (sync)
dataset = Dataset.create(
    name="qa-evaluation",
    description="Question answering test cases"
)

# Create dataset (async)
dataset = await Dataset.acreate(
    name="qa-evaluation",
    description="Question answering test cases"
)

# Add a single record with flexible column-based data (sync)
dataset.create_record(
    data={
        "prompt": "What is AI?",
        "expected_answer": "Artificial Intelligence is...",
        "difficulty": "easy",
        "category": "definitions"
    }
)

# Add a single record (async)
await dataset.acreate_record(
    data={
        "prompt": "What is AI?",
        "expected_answer": "Artificial Intelligence is...",
        "difficulty": "easy"
    }
)

# Bulk create records (sync)
records = [
    {
        "data": {
            "question": "What is ML?",
            "expected_answer": "Machine Learning...",
            "difficulty": "medium"
        }
    },
    {
        "data": {
            "question": "What is DL?",
            "expected_answer": "Deep Learning...",
            "difficulty": "hard"
        }
    }
]
dataset.bulk_create_records(records)

# Bulk create records (async)
await dataset.abulk_create_records(records)

# List datasets (sync)
datasets, pagination = Dataset.list(page=1, page_size=20, search="qa")
for ds in datasets:
    print(f"{ds.name}: {ds.description}")

# List datasets (async)
datasets, pagination = await Dataset.alist(page=1, page_size=20, search="qa")
for ds in datasets:
    print(f"{ds.name}: {ds.description}")

# Get dataset (sync)
dataset = Dataset.get(dataset_id="dataset-uuid", include_records=True)

# Get dataset (async)
dataset = await Dataset.aget(dataset_id="dataset-uuid", include_records=True)

# List records with pagination (sync)
records, pagination = dataset.list_records(page=1, page_size=50)

# List records with pagination (async)
records, pagination = await dataset.alist_records(page=1, page_size=50)

# Update dataset (sync)
dataset.update(name="updated-name", description="New description")

# Update dataset (async)
await dataset.aupdate(name="updated-name", description="New description")

# Delete dataset (cascade deletes records) (sync)
dataset.delete()

# Delete dataset (async)
await dataset.adelete()

5. Prompt Management

Version-controlled prompt templates with labels:

Creating Prompts

from lumenova_beacon import BeaconClient
from lumenova_beacon.prompts import Prompt

client = BeaconClient()

# Create text prompt (sync)
prompt = Prompt.create(
    name="greeting",
    template="Hello {{name}}! Welcome to {{company}}.",
    description="Customer greeting template",
    tags=["customer-support", "greeting"]
)

# Create chat prompt (async)
prompt = await Prompt.acreate(
    name="support-bot",
    messages=[
        {"role": "system", "content": "You are a helpful assistant for {{product}}."},
        {"role": "user", "content": "{{question}}"}
    ],
    tags=["support"]
)

# Quick sync example
prompt = Prompt.create(
    name="quick-prompt",
    template="Hi {{name}}!"
)

Fetching and Using Prompts

# Get latest version (sync)
prompt = Prompt.get("greeting")

# Get specific version (async)
prompt = await Prompt.aget("greeting", version=2)

# Get labeled version (sync)
prompt = Prompt.get("greeting", label="production")

# Get by ID (async)
prompt = await Prompt.aget(prompt_id="prompt-uuid")

# Format prompt with variables
message = prompt.format(name="Alice", company="Acme Corp")
# Result: "Hello Alice! Welcome to Acme Corp."

# Chat prompt formatting (async)
prompt = await Prompt.aget("support-bot")
messages = prompt.format(product="CloudSync", question="How do I sync?")
# Result: [{"role": "system", "content": "..."}, {"role": "user", "content": "..."}]

Versioning and Labels

# Publish new version (async)
new_version = await prompt.apublish(
    template="Hi {{name}}! Welcome to {{company}}. We're excited to have you!",
    message="Added enthusiastic tone"
)
print(f"Published version {new_version.version}")

# Set labels (sync)
prompt.set_label("staging", version=2)
prompt.set_label("production", version=2)

# Promote staging to production after testing (async)
staging_prompt = await Prompt.aget("greeting", label="staging")
# ... test the prompt ...
await staging_prompt.aset_label("production")

LangChain Conversion

from langchain_core.prompts import PromptTemplate, ChatPromptTemplate

# Convert text prompt to LangChain (sync)
prompt = Prompt.get("greeting", label="production")
lc_prompt = prompt.to_langchain()  # Returns PromptTemplate
result = lc_prompt.format(name="Bob", company="TechCorp")

# Convert chat prompt to LangChain (async)
chat_prompt = await Prompt.aget("support-bot", label="production")
lc_chat = chat_prompt.to_langchain()  # Returns ChatPromptTemplate
messages = lc_chat.format_messages(product="DataHub", question="Reset password?")

# Use in chain
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4")
chain = lc_chat | llm
response = await chain.ainvoke({"product": "CloudSync", "question": "Why sync failing?"})

List and Search

# List all prompts (sync)
prompts = Prompt.list(page=1, page_size=20)

# Filter by tags (async)
support_prompts = await Prompt.alist(tags=["customer-support"])

# Search by text (sync)
results = Prompt.list(search="greeting")

# Async version
prompts = await Prompt.alist(page=1, page_size=10)

API Reference

Main Exports

from lumenova_beacon import (
    BeaconClient,           # Main client
    BeaconConfig,           # Configuration class
    get_client,             # Get current client singleton
    trace,                  # Tracing decorator
)

from lumenova_beacon.integrations import BeaconCallbackHandler  # LangChain integration
from lumenova_beacon.datasets import Dataset, DatasetRecord
from lumenova_beacon.prompts import Prompt
from lumenova_beacon.types import SpanKind, StatusCode, SpanType

BeaconClient

client = BeaconClient(
    endpoint: str = "",
    api_key: str | None = None,
    session_id: str | None = None,
    timeout: float = 10.0,
    verify: bool = True,
    enabled: bool = True,
    debug: bool = False,
    auto_instrument_opentelemetry: bool = True,
)

# Methods
span = client.create_span(name, kind, span_type, session_id)
ctx = client.trace(name, kind, span_type)  # Context manager
client.send_span(span)  # Sync
await client.send_span_async(span)  # Async
client.flush()
base_url = client.get_base_url()

Dataset

# Class methods (sync - simple names)
dataset = Dataset.create(name: str, description: str | None = None, column_schema: list[dict[str, Any]] | None = None)
dataset = Dataset.get(dataset_id: str, include_records: bool = False)
datasets, pagination = Dataset.list(page=1, page_size=20, search=None)

# Class methods (async - 'a' prefix)
dataset = await Dataset.acreate(...)
dataset = await Dataset.aget(...)
datasets, pagination = await Dataset.alist(...)

# Instance methods (sync - simple names)
dataset.save()
dataset.update(name=None, description=None)
dataset.delete()
record = dataset.create_record(data: dict[str, Any])
dataset.bulk_create_records(records: list[dict])
records, pagination = dataset.list_records(page=1, page_size=50)

# Instance methods (async - 'a' prefix)
await dataset.asave()
await dataset.aupdate(...)
await dataset.adelete()
record = await dataset.acreate_record(...)
await dataset.abulk_create_records(...)
records, pagination = await dataset.alist_records(...)

# Properties
dataset.id
dataset.name
dataset.description
dataset.record_count
dataset.created_at
dataset.updated_at
dataset.column_schema

DatasetRecord

# Class methods (sync - simple names)
record = DatasetRecord.get(dataset_id: str, record_id: str)
records, pagination = DatasetRecord.list(dataset_id: str, page=1, page_size=50)

# Class methods (async - 'a' prefix)
record = await DatasetRecord.aget(...)
records, pagination = await DatasetRecord.alist(...)

# Instance methods (sync - simple names)
record.save()
record.update(data: dict[str, Any] | None = None)
record.delete()

# Instance methods (async - 'a' prefix)
await record.asave()
await record.aupdate(...)
await record.adelete()

# Properties
record.id
record.dataset_id
record.data  # dict[str, Any] - flexible column data
record.created_at
record.updated_at

Prompt

# Class methods (sync - simple names)
prompt = Prompt.create(name, template=None, messages=None, description=None, tags=None)
prompt = Prompt.get(name=None, prompt_id=None, label="latest", version=None)
prompts = Prompt.list(page=1, page_size=10, tags=None, search=None)

# Class methods (async - 'a' prefix)
prompt = await Prompt.acreate(...)
prompt = await Prompt.aget(...)
prompts = await Prompt.alist(...)

# Instance methods (sync - simple names)
prompt.update(name=None, description=None, tags=None)
prompt.delete()
new_version = prompt.publish(template=None, messages=None, message="")
prompt.set_label(label: str, version: int | None = None)

# Instance methods (async - 'a' prefix)
await prompt.aupdate(...)
await prompt.adelete()
new_version = await prompt.apublish(...)
await prompt.aset_label(...)

# Rendering (always sync)
result = prompt.format(**kwargs)
result = prompt.compile(variables: dict)
template = prompt.to_template()  # Convert to Python f-string format
lc_prompt = prompt.to_langchain()  # Convert to LangChain template

# Properties
prompt.id
prompt.name
prompt.type  # "text" or "chat"
prompt.version
prompt.template  # For TEXT prompts
prompt.messages  # For CHAT prompts
prompt.labels  # list[str]
prompt.tags  # list[str]

Span

span = Span(name, kind, span_type)

# Lifecycle
span.start()
span.end(status_code=StatusCode.OK)

# Status
span.set_status(StatusCode.ERROR, "description")
span.record_exception(exc: Exception)

# Attributes
span.set_attribute("key", value)
span.set_attributes({"k1": "v1", "k2": "v2"})
span.set_input(data: dict)
span.set_output(data: dict)
span.set_metadata("key", value)

# Properties
span.trace_id
span.span_id
span.parent_id
span.name
span.kind
span.span_type

Type Enums

from lumenova_beacon.types import SpanKind, StatusCode, SpanType

# SpanKind
SpanKind.INTERNAL
SpanKind.SERVER
SpanKind.CLIENT
SpanKind.PRODUCER
SpanKind.CONSUMER

# StatusCode
StatusCode.UNSET
StatusCode.OK
StatusCode.ERROR

# SpanType
SpanType.SPAN
SpanType.GENERATION
SpanType.CHAIN
SpanType.TOOL
SpanType.RETRIEVAL
SpanType.AGENT
SpanType.FUNCTION

Error Handling

Exception Hierarchy

from lumenova_beacon.exceptions import (
    BeaconError,              # Base exception
    ConfigurationError,       # Configuration issues
    TransportError,           # Transport errors
    HTTPTransportError,       # HTTP transport errors
    FileTransportError,       # File transport errors
    SpanError,                # Span-related errors
    DatasetError,             # Dataset errors
    DatasetNotFoundError,     # Dataset not found
    DatasetValidationError,   # Dataset validation
    PromptError,              # Prompt errors
    PromptNotFoundError,      # Prompt not found
    PromptValidationError,    # Prompt validation
    PromptCompilationError,   # Template compilation
    PromptNetworkError,       # Network errors
)

Retry Logic

All HTTP operations automatically retry up to 3 times with exponential backoff:

from lumenova_beacon.exceptions import PromptNetworkError

try:
    prompt = await Prompt.get("my-prompt")
except PromptNetworkError as e:
    # Failed after 3 automatic retries
    print(f"Network error: {e}")
except PromptNotFoundError as e:
    # Prompt doesn't exist
    print(f"Not found: {e}")

Graceful Degradation

from lumenova_beacon import BeaconClient

# Disable tracing in development
client = BeaconClient(enabled=False)

# Tracing becomes no-op when disabled
@trace
def my_function():
    return "result"  # No tracing overhead

License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lumenova_beacon-2.1.9.tar.gz (504.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

lumenova_beacon-2.1.9-py3-none-any.whl (99.1 kB view details)

Uploaded Python 3

File details

Details for the file lumenova_beacon-2.1.9.tar.gz.

File metadata

  • Download URL: lumenova_beacon-2.1.9.tar.gz
  • Upload date:
  • Size: 504.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for lumenova_beacon-2.1.9.tar.gz
Algorithm Hash digest
SHA256 6685d28c86aba9f55205bf3ffe282fedf7ac23c97cc991faac5c374b7379b1c1
MD5 babd4791835e68ec7e19a03b45656923
BLAKE2b-256 00a16782364049e7461a4e3e8fa3efa7b707d007206b054d7e518dabcff9af92

See more details on using hashes here.

File details

Details for the file lumenova_beacon-2.1.9-py3-none-any.whl.

File metadata

File hashes

Hashes for lumenova_beacon-2.1.9-py3-none-any.whl
Algorithm Hash digest
SHA256 07a89c0d9c7336c04e48295465d52a344cbeb42ac6aa90af017eb13dda4d3c5e
MD5 8fd559b482bdb6f9168dfb510dd67464
BLAKE2b-256 46897d6c52408cd06282de153aaff68159a3c395353659dc160a3ab3413d3fb8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page