Skip to main content

Model execution as human-readable stories with lean/rich failure diagnostics and optional LLM analysis

Project description

runtime-narrative

Turn any Python execution into a traceable story composed of named stages. Get minimal logs when everything works — and surgical, LLM-powered diagnostics the moment something breaks.

▶ Story started: Import Customers
✔ Load CSV           0.012s
✔ Validate Data      0.004s

❌ Failure at: Insert Records

  ValueError: duplicate customer id
  Location:   app/db.py:47  insert_row
  Code:       raise ValueError("duplicate customer id")
  Chain:      ValueError ← sqlite3.IntegrityError

  ## Exact Why
  A record with the same customer_id already exists (UNIQUE constraint).

  ## Targeted Fix
  Use INSERT OR IGNORE, or check for an existing row before inserting.

Contents


Installation

pip install runtime-narrative

Optional extras unlock additional renderers and integrations:

Extra What it installs
console typer — colored terminal output in ConsoleRenderer
fastapi starletteRuntimeNarrativeMiddleware
otel opentelemetry-api, opentelemetry-sdkOtelRenderer, OtelLogRenderer, OtelMetricsRenderer
prometheus prometheus-clientPrometheusRenderer
anthropic anthropicAnthropicFailureAnalyzer
django djangoRuntimeNarrativeDjangoMiddleware / SyncMiddleware
celery celeryNarrativeTask, connect_narrative
grpc grpcioRuntimeNarrativeInterceptor / AsyncInterceptor
all Everything above
pip install "runtime-narrative[console,fastapi,anthropic]"
pip install "runtime-narrative[all]"

Quick start

Sync

from runtime_narrative import story, stage

with story("Import Customers"):
    with stage("Load CSV"):
        rows = load_csv("customers.csv")

    with stage("Validate Data"):
        validate(rows)

    with stage("Insert Records"):
        db.insert(rows)

ConsoleRenderer is the default. On failure it prints the exact frame, a source snippet, the exception chain, and a compressed stack summary — no configuration needed.

Async

import asyncio
from runtime_narrative import story, stage

async def run():
    async with story("Sync Pipeline"):
        async with stage("Fetch Orders"):
            orders = await fetch_orders()

        async with stage("Process Orders"):
            await process(orders)

asyncio.run(run())

story and stage are dual sync/async context managers — use with or async with interchangeably.

Track progress

Declare the total stage count upfront so progress_percent is accurate at every stage boundary:

with story("Import Customers", total_stages=3) as runtime:
    with stage("Load CSV"):    ...   # 33%
    with stage("Validate"):   ...   # 66%
    with stage("Insert"):     ...   # 100%

Or set it dynamically after the story starts:

with story("Batch Job") as runtime:
    items = fetch_batch()
    runtime.set_total_stages(len(items))
    for item in items:
        with stage(f"Process {item.id}"): ...

Decorators

Wrap functions without restructuring call sites. The library detects async def automatically:

from runtime_narrative import runtime_narrative_story, runtime_narrative_stage

@runtime_narrative_stage("Load CSV")
def load_csv() -> list[str]:
    return open("customers.csv").read().splitlines()

@runtime_narrative_stage("Insert Records")
def insert(rows: list[str]) -> None:
    db.insert_many(rows)

@runtime_narrative_story("Import Customers")
def run() -> None:
    rows = load_csv()
    insert(rows)

@runtime_narrative_story accepts the same keyword arguments as story(): renderers, failure_analyzer, background_analysis, diagnostics_config, failure_diagnostics, allow_rich_in_production, app_roots, redact_extra, total_stages, dry_run.


Auto-instrumentation

Instrument an entire class or module without wrapping every function individually.

@narrative_class

Every public instance method becomes a stage. Stage names are ClassName.method_name:

from runtime_narrative import narrative_class, no_stage, story

@narrative_class
class OrderService:
    def validate(self, order: dict) -> None: ...   # → "OrderService.validate"
    def charge(self, order: dict) -> str: ...      # → "OrderService.charge"
    def fulfill(self, order: dict) -> str: ...     # → "OrderService.fulfill"

    @no_stage
    def _log(self, msg: str) -> None: ...          # excluded

svc = OrderService()
with story("Process Order", total_stages=3):
    svc.validate(order)
    svc.charge(order)
    svc.fulfill(order)

Skipped by default: names starting with _, @no_stage-marked methods, @property, @staticmethod, @classmethod, and inherited methods. Opt in to class and static methods explicitly:

@narrative_class(instrument_classmethods=True, instrument_staticmethods=True)
class Factory:
    @classmethod
    def create(cls): ...       # → "Factory.create"

    @staticmethod
    def validate(x): ...       # → "Factory.validate"

    @classmethod
    @narrative_stage("Build Widget")
    def build(cls): ...        # → "Build Widget" (custom name)

@narrative_stage

Override the stage name on a single method or standalone function. Prevents double-wrapping when used inside @narrative_class:

from runtime_narrative import narrative_class, narrative_stage

@narrative_class
class ReportService:
    @narrative_stage("Generate PDF Report")
    def generate(self, data: dict) -> bytes: ...   # custom name

    def archive(self, report: bytes) -> None: ...  # "ReportService.archive" (default)

When name is omitted, the function name is title-cased: validate_order"Validate Order".

@no_stage

Exclude any method or function from all auto-instrumentation:

from runtime_narrative import no_stage

@no_stage
def _cache_lookup(key: str): ...

instrument_module

Instrument all public callables in an existing module in-place. Classes get @narrative_class; top-level functions are wrapped directly. Imported symbols are not touched:

import runtime_narrative
import myapp.services as svc

runtime_narrative.instrument_module(svc)

auto_instrument

Register a sys.meta_path import hook that instruments every app module on import. Only modules whose source path is under app_roots (default: cwd) are affected — stdlib and installed packages are unaffected:

import runtime_narrative

finder = runtime_narrative.auto_instrument()

# All subsequent imports of app modules are instrumented automatically.
from myapp.services import OrderService

# Stop at any point:
import sys
sys.meta_path.remove(finder)

Pin to specific directories:

runtime_narrative.auto_instrument(app_roots=["/app/src", "/app/workers"])

Failure diagnostics

Lean vs rich mode

Mode What is captured
lean (default) Primary frame, source snippet (±2 lines), exception chain, compressed stack summary
rich Everything in lean + local variable values for up to 2 frames, with automatic secret redaction
RUNTIME_NARRATIVE_FAILURE_DIAGNOSTICS=rich python app.py

Or per-story:

with story("Import", failure_diagnostics="rich"):
    ...

Production safeguards

When RUNTIME_NARRATIVE_ENV=production:

  • Tracebacks are capped at 8 000 characters.
  • rich mode is silently downgraded to lean to prevent local variable leakage in logs.

Override the downgrade when needed:

with story("Debug", failure_diagnostics="rich", allow_rich_in_production=True):
    ...

Secret redaction

Rich mode automatically redacts local variables whose key names contain any of: password, secret, token, api_key, apikey, authorization, cookie, session, credential.

Extend the list with project-specific names:

with story("Import", failure_diagnostics="rich", redact_extra=["internal_id", "org_key"]):
    ...

For more expressive rules, use FailureDiagnosticsConfig:

from runtime_narrative import FailureDiagnosticsConfig

config = FailureDiagnosticsConfig(
    failure_diagnostics="rich",
    redact_patterns=("^internal_.*", r"\bpii\b"),   # regex, case-insensitive re.search
    redact_callback=lambda key: key.startswith("priv_"),
)

with story("Import", diagnostics_config=config):
    ...

redact_callback exceptions are caught and treated as non-redact. Both redact_patterns and redact_callback are available on FailureDiagnosticsConfig and flow through merge().

Full FailureDiagnosticsConfig reference

from runtime_narrative import FailureDiagnosticsConfig

config = FailureDiagnosticsConfig(
    runtime_environment="production",       # "development" | "production"
    failure_diagnostics="lean",             # "lean" | "rich"
    allow_rich_in_production=False,
    app_roots=("/app/src",),                # paths used for primary frame selection
    redact_extra=("internal_id",),          # additional substring matches
    redact_patterns=("^priv_",),            # regex patterns (case-insensitive)
    redact_callback=lambda k: k.endswith("_key"),
    max_traceback_chars=12_000,             # development cap (None = unlimited)
    production_traceback_cap=8_000,
    max_locals_per_frame=12,
    max_local_value_len=200,
    max_local_depth=2,
    max_frames_with_locals=2,
    snippet_context_lines=2,
)

with story("Import", diagnostics_config=config):
    ...

FailureDiagnosticsConfig.from_env() reads the standard environment variables. FailureDiagnosticsConfig.merge(base, **overrides) layers per-story overrides on a base config.


