The framework handles the graph. Stroma handles the guarantees.
Project description
Stroma
dbt didn't replace your data warehouse. Stroma doesn't replace your agent framework.
dbt gave you typed models, tested transformations, and documented lineage — a software engineering layer that worked regardless of which warehouse you were running. Stroma does the same thing for agent execution graphs: typed node contracts, formal failure classification, and cost-aware execution — portable across whatever orchestration framework you're building on.
The framework handles the graph. Stroma handles the guarantees.
The problem it solves
LLM pipelines fail in ways that traditional software doesn't. A node returns malformed data and the error surfaces three steps later. A transient timeout kills a 20-minute run and you start over from scratch. Costs spiral past budget with no enforcement mechanism. Failures are silent until they're catastrophic.
Stroma gives you the building blocks to handle this — without locking you into a framework.
See it in action
The scenario below is the kind that breaks raw LangGraph pipelines: a multi-step run that crashes midway, resumes from checkpoint, and gives you a diff of what changed between the failed and successful run.
import asyncio
from pydantic import BaseModel
from stroma import (
AsyncInMemoryStore,
CheckpointManager,
ContractRegistry,
NodeContract,
RunConfig,
StromaRunner,
stroma_node,
)
class Document(BaseModel):
text: str
class Extracted(BaseModel):
entities: list[str]
class Summary(BaseModel):
entities: list[str]
count: int
registry = ContractRegistry()
store = AsyncInMemoryStore()
manager = CheckpointManager(store)
c1 = NodeContract(node_id="extract", input_schema=Document, output_schema=Extracted)
c2 = NodeContract(node_id="summarize", input_schema=Extracted, output_schema=Summary)
registry.register(c1)
registry.register(c2)
@stroma_node("extract", c1)
async def extract(state: Document) -> dict:
return {"entities": state.text.split()}
@stroma_node("summarize", c2)
async def summarize_failing(state: Extracted) -> dict:
raise TimeoutError("downstream API unavailable") # (1)!
@stroma_node("summarize", c2)
async def summarize_fixed(state: Extracted) -> dict:
return {"entities": state.entities, "count": len(state.entities)}
async def main():
config1 = RunConfig(run_id="doc-run-1")
runner1 = StromaRunner(registry, manager, config1)
result1 = await runner1.run(
[extract, summarize_failing],
Document(text="Stroma adds reliability to agent pipelines"),
)
print(result1.status) # PARTIAL — extract checkpointed, summarize exhausted retries
config2 = RunConfig(run_id="doc-run-1", resume_from="summarize") # (2)!
runner2 = StromaRunner(registry, manager, config2)
result2 = await runner2.run(
[extract, summarize_fixed],
Document(text="Stroma adds reliability to agent pipelines"),
)
print(result2.status) # RESUMED — extract skipped, loaded from checkpoint
print(result2.final_state) # entities=[...] count=6
diffs = result1.trace.diff(result2.trace) # (3)!
for d in diffs:
print(d)
asyncio.run(main())
# In a Jupyter notebook, replace the line above with: await main()
TimeoutErroris classified asRECOVERABLE. Stroma retries with jittered backoff. After exhausting retries, the run fails — butextract's output is already checkpointed.- Same
run_id,resume_from="summarize". The runner loadsextract's checkpoint and skips re-running it entirely. diff()compares both traces — node IDs, attempts, inputs, outputs, failure states — so you can see exactly what changed between the failed run and the successful one.
What You Get
- Contracts — Pydantic-based input/output validation at every node boundary
- Failure classification — automatic categorization of errors as recoverable, terminal, or ambiguous
- Retry policies — configurable retries with jittered backoff, per failure class or per node
- Checkpointing — async-first save and resume across crashes (in-memory or Redis)
- Cost estimation — model-aware USD cost tracking via
KNOWN_MODELSand token/dollar/latency budgets - Per-node timeouts — configurable
node_timeoutswithasyncio.wait_for; timeouts are classified as recoverable and retried automatically - Parallel execution — fan out work to concurrent nodes with
parallel(), per-child contract validation, merged output, and full retry support - Node hooks — async
on_node_start,on_node_success, andon_node_failurecallbacks - Shared context — pass a mutable
contextdict throughRunConfigto every node - Execution tracing — full record of every node attempt, with diffing and JSON export
- Per-run logging — structured
LoggerAdapterwithrun_idin every log line - Fluent builder API — configure runners with chained
.with_budget(),.with_hooks(),.with_redis(), etc. - LangGraph adapter — apply contracts to existing LangGraph graphs
- CrewAI adapter — contract validation for CrewAI Flow methods
- DeepAgents adapter — contract validation and cost tracking for deepagents graphs
- Universal reliability middleware —
execute_step()andStromaStepapply contracts, retries, cost tracking, and checkpointing to any async callable, independent of any framework - Framework-agnostic — works with any async Python code, no framework lock-in
Install
Requires Python 3.12+.
uv add stroma
Optional extras:
uv add stroma[redis] # Redis-backed checkpointing
uv add stroma[langgraph] # LangGraph adapter
uv add stroma[crewai] # CrewAI adapter
uv add stroma[deepagents] # DeepAgents adapter
Quick Examples
Cost estimation
Nodes can return token counts and a model name. Stroma computes USD cost automatically from built-in pricing data:
@runner.node("summarize", input=DocInput, output=Summary)
async def summarize(state: DocInput) -> tuple:
# call your LLM here...
return ({"text": response}, input_tokens, output_tokens, "gpt-4o")
Parallel execution
Run independent nodes concurrently and merge their outputs:
from stroma import parallel
result = await runner.run(
[parallel(fetch_metadata, fetch_embeddings), merge_node],
initial_state,
)
Node hooks
Attach lifecycle callbacks to observe node execution:
from stroma import NodeHooks, RunConfig
async def on_start(run_id, node_id, input_dict):
print(f"Starting {node_id}")
config = RunConfig(hooks=NodeHooks(on_node_start=on_start))
Shared context
Pass runtime configuration to nodes that accept a second argument:
@runner.node("enrich", input=Input, output=Output)
async def enrich(state: Input, ctx: dict) -> dict:
api_key = ctx["api_key"]
# ...
config = RunConfig(context={"api_key": "sk-..."})
Per-node retry policies
Override the global retry policy for specific nodes:
from stroma import FailureClass, FailurePolicy
config = RunConfig(
node_policies={
"flaky_node": {
FailureClass.RECOVERABLE: FailurePolicy(max_retries=5, backoff_seconds=2.0),
}
}
)
Per-node timeouts
Guard against hanging LLM calls with per-node timeouts. Timeouts raise TimeoutError, which is classified as recoverable and retried automatically:
runner = StromaRunner.quick().with_node_timeouts({
"llm_call": 30_000, # 30 seconds
"embedding": 10_000, # 10 seconds
})
Async checkpointing
The default store is now async. For distributed pipelines, use the async Redis store:
from stroma import RedisStore, CheckpointManager
store = RedisStore("redis://localhost:6379", ttl_seconds=7200)
manager = CheckpointManager(store)
The original synchronous Redis store is still available as SyncRedisStore.
Documentation
Full documentation including a tutorial and API reference is available at the docs site.
Development
uv sync --extra dev
uv run pytest tests/ -v --cov=stroma --cov-fail-under=85
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file stroma-0.3.2.tar.gz.
File metadata
- Download URL: stroma-0.3.2.tar.gz
- Upload date:
- Size: 41.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a3bd1c8e2c4dc4f6cecac823522fa9ff2b26323a1ffc0610c0e09700f78e839a
|
|
| MD5 |
ad7e693d9a2c7e1ebf27ca8f85059332
|
|
| BLAKE2b-256 |
45ef0f6828634c62a6da0e3309325458fe8ab6d7b04f4ccde1c809b4868ef2b4
|
Provenance
The following attestation bundles were made for stroma-0.3.2.tar.gz:
Publisher:
workflow.yml on jengroff/stroma
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
stroma-0.3.2.tar.gz -
Subject digest:
a3bd1c8e2c4dc4f6cecac823522fa9ff2b26323a1ffc0610c0e09700f78e839a - Sigstore transparency entry: 1240993666
- Sigstore integration time:
-
Permalink:
jengroff/stroma@1dc0e20706696e263fab04625a368ae99b37a402 -
Branch / Tag:
refs/tags/v0.3.2 - Owner: https://github.com/jengroff
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
workflow.yml@1dc0e20706696e263fab04625a368ae99b37a402 -
Trigger Event:
release
-
Statement type:
File details
Details for the file stroma-0.3.2-py3-none-any.whl.
File metadata
- Download URL: stroma-0.3.2-py3-none-any.whl
- Upload date:
- Size: 34.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e293cfec8e23e99cf6b91010f8d2ca13cc6ff9893b801c3e27d3df34447ee95b
|
|
| MD5 |
f7cf73a4504fde0cb88b0dd77fd27ce4
|
|
| BLAKE2b-256 |
a1c876e466d8c4e99d70fae5d3f37cb08be3969aedde96d7d5d4a17b4a906912
|
Provenance
The following attestation bundles were made for stroma-0.3.2-py3-none-any.whl:
Publisher:
workflow.yml on jengroff/stroma
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
stroma-0.3.2-py3-none-any.whl -
Subject digest:
e293cfec8e23e99cf6b91010f8d2ca13cc6ff9893b801c3e27d3df34447ee95b - Sigstore transparency entry: 1240993714
- Sigstore integration time:
-
Permalink:
jengroff/stroma@1dc0e20706696e263fab04625a368ae99b37a402 -
Branch / Tag:
refs/tags/v0.3.2 - Owner: https://github.com/jengroff
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
workflow.yml@1dc0e20706696e263fab04625a368ae99b37a402 -
Trigger Event:
release
-
Statement type: