Skip to main content

LLM & Agent Observability — structured tracing, Prometheus metrics, and OpenTelemetry export via Python decorators

Project description

Rastir

Rastir

LLM & Agent Observability for Python
Structured tracing and Prometheus metrics via decorators — no monkey-patching, no vendor lock-in.

PyPI Python Docs License GitHub


Why Rastir?

Most LLM observability tools require SDK wrappers, monkey-patching, or vendor-specific clients. Rastir takes a different approach:

  • Decorators, not wrappers — add @llm, @agent, @tool to your existing functions. No code rewrites.
  • Adapters, not monkey-patches — Rastir inspects return values to extract model, tokens, and provider metadata. Works with any SDK version.
  • Two-phase enrichment — model/provider metadata is captured from function arguments before the call and refined from the response after. If the API call fails, metadata still survives.
  • Self-hosted collector — a lightweight FastAPI server you own. Prometheus metrics out of the box, OTLP export to Tempo/Jaeger if you want it.
  • Zero external infrastructure — no database, no Redis, no Kafka. The collector is stateless and runs in a single container.
Your Python App                          Rastir Collector
┌──────────────────────────────┐         ┌──────────────────────────────┐
│  @agent                      │  HTTP   │  FastAPI ingestion            │
│    @llm (OpenAI)             │ ──────▸ │  ├─ Prometheus /metrics       │
│    @tool (search)            │  spans  │  ├─ Trace store /v1/traces    │
│    @retrieval (RAG)          │         │  ├─ Sampling & backpressure   │
│                              │         │  └─ OTLP → Tempo/Jaeger      │
│  Two-phase enrichment:       │         │                                │
│    request args → response   │         │  Defence-in-depth:             │
│                              │         │    cardinality guards          │
│  wrap(obj, name="cache")     │         │    error normalisation         │
└──────────────────────────────┘         │    bounded enum validation     │
        decorators + wrap()              └──────────────────────────────┘

Supported Providers

Provider Auto-detection Tokens Model Streaming Request-phase
OpenAI
Azure OpenAI
Anthropic
AWS Bedrock
Google Gemini
Cohere
Mistral
Groq
LangChain
LangGraph
LlamaIndex
CrewAI

15 adapters are priority-ordered and composable: LangGraph → LangChain → OpenAI resolution happens automatically.

Request-phase enrichment: For provider adapters, model/provider metadata is extracted from function kwargs (e.g., model="gpt-4o") before the API call. If the call fails, the span still contains the model and provider.

MCP Distributed Tracing

Rastir supports distributed tracing across MCP (Model Context Protocol) tool boundaries. Trace context flows automatically from client to server via tool arguments — no _meta, no HTTP headers.

Server side — the MCP server must call configure() independently to push its server-side spans to the collector:

# ── MCP Server (separate process) ─────────────────
from rastir import configure, mcp_endpoint

configure(service="tool-server", push_url="http://localhost:8080")

@mcp.tool()
@mcp_endpoint
async def search(query: str) -> str:
    return db.search(query)       # server span created with remote="false"

Client side — wrap the MCP session or use the LangChain bridge:

# ── Client (your agent process) ───────────────────
from rastir import configure, agent_span, trace_remote_tools, mcp_to_langchain_tools

configure(service="my-agent", push_url="http://localhost:8080")

# Option 1: Direct MCP session
@agent_span(agent_name="my_agent")
async def run():
    async with streamable_http_client(url) as (read, write, _):
        async with ClientSession(read, write) as session:
            await session.initialize()

            @trace_remote_tools
            def wrap():
                return session

            wrapped = wrap()
            result = await wrapped.call_tool("search", {"query": "hello"})
            # client span created with remote="true", trace context injected

# Option 2: LangGraph agent (one-line bridge)
async with ClientSession(read, write) as session:
    await session.initialize()
    tools = await mcp_to_langchain_tools(session)   # automatic trace injection
    agent = create_react_agent(llm, tools)           # ready to use

Both processes must call configure(push_url=...) — the client pushes client spans, the server pushes server spans. Both arrive at the same collector and are linked by trace_id.

Trace topology:

Agent Span
└── Tool Client Span  (remote="true",  model/provider inherited)
      └── Tool Server Span (remote="false", same trace_id)

