Skip to main content

The framework handles the graph. Stroma handles the guarantees.

Project description

Stroma

dbt didn't replace your data warehouse. Stroma doesn't replace your agent framework.

dbt gave you typed models, tested transformations, and documented lineage — a software engineering layer that worked regardless of which warehouse you were running. Stroma does the same thing for agent execution graphs: typed node contracts, formal failure classification, and cost-aware execution — portable across whatever orchestration framework you're building on.

The framework handles the graph. Stroma handles the guarantees.

The problem it solves

LLM pipelines fail in ways that traditional software doesn't. A node returns malformed data and the error surfaces three steps later. A transient timeout kills a 20-minute run and you start over from scratch. Costs spiral past budget with no enforcement mechanism. Failures are silent until they're catastrophic.

Stroma gives you the building blocks to handle this — without locking you into a framework.

See it in action

The scenario below is the kind that breaks raw LangGraph pipelines: a multi-step run that crashes midway, resumes from checkpoint, and gives you a diff of what changed between the failed and successful run.

import asyncio
from pydantic import BaseModel
from stroma import (
    AsyncInMemoryStore,
    CheckpointManager,
    ContractRegistry,
    NodeContract,
    RunConfig,
    StromaRunner,
    stroma_node,
)


class Document(BaseModel):
    text: str


class Extracted(BaseModel):
    entities: list[str]


class Summary(BaseModel):
    entities: list[str]
    count: int


registry = ContractRegistry()
store = AsyncInMemoryStore()
manager = CheckpointManager(store)

c1 = NodeContract(node_id="extract", input_schema=Document, output_schema=Extracted)
c2 = NodeContract(node_id="summarize", input_schema=Extracted, output_schema=Summary)
registry.register(c1)
registry.register(c2)


@stroma_node("extract", c1)
async def extract(state: Document) -> dict:
    return {"entities": state.text.split()}


@stroma_node("summarize", c2)
async def summarize_failing(state: Extracted) -> dict:
    raise TimeoutError("downstream API unavailable")  # (1)!


@stroma_node("summarize", c2)
async def summarize_fixed(state: Extracted) -> dict:
    return {"entities": state.entities, "count": len(state.entities)}


async def main():
    config1 = RunConfig(run_id="doc-run-1")
    runner1 = StromaRunner(registry, manager, config1)
    result1 = await runner1.run(
        [extract, summarize_failing],
        Document(text="Stroma adds reliability to agent pipelines"),
    )
    print(result1.status)  # PARTIAL — extract checkpointed, summarize exhausted retries

    config2 = RunConfig(run_id="doc-run-1", resume_from="summarize")  # (2)!
    runner2 = StromaRunner(registry, manager, config2)
    result2 = await runner2.run(
        [extract, summarize_fixed],
        Document(text="Stroma adds reliability to agent pipelines"),
    )
    print(result2.status)       # RESUMED — extract skipped, loaded from checkpoint
    print(result2.final_state)  # entities=[...] count=6

    diffs = result1.trace.diff(result2.trace)  # (3)!
    for d in diffs:
        print(d)


asyncio.run(main())
# In a Jupyter notebook, replace the line above with: await main()
  1. TimeoutError is classified as RECOVERABLE. Stroma retries with jittered backoff. After exhausting retries, the run fails — but extract's output is already checkpointed.
  2. Same run_id, resume_from="summarize". The runner loads extract's checkpoint and skips re-running it entirely.
  3. diff() compares both traces — node IDs, attempts, inputs, outputs, failure states — so you can see exactly what changed between the failed run and the successful one.

What You Get

  • Contracts — Pydantic-based input/output validation at every node boundary
  • Failure classification — automatic categorization of errors as recoverable, terminal, or ambiguous
  • Retry policies — configurable retries with jittered backoff, per failure class or per node
  • Checkpointing — async-first save and resume across crashes (in-memory or Redis)
  • Cost estimation — model-aware USD cost tracking via KNOWN_MODELS and token/dollar/latency budgets
  • Per-node timeouts — configurable node_timeouts with asyncio.wait_for; timeouts are classified as recoverable and retried automatically
  • Parallel execution — fan out work to concurrent nodes with parallel(), per-child contract validation, merged output, and full retry support
  • Node hooks — async on_node_start, on_node_success, and on_node_failure callbacks
  • Shared context — pass a mutable context dict through RunConfig to every node
  • Execution tracing — full record of every node attempt, with diffing and JSON export
  • Per-run logging — structured LoggerAdapter with run_id in every log line
  • Fluent builder API — configure runners with chained .with_budget(), .with_hooks(), .with_redis(), etc.
  • LangGraph adapter — apply contracts to existing LangGraph graphs
  • CrewAI adapter — contract validation for CrewAI Flow methods
  • DeepAgents adapter — contract validation and cost tracking for deepagents graphs
  • Universal reliability middlewareexecute_step() and StromaStep apply contracts, retries, cost tracking, and checkpointing to any async callable, independent of any framework
  • Framework-agnostic — works with any async Python code, no framework lock-in