Failure analyzers

Analyzers receive structured failure data and return a diagnosis string that is attached to FailureOccurred and displayed by renderers. All analyzers are optional — if the endpoint is unreachable, your exception propagates normally.

OllamaFailureAnalyzer

Calls a local Ollama instance or any /api/generate-compatible endpoint:

from runtime_narrative import OllamaFailureAnalyzer, story

analyzer = OllamaFailureAnalyzer(
    model="llama3",
    endpoint="http://127.0.0.1:11434/api/generate",  # default
    timeout_seconds=60.0,
    max_context_chars=8000,
)

with story("Import Customers", failure_analyzer=analyzer):
    ...

LLMFailureAnalyzer

Calls any OpenAI-compatible /v1/chat/completions endpoint (OpenAI, vLLM, llama.cpp, LM Studio, Ollama OpenAI mode):

from runtime_narrative import LLMFailureAnalyzer

analyzer = LLMFailureAnalyzer(
    model="gpt-4o-mini",
    endpoint="https://api.openai.com/v1/chat/completions",
    api_key="sk-...",
    max_context_chars=8000,
)

AnthropicFailureAnalyzer ([anthropic] extra)

Uses the Anthropic Claude API. Reads ANTHROPIC_API_KEY from the environment by default:

from runtime_narrative import AnthropicFailureAnalyzer

analyzer = AnthropicFailureAnalyzer(
    model="claude-haiku-4-5-20251001",   # default
    api_key="sk-ant-...",                # or set ANTHROPIC_API_KEY
    max_tokens=1024,
    timeout_seconds=30.0,
)

Both analyze_failure() (sync) and analyze_failure_async() (async) are implemented. The async path uses anthropic.AsyncAnthropic natively — no thread offloading.

Structured output

All analyzers request a JSON response (exact_why, evidence, targeted_fix, code_changes) and format it into guaranteed ## Header\ncontent sections. Falls back to raw text when the model returns non-JSON.

Context budget

Tracebacks are trimmed from the top (keeping the most recent frames) when the prompt would exceed max_context_chars. If the budget is exhausted before any traceback fits, a <traceback omitted> marker is used:

analyzer = LLMFailureAnalyzer(model="llama3", max_context_chars=4000)

DeduplicatingAnalyzer

Wraps any analyzer with an LRU cache keyed by SHA-256(error_type, filename, lineno, exception_chain). Prevents redundant LLM calls for the same recurring error. None results (timeouts, network errors) are never cached:

from runtime_narrative import DeduplicatingAnalyzer, AnthropicFailureAnalyzer

analyzer = DeduplicatingAnalyzer(
    AnthropicFailureAnalyzer(),
    max_cache_size=256,
)

Thread-safe; async-aware (delegates to analyze_failure_async when available).

Background analysis

background_analysis=True emits FailureOccurred immediately (with llm_analysis=None), then runs the LLM as a background asyncio.Task. When the task completes, LLMAnalysisReady is emitted:

async with story(
    "Process Order",
    failure_analyzer=analyzer,
    background_analysis=True,
):
    ...
# Response is not delayed by LLM latency.
# LLMAnalysisReady fires when analysis is ready.

FailureAnalyzer protocol

All built-in analyzers satisfy the @runtime_checkable FailureAnalyzer protocol. Use it to type-check custom analyzers:

from runtime_narrative import FailureAnalyzer

assert isinstance(my_analyzer, FailureAnalyzer)

Renderers

A renderer is any object with handle(self, event: object) -> None (sync) or async def handle(self, event: object) -> None (async). Pass a list to story(), middleware, or a decorator. Async renderers are awaited inside async with story(...) including for stage events.

Renderers never crash a story — if a renderer raises, a warning is printed to stderr and the next renderer continues.

ConsoleRenderer (default)

Colored terminal output for local development. Falls back to print and ASCII glyphs (>, [ok], [FAIL]) when typer is absent or the terminal is not UTF-8 (e.g. Windows cp1252). Every line is tagged with a [short_id] (first 6 characters of story_id), color-coded per story family, and indented by nesting depth — see Sub-stories and log capture for how this looks with nested stages and sub-stories:

from runtime_narrative import ConsoleRenderer

with story("My Pipeline", renderers=[ConsoleRenderer()]):
    ...

JsonRenderer

