Skip to main content

Python SDK for MV37 Rollout

Project description

mv37-rollout

Python SDK for Rollout

Install

pip install mv37-rollout
# or
uv add mv37-rollout

Requires Python 3.12+. The base install stays lightweight (only httpx and pydantic) and pulls in no provider SDKs. Provider integrations are optional extras:

uv add "mv37-rollout[openai]"
uv add "mv37-rollout[anthropic]"

Quickstart

from mv37.rollout import Rollout, usage_from_openai

client = Rollout(api_key="...", agent_name="support_agent", environment="production")

Or set ROLLOUT_API_KEY and call Rollout() with no arguments.

Trace and span

with client.trace("support_agent", conversation_id="thread_123", user_id="cus_123") as trace:
    trace.message(role="user", content="Where is my order?")

    with trace.llm("openai.responses", model="gpt-4.1-mini") as span:
        span.record_input({"messages": [{"role": "user", "content": "Where is my order?"}]})
        response = openai_client.chat.completions.create(...)
        span.record_output(response)  # pydantic responses are serialized automatically
        span.set_usage(**usage_from_openai(response))

    trace.message(role="assistant", content="Your order has shipped.")
    trace.feedback("thumbs_up", True)

usage_from_openai and usage_from_anthropic pull token counts off the provider response. record_input / record_output accept pydantic models directly, so you don't need to serialize responses yourself.

Tools

Record a tool call as a paired tool.call / tool.result. Pass the LLM's tool-call id so the two events link to the assistant message:

for tool_call in response.choices[0].message.tool_calls:
    args = json.loads(tool_call.function.arguments)
    with trace.tool(tool_call.function.name, tool_call_id=tool_call.id, arguments=args) as call:
        result = run_tool(tool_call.function.name, args)
        call.record_output(result)

Streaming

By default, per-chunk events are not persisted. A rolling preview and periodic updates are sent instead. Call span.end() after the stream to record the final output and usage.

with trace.llm("openai.responses", model="gpt-4.1-mini", stream=True) as span:
    span.record_input({"messages": messages})
    full_text = ""
    for chunk in stream:
        delta = chunk.choices[0].delta.content or ""
        full_text += delta
        span.record_chunk(delta)
    span.end(output=full_text, usage={"input_tokens": 512, "output_tokens": 128})

User identity

# One-off identity event
client.identify_user("cus_123", traits={"plan": "pro", "company": "Acme"})

# Scoped context — all traces in this block inherit the user
with client.user(user_id="cus_123", traits={"plan": "pro"}):
    with client.trace("agent_run") as trace:
        ...

Check connectivity

check() sends a diagnostic trace through the real ingest path and prints a one-line summary:

client.check()  # ✓ Rollout connected — 3 event(s) accepted

It also returns a CheckResult (ok, accepted, rejected, errors, message); pass verbose=False to silence the print.

Async apps

async with client.trace("support_agent") as trace:
    async with trace.llm("openai.responses", model="gpt-4.1-mini") as span:
        ...

# In your ASGI shutdown handler:
await client.ashutdown()

Provider integrations

The fastest way to instrument LLM calls is to wrap the provider client. This is explicit, object-level instrumentation — only the object you pass in is touched, there is no global monkeypatching, and wrapping is idempotent.

import mv37.rollout as rollout
from openai import OpenAI

rollout.init(api_key="...", agent_name="support_agent")

openai_client = rollout.wrap(OpenAI())  # or client.wrap(OpenAI()) with an explicit client
response = openai_client.responses.create(model="gpt-4.1-mini", input="Hello")

wrap instruments both responses.create and chat.completions.create, sync and async, streaming and non-streaming. Each call records an llm span with the model, provider, options, input/output, token usage, latency, and errors. If there is an active trace the span attaches to it; otherwise an implicit single-call trace is created. The original provider response (or stream) is returned unchanged, and provider exceptions propagate untouched. Streaming calls yield the original chunks while a rolling preview and final output/usage are recorded.

wrap auto-detects supported clients and raises UnsupportedIntegrationError for anything it does not recognize — it never silently no-ops. AsyncOpenAI is supported the same way.

Anthropic

Anthropic clients work identically — wrap instruments messages.create on Anthropic / AsyncAnthropic:

from anthropic import Anthropic

claude = rollout.wrap(Anthropic())
message = claude.messages.create(
    model="claude-sonnet-4-5",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}],
)

The same span semantics apply: model/provider/options, input (messages, system, tools), output, token usage (including cache tokens), latency, errors, streaming preview, and attach-vs-implicit trace. The explicit entry point is mv37.rollout.integrations.anthropic.wrap_anthropic.

