Skip to main content

LLM & Agent Observability — structured tracing, Prometheus metrics, and OpenTelemetry export via Python decorators

Project description

Rastir

Rastir

LLM & Agent Observability for Python
Structured tracing and Prometheus metrics via decorators — no monkey-patching, no vendor lock-in.

PyPI Python Docs License GitHub


Why Rastir?

Most LLM observability tools require SDK wrappers, monkey-patching, or vendor-specific clients. Rastir takes a different approach:

  • Decorators, not wrappers — add @llm, @agent, @tool to your existing functions. No code rewrites.
  • Adapters, not monkey-patches — Rastir inspects return values to extract model, tokens, and provider metadata. Works with any SDK version.
  • Two-phase enrichment — model/provider metadata is captured from function arguments before the call and refined from the response after. If the API call fails, metadata still survives.
  • Self-hosted collector — a lightweight FastAPI server you own. Prometheus metrics out of the box, OTLP export to Tempo/Jaeger if you want it.
  • Zero external infrastructure — no database, no Redis, no Kafka. The collector is stateless and runs in a single container.
Your Python App                          Rastir Collector
┌──────────────────────────────┐         ┌──────────────────────────────┐
│  @agent                      │  HTTP   │  FastAPI ingestion            │
│    @llm (OpenAI)             │ ──────▸ │  ├─ Prometheus /metrics       │
│    @tool (search)            │  spans  │  ├─ Trace store /v1/traces    │
│    @retrieval (RAG)          │         │  ├─ Sampling & backpressure   │
│                              │         │  └─ OTLP → Tempo/Jaeger      │
│  Two-phase enrichment:       │         │                                │
│    request args → response   │         │  Defence-in-depth:             │
│                              │         │    cardinality guards          │
│  wrap(obj, name="cache")     │         │    error normalisation         │
└──────────────────────────────┘         │    bounded enum validation     │
        decorators + wrap()              └──────────────────────────────┘

Supported Providers

Provider Auto-detection Tokens Model Streaming Request-phase
OpenAI
Azure OpenAI
Anthropic
AWS Bedrock
Google Gemini
Cohere
Mistral
Groq
LangChain
LangGraph
LlamaIndex
CrewAI

15 adapters are priority-ordered and composable: LangGraph → LangChain → OpenAI resolution happens automatically.

Request-phase enrichment: For provider adapters, model/provider metadata is extracted from function kwargs (e.g., model="gpt-4o") before the API call. If the call fails, the span still contains the model and provider.

MCP Distributed Tracing

Rastir supports distributed tracing across MCP (Model Context Protocol) tool boundaries. Trace context flows automatically from client to server via tool arguments — no _meta, no HTTP headers.

from rastir import configure, agent_span, trace_remote_tools, mcp_endpoint, mcp_to_langchain_tools

configure(service="my-app", push_url="http://localhost:8080")

# Server side: add @mcp_endpoint under @mcp.tool()
@mcp.tool()
@mcp_endpoint
async def search(query: str) -> str:
    return db.search(query)       # server span created with remote="false"

# Client side — Option 1: Direct MCP session
@agent_span(agent_name="my_agent")
async def run():
    async with streamable_http_client(url) as (read, write, _):
        async with ClientSession(read, write) as session:
            await session.initialize()

            @trace_remote_tools
            def wrap():
                return session
            
            wrapped = wrap()
            result = await wrapped.call_tool("search", {"query": "hello"})
            # client span created with remote="true", trace context injected

# Client side — Option 2: LangGraph agent (one-line bridge)
async with ClientSession(read, write) as session:
    await session.initialize()
    tools = await mcp_to_langchain_tools(session)   # automatic trace injection
    agent = create_react_agent(llm, tools)           # ready to use

Trace topology:

Agent Span
└── Tool Client Span  (remote="true",  model/provider inherited)
      └── Tool Server Span (remote="false", same trace_id)

Full MCP documentation → MCP Distributed Tracing

Installation

pip install rastir              # Client library (decorators + HTTP push)
pip install rastir[server]      # + Collector server (FastAPI, Prometheus, OTLP)
pip install rastir[all]         # Everything including dev tools