One structured JSON object per lifecycle event. Suitable for log aggregators (Datadog, CloudWatch, Loki):

from runtime_narrative import JsonRenderer

with story("My Pipeline", renderers=[JsonRenderer()]):
    ...

# Write to a file
with story("My Pipeline", renderers=[JsonRenderer(output=open("stories.jsonl", "a"))]):
    ...

On failure, FailureOccurred carries the full diagnostics payload: exact location, stack frame classifications, source snippet, local variables (rich mode), compressed stack summary, and traceback text.

RotatingJsonRenderer

Same as JsonRenderer but writes to a rotating log file using path.1 / path.2 semantics. No external dependencies:

from runtime_narrative import RotatingJsonRenderer

renderer = RotatingJsonRenderer(
    "stories.jsonl",
    max_bytes=10_485_760,  # rotate at 10 MB (default)
    backup_count=5,        # keep .1 through .5 (default)
)

HtmlReportRenderer

Writes a self-contained HTML report on StoryCompleted. Includes a story header, per-stage duration bar chart, and a failure detail section with traceback and optional LLM analysis:

from runtime_narrative import HtmlReportRenderer

with story("ETL Run", renderers=[HtmlReportRenderer("report.html", open_browser=True)]):
    ...

open_browser=True calls webbrowser.open() on the generated file after writing.

SqliteStoryRenderer

Persists all six lifecycle events to a SQLite database. No extra dependencies. See Persistence and CLI:

from runtime_narrative import SqliteStoryRenderer

with story("Nightly ETL", renderers=[SqliteStoryRenderer("stories.db")]):
    ...

OtelRenderer ([otel] extra)

Maps narrative events to OpenTelemetry spans. Each story is a root span; each stage is a child span:

from runtime_narrative import OtelRenderer

renderer = OtelRenderer(
    exclude_stages={"Health Check"},  # never create child spans for these stages
    min_duration_ms=5.0,              # suppress spans shorter than 5 ms
    max_attribute_length=8192,        # truncate long string attributes (default)
)

Attributes on failure spans: error.type, error.message, code.filepath, code.lineno, code.function, error.stack_trace, narrative.exception_chain. LLMAnalysisReady adds a llm_analysis_ready span event with narrative.llm_analysis.

exclude_stages stages that fail still mark the root span ERROR — only the child span is suppressed.

OtelLogRenderer ([otel] extra)

Emits all six lifecycle events as OpenTelemetry log records via opentelemetry._logs:

Event Severity
StoryStarted, StoryCompleted, LLMAnalysisReady INFO
StageStarted, StageCompleted DEBUG
FailureOccurred ERROR with error.type, code.filepath, error.stack_trace, etc.

Automatically correlates trace_id/span_id from the ambient OTel context so logs link to their enclosing spans:

from runtime_narrative import OtelLogRenderer

renderer = OtelLogRenderer(logger_name="my_app")

OtelMetricsRenderer ([otel] extra)

Emits four OTel instruments via the OpenTelemetry Metrics API:

Instrument Type Labels
narrative.stage.duration Histogram (s) story_name, stage_name
narrative.story.duration Histogram (s) story_name, success
narrative.story.failures Counter story_name, error_type
narrative.llm.analysis_latency Histogram (s) story_name

narrative.llm.analysis_latency measures the time between FailureOccurred and LLMAnalysisReady (only recorded when background_analysis=True):

from runtime_narrative import OtelMetricsRenderer

renderer = OtelMetricsRenderer(meter_name="my_app")

PrometheusRenderer ([prometheus] extra)

Exposes four Prometheus metrics:

Metric Type Labels
narrative_story_duration_seconds Histogram story_name, success
narrative_stage_duration_seconds Histogram story_name, stage_name
narrative_story_failures_total Counter story_name, error_type
narrative_story_total Counter story_name, success
from runtime_narrative import PrometheusRenderer
from prometheus_client import CollectorRegistry, start_http_server

registry = CollectorRegistry()
renderer = PrometheusRenderer(registry=registry)
start_http_server(8000, registry=registry)

Combining renderers

from runtime_narrative import story, JsonRenderer, SqliteStoryRenderer, OtelRenderer

with story("Nightly ETL", renderers=[
    JsonRenderer(),
    SqliteStoryRenderer("stories.db"),
    OtelRenderer(),
]):
    ...

Framework integrations

FastAPI / Starlette

RuntimeNarrativeMiddleware wraps every HTTP request in async with story(...). Route handlers only need to declare stages — no story() context required in handlers:

from fastapi import FastAPI
from runtime_narrative import RuntimeNarrativeMiddleware, JsonRenderer, AnthropicFailureAnalyzer

app = FastAPI()
app.add_middleware(
    RuntimeNarrativeMiddleware,
    renderers=[JsonRenderer()],
    failure_analyzer=AnthropicFailureAnalyzer(),
    runtime_environment="production",
)

@app.post("/orders")
async def create_order(payload: OrderIn):
    with stage("Validate Input"):
        validate(payload)
    with stage("Persist Order"):
        order = await db.insert(payload)
    return {"id": order.id}

Each request becomes a story named "METHOD /path" (e.g. "POST /orders"). When renderers is not passed, the middleware auto-selects ConsoleRenderer on a TTY and JsonRenderer otherwise.

When opentelemetry-api is installed, the middleware automatically extracts incoming W3C traceparent/tracestate headers so story spans become children of the upstream trace:

app.add_middleware(
    RuntimeNarrativeMiddleware,
    propagate_trace_context=True,   # default; set False to disable
)

Use skip_if to bypass story wrapping for specific routes (health checks, readiness probes, etc.):

app.add_middleware(
    RuntimeNarrativeMiddleware,
    renderers=[JsonRenderer()],
    skip_if=lambda req: req.url.path in {"/health", "/ready"},
)

Run: uv run python examples/middleware_skip_if.py

Django

ASGI (async):

# settings.py
MIDDLEWARE = [
    "runtime_narrative.middleware_django.RuntimeNarrativeDjangoMiddleware",
    ...
]

WSGI (sync):

# settings.py
MIDDLEWARE = [
    "runtime_narrative.middleware_django.RuntimeNarrativeDjangoSyncMiddleware",
    ...
]

Story name is "METHOD /path". Requires pip install "runtime-narrative[django]".

Celery

from celery import Celery
from runtime_narrative import NarrativeTask, connect_narrative, JsonRenderer

app = Celery("myapp")

# Option A — per task
@app.task(base=NarrativeTask)
def process_order(order_id: str) -> None:
    with stage("Validate"): validate(order_id)
    with stage("Charge"):   charge(order_id)

# Option B — set defaults for all tasks globally
connect_narrative(
    app,
    renderers=[JsonRenderer()],
    failure_analyzer=AnthropicFailureAnalyzer(),
)

Story name is "<task.name> [task_id=<id>]". Override options per task by setting narrative_* class attributes directly. Requires pip install "runtime-narrative[celery]".

gRPC

import grpc
from runtime_narrative import RuntimeNarrativeAsyncInterceptor, JsonRenderer

# Async server
server = grpc.aio.server(
    interceptors=[RuntimeNarrativeAsyncInterceptor(renderers=[JsonRenderer()])],
)

# Sync server
from runtime_narrative import RuntimeNarrativeInterceptor
server = grpc.server(
    thread_pool,
    interceptors=[RuntimeNarrativeInterceptor(renderers=[JsonRenderer()])],
)

Story name is the full gRPC method path, e.g. "/mypackage.MyService/MyMethod". Requires pip install "runtime-narrative[grpc]".


Async task groups

NarrativeTaskGroup runs concurrent asyncio tasks under a shared story. Tasks inherit the parent story context automatically via ContextVar copy, so stage() calls inside tasks are tracked normally:

import asyncio
from runtime_narrative import story, NarrativeTaskGroup, NarrativeTaskGroupError

async def main():
    async with story("Parallel Pipeline"):
        async with NarrativeTaskGroup() as tg:
            tg.create_task(fetch_orders(),    name="Fetch Orders")
            tg.create_task(fetch_inventory(), name="Fetch Inventory")
        # waits for both; stages from both appear in the timeline

asyncio.run(main())

If tasks fail, NarrativeTaskGroupError is raised with failed_tasks: dict[str, BaseException]:

try:
    async with NarrativeTaskGroup() as tg:
        tg.create_task(risky_job(), name="Risky Job")
except NarrativeTaskGroupError as e:
    for task_name, exc in e.failed_tasks.items():
        print(f"{task_name} failed: {exc}")

No extra dependencies. Python 3.9+.


Persistence and CLI

SqliteStoryRenderer records all six lifecycle events to a local SQLite database. No extra dependencies:

from runtime_narrative import story, SqliteStoryRenderer