Install

Requires Python 3.12+.

uv add stroma

Optional extras:

uv add stroma[redis]       # Redis-backed checkpointing
uv add stroma[langgraph]   # LangGraph adapter
uv add stroma[crewai]      # CrewAI adapter
uv add stroma[deepagents]  # DeepAgents adapter

Quick Examples

Cost estimation

Nodes can return token counts and a model name. Stroma computes USD cost automatically from built-in pricing data:

@runner.node("summarize", input=DocInput, output=Summary)
async def summarize(state: DocInput) -> tuple:
    # call your LLM here...
    return ({"text": response}, input_tokens, output_tokens, "gpt-4o")

Parallel execution

Run independent nodes concurrently and merge their outputs:

from stroma import parallel

result = await runner.run(
    [parallel(fetch_metadata, fetch_embeddings), merge_node],
    initial_state,
)

Node hooks

Attach lifecycle callbacks to observe node execution:

from stroma import NodeHooks, RunConfig

async def on_start(run_id, node_id, input_dict):
    print(f"Starting {node_id}")

config = RunConfig(hooks=NodeHooks(on_node_start=on_start))

Shared context

Pass runtime configuration to nodes that accept a second argument:

@runner.node("enrich", input=Input, output=Output)
async def enrich(state: Input, ctx: dict) -> dict:
    api_key = ctx["api_key"]
    # ...

config = RunConfig(context={"api_key": "sk-..."})

Per-node retry policies

Override the global retry policy for specific nodes:

from stroma import FailureClass, FailurePolicy

config = RunConfig(
    node_policies={
        "flaky_node": {
            FailureClass.RECOVERABLE: FailurePolicy(max_retries=5, backoff_seconds=2.0),
        }
    }
)

Per-node timeouts

Guard against hanging LLM calls with per-node timeouts. Timeouts raise TimeoutError, which is classified as recoverable and retried automatically:

runner = StromaRunner.quick().with_node_timeouts({
    "llm_call": 30_000,   # 30 seconds
    "embedding": 10_000,  # 10 seconds
})

Async checkpointing

The default store is now async. For distributed pipelines, use the async Redis store:

from stroma import RedisStore, CheckpointManager

store = RedisStore("redis://localhost:6379", ttl_seconds=7200)
manager = CheckpointManager(store)

The original synchronous Redis store is still available as SyncRedisStore.

Documentation

Full documentation including a tutorial and API reference is available at the docs site.

Development

uv sync --extra dev
uv run pytest tests/ -v --cov=stroma --cov-fail-under=85

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stroma-0.3.2.tar.gz (41.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stroma-0.3.2-py3-none-any.whl (34.9 kB view details)

Uploaded Python 3

File details

Details for the file stroma-0.3.2.tar.gz.

File metadata

  • Download URL: stroma-0.3.2.tar.gz
  • Upload date:
  • Size: 41.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for stroma-0.3.2.tar.gz
Algorithm Hash digest
SHA256 a3bd1c8e2c4dc4f6cecac823522fa9ff2b26323a1ffc0610c0e09700f78e839a
MD5 ad7e693d9a2c7e1ebf27ca8f85059332
BLAKE2b-256 45ef0f6828634c62a6da0e3309325458fe8ab6d7b04f4ccde1c809b4868ef2b4

See more details on using hashes here.

Provenance

The following attestation bundles were made for stroma-0.3.2.tar.gz:

Publisher: workflow.yml on jengroff/stroma

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file stroma-0.3.2-py3-none-any.whl.

File metadata

  • Download URL: stroma-0.3.2-py3-none-any.whl
  • Upload date:
  • Size: 34.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for stroma-0.3.2-py3-none-any.whl
Algorithm Hash digest
SHA256 e293cfec8e23e99cf6b91010f8d2ca13cc6ff9893b801c3e27d3df34447ee95b
MD5 f7cf73a4504fde0cb88b0dd77fd27ce4
BLAKE2b-256 a1c876e466d8c4e99d70fae5d3f37cb08be3969aedde96d7d5d4a17b4a906912

See more details on using hashes here.

Provenance

The following attestation bundles were made for stroma-0.3.2-py3-none-any.whl:

Publisher: workflow.yml on jengroff/stroma

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page