Quick Start

1. Instrument your code (3 lines to add)

from rastir import configure, agent, llm, tool, retrieval

configure(
    service="my-app",
    push_url="http://localhost:8080/v1/telemetry",
)

@agent(agent_name="research_agent")
def run_research(query: str) -> str:
    context = fetch_docs(query)
    return ask_llm(query, context)

@retrieval
def fetch_docs(query: str) -> list[str]:
    return vector_db.search(query)           # auto-tracked

@llm(model="gpt-4o", provider="openai")
def ask_llm(query: str, context: list[str]) -> str:
    return openai.chat(messages=[...])        # tokens & model extracted automatically

2. Start the collector

rastir-server                              # default: 0.0.0.0:8080
# or
docker run -p 8080:8080 rastir-server

3. Query metrics

curl http://localhost:8080/metrics          # Prometheus format
curl http://localhost:8080/v1/traces        # JSON trace store

That's it. Prometheus scrapes /metrics, you build Grafana dashboards, and optionally forward spans to Tempo or Jaeger via OTLP.

What you get in Prometheus

# Token usage by model
rastir_tokens_input_total{model="gpt-4o",provider="openai",agent="research_agent"} 1250
rastir_tokens_output_total{model="gpt-4o",provider="openai",agent="research_agent"} 380

# Latency percentiles
rastir_duration_seconds_bucket{span_type="llm",le="0.5"} 12
rastir_duration_seconds_bucket{span_type="llm",le="1.0"} 45

# Tool & retrieval call rates
rastir_tool_calls_total{tool_name="web_search",agent="research_agent"} 89
rastir_retrieval_calls_total{agent="research_agent"} 156

# Error tracking with normalised categories
rastir_errors_total{span_type="llm",error_type="rate_limit"} 7
rastir_errors_total{span_type="llm",error_type="timeout"} 3

Two-Phase Enrichment

Rastir captures metadata in two phases to ensure observability even when API calls fail:

Phase 1 (request): Scan function kwargs for model/provider
  └─ e.g., model="gpt-4o" extracted before the call

Phase 2 (response): Adapter pipeline extracts from return value
  └─ Concrete response values override request-phase guesses
  └─ If call raises, request-phase metadata survives

Example — failed API call still produces useful metrics:

@llm
def ask_model(query: str):
    return openai.chat.completions.create(
        model="gpt-4o",          # ← captured in Phase 1
        messages=[...],
    )
    # If this raises RateLimitError, the span still records:
    #   model="gpt-4o", provider="openai", status="ERROR"
    #   error_type="rate_limit"

Nested Spans

Rastir automatically links parent–child relationships for agent call trees:

@agent(agent_name="supervisor")
def supervisor(task):
    plan = planner(task)            # nested agent
    return executor(plan)

@agent(agent_name="planner")
def planner(task):
    return ask_llm(task)            # nested LLM call

@llm(model="gpt-4o")
def ask_llm(prompt):
    return openai.chat(messages=[...])
supervisor (agent, 3200ms)
├── planner (agent, 1100ms)
│   └── ask_llm (llm, 980ms) → model=gpt-4o, tokens_in=150, tokens_out=85
└── executor (agent, 2000ms)
    ├── web_search (tool, 450ms)
    └── ask_llm (llm, 1200ms) → model=gpt-4o, tokens_in=320, tokens_out=200

Works with LangGraph

from langgraph.prebuilt import create_react_agent

app = create_react_agent(ChatOpenAI(model="gpt-4o-mini"), tools=[search, calc])

@agent(agent_name="react_agent")
def run(query: str):
    return app.invoke({"messages": [HumanMessage(query)]})
    # Rastir auto-detects LangGraph state → LangChain messages → OpenAI response
    # Extracts: model, tokens, tool calls, message counts — zero config

Generic Object Wrapper

Instrument any object without decorator access using rastir.wrap():

import rastir

# Wrap a Redis client, vector store, or any infrastructure component
wrapped_cache = rastir.wrap(redis_client, name="redis")
wrapped_cache.get("key")       # creates INFRA span: "redis.get"
wrapped_cache.set("key", val)  # creates INFRA span: "redis.set"

# Wrap with filtering
wrapped_db = rastir.wrap(db_client, name="postgres",
                         include=["query", "execute"],
                         span_type="tool")
  • Supports sync + async methods
  • Preserves isinstance() behaviour
  • Prevents double-wrapping
  • Configurable span_type: infra, tool, llm, trace, agent, retrieval

Bedrock Guardrail Observability

Rastir automatically detects and tracks AWS Bedrock guardrails:

@llm
def call_bedrock(prompt: str):
    return bedrock.converse(
        modelId="anthropic.claude-3-sonnet",
        messages=[...],
        guardrailIdentifier="my-guardrail",  # auto-detected
        guardrailVersion="1",
    )

Produces metrics:

rastir_guardrail_requests_total{guardrail_id="my-guardrail",provider="bedrock"} 42
rastir_guardrail_violations_total{guardrail_action="GUARDRAIL_INTERVENED",model="claude-3"} 3

Guardrail labels are cardinality-guarded on both client and server side:

  • guardrail_category is validated against a bounded enum (CONTENT_POLICY, TOPIC_POLICY, etc.)
  • guardrail_action is validated against a bounded enum (GUARDRAIL_INTERVENED, NONE)
  • Unknown values are replaced with __cardinality_overflow__

Error Normalisation

Raw exception types are normalised into six fixed categories to prevent label explosion:

Category Example exceptions
timeout TimeoutError, httpx.ReadTimeout, openai.APITimeoutError
rate_limit RateLimitError, openai.RateLimitError, anthropic.RateLimitError
validation_error ValueError, TypeError, pydantic.ValidationError
provider_error openai.APIError, anthropic.APIStatusError, botocore.ClientError
internal_error RuntimeError, Exception
unknown Anything else

Key Metrics at a Glance

Metric Type What it tracks
rastir_llm_calls_total Counter LLM invocations by model, provider, agent
rastir_tokens_input_total Counter Input token consumption
rastir_tokens_output_total Counter Output token consumption
rastir_duration_seconds Histogram Latency with P50/P95/P99 + exemplars
rastir_tokens_per_call Histogram Token distribution per LLM call
rastir_tool_calls_total Counter Tool invocations by name, agent, model, provider
rastir_retrieval_calls_total Counter Retrieval operations by agent
rastir_errors_total Counter Failures by span type and normalised error type
rastir_guardrail_requests_total Counter LLM calls with guardrail config
rastir_guardrail_violations_total Counter Guardrail interventions by action/category
rastir_spans_sampled_total Counter Spans retained after sampling
rastir_spans_dropped_by_sampling_total Counter Spans dropped by sampling
rastir_backpressure_warnings_total Counter Queue soft-limit warnings
rastir_ingestion_rate Gauge Spans per second throughput
rastir_queue_utilization_percent Gauge Collector backpressure indicator

Full metrics reference → Server Documentation

Server Endpoints

Method Path Description
POST /v1/telemetry Ingest span batches
GET /metrics Prometheus exposition
GET /v1/traces Query trace store
GET /v1/traces/{trace_id} Get spans for a specific trace
GET /health Liveness probe
GET /ready Readiness probe (queue pressure)

Server Features

  • Sampling — probabilistic + error-always-retain + latency threshold (metrics always recorded regardless)
  • Backpressure — soft/hard queue limits with reject or drop-oldest mode
  • Rate limiting — per-IP and per-service RPM limits
  • Multi-tenant — inject tenant label from HTTP header
  • Exemplars — trace_id linked to histogram observations for Grafana → Jaeger drill-down
  • OTLP export — forward spans to Tempo, Jaeger, or any OTLP backend
  • Cardinality guards — per-dimension caps (model: 50, provider: 10, tool: 200, agent: 200, etc.)
  • Graceful shutdown — drains queue and flushes exporter before exit

Configuration

Configure via configure() call or environment variables:

configure(
    service="my-app",
    env="production",
    push_url="http://collector:8080/v1/telemetry",
    api_key="secret",
    batch_size=100,
    flush_interval=5,
)

Or equivalently:

export RASTIR_SERVICE=my-app
export RASTIR_ENV=production
export RASTIR_PUSH_URL=http://collector:8080/v1/telemetry

Full configuration reference → Configuration Documentation

Project Structure

src/rastir/
├── __init__.py          # Public API: configure, trace, agent, llm, tool, retrieval, wrap
├── config.py            # GlobalConfig, configure()
├── context.py           # Span & agent context (ContextVar-based)
├── decorators.py        # All decorator implementations + two-phase enrichment
├── remote.py            # MCP distributed tracing: trace_remote_tools, mcp_endpoint,
│                        #   mcp_to_langchain_tools — argument-based trace propagation
├── wrapper.py           # rastir.wrap() generic object wrapper
├── spans.py             # SpanRecord data model
├── queue.py             # Bounded in-memory span queue
├── transport.py         # TelemetryClient + BackgroundExporter
├── adapters/            # 15 adapters: OpenAI, Azure, Anthropic, Bedrock, Gemini,
│                        #   Cohere, Mistral, Groq, LangChain, LangGraph, LlamaIndex, CrewAI
│   └── registry.py      # Adapter resolution pipeline + request-phase scanning
└── server/              # FastAPI collector
    ├── app.py           # Server factory, routes, lifespan
    ├── config.py        # Server configuration (YAML + env vars)
    ├── metrics.py       # MetricsRegistry — Prometheus counters/histograms/gauges
    ├── ingestion.py     # IngestionWorker — queue → record_span() → store/export
    └── trace_store.py   # In-memory trace store with LRU eviction

Development

pip install -e ".[all]"           # editable install with all extras
pytest                            # 232+ unit/mock tests, 36+ integration tests
ruff check src/ tests/            # linting

Grafana Dashboards

Rastir ships five pre-built Grafana dashboards in grafana/dashboards/:

Dashboard Description
LLM Performance Token usage, latency percentiles, throughput by model, error tracking
Agent & Tool Agent execution patterns, tool calls with model/provider context
Evaluation Eval runs/success/failures, scores by type and model, queue health
Guardrail Guardrail violations by category and model, request volumes
System Health Ingestion rate, queue pressure, memory, backpressure, OTLP export health

All dashboards include template variables for filtering by service, environment, model, provider, and agent. Import via Grafana UI or API.

Full dashboard documentation → Dashboards

Documentation

Full documentation at skamalj.github.io/rastir:

License

MIT — see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rastir-0.1.0rc3.tar.gz (597.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rastir-0.1.0rc3-py3-none-any.whl (104.5 kB view details)

Uploaded Python 3

File details

Details for the file rastir-0.1.0rc3.tar.gz.

File metadata

  • Download URL: rastir-0.1.0rc3.tar.gz
  • Upload date:
  • Size: 597.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for rastir-0.1.0rc3.tar.gz
Algorithm Hash digest
SHA256 306d8cc7f4d2fea5214d3fdd5bda8e9d09f9c55c50dda2c7f817e6799f3383c9
MD5 f0ace7ec8f75ea37f5f0ae6fae338f12
BLAKE2b-256 6f0c0f6ea8620c8717faef4be51ededaa13b09a2049dae89f99120e8df135d6a

See more details on using hashes here.

File details

Details for the file rastir-0.1.0rc3-py3-none-any.whl.

File metadata

  • Download URL: rastir-0.1.0rc3-py3-none-any.whl
  • Upload date:
  • Size: 104.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for rastir-0.1.0rc3-py3-none-any.whl
Algorithm Hash digest
SHA256 0c36e01fee515cab984f987eb088d9d83eab4fb16bd24765d12184820ee7e107
MD5 6eef604303b91362b39221cc3ce5c848
BLAKE2b-256 4ca67c2713210eb4ebaaa5e3a610e1513acb91934ba2f90ddd7e825bf6ec548e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page