with story("Nightly ETL", renderers=[SqliteStoryRenderer("stories.db")]):
    ...

Schema:

Table Key columns
stories story_id, name, success, duration_seconds, started_at, completed_at
stages story_id, stage_name, stage_index, parent_stage_name, duration_seconds, completed, failed
failures story_id, stage_name, error_type, error_message, filename, lineno, traceback_text, llm_analysis

LLMAnalysisReady back-fills the llm_analysis column in failures so background analysis results are persisted even when they arrive after StoryCompleted.

CLI (registered as runtime-narrative):

# List the 10 most recent failures
runtime-narrative failures --db stories.db

# Filter by stage name or story name (LIKE pattern)
runtime-narrative failures --last 25 --stage "Insert Records"
runtime-narrative failures --story "Nightly ETL"

# Show the full detail for one story
runtime-narrative story <story_id> --db stories.db

The --db flag defaults to runtime_narrative.db in the current directory.


Alert routing

AlertRoutingRenderer fans out FailureOccurred events to webhook destinations concurrently. Destination failures are logged to stderr and swallowed — they never crash the story:

from runtime_narrative import (
    story,
    AlertRoutingRenderer,
    SlackWebhookDestination,
    HttpWebhookDestination,
)

renderer = AlertRoutingRenderer(
    destinations=[
        SlackWebhookDestination("https://hooks.slack.com/services/..."),
        HttpWebhookDestination(
            "https://alerts.example.com/webhook",
            headers={"Authorization": "Bearer ..."},
            timeout=5.0,
        ),
    ],
    only_stories={"Nightly ETL", "Payment Processor"},  # None = all stories
    only_error_types={"ValueError", "TimeoutError"},    # None = all error types
)

async with story("Nightly ETL", renderers=[renderer]):
    ...

SlackWebhookDestination sends a Block Kit message with a header, error detail section, and an optional analysis section when llm_analysis is present.

HttpWebhookDestination POSTs a JSON payload containing: story_id, story_name, stage_name, error_type, error_message, filename, lineno, function, llm_analysis, timestamp.


dry_run mode

dry_run=True suppresses exceptions raised inside stage bodies, marks all stages completed, and emits StoryCompleted(success=True). Useful for smoke-testing instrumentation wiring without triggering real side effects:

with story("Nightly ETL", dry_run=True):
    with stage("Load Warehouse"):
        raise IOError("would connect to DB in prod")   # suppressed
    with stage("Transform"):
        raise RuntimeError("would run transforms")     # suppressed
# StoryCompleted(success=True) emitted for all stages

Testing utilities

StoryRecorder is a dual sync/async context manager that captures story events for assertions. No output is produced:

from runtime_narrative import stage
from runtime_narrative.testing import StoryRecorder

def test_pipeline_success():
    with StoryRecorder("ETL") as recorder:
        with stage("Load"):   rows = [1, 2, 3]
        with stage("Insert"): db.insert(rows)

    recorder.assert_stages_completed(["Load", "Insert"])
    recorder.assert_no_failure()
    recorder.assert_story_completed(success=True)

def test_invalid_input_fails_at_validate():
    import pytest

    with pytest.raises(ValueError):
        with StoryRecorder("ETL") as recorder:
            with stage("Load"):     pass
            with stage("Validate"): raise ValueError("bad schema")

    recorder.assert_stage_failed("Validate", error_type="ValueError")
    recorder.assert_story_completed(success=False)

Works as async with StoryRecorder(...) too. Pass any story() kwargs including dry_run=True:

with StoryRecorder("ETL", dry_run=True) as recorder:
    run_pipeline()   # side effects suppressed

recorder.assert_stages_completed(["Load", "Validate", "Insert"])
recorder.assert_no_failure()

Assertion methods:

Method What it checks
assert_stages_completed(names) All named stages appear in StageCompleted events
assert_no_failure() No FailureOccurred event was emitted
assert_stage_failed(name, error_type=None) A FailureOccurred event at that stage name; optionally checks error_type
assert_story_completed(success=None) A StoryCompleted event was emitted; optionally checks the success flag

Custom renderers and analyzers

Custom renderer

Implement handle(self, event: object). Async renderers (async def handle) are awaited inside async with story(...):

class PagerDutyRenderer:
    async def handle(self, event: object) -> None:
        if type(event).__name__ == "FailureOccurred":
            await pagerduty.trigger(
                summary=f"{event.story_name} failed at {event.stage_name}",
                details={"error": event.error_type, "analysis": event.llm_analysis},
            )