OpenAI-compatible gateways

Gateways such as OpenRouter use the OpenAI SDK, so the same wrapper applies. Force the integration with provider="openai" and tag the upstream provider via attributes:

openrouter = rollout.wrap(
    OpenAI(base_url="https://openrouter.ai/api/v1", api_key="..."),
    provider="openai",
    attributes={"provider": "openrouter"},
)

Agent boundary

@rollout.agent marks a custom agent entry point. It opens a fresh trace per call (named after the function), captures the arguments and return value, flushes on completion, and records and re-raises errors. It works on sync and async functions. Combined with rollout.wrap, this is the whole setup for a hand-written agent loop:

openai_client = rollout.wrap(OpenAI())

@rollout.agent
def run_agent(user_message: str) -> str:
    response = openai_client.responses.create(model="gpt-4.1-mini", input=user_message)
    return response.output_text

Pass capture_input=False / capture_output=False to skip recording arguments or the return value.

Tool registries

wrap_tools instruments a tool registry so each call records a tool.call / tool.result pair without a manual with trace.tool(...) block. It accepts a {name: callable} mapping or an iterable of callables, and supports sync and async functions:

tools = rollout.wrap_tools({
    "search": search,
    "refund_order": refund_order,
})

result = tools["search"]("latest invoice")  # recorded automatically

Wrapped tools require an active trace (e.g. inside a @rollout.agent function); pass implicit_trace=True to create a one-off trace when called outside one. Return values and exceptions are preserved.

Module-level API and decorators

If you prefer not to pass a client around, initialize one global client and use the module-level functions and decorators. They route through the client created by rollout.init().

import mv37.rollout as rollout

rollout.init(api_key="...", agent_name="support_agent", environment="production")

Decorate an entry point to open a trace per call, and decorate tools to capture their inputs and outputs. Both work on sync and async functions.

@rollout.tool("get_weather")
def get_weather(city: str) -> dict:
    return {"city": city, "temp": 21}

@rollout.trace("support_agent")
def handle_request(text: str) -> str:
    rollout.current_trace().message(role="user", content=text)
    weather = get_weather("Paris")  # recorded as a tool span
    return f"It's {weather['temp']}°C."

rollout.trace(...) and rollout.span(...) also work as plain context managers, and rollout.current_trace() / rollout.current_span() return the active objects so you can attach messages, feedback, or usage from anywhere inside the call.

with rollout.trace("support_agent") as trace:
    with rollout.span("retrieval") as span:
        span.record_input({"query": "orders"})

The full decorator set: @rollout.agent (agent entry point), @rollout.trace (a trace per call), @rollout.span (a typed span), @rollout.tool (a tool.call / tool.result pair), and @rollout.task (a multi-step unit of work). All accept an optional name, default to the function name, and work on sync and async functions.

@rollout.task("retrieval_pipeline")
def run_retrieval(query: str) -> list[str]:
    ...

Feedback and signals

feedback records explicit user feedback on a trace; signal records implicit behavioral or business outcomes. Both can be called on the trace, the client, or the module:

trace.feedback("thumbs_up", True)
trace.signal("order_placed", {"order_id": "ord_123"})

# or, against a trace you only have the id for:
client.feedback("csat", 5, trace_id="trace_abc")

Environment variables

Variable Description
ROLLOUT_API_KEY API key
ROLLOUT_BASE_URL Override ingest endpoint
ROLLOUT_ENVIRONMENT Deployment environment, e.g. production
ROLLOUT_RELEASE App version or git SHA
ROLLOUT_SERVICE_NAME Service identifier
ROLLOUT_AGENT_NAME Agent name
ROLLOUT_AGENT_ID Agent ID
ROLLOUT_AGENT_VERSION Agent version
ROLLOUT_DEPLOYMENT Deployment slot or region
ROLLOUT_SAMPLE_RATE Trace sample rate (0.01.0)
ROLLOUT_DEBUG Enable verbose logging (true/false)
ROLLOUT_DISABLED Disable the SDK entirely (true/false)

Examples

Scripts in examples/ use uv inline dependencies and resolve the SDK from the local source, so no extra install step is needed.

  • basic_openrouter_agent.py — explicit client and with blocks.
  • decorated_openrouter_agent.py — the same agent with rollout.init() and the @rollout.trace / @rollout.tool decorators.
cd packages/sdk-python
ROLLOUT_API_KEY=... OPENROUTER_API_KEY=... uv run examples/decorated_openrouter_agent.py

Configuration

All constructor parameters mirror the env vars above, plus advanced options:

client = Rollout(
    api_key="...",
    agent_name="support_agent",
    environment="production",
    sample_rate=0.5,           # keep 50% of traces
    sync_mode=True,            # send immediately (tests/debugging only)
    debug=True,                # verbose logging
    disabled=False,
    before_send=lambda e: e,   # mutate or drop events before enqueue
    capture_stream_chunks=False,  # persist every token (off by default)
)

Privacy and redaction

Each event passes through a fixed pipeline before it is queued:

event construction → scrubber → before_send → sampling → enqueue
  • scrubber(event_dict) -> event_dict — redact or strip fields. Runs first, on every event.
  • before_send(event_dict) -> event_dict | None — mutate an event, or return None to drop it.
  • send_default_pii — defaults to False. The SDK never requires email or name. If you explicitly attach PII (e.g. via user traits), it still passes through the scrubber before enqueue.
def scrub(event: dict) -> dict:
    event.get("context", {}).pop("user_traits", None)
    return event

client = Rollout(api_key="...", scrubber=scrub, send_default_pii=False)

Sampling is per trace: once a trace is sampled in, all of its spans and events are kept, so you never get partially-sampled traces.

Disabling the SDK

Set disabled=True or ROLLOUT_DISABLED=true to turn the SDK into a no-op without changing application code — useful for tests, CI, local scripts, and as an emergency kill-switch. Traces, spans, and decorators still run your code; they just don't emit events.

Lifecycle and shutdown

Events are batched and sent by a background worker. Flush or shut down to make sure nothing is left in the queue when your process or request ends.

client.flush()          # block until the queue is drained
client.shutdown()       # flush and stop the worker (sync apps)

await client.aflush()   # async equivalents
await client.ashutdown()

A trace flushes on exit, so short scripts using with client.trace(...) usually need nothing more. For long-running or async apps, wire ashutdown() into your framework's shutdown hook (e.g. an ASGI lifespan handler). The SDK also registers a best-effort atexit flush, but don't rely on it as the primary mechanism in production. sync_mode=True sends events inline in the calling thread (tests and local debugging only) and makes flush / shutdown no-ops.

Development & testing

All commands run from packages/sdk-python/ with uv. The dev tools (pytest, ruff, ty) are declared in the dev dependency group and installed by uv sync.

uv run pytest          # test suite
uv run ruff check      # lint
uv run ty check        # type check

These three are the standard local + CI gate. ruff check and ty check default to the current directory, so no path argument is needed.

Provider integration tests

The base package has no provider SDKs as dependencies, so the test suite is split in two:

  • Default run (uv run pytest) — exercises everything against in-process fakes that mirror the provider SDK shapes. No provider SDK install required.
  • Real-library tests — guarded with pytest.importorskip("openai") / pytest.importorskip("anthropic"). With the package absent they report as skipped (not silently passed). They assert the wrapper attaches to the real client objects and that chunk/usage/sentinel parsing matches the real SDK types.

To actually run the real-library tests, install the declared optional extras:

uv run --extra openai --extra anthropic pytest

In CI this is a second job: uv run pytest proves the lightweight base install works, and uv run --extra openai --extra anthropic pytest validates the provider integrations.

Note: uv run --extra ... installs the provider SDKs into .venv and they persist. Afterward, uv run ty check will emit harmless unused-ignore-comment warnings (the # ty: ignore directives on the optional provider imports are only needed when the package is absent). Run uv sync to return to the clean base environment.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mv37_rollout-0.1.0.tar.gz (44.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mv37_rollout-0.1.0-py3-none-any.whl (56.7 kB view details)

Uploaded Python 3

File details

Details for the file mv37_rollout-0.1.0.tar.gz.

File metadata

  • Download URL: mv37_rollout-0.1.0.tar.gz
  • Upload date:
  • Size: 44.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for mv37_rollout-0.1.0.tar.gz
Algorithm Hash digest
SHA256 f7af3934859ee02d910ee842b61e84df792831320e12d14f5c9bed5609610ec2
MD5 edc99d35375cff6ac8a8ff032fe61855
BLAKE2b-256 b4f0ee86594fcbf1d89c991b05f01cdc3bbec4c510662e73e611695af53bda5a

See more details on using hashes here.

File details

Details for the file mv37_rollout-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: mv37_rollout-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 56.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for mv37_rollout-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4b0d17fe61b74c4bdc6e0323dc9f84466e04e0a54a67f5ba7bcb4b0ce9e2dcaa
MD5 23901e1adbcd02d9e5f7966c90e25761
BLAKE2b-256 d5ffc353f8e1785837aa4e2c0f7cb27daf89c54132a043979e9ed4a1affc7e9c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page