Unified LLM Observability & Audit SDK - thin wrapper around Pydantic Logfire with Keycloak and Kafka support
Project description
Autonomize Observer SDK
A lightweight, production-ready SDK for LLM observability and audit logging. Built as a thin wrapper around Pydantic Logfire for tracing and genai-prices for cost calculation, with additional support for:
- Audit Logging - Compliance-ready audit trails with Keycloak JWT integration
- Kafka Export - Stream audit events to Kafka for downstream processing
- Langflow Integration - First-class support for Langflow/Flow tracing
- FastAPI Middleware - Automatic user context extraction from JWT tokens
Why Autonomize Observer?
Instead of reinventing the wheel, we leverage best-in-class libraries:
| Feature | Powered By |
|---|---|
| OTEL Tracing & Spans | Pydantic Logfire |
| OpenAI/Anthropic Instrumentation | Logfire Integrations |
| LLM Cost Calculation | genai-prices (28+ providers) |
| Audit Logging & Keycloak | Autonomize Observer |
| Kafka Event Export | Autonomize Observer |
| Langflow Integration | Autonomize Observer |
Quick Start
Installation
# Using pip
pip install autonomize-observer
# Using uv (recommended)
uv add autonomize-observer
Basic Usage
from autonomize_observer import init, audit
from autonomize_observer import ResourceType
# Initialize once at startup
init(
service_name="my-service",
kafka_enabled=False, # Enable for Kafka export
)
# Log audit events
audit.log_create(
resource_type=ResourceType.DOCUMENT,
resource_id="doc-123",
resource_name="Project Proposal",
)
# Log LLM interactions for compliance
audit.log_llm_interaction(
flow_id="flow-456",
model="gpt-4o",
provider="openai",
input_tokens=150,
output_tokens=75,
cost=0.025,
)
LLM Tracing with Logfire
For LLM tracing, use Logfire directly - we don't duplicate its functionality:
import logfire
from openai import OpenAI
# Configure Logfire (data stays local by default)
logfire.configure(
service_name="my-service",
send_to_logfire=False, # Keep data local
)
# One-line instrumentation for all OpenAI calls
logfire.instrument_openai()
# Use OpenAI normally - all calls are traced
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}]
)
Cost Calculation
from autonomize_observer import calculate_cost, get_price
# Calculate cost for an LLM call
result = calculate_cost(
provider="openai",
model="gpt-4o",
input_tokens=1000,
output_tokens=500,
)
print(f"Total cost: ${result.total_cost:.4f}")
print(f"Input cost: ${result.input_cost:.4f}")
print(f"Output cost: ${result.output_cost:.4f}")
# Get price info for a model
price = get_price("anthropic", "claude-3-5-sonnet-20241022")
if price:
print(f"Input: ${price.input_price_per_1k:.4f}/1K tokens")
print(f"Output: ${price.output_price_per_1k:.4f}/1K tokens")
Key Features
Audit Logging with Keycloak Support
from autonomize_observer import (
init, audit,
ActorContext, set_actor_context,
ResourceType, AuditAction,
)
# Set user context from Keycloak JWT
set_actor_context(ActorContext(
actor_id="user-123",
email="user@example.com",
roles=["admin", "analyst"],
))
# All audit events now include user context
audit.log_read(
resource_type=ResourceType.FILE,
resource_id="sensitive-data.csv",
)
audit.log_update(
resource_type=ResourceType.USER,
resource_id="user-456",
changes=[
{"field": "role", "old_value": "viewer", "new_value": "editor"},
],
)
Kafka Export for Audit Events
from autonomize_observer import init, KafkaConfig
init(
service_name="my-service",
kafka_config=KafkaConfig(
bootstrap_servers="kafka:9092",
audit_topic="audit-events",
security_protocol="SASL_SSL",
sasl_mechanism="PLAIN",
sasl_username="user",
sasl_password="secret",
),
kafka_enabled=True,
)
# All audit events are now streamed to Kafka
Langflow Integration
from autonomize_observer.integrations import trace_flow, trace_component
@trace_flow(
flow_id="customer-support-flow",
flow_name="Customer Support Bot",
session_id="session-123",
)
def run_customer_support(query: str) -> str:
# Flow execution is automatically traced
@trace_component("LLMComponent", "Query Analyzer")
def analyze_query():
# Component execution is traced as a child span
return process_with_llm(query)
@trace_component("LLMComponent", "Response Generator")
def generate_response(analysis):
return generate_with_llm(analysis)
analysis = analyze_query()
return generate_response(analysis)
FastAPI Integration
from fastapi import FastAPI
from autonomize_observer.integrations import setup_fastapi
app = FastAPI()
# Automatically extracts user context from JWT tokens
setup_fastapi(
app,
service_name="my-api",
keycloak_enabled=True,
)
@app.get("/documents/{doc_id}")
async def get_document(doc_id: str):
# User context is automatically available for audit logging
from autonomize_observer import audit, ResourceType
audit.log_read(
resource_type=ResourceType.DOCUMENT,
resource_id=doc_id,
)
return {"id": doc_id, "content": "..."}
Workflow Tracing
For transactional workflows that need step-by-step timing (not LLM-specific):
from autonomize_observer.tracing import WorkflowTracer
with WorkflowTracer("process-order", order_id="123") as tracer:
with tracer.step("validate") as step:
validate_order()
step.set("items_count", 5)
with tracer.step("payment") as step:
result = process_payment()
step.set("amount", result.amount)
with tracer.step("fulfillment"):
send_to_warehouse()
tracer.set("status", "completed")
# Access timing data
for step in tracer.steps:
print(f"{step.name}: {step.duration_ms:.2f}ms")
Agent Tracing
The SDK provides two approaches for tracing agents/LLM workflows:
Standalone Agent Tracing (Recommended for new projects)
Use Logfire directly for modern OTEL-based tracing:
import logfire
from openai import OpenAI
# Configure Logfire (one-time setup)
logfire.configure(
service_name="my-agent",
send_to_logfire=False, # Keep data local or send to your OTEL collector
)
# Auto-instrument LLM clients
logfire.instrument_openai()
logfire.instrument_anthropic()
# Use normally - all LLM calls are automatically traced with token usage
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}]
)
AI Studio Integration (Legacy streaming format)
For AI Studio (Langflow) compatibility, use AgentTracer:
from uuid import uuid4
from autonomize_observer.tracing import AgentTracer
# Create tracer with Kafka streaming
tracer = AgentTracer(
trace_name="Customer Support Flow",
trace_id=uuid4(),
flow_id="flow-123",
kafka_bootstrap_servers="kafka:9092",
kafka_topic="genesis-traces-streaming",
# Optional: Enable dual export (Kafka + OTEL)
enable_otel=True,
)
# Trace workflow
tracer.start_trace()
tracer.add_trace("comp-1", "QueryAnalyzer", "llm", {"query": "..."})
# ... component execution ...
tracer.end_trace("comp-1", "QueryAnalyzer", {"result": "..."})
tracer.end(inputs={}, outputs={})
Architecture
SDK Components
graph TB
subgraph SDK["Autonomize Observer SDK"]
subgraph Core["core/"]
Config[KafkaConfig<br/>ObserverConfig]
Imports[imports.py<br/>Availability checks]
end
subgraph Tracing["tracing/"]
AT[AgentTracer]
WT[WorkflowTracer]
Factory[TracerFactory]
OTEL[OTELManager]
end
subgraph Audit["audit/"]
Logger[AuditLogger]
Context[ActorContext]
end
subgraph Exporters["exporters/"]
KafkaExp[KafkaExporter]
KafkaBase[BaseKafkaProducer]
end
subgraph Cost["cost/"]
Pricing[calculate_cost<br/>get_price]
end
subgraph Integrations["integrations/"]
FastAPI[FastAPI Middleware]
Langflow[Flow Tracing]
end
end
Tracing --> Core
Audit --> Core
Exporters --> Core
Integrations --> Tracing
Integrations --> Audit
Data Flow
flowchart LR
subgraph App["Your Application"]
LLM[LLM Calls]
WF[Workflows]
API[API Requests]
end
subgraph SDK["Autonomize Observer"]
AT[AgentTracer]
WT[WorkflowTracer]
AL[AuditLogger]
LF[Logfire]
end
subgraph Export["Export Destinations"]
K[Kafka]
OT[OTEL Collector]
LFD[Logfire Dashboard]
end
LLM --> LF
LLM --> AT
WF --> WT
API --> AL
AT --> K
AT --> OT
WT --> K
WT --> OT
AL --> K
LF --> OT
LF --> LFD
Class Hierarchy
classDiagram
class BaseTracer {
<<protocol>>
+start()
+end(outputs)
+__enter__()
+__exit__()
}
class TracerMixin {
+set(key, value)
+get_summary()
}
class AgentTracer {
+trace_id: UUID
+flow_id: str
+start_trace()
+add_trace()
+end_trace()
}
class WorkflowTracer {
+name: str
+steps: list
+step(name)
+duration_ms
}
class TracerFactory {
+config: ObserverConfig
+create_agent_tracer()
+create_workflow_tracer()
}
BaseTracer <|.. AgentTracer
BaseTracer <|.. WorkflowTracer
TracerMixin <|-- AgentTracer
TracerMixin <|-- WorkflowTracer
TracerFactory --> AgentTracer
TracerFactory --> WorkflowTracer
Directory Structure
Click to expand file tree
autonomize-observer/
├── audit/ # Audit logging with Keycloak support
│ ├── context.py # ActorContext and JWT parsing
│ └── logger.py # AuditLogger with convenience methods
├── core/ # Shared utilities and configuration
│ ├── config.py # KafkaConfig, ObserverConfig
│ ├── imports.py # Centralized dependency availability checks
│ └── kafka_utils.py # Shared Kafka config builder
├── cost/ # Cost calculation (wraps genai-prices)
│ └── pricing.py # calculate_cost, get_price
├── exporters/ # Event export (Kafka)
│ ├── base.py # BaseExporter interface
│ ├── kafka_base.py # BaseKafkaProducer (shared Kafka logic)
│ └── kafka.py # KafkaExporter for audit events
├── integrations/ # Framework integrations
│ ├── fastapi.py # FastAPI middleware
│ └── langflow.py # Langflow/Flow tracing
├── schemas/ # Pydantic models
│ ├── audit.py # AuditEvent, ChangeRecord
│ ├── base.py # BaseEvent
│ ├── streaming.py # TraceEvent for streaming
│ └── enums.py # AuditAction, ResourceType, etc.
└── tracing/ # Tracing module
├── base.py # BaseTracer protocol & TracerMixin
├── factory.py # TracerFactory for creating tracers
├── agent_tracer.py # AgentTracer for AI Studio
├── workflow_tracer.py # WorkflowTracer for step timing
├── kafka_trace_producer.py # Kafka streaming producer
├── otel_utils.py # OTELManager for Logfire integration
├── logfire_integration.py # Logfire configuration
└── utils/ # Utility modules
├── token_extractors.py # Strategy pattern for token extraction
├── model_utils.py # Model name normalization
└── serialization.py # Safe serialization utilities
Configuration
Environment Variables
# Kafka Configuration
export KAFKA_BOOTSTRAP_SERVERS="kafka:9092"
export KAFKA_AUDIT_TOPIC="audit-events"
export KAFKA_SECURITY_PROTOCOL="SASL_SSL"
export KAFKA_SASL_MECHANISM="PLAIN"
export KAFKA_SASL_USERNAME="user"
export KAFKA_SASL_PASSWORD="secret"
# Service Configuration
export SERVICE_NAME="my-service"
export SERVICE_VERSION="1.0.0"
export ENVIRONMENT="production"
Programmatic Configuration
from autonomize_observer import init, configure, ObserverConfig, KafkaConfig
# Option 1: Direct initialization
init(
service_name="my-service",
service_version="1.0.0",
environment="production",
send_to_logfire=False,
kafka_config=KafkaConfig(bootstrap_servers="kafka:9092"),
kafka_enabled=True,
)
# Option 2: Configuration object
config = ObserverConfig(
service_name="my-service",
kafka=KafkaConfig(bootstrap_servers="kafka:9092"),
kafka_enabled=True,
)
configure(config)
Testing
# Run all tests with coverage
uv run pytest tests/ -v --cov
# Run specific test modules
uv run pytest tests/test_audit_logger.py -v
uv run pytest tests/test_integrations.py -v
Test Coverage: 97%+ with 574 tests covering all modules.
Documentation
- INSTALL.md - Installation and setup guide
- GUIDE.md - Comprehensive usage guide
- INTEGRATIONS.md - Integration guides for Langflow, FastAPI, etc.
Requirements
- Python 3.10+
- Dependencies:
logfire>=4.0.0- OTEL tracinggenai-prices>=0.0.40- LLM cost calculationpydantic>=2.10.0- Data validationpyjwt>=2.10.0- JWT token parsingconfluent-kafka>=2.10.0- Kafka export (optional)
License
Proprietary - Autonomize AI
Support
- Issues: GitHub Issues
- Email: support@autonomize.ai
Autonomize Observer SDK v2.0.0 - Lightweight LLM observability and audit logging, powered by Pydantic Logfire and genai-prices.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file autonomize_observer-2.0.5.tar.gz.
File metadata
- Download URL: autonomize_observer-2.0.5.tar.gz
- Upload date:
- Size: 197.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b8ce18fe073ac1f7fc58e91f13bb2fbc3be3bb62238f44cfd424dc09329c2f09
|
|
| MD5 |
0a7494e91ca5bc49782873e914a3b88e
|
|
| BLAKE2b-256 |
3c946ed85edf91cf01db782a94696109a49c5cb3c3e83e1439a7558215ff2cc2
|
File details
Details for the file autonomize_observer-2.0.5-py3-none-any.whl.
File metadata
- Download URL: autonomize_observer-2.0.5-py3-none-any.whl
- Upload date:
- Size: 70.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
03623d27bb8698952618ac6c466b9573bc44c848423483d14e9415fdf082efe8
|
|
| MD5 |
39167f0361ffdacb1281593c039ec73e
|
|
| BLAKE2b-256 |
f7a687617ee54a70c9e781b492d86b399e067ba6deeb82af4f135b14efd730e1
|