async with story("Nightly ETL", renderers=[PagerDutyRenderer()]):
    ...

Six event types are emitted. Key fields on each:

Event Key fields
StoryStarted story_id, story_name, timestamp
StageStarted story_id, story_name, stage_name, timestamp, stage_index (0-based), parent_stage_name
StageCompleted story_id, story_name, stage_name, timestamp, duration_seconds, stage_index, parent_stage_name
FailureOccurred story_id, story_name, stage_name, error_type, error_message, filename, lineno, function, traceback_text, exception_chain, exact_cause, stage_timeline, progress_percent, llm_analysis, diagnostics_mode, stack_frames, source_snippet, compressed_stack_summary, locals_by_frame
StoryCompleted story_id, story_name, success, progress_percent, completed_stages, total_stages, timestamp
LLMAnalysisReady story_id, story_name, stage_name, llm_analysis, timestamp

parent_stage_name is None for top-level stages and set to the enclosing stage name for nested stages. story_name on stage events lets a renderer filter by story without a story_id → story_name side table (run: uv run python examples/stage_story_name.py).

Custom failure analyzer

Implement analyze_failure(...). Add analyze_failure_async(...) for native async — otherwise the sync method is called via asyncio.to_thread:

class MyAnalyzer:
    async def analyze_failure_async(
        self,
        *,
        story_name: str,
        stage_name: str,
        failure,           # FailureSummary: .error_type, .error_message, .filename,
                           #                 .lineno, .function, .source_line,
                           #                 .traceback_text, .exception_chain
        stage_timeline: str,
        progress_percent: int,
    ) -> str | None:
        result = await my_llm_client.complete(build_prompt(failure))
        return result.text

    def analyze_failure(self, *, story_name, stage_name, failure, stage_timeline, progress_percent):
        # sync fallback used when called from sync story()
        return requests.post(...).json()["text"]

with story("Import", failure_analyzer=MyAnalyzer()):
    ...

Utilities

has_active_story()

Returns True when a story() context is active in the current sync or async context. Useful for library code that should behave differently when called under instrumentation:

from runtime_narrative import has_active_story

def send_email(to: str, body: str) -> None:
    if has_active_story():
        # stage() is safe here
        with stage("Send Email", optional=True):
            _send(to, body)
    else:
        _send(to, body)

stage(optional=True)

When optional=True, a stage() outside an active story is a no-op — no exception, no events, no tracking. When inside a story it behaves normally. Ideal for shared utilities:

from runtime_narrative import stage

def enrich_record(record: dict) -> dict:
    with stage("Enrich Record", optional=True):
        return _lookup(record)
    return record   # reached only when no story active

Run: uv run python examples/optional_stage.py

StoryRuntime.record_failure()

Emits a FailureOccurred event (with full diagnostics) without owning exception propagation. Use this in saga/rollback flows where a compensating action fails but you want the story to complete successfully:

async with story("Payment Saga", renderers=[JsonRenderer()]) as runtime:
    async with stage("Charge Card"):
        charge_id = await charge(order)

    try:
        async with stage("Reserve Inventory"):
            await reserve(order)
    except InventoryError as exc:
        async with stage("Refund Charge"):
            await refund(charge_id)
        await runtime.record_failure(exc, stage_name="Reserve Inventory")
        # FailureOccurred emitted; story still completes success=True

Run: uv run python examples/saga_record_failure.py


Sub-stories and log capture

Sub-stories

Opening a story() while another is already active (in the same sync/async context) makes it a sub-story: it inherits the parent's renderers, diagnostics_config, and failure_analyzer unless you pass your own, and its StoryStarted/StoryCompleted/FailureOccurred events carry parent_story_id and root_story_id so the whole call tree can be reconstructed from events alone — no new API, no tree data structure to maintain yourself:

async def create_order(payload):
    async with story(f"POST /orders") as api_story:
        async with stage("Persist Order"):
            # Same story() primitive. Because api_story is already active,
            # this becomes a sub-story: parent_story_id == api_story.story_id,
            # root_story_id == api_story.story_id (or further up if api_story
            # is itself nested), and renderers/diagnostics are inherited.
            async with story("DB: INSERT orders") as db_story:
                async with stage("Execute Query"):
                    await conn.execute("INSERT INTO orders ...")

