Model execution as human-readable stories with lean/rich failure diagnostics and optional LLM analysis
Project description
runtime-narrative
Turn any Python execution into a traceable story composed of named stages. Get minimal logs when everything works — and surgical, LLM-powered diagnostics the moment something breaks.
▶ Story started: Import Customers
✔ Load CSV 0.012s
✔ Validate Data 0.004s
❌ Failure at: Insert Records
ValueError: duplicate customer id
Location: app/db.py:47 insert_row
Code: raise ValueError("duplicate customer id")
Chain: ValueError ← sqlite3.IntegrityError
## Exact Why
A record with the same customer_id already exists (UNIQUE constraint).
## Targeted Fix
Use INSERT OR IGNORE, or check for an existing row before inserting.
Contents
- Installation
- Quick start
- Decorators
- Auto-instrumentation
- Failure diagnostics
- Failure analyzers
- Renderers
- Framework integrations
- Async task groups
- Persistence and CLI
- Alert routing
- Testing utilities
- Custom renderers and analyzers
- Environment variables
Installation
pip install runtime-narrative
Optional extras unlock additional renderers and integrations:
| Extra | What it installs |
|---|---|
console |
typer — colored terminal output in ConsoleRenderer |
fastapi |
starlette — RuntimeNarrativeMiddleware |
otel |
opentelemetry-api, opentelemetry-sdk — OtelRenderer, OtelLogRenderer, OtelMetricsRenderer |
prometheus |
prometheus-client — PrometheusRenderer |
anthropic |
anthropic — AnthropicFailureAnalyzer |
django |
django — RuntimeNarrativeDjangoMiddleware / SyncMiddleware |
celery |
celery — NarrativeTask, connect_narrative |
grpc |
grpcio — RuntimeNarrativeInterceptor / AsyncInterceptor |
all |
Everything above |
pip install "runtime-narrative[console,fastapi,anthropic]"
pip install "runtime-narrative[all]"
Quick start
Sync
from runtime_narrative import story, stage
with story("Import Customers"):
with stage("Load CSV"):
rows = load_csv("customers.csv")
with stage("Validate Data"):
validate(rows)
with stage("Insert Records"):
db.insert(rows)
ConsoleRenderer is the default. On failure it prints the exact frame, a source snippet, the exception chain, and a compressed stack summary — no configuration needed.
Async
import asyncio
from runtime_narrative import story, stage
async def run():
async with story("Sync Pipeline"):
async with stage("Fetch Orders"):
orders = await fetch_orders()
async with stage("Process Orders"):
await process(orders)
asyncio.run(run())
story and stage are dual sync/async context managers — use with or async with interchangeably.
Track progress
Declare the total stage count upfront so progress_percent is accurate at every stage boundary:
with story("Import Customers", total_stages=3) as runtime:
with stage("Load CSV"): ... # 33%
with stage("Validate"): ... # 66%
with stage("Insert"): ... # 100%
Or set it dynamically after the story starts:
with story("Batch Job") as runtime:
items = fetch_batch()
runtime.set_total_stages(len(items))
for item in items:
with stage(f"Process {item.id}"): ...
Decorators
Wrap functions without restructuring call sites. The library detects async def automatically:
from runtime_narrative import runtime_narrative_story, runtime_narrative_stage
@runtime_narrative_stage("Load CSV")
def load_csv() -> list[str]:
return open("customers.csv").read().splitlines()
@runtime_narrative_stage("Insert Records")
def insert(rows: list[str]) -> None:
db.insert_many(rows)
@runtime_narrative_story("Import Customers")
def run() -> None:
rows = load_csv()
insert(rows)
@runtime_narrative_story accepts the same keyword arguments as story(): renderers, failure_analyzer, background_analysis, diagnostics_config, failure_diagnostics, allow_rich_in_production, app_roots, redact_extra, total_stages, dry_run.
Auto-instrumentation
Instrument an entire class or module without wrapping every function individually.
@narrative_class
Every public instance method becomes a stage. Stage names are ClassName.method_name:
from runtime_narrative import narrative_class, no_stage, story
@narrative_class
class OrderService:
def validate(self, order: dict) -> None: ... # → "OrderService.validate"
def charge(self, order: dict) -> str: ... # → "OrderService.charge"
def fulfill(self, order: dict) -> str: ... # → "OrderService.fulfill"
@no_stage
def _log(self, msg: str) -> None: ... # excluded
svc = OrderService()
with story("Process Order", total_stages=3):
svc.validate(order)
svc.charge(order)
svc.fulfill(order)
Skipped by default: names starting with _, @no_stage-marked methods, @property, @staticmethod, @classmethod, and inherited methods. Opt in to class and static methods explicitly:
@narrative_class(instrument_classmethods=True, instrument_staticmethods=True)
class Factory:
@classmethod
def create(cls): ... # → "Factory.create"
@staticmethod
def validate(x): ... # → "Factory.validate"
@classmethod
@narrative_stage("Build Widget")
def build(cls): ... # → "Build Widget" (custom name)
@narrative_stage
Override the stage name on a single method or standalone function. Prevents double-wrapping when used inside @narrative_class:
from runtime_narrative import narrative_class, narrative_stage
@narrative_class
class ReportService:
@narrative_stage("Generate PDF Report")
def generate(self, data: dict) -> bytes: ... # custom name
def archive(self, report: bytes) -> None: ... # "ReportService.archive" (default)
When name is omitted, the function name is title-cased: validate_order → "Validate Order".
@no_stage
Exclude any method or function from all auto-instrumentation:
from runtime_narrative import no_stage
@no_stage
def _cache_lookup(key: str): ...
instrument_module
Instrument all public callables in an existing module in-place. Classes get @narrative_class; top-level functions are wrapped directly. Imported symbols are not touched:
import runtime_narrative
import myapp.services as svc
runtime_narrative.instrument_module(svc)
auto_instrument
Register a sys.meta_path import hook that instruments every app module on import. Only modules whose source path is under app_roots (default: cwd) are affected — stdlib and installed packages are unaffected:
import runtime_narrative
finder = runtime_narrative.auto_instrument()
# All subsequent imports of app modules are instrumented automatically.
from myapp.services import OrderService
# Stop at any point:
import sys
sys.meta_path.remove(finder)
Pin to specific directories:
runtime_narrative.auto_instrument(app_roots=["/app/src", "/app/workers"])
Failure diagnostics
Lean vs rich mode
| Mode | What is captured |
|---|---|
lean (default) |
Primary frame, source snippet (±2 lines), exception chain, compressed stack summary |
rich |
Everything in lean + local variable values for up to 2 frames, with automatic secret redaction |
RUNTIME_NARRATIVE_FAILURE_DIAGNOSTICS=rich python app.py
Or per-story:
with story("Import", failure_diagnostics="rich"):
...
Production safeguards
When RUNTIME_NARRATIVE_ENV=production:
- Tracebacks are capped at 8 000 characters.
richmode is silently downgraded toleanto prevent local variable leakage in logs.
Override the downgrade when needed:
with story("Debug", failure_diagnostics="rich", allow_rich_in_production=True):
...
Secret redaction
Rich mode automatically redacts local variables whose key names contain any of: password, secret, token, api_key, apikey, authorization, cookie, session, credential.
Extend the list with project-specific names:
with story("Import", failure_diagnostics="rich", redact_extra=["internal_id", "org_key"]):
...
For more expressive rules, use FailureDiagnosticsConfig:
from runtime_narrative import FailureDiagnosticsConfig
config = FailureDiagnosticsConfig(
failure_diagnostics="rich",
redact_patterns=("^internal_.*", r"\bpii\b"), # regex, case-insensitive re.search
redact_callback=lambda key: key.startswith("priv_"),
)
with story("Import", diagnostics_config=config):
...
redact_callback exceptions are caught and treated as non-redact. Both redact_patterns and redact_callback are available on FailureDiagnosticsConfig and flow through merge().
Full FailureDiagnosticsConfig reference
from runtime_narrative import FailureDiagnosticsConfig
config = FailureDiagnosticsConfig(
runtime_environment="production", # "development" | "production"
failure_diagnostics="lean", # "lean" | "rich"
allow_rich_in_production=False,
app_roots=("/app/src",), # paths used for primary frame selection
redact_extra=("internal_id",), # additional substring matches
redact_patterns=("^priv_",), # regex patterns (case-insensitive)
redact_callback=lambda k: k.endswith("_key"),
max_traceback_chars=12_000, # development cap (None = unlimited)
production_traceback_cap=8_000,
max_locals_per_frame=12,
max_local_value_len=200,
max_local_depth=2,
max_frames_with_locals=2,
snippet_context_lines=2,
)
with story("Import", diagnostics_config=config):
...
FailureDiagnosticsConfig.from_env() reads the standard environment variables. FailureDiagnosticsConfig.merge(base, **overrides) layers per-story overrides on a base config.
Failure analyzers
Analyzers receive structured failure data and return a diagnosis string that is attached to FailureOccurred and displayed by renderers. All analyzers are optional — if the endpoint is unreachable, your exception propagates normally.
OllamaFailureAnalyzer
Calls a local Ollama instance or any /api/generate-compatible endpoint:
from runtime_narrative import OllamaFailureAnalyzer, story
analyzer = OllamaFailureAnalyzer(
model="llama3",
endpoint="http://127.0.0.1:11434/api/generate", # default
timeout_seconds=60.0,
max_context_chars=8000,
)
with story("Import Customers", failure_analyzer=analyzer):
...
LLMFailureAnalyzer
Calls any OpenAI-compatible /v1/chat/completions endpoint (OpenAI, vLLM, llama.cpp, LM Studio, Ollama OpenAI mode):
from runtime_narrative import LLMFailureAnalyzer
analyzer = LLMFailureAnalyzer(
model="gpt-4o-mini",
endpoint="https://api.openai.com/v1/chat/completions",
api_key="sk-...",
max_context_chars=8000,
)
AnthropicFailureAnalyzer ([anthropic] extra)
Uses the Anthropic Claude API. Reads ANTHROPIC_API_KEY from the environment by default:
from runtime_narrative import AnthropicFailureAnalyzer
analyzer = AnthropicFailureAnalyzer(
model="claude-haiku-4-5-20251001", # default
api_key="sk-ant-...", # or set ANTHROPIC_API_KEY
max_tokens=1024,
timeout_seconds=30.0,
)
Both analyze_failure() (sync) and analyze_failure_async() (async) are implemented. The async path uses anthropic.AsyncAnthropic natively — no thread offloading.
Structured output
All analyzers request a JSON response (exact_why, evidence, targeted_fix, code_changes) and format it into guaranteed ## Header\ncontent sections. Falls back to raw text when the model returns non-JSON.
Context budget
Tracebacks are trimmed from the top (keeping the most recent frames) when the prompt would exceed max_context_chars. If the budget is exhausted before any traceback fits, a <traceback omitted> marker is used:
analyzer = LLMFailureAnalyzer(model="llama3", max_context_chars=4000)
DeduplicatingAnalyzer
Wraps any analyzer with an LRU cache keyed by SHA-256(error_type, filename, lineno, exception_chain). Prevents redundant LLM calls for the same recurring error. None results (timeouts, network errors) are never cached:
from runtime_narrative import DeduplicatingAnalyzer, AnthropicFailureAnalyzer
analyzer = DeduplicatingAnalyzer(
AnthropicFailureAnalyzer(),
max_cache_size=256,
)
Thread-safe; async-aware (delegates to analyze_failure_async when available).
Background analysis
background_analysis=True emits FailureOccurred immediately (with llm_analysis=None), then runs the LLM as a background asyncio.Task. When the task completes, LLMAnalysisReady is emitted:
async with story(
"Process Order",
failure_analyzer=analyzer,
background_analysis=True,
):
...
# Response is not delayed by LLM latency.
# LLMAnalysisReady fires when analysis is ready.
FailureAnalyzer protocol
All built-in analyzers satisfy the @runtime_checkable FailureAnalyzer protocol. Use it to type-check custom analyzers:
from runtime_narrative import FailureAnalyzer
assert isinstance(my_analyzer, FailureAnalyzer)
Renderers
A renderer is any object with handle(self, event: object) -> None (sync) or async def handle(self, event: object) -> None (async). Pass a list to story(), middleware, or a decorator. Async renderers are awaited inside async with story(...) including for stage events.
Renderers never crash a story — if a renderer raises, a warning is printed to stderr and the next renderer continues.
ConsoleRenderer (default)
Colored terminal output for local development. Falls back to print and ASCII glyphs (>, [ok], [FAIL]) when typer is absent or the terminal is not UTF-8 (e.g. Windows cp1252):
from runtime_narrative.renderer.console import ConsoleRenderer
with story("My Pipeline", renderers=[ConsoleRenderer()]):
...
JsonRenderer
One structured JSON object per lifecycle event. Suitable for log aggregators (Datadog, CloudWatch, Loki):
from runtime_narrative import JsonRenderer
with story("My Pipeline", renderers=[JsonRenderer()]):
...
# Write to a file
with story("My Pipeline", renderers=[JsonRenderer(output=open("stories.jsonl", "a"))]):
...
On failure, FailureOccurred carries the full diagnostics payload: exact location, stack frame classifications, source snippet, local variables (rich mode), compressed stack summary, and traceback text.
RotatingJsonRenderer
Same as JsonRenderer but writes to a rotating log file using path.1 / path.2 semantics. No external dependencies:
from runtime_narrative import RotatingJsonRenderer
renderer = RotatingJsonRenderer(
"stories.jsonl",
max_bytes=10_485_760, # rotate at 10 MB (default)
backup_count=5, # keep .1 through .5 (default)
)
HtmlReportRenderer
Writes a self-contained HTML report on StoryCompleted. Includes a story header, per-stage duration bar chart, and a failure detail section with traceback and optional LLM analysis:
from runtime_narrative import HtmlReportRenderer
with story("ETL Run", renderers=[HtmlReportRenderer("report.html", open_browser=True)]):
...
open_browser=True calls webbrowser.open() on the generated file after writing.
SqliteStoryRenderer
Persists all six lifecycle events to a SQLite database. No extra dependencies. See Persistence and CLI:
from runtime_narrative import SqliteStoryRenderer
with story("Nightly ETL", renderers=[SqliteStoryRenderer("stories.db")]):
...
OtelRenderer ([otel] extra)
Maps narrative events to OpenTelemetry spans. Each story is a root span; each stage is a child span:
from runtime_narrative import OtelRenderer
renderer = OtelRenderer(
exclude_stages={"Health Check"}, # never create child spans for these stages
min_duration_ms=5.0, # suppress spans shorter than 5 ms
max_attribute_length=8192, # truncate long string attributes (default)
)
Attributes on failure spans: error.type, error.message, code.filepath, code.lineno, code.function, error.stack_trace, narrative.exception_chain. LLMAnalysisReady adds a llm_analysis_ready span event with narrative.llm_analysis.
exclude_stages stages that fail still mark the root span ERROR — only the child span is suppressed.
OtelLogRenderer ([otel] extra)
Emits all six lifecycle events as OpenTelemetry log records via opentelemetry._logs:
| Event | Severity |
|---|---|
StoryStarted, StoryCompleted, LLMAnalysisReady |
INFO |
StageStarted, StageCompleted |
DEBUG |
FailureOccurred |
ERROR with error.type, code.filepath, error.stack_trace, etc. |
Automatically correlates trace_id/span_id from the ambient OTel context so logs link to their enclosing spans:
from runtime_narrative import OtelLogRenderer
renderer = OtelLogRenderer(logger_name="my_app")
OtelMetricsRenderer ([otel] extra)
Emits four OTel instruments via the OpenTelemetry Metrics API:
| Instrument | Type | Labels |
|---|---|---|
narrative.stage.duration |
Histogram (s) | story_name, stage_name |
narrative.story.duration |
Histogram (s) | story_name, success |
narrative.story.failures |
Counter | story_name, error_type |
narrative.llm.analysis_latency |
Histogram (s) | story_name |
narrative.llm.analysis_latency measures the time between FailureOccurred and LLMAnalysisReady (only recorded when background_analysis=True):
from runtime_narrative import OtelMetricsRenderer
renderer = OtelMetricsRenderer(meter_name="my_app")
PrometheusRenderer ([prometheus] extra)
Exposes four Prometheus metrics:
| Metric | Type | Labels |
|---|---|---|
narrative_story_duration_seconds |
Histogram | story_name, success |
narrative_stage_duration_seconds |
Histogram | story_name, stage_name |
narrative_story_failures_total |
Counter | story_name, error_type |
narrative_story_total |
Counter | story_name, success |
from runtime_narrative import PrometheusRenderer
from prometheus_client import CollectorRegistry, start_http_server
registry = CollectorRegistry()
renderer = PrometheusRenderer(registry=registry)
start_http_server(8000, registry=registry)
Combining renderers
from runtime_narrative import story, JsonRenderer, SqliteStoryRenderer, OtelRenderer
with story("Nightly ETL", renderers=[
JsonRenderer(),
SqliteStoryRenderer("stories.db"),
OtelRenderer(),
]):
...
Framework integrations
FastAPI / Starlette
RuntimeNarrativeMiddleware wraps every HTTP request in async with story(...). Route handlers only need to declare stages — no story() context required in handlers:
from fastapi import FastAPI
from runtime_narrative import RuntimeNarrativeMiddleware, JsonRenderer, AnthropicFailureAnalyzer
app = FastAPI()
app.add_middleware(
RuntimeNarrativeMiddleware,
renderers=[JsonRenderer()],
failure_analyzer=AnthropicFailureAnalyzer(),
runtime_environment="production",
)
@app.post("/orders")
async def create_order(payload: OrderIn):
with stage("Validate Input"):
validate(payload)
with stage("Persist Order"):
order = await db.insert(payload)
return {"id": order.id}
Each request becomes a story named "METHOD /path" (e.g. "POST /orders"). When renderers is not passed, the middleware auto-selects ConsoleRenderer on a TTY and JsonRenderer otherwise.
When opentelemetry-api is installed, the middleware automatically extracts incoming W3C traceparent/tracestate headers so story spans become children of the upstream trace:
app.add_middleware(
RuntimeNarrativeMiddleware,
propagate_trace_context=True, # default; set False to disable
)
Django
ASGI (async):
# settings.py
MIDDLEWARE = [
"runtime_narrative.middleware_django.RuntimeNarrativeDjangoMiddleware",
...
]
WSGI (sync):
# settings.py
MIDDLEWARE = [
"runtime_narrative.middleware_django.RuntimeNarrativeDjangoSyncMiddleware",
...
]
Story name is "METHOD /path". Requires pip install "runtime-narrative[django]".
Celery
from celery import Celery
from runtime_narrative import NarrativeTask, connect_narrative, JsonRenderer
app = Celery("myapp")
# Option A — per task
@app.task(base=NarrativeTask)
def process_order(order_id: str) -> None:
with stage("Validate"): validate(order_id)
with stage("Charge"): charge(order_id)
# Option B — set defaults for all tasks globally
connect_narrative(
app,
renderers=[JsonRenderer()],
failure_analyzer=AnthropicFailureAnalyzer(),
)
Story name is "<task.name> [task_id=<id>]". Override options per task by setting narrative_* class attributes directly. Requires pip install "runtime-narrative[celery]".
gRPC
import grpc
from runtime_narrative import RuntimeNarrativeAsyncInterceptor, JsonRenderer
# Async server
server = grpc.aio.server(
interceptors=[RuntimeNarrativeAsyncInterceptor(renderers=[JsonRenderer()])],
)
# Sync server
from runtime_narrative import RuntimeNarrativeInterceptor
server = grpc.server(
thread_pool,
interceptors=[RuntimeNarrativeInterceptor(renderers=[JsonRenderer()])],
)
Story name is the full gRPC method path, e.g. "/mypackage.MyService/MyMethod". Requires pip install "runtime-narrative[grpc]".
Async task groups
NarrativeTaskGroup runs concurrent asyncio tasks under a shared story. Tasks inherit the parent story context automatically via ContextVar copy, so stage() calls inside tasks are tracked normally:
import asyncio
from runtime_narrative import story, NarrativeTaskGroup, NarrativeTaskGroupError
async def main():
async with story("Parallel Pipeline"):
async with NarrativeTaskGroup() as tg:
tg.create_task(fetch_orders(), name="Fetch Orders")
tg.create_task(fetch_inventory(), name="Fetch Inventory")
# waits for both; stages from both appear in the timeline
asyncio.run(main())
If tasks fail, NarrativeTaskGroupError is raised with failed_tasks: dict[str, BaseException]:
try:
async with NarrativeTaskGroup() as tg:
tg.create_task(risky_job(), name="Risky Job")
except NarrativeTaskGroupError as e:
for task_name, exc in e.failed_tasks.items():
print(f"{task_name} failed: {exc}")
No extra dependencies. Python 3.9+.
Persistence and CLI
SqliteStoryRenderer records all six lifecycle events to a local SQLite database. No extra dependencies:
from runtime_narrative import story, SqliteStoryRenderer
with story("Nightly ETL", renderers=[SqliteStoryRenderer("stories.db")]):
...
Schema:
| Table | Key columns |
|---|---|
stories |
story_id, name, success, duration_seconds, started_at, completed_at |
stages |
story_id, stage_name, stage_index, parent_stage_name, duration_seconds, completed, failed |
failures |
story_id, stage_name, error_type, error_message, filename, lineno, traceback_text, llm_analysis |
LLMAnalysisReady back-fills the llm_analysis column in failures so background analysis results are persisted even when they arrive after StoryCompleted.
CLI (registered as runtime-narrative):
# List the 10 most recent failures
runtime-narrative failures --db stories.db
# Filter by stage name or story name (LIKE pattern)
runtime-narrative failures --last 25 --stage "Insert Records"
runtime-narrative failures --story "Nightly ETL"
# Show the full detail for one story
runtime-narrative story <story_id> --db stories.db
The --db flag defaults to runtime_narrative.db in the current directory.
Alert routing
AlertRoutingRenderer fans out FailureOccurred events to webhook destinations concurrently. Destination failures are logged to stderr and swallowed — they never crash the story:
from runtime_narrative import (
story,
AlertRoutingRenderer,
SlackWebhookDestination,
HttpWebhookDestination,
)
renderer = AlertRoutingRenderer(
destinations=[
SlackWebhookDestination("https://hooks.slack.com/services/..."),
HttpWebhookDestination(
"https://alerts.example.com/webhook",
headers={"Authorization": "Bearer ..."},
timeout=5.0,
),
],
only_stories={"Nightly ETL", "Payment Processor"}, # None = all stories
only_error_types={"ValueError", "TimeoutError"}, # None = all error types
)
async with story("Nightly ETL", renderers=[renderer]):
...
SlackWebhookDestination sends a Block Kit message with a header, error detail section, and an optional analysis section when llm_analysis is present.
HttpWebhookDestination POSTs a JSON payload containing: story_id, story_name, stage_name, error_type, error_message, filename, lineno, function, llm_analysis, timestamp.
dry_run mode
dry_run=True suppresses exceptions raised inside stage bodies, marks all stages completed, and emits StoryCompleted(success=True). Useful for smoke-testing instrumentation wiring without triggering real side effects:
with story("Nightly ETL", dry_run=True):
with stage("Load Warehouse"):
raise IOError("would connect to DB in prod") # suppressed
with stage("Transform"):
raise RuntimeError("would run transforms") # suppressed
# StoryCompleted(success=True) emitted for all stages
Testing utilities
StoryRecorder is a dual sync/async context manager that captures story events for assertions. No output is produced:
from runtime_narrative import stage
from runtime_narrative.testing import StoryRecorder
def test_pipeline_success():
with StoryRecorder("ETL") as recorder:
with stage("Load"): rows = [1, 2, 3]
with stage("Insert"): db.insert(rows)
recorder.assert_stages_completed(["Load", "Insert"])
recorder.assert_no_failure()
recorder.assert_story_completed(success=True)
def test_invalid_input_fails_at_validate():
import pytest
with pytest.raises(ValueError):
with StoryRecorder("ETL") as recorder:
with stage("Load"): pass
with stage("Validate"): raise ValueError("bad schema")
recorder.assert_stage_failed("Validate", error_type="ValueError")
recorder.assert_story_completed(success=False)
Works as async with StoryRecorder(...) too. Pass any story() kwargs including dry_run=True:
with StoryRecorder("ETL", dry_run=True) as recorder:
run_pipeline() # side effects suppressed
recorder.assert_stages_completed(["Load", "Validate", "Insert"])
recorder.assert_no_failure()
Assertion methods:
| Method | What it checks |
|---|---|
assert_stages_completed(names) |
All named stages appear in StageCompleted events |
assert_no_failure() |
No FailureOccurred event was emitted |
assert_stage_failed(name, error_type=None) |
A FailureOccurred event at that stage name; optionally checks error_type |
assert_story_completed(success=None) |
A StoryCompleted event was emitted; optionally checks the success flag |
Custom renderers and analyzers
Custom renderer
Implement handle(self, event: object). Async renderers (async def handle) are awaited inside async with story(...):
class PagerDutyRenderer:
async def handle(self, event: object) -> None:
if type(event).__name__ == "FailureOccurred":
await pagerduty.trigger(
summary=f"{event.story_name} failed at {event.stage_name}",
details={"error": event.error_type, "analysis": event.llm_analysis},
)
async with story("Nightly ETL", renderers=[PagerDutyRenderer()]):
...
Six event types are emitted. Key fields on each:
| Event | Key fields |
|---|---|
StoryStarted |
story_id, story_name, timestamp |
StageStarted |
story_id, stage_name, timestamp, stage_index (0-based), parent_stage_name |
StageCompleted |
story_id, stage_name, timestamp, duration_seconds, stage_index, parent_stage_name |
FailureOccurred |
story_id, story_name, stage_name, error_type, error_message, filename, lineno, function, traceback_text, exception_chain, exact_cause, stage_timeline, progress_percent, llm_analysis, diagnostics_mode, stack_frames, source_snippet, compressed_stack_summary, locals_by_frame |
StoryCompleted |
story_id, story_name, success, progress_percent, completed_stages, total_stages, timestamp |
LLMAnalysisReady |
story_id, story_name, stage_name, llm_analysis, timestamp |
parent_stage_name is None for top-level stages and set to the enclosing stage name for nested stages.
Custom failure analyzer
Implement analyze_failure(...). Add analyze_failure_async(...) for native async — otherwise the sync method is called via asyncio.to_thread:
class MyAnalyzer:
async def analyze_failure_async(
self,
*,
story_name: str,
stage_name: str,
failure, # FailureSummary: .error_type, .error_message, .filename,
# .lineno, .function, .source_line,
# .traceback_text, .exception_chain
stage_timeline: str,
progress_percent: int,
) -> str | None:
result = await my_llm_client.complete(build_prompt(failure))
return result.text
def analyze_failure(self, *, story_name, stage_name, failure, stage_timeline, progress_percent):
# sync fallback used when called from sync story()
return requests.post(...).json()["text"]
with story("Import", failure_analyzer=MyAnalyzer()):
...
Environment variables
| Variable | Values | Default | Effect |
|---|---|---|---|
RUNTIME_NARRATIVE_ENV |
development, production |
development |
Production caps tracebacks to 8 000 chars and forces lean mode |
RUNTIME_NARRATIVE_FAILURE_DIAGNOSTICS |
lean, rich |
lean |
rich captures local variable values at the failing frames. Invalid values raise ValueError at story construction. |
RUNTIME_NARRATIVE_ALLOW_RICH_IN_PRODUCTION |
1, true |
off | Bypass production safeguard; allow rich diagnostics in production |
RUNTIME_NARRATIVE_MODEL |
model name string | — | Default model for AnthropicFailureAnalyzer; also used by example scripts for OllamaFailureAnalyzer / LLMFailureAnalyzer |
ANTHROPIC_API_KEY |
API key | — | Required by AnthropicFailureAnalyzer; read automatically if api_key= is not passed |
Python compatibility
Python 3.9 – 3.13. Zero required dependencies beyond python-dotenv.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file runtime_narrative-1.0.1.tar.gz.
File metadata
- Download URL: runtime_narrative-1.0.1.tar.gz
- Upload date:
- Size: 100.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3148adbf326f421f58f86f2a97ee3df228b4568eb58b8d094db890afe2bba72c
|
|
| MD5 |
96f7e0fb782bd8d1791d98ebe97b5a89
|
|
| BLAKE2b-256 |
f11622b9c66f9a3fe18b9c1a9210542f3e54dceb779b045abc78d366ff10d3d7
|
File details
Details for the file runtime_narrative-1.0.1-py3-none-any.whl.
File metadata
- Download URL: runtime_narrative-1.0.1-py3-none-any.whl
- Upload date:
- Size: 61.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
554a82349c64f15b19d28955965bef1e3a8164200f90f5afaf7d6735ba1ae446
|
|
| MD5 |
4a58b9c78d2c4894352be8adfa28d8ea
|
|
| BLAKE2b-256 |
987acd2240285735c8922a5381c1a9c79d10f58d572841188cdfef97241a7d59
|