Full MCP documentation → MCP Distributed Tracing

Installation

pip install rastir              # Client library (decorators + HTTP push)
pip install rastir[server]      # + Collector server (FastAPI, Prometheus, OTLP)
pip install rastir[all]         # Everything including dev tools

Quick Start

1. Instrument your code (3 lines to add)

from rastir import configure, agent, llm, tool, retrieval

configure(
    service="my-app",
    push_url="http://localhost:8080/v1/telemetry",
)

@agent(agent_name="research_agent")
def run_research(query: str) -> str:
    context = fetch_docs(query)
    return ask_llm(query, context)

@retrieval
def fetch_docs(query: str) -> list[str]:
    return vector_db.search(query)           # auto-tracked

@llm(model="gpt-4o", provider="openai")
def ask_llm(query: str, context: list[str]) -> str:
    return openai.chat(messages=[...])        # tokens & model extracted automatically

2. Start the collector

rastir-server                              # default: 0.0.0.0:8080
# or
docker run -p 8080:8080 rastir-server

3. Query metrics

curl http://localhost:8080/metrics          # Prometheus format
curl http://localhost:8080/v1/traces        # JSON trace store

That's it. Prometheus scrapes /metrics, you build Grafana dashboards, and optionally forward spans to Tempo or Jaeger via OTLP.

What you get in Prometheus

# Token usage by model
rastir_tokens_input_total{model="gpt-4o",provider="openai",agent="research_agent"} 1250
rastir_tokens_output_total{model="gpt-4o",provider="openai",agent="research_agent"} 380

# Latency percentiles
rastir_duration_seconds_bucket{span_type="llm",le="0.5"} 12
rastir_duration_seconds_bucket{span_type="llm",le="1.0"} 45

# Tool & retrieval call rates
rastir_tool_calls_total{tool_name="web_search",agent="research_agent"} 89
rastir_retrieval_calls_total{agent="research_agent"} 156

# Error tracking with normalised categories
rastir_errors_total{span_type="llm",error_type="rate_limit"} 7
rastir_errors_total{span_type="llm",error_type="timeout"} 3

Two-Phase Enrichment

Rastir captures metadata in two phases to ensure observability even when API calls fail:

Phase 1 (request): Scan function kwargs for model/provider
  └─ e.g., model="gpt-4o" extracted before the call

Phase 2 (response): Adapter pipeline extracts from return value
  └─ Concrete response values override request-phase guesses
  └─ If call raises, request-phase metadata survives

Example — failed API call still produces useful metrics:

@llm
def ask_model(query: str):
    return openai.chat.completions.create(
        model="gpt-4o",          # ← captured in Phase 1
        messages=[...],
    )
    # If this raises RateLimitError, the span still records:
    #   model="gpt-4o", provider="openai", status="ERROR"
    #   error_type="rate_limit"

Nested Spans

Rastir automatically links parent–child relationships for agent call trees:

@agent(agent_name="supervisor")
def supervisor(task):
    plan = planner(task)            # nested agent
    return executor(plan)

@agent(agent_name="planner")
def planner(task):
    return ask_llm(task)            # nested LLM call

@llm(model="gpt-4o")
def ask_llm(prompt):
    return openai.chat(messages=[...])
supervisor (agent, 3200ms)
├── planner (agent, 1100ms)
│   └── ask_llm (llm, 980ms) → model=gpt-4o, tokens_in=150, tokens_out=85
└── executor (agent, 2000ms)
    ├── web_search (tool, 450ms)
    └── ask_llm (llm, 1200ms) → model=gpt-4o, tokens_in=320, tokens_out=200

Works with LangGraph

from langgraph.prebuilt import create_react_agent

app = create_react_agent(ChatOpenAI(model="gpt-4o-mini"), tools=[search, calc])

@agent(agent_name="react_agent")
def run(query: str):
    return app.invoke({"messages": [HumanMessage(query)]})
    # Rastir auto-detects LangGraph state → LangChain messages → OpenAI response
    # Extracts: model, tokens, tool calls, message counts — zero config

Works with CrewAI

from crewai import Agent, Task, Crew, LLM