Each sub-story succeeds or fails independently (a failed DB call doesn't automatically fail the API story unless the exception propagates or you call record_failure), and gets its own duration_seconds on StoryCompleted. OtelRenderer maps this to proper parent/child spans automatically.

Because linkage is derived from ContextVar state at the moment story() is entered — not from a shared registry — the same reusable function (e.g. a execute_query() helper) can be called from many different parent stories, including concurrently: asyncio.Task copies context at creation and each OS thread starts with a fresh top-level context, so concurrent API calls sharing one DB helper never cross-contaminate each other's story tree.

Run: uv run python examples/substory_db_call.py

NarrativeLogHandler — capture existing logging calls into a story

If your application already uses logging.warning()/.error(), NarrativeLogHandler routes those records into the same event pipeline as story()/stage() — one stream instead of two:

import logging
from runtime_narrative import NarrativeLogHandler

logging.getLogger().addHandler(NarrativeLogHandler(level=logging.WARNING))

Each captured record becomes a LogRecorded event (story_id, root_story_id, stage_name, level, logger_name, message, optional exc_text) emitted through the active story's renderers. Outside an active story, records fall through to an optional fallback handler so nothing is silently dropped:

NarrativeLogHandler(level=logging.WARNING, fallback=logging.StreamHandler())

ConsoleRenderer prints every event (including LogRecorded) with a [short_id] tag — the first 6 characters of that event's story_id — so a specific call is identifiable when scanning or searching output. All events belonging to one story family (a root story and any sub-stories) additionally render in the same deterministic color, and lines are indented one level per stage/sub-story nesting depth, so the call tree (API call → DB sub-story → its own stages) is visible directly in the log output:

[ad8cc2] ▶ Story started: POST /orders
  [ad8cc2] ▶ Stage started: Persist Order
    [d17c63] ▶ Story started: DB: INSERT orders
      [d17c63] ▶ Stage started: Execute Query
      [d17c63] ✔ Stage completed: Execute Query (0.021s)
    [d17c63] ▶ Story ended: SUCCESS (0.034s)
  [ad8cc2] ✔ Stage completed: Persist Order (0.034s)
[ad8cc2] ▶ Story ended: SUCCESS (0.052s)

Run: uv run python examples/logging_bridge.py


Environment variables

Variable Values Default Effect
RUNTIME_NARRATIVE_ENV development, production development Production caps tracebacks to 8 000 chars and forces lean mode
RUNTIME_NARRATIVE_FAILURE_DIAGNOSTICS lean, rich lean rich captures local variable values at the failing frames. Invalid values raise ValueError at story construction.
RUNTIME_NARRATIVE_ALLOW_RICH_IN_PRODUCTION 1, true off Bypass production safeguard; allow rich diagnostics in production
RUNTIME_NARRATIVE_MODEL model name string Default model for AnthropicFailureAnalyzer; also used by example scripts for OllamaFailureAnalyzer / LLMFailureAnalyzer
ANTHROPIC_API_KEY API key Required by AnthropicFailureAnalyzer; read automatically if api_key= is not passed

Python compatibility

Python 3.9 – 3.13. Zero required dependencies beyond python-dotenv.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

runtime_narrative-1.1.0.tar.gz (110.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

runtime_narrative-1.1.0-py3-none-any.whl (66.4 kB view details)

Uploaded Python 3

File details

Details for the file runtime_narrative-1.1.0.tar.gz.

File metadata

  • Download URL: runtime_narrative-1.1.0.tar.gz
  • Upload date:
  • Size: 110.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.0

File hashes

Hashes for runtime_narrative-1.1.0.tar.gz
Algorithm Hash digest
SHA256 717d486ecf4936968267f0c7bdf88b50e33eb1fa70025ff149f1ed6a02199ce9
MD5 eafc0666cfa542d6d9f2766d696e9999
BLAKE2b-256 e27baa8f86a2d44217455d65d93630d89bc80e63a0cb1756d14e52c94dcfa5ec

See more details on using hashes here.

File details

Details for the file runtime_narrative-1.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for runtime_narrative-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 14bc33ae8ca8449520b8a62540ea9989af56676e4bdc53c6b5d35d09d59e41b3
MD5 22e95ad509e3ebd81da8eaeb8d7626c0
BLAKE2b-256 a8019f80153fd33d34fd6fed4e1c9d37e6ad979104e4983deab5249bfcab3be8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page