crewai_llm = LLM(model="gemini/gemini-2.5-flash", api_key="...")
researcher = Agent(role="Researcher", goal="Research topics", llm=crewai_llm, tools=[...])
task = Task(description="Research AI trends", expected_output="Summary", agent=researcher)
crew = Crew(agents=[researcher], tasks=[task])

@agent(agent_name="crewai_agent")
def run():
    @llm(model="gemini-2.5-flash", provider="gemini")
    def invoke():
        return crew.kickoff()
        # Rastir detects CrewOutput → extracts crewai_task_count,
        # crewai_total_tokens, crewai_successful_requests, tokens_input/output
    return invoke()

Generic Object Wrapper

Instrument any object without decorator access using rastir.wrap():

import rastir

# Wrap a Redis client, vector store, or any infrastructure component
wrapped_cache = rastir.wrap(redis_client, name="redis")
wrapped_cache.get("key")       # creates INFRA span: "redis.get"
wrapped_cache.set("key", val)  # creates INFRA span: "redis.set"

# Wrap with filtering
wrapped_db = rastir.wrap(db_client, name="postgres",
                         include=["query", "execute"],
                         span_type="tool")
  • Supports sync + async methods
  • Preserves isinstance() behaviour
  • Prevents double-wrapping
  • Configurable span_type: infra, tool, llm, trace, agent, retrieval

Bedrock Guardrail Observability

Rastir automatically detects and tracks AWS Bedrock guardrails:

@llm
def call_bedrock(prompt: str):
    return bedrock.converse(
        modelId="anthropic.claude-3-sonnet",
        messages=[...],
        guardrailIdentifier="my-guardrail",  # auto-detected
        guardrailVersion="1",
    )

Produces metrics:

rastir_guardrail_requests_total{guardrail_id="my-guardrail",provider="bedrock"} 42
rastir_guardrail_violations_total{guardrail_action="GUARDRAIL_INTERVENED",model="claude-3"} 3

Guardrail labels are cardinality-guarded on both client and server side:

  • guardrail_category is validated against a bounded enum (CONTENT_POLICY, TOPIC_POLICY, etc.)
  • guardrail_action is validated against a bounded enum (GUARDRAIL_INTERVENED, NONE)
  • Unknown values are replaced with __cardinality_overflow__

Error Normalisation

Raw exception types are normalised into six fixed categories to prevent label explosion:

Category Example exceptions
timeout TimeoutError, httpx.ReadTimeout, openai.APITimeoutError
rate_limit RateLimitError, openai.RateLimitError, anthropic.RateLimitError
validation_error ValueError, TypeError, pydantic.ValidationError
provider_error openai.APIError, anthropic.APIStatusError, botocore.ClientError
internal_error RuntimeError, Exception
unknown Anything else

Key Metrics at a Glance

Metric Type What it tracks
rastir_llm_calls_total Counter LLM invocations by model, provider, agent
rastir_tokens_input_total Counter Input token consumption
rastir_tokens_output_total Counter Output token consumption
rastir_duration_seconds Histogram Latency with P50/P95/P99 + exemplars
rastir_tokens_per_call Histogram Token distribution per LLM call
rastir_tool_calls_total Counter Tool invocations by name, agent, model, provider
rastir_retrieval_calls_total Counter Retrieval operations by agent
rastir_errors_total Counter Failures by span type and normalised error type
rastir_guardrail_requests_total Counter LLM calls with guardrail config
rastir_guardrail_violations_total Counter Guardrail interventions by action/category
rastir_spans_sampled_total Counter Spans retained after sampling
rastir_spans_dropped_by_sampling_total Counter Spans dropped by sampling
rastir_backpressure_warnings_total Counter Queue soft-limit warnings
rastir_ingestion_rate Gauge Spans per second throughput
rastir_queue_utilization_percent Gauge Collector backpressure indicator

Full metrics reference → Server Documentation

Server Endpoints

Method Path Description
POST /v1/telemetry Ingest span batches
GET /metrics Prometheus exposition
GET /v1/traces Query trace store
GET /v1/traces/{trace_id} Get spans for a specific trace
GET /health Liveness probe
GET /ready Readiness probe (queue pressure)

Server Features

  • Sampling — probabilistic + error-always-retain + latency threshold (metrics always recorded regardless)
  • Backpressure — soft/hard queue limits with reject or drop-oldest mode
  • Rate limiting — per-IP and per-service RPM limits
  • Multi-tenant — inject tenant label from HTTP header
  • Exemplars — trace_id linked to histogram observations for Grafana → Jaeger drill-down
  • OTLP export — forward spans to Tempo, Jaeger, or any OTLP backend
  • Cardinality guards — per-dimension caps (model: 50, provider: 10, tool: 200, agent: 200, etc.)
  • Graceful shutdown — drains queue and flushes exporter before exit

Configuration

Configure via configure() call or environment variables:

configure(
    service="my-app",
    env="production",
    push_url="http://collector:8080/v1/telemetry",
    api_key="secret",
    batch_size=100,
    flush_interval=5,
)

Or equivalently:

export RASTIR_SERVICE=my-app
export RASTIR_ENV=production
export RASTIR_PUSH_URL=http://collector:8080/v1/telemetry

Full configuration reference → Configuration Documentation

Project Structure

src/rastir/
├── __init__.py          # Public API: configure, trace, agent, llm, tool, retrieval, wrap
├── config.py            # GlobalConfig, configure()
├── context.py           # Span & agent context (ContextVar-based)
├── decorators.py        # All decorator implementations + two-phase enrichment
├── remote.py            # MCP distributed tracing: trace_remote_tools, mcp_endpoint,
│                        #   mcp_to_langchain_tools — argument-based trace propagation
├── wrapper.py           # rastir.wrap() generic object wrapper
├── spans.py             # SpanRecord data model
├── queue.py             # Bounded in-memory span queue
├── transport.py         # TelemetryClient + BackgroundExporter
├── adapters/            # 15 adapters: OpenAI, Azure, Anthropic, Bedrock, Gemini,
│                        #   Cohere, Mistral, Groq, LangChain, LangGraph, LlamaIndex, CrewAI
│   └── registry.py      # Adapter resolution pipeline + request-phase scanning
└── server/              # FastAPI collector
    ├── app.py           # Server factory, routes, lifespan
    ├── config.py        # Server configuration (YAML + env vars)
    ├── metrics.py       # MetricsRegistry — Prometheus counters/histograms/gauges
    ├── ingestion.py     # IngestionWorker — queue → record_span() → store/export
    └── trace_store.py   # In-memory trace store with LRU eviction

Development

pip install -e ".[all]"           # editable install with all extras
pytest                            # 232+ unit/mock tests, 36+ integration tests
ruff check src/ tests/            # linting

Grafana Dashboards

Rastir ships five pre-built Grafana dashboards in grafana/dashboards/:

Dashboard Description
LLM Performance Token usage, latency percentiles, throughput by model, error tracking
Agent & Tool Agent execution patterns, tool calls with model/provider context
Evaluation Eval runs/success/failures, scores by type and model, queue health
Guardrail Guardrail violations by category and model, request volumes
System Health Ingestion rate, queue pressure, memory, backpressure, OTLP export health

All dashboards include template variables for filtering by service, environment, model, provider, and agent. Import via Grafana UI or API.

Full dashboard documentation → Dashboards

Documentation

Full documentation at skamalj.github.io/rastir:

License

MIT — see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rastir-0.1.1.tar.gz (603.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rastir-0.1.1-py3-none-any.whl (105.0 kB view details)

Uploaded Python 3

File details

Details for the file rastir-0.1.1.tar.gz.

File metadata

  • Download URL: rastir-0.1.1.tar.gz
  • Upload date:
  • Size: 603.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for rastir-0.1.1.tar.gz
Algorithm Hash digest
SHA256 a097d0b2c0c61907fab6102263c1a5fdda6a22a4875b392bb256703e42dbeff5
MD5 77e14dfd5dea12775c8dfd76fd49abba
BLAKE2b-256 83dc17988039acfab6291b44f6ec167ae0f0c032f2d32eaaa937dc08b125197e

See more details on using hashes here.

File details

Details for the file rastir-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: rastir-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 105.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for rastir-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 7586c5ddc5bafc48d857baae6a13c00936d0ab90f8a5d6c6d78ea96c48471990
MD5 63b61f3331d387a7aee382a9f3a563c4
BLAKE2b-256 cc590b32b8f72b040557c996568788006a7cc34f1c4d72659c71b68ee180ee87

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page