Reliability primitives for agent pipelines.
Project description
Stroma
dbt didn't replace your data warehouse. Stroma doesn't replace your agent framework.
Typed node contracts, formal failure classification, and cost-aware execution — portable across whatever orchestration framework you're building on. The framework handles the graph. Stroma handles the guarantees.
from pydantic import BaseModel
from stroma import StromaRunner
class Input(BaseModel):
value: int
class Output(BaseModel):
result: int
runner = StromaRunner.quick()
@runner.node("double", input=Input, output=Output)
async def double(state: Input) -> dict:
return {"result": state.value * 2}
result = await runner.run([double], Input(value=5))
print(result.final_state) # result=10
Install
Requires Python 3.12+.
uv add stroma
Optional extras:
uv add stroma[redis] # Redis-backed checkpointing
uv add stroma[langgraph] # LangGraph adapter
uv add stroma[deepagents] # DeepAgents adapter
What You Get
- Contracts — Pydantic-based input/output validation at every node boundary
- Failure classification — automatic categorization of errors as recoverable, terminal, or ambiguous
- Retry policies — configurable retries with jittered backoff, per failure class or per node
- Checkpointing — async-first save and resume across crashes (in-memory or Redis)
- Cost estimation — model-aware USD cost tracking via
KNOWN_MODELSand token/dollar/latency budgets - Per-node timeouts — configurable
node_timeoutswithasyncio.wait_for; timeouts are classified as recoverable and retried automatically - Parallel execution — fan out work to concurrent nodes with
parallel(), per-child contract validation, merged output, and full retry support - Node hooks — async
on_node_start,on_node_success, andon_node_failurecallbacks - Shared context — pass a mutable
contextdict throughRunConfigto every node - Execution tracing — full record of every node attempt, with diffing and JSON export
- Per-run logging — structured
LoggerAdapterwithrun_idin every log line - Fluent builder API — configure runners with chained
.with_budget(),.with_hooks(),.with_redis(), etc. - LangGraph adapter — apply contracts to existing LangGraph graphs
- DeepAgents adapter — contract validation and cost tracking for deepagents graphs
- Framework-agnostic — works with any async Python code, no framework lock-in
Quick Examples
Cost estimation
Nodes can return token counts and a model name. Stroma computes USD cost automatically from built-in pricing data:
@runner.node("summarize", input=DocInput, output=Summary)
async def summarize(state: DocInput) -> tuple:
# call your LLM here...
return ({"text": response}, input_tokens, output_tokens, "gpt-4o")
Parallel execution
Run independent nodes concurrently and merge their outputs:
from stroma import parallel
result = await runner.run(
[parallel(fetch_metadata, fetch_embeddings), merge_node],
initial_state,
)
Node hooks
Attach lifecycle callbacks to observe node execution:
from stroma import NodeHooks, RunConfig
async def on_start(run_id, node_id, input_dict):
print(f"Starting {node_id}")
config = RunConfig(hooks=NodeHooks(on_node_start=on_start))
Shared context
Pass runtime configuration to nodes that accept a second argument:
@runner.node("enrich", input=Input, output=Output)
async def enrich(state: Input, ctx: dict) -> dict:
api_key = ctx["api_key"]
# ...
config = RunConfig(context={"api_key": "sk-..."})
Per-node retry policies
Override the global retry policy for specific nodes:
from stroma import FailureClass, FailurePolicy
config = RunConfig(
node_policies={
"flaky_node": {
FailureClass.RECOVERABLE: FailurePolicy(max_retries=5, backoff_seconds=2.0),
}
}
)
Per-node timeouts
Guard against hanging LLM calls with per-node timeouts. Timeouts raise TimeoutError, which is classified as recoverable and retried automatically:
runner = StromaRunner.quick().with_node_timeouts({
"llm_call": 30_000, # 30 seconds
"embedding": 10_000, # 10 seconds
})
Async checkpointing
The default store is now async. For distributed pipelines, use the async Redis store:
from stroma import RedisStore, CheckpointManager
store = RedisStore("redis://localhost:6379", ttl_seconds=7200)
manager = CheckpointManager(store)
The original synchronous Redis store is still available as SyncRedisStore.
Documentation
Full documentation including a tutorial and API reference is available at the docs site.
Development
uv sync --extra dev
uv run pytest tests/ -v --cov=stroma --cov-fail-under=85
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file stroma-0.3.0.tar.gz.
File metadata
- Download URL: stroma-0.3.0.tar.gz
- Upload date:
- Size: 32.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0a4c21de6b1cb09b0185ec483189d7ebc8d9e4dc942f93e6fec28be42a02fc08
|
|
| MD5 |
cf6083405cfd30a1f2293dd0bf34100a
|
|
| BLAKE2b-256 |
640b0285ed070601d6b2f31ed3c59df67d95b55e8e2e977bf30b5f54887e026d
|
Provenance
The following attestation bundles were made for stroma-0.3.0.tar.gz:
Publisher:
workflow.yml on jengroff/stroma
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
stroma-0.3.0.tar.gz -
Subject digest:
0a4c21de6b1cb09b0185ec483189d7ebc8d9e4dc942f93e6fec28be42a02fc08 - Sigstore transparency entry: 1239365185
- Sigstore integration time:
-
Permalink:
jengroff/stroma@1d03cd8c54f5a0ea813b850781a1e34a17f97e2c -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/jengroff
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
workflow.yml@1d03cd8c54f5a0ea813b850781a1e34a17f97e2c -
Trigger Event:
release
-
Statement type:
File details
Details for the file stroma-0.3.0-py3-none-any.whl.
File metadata
- Download URL: stroma-0.3.0-py3-none-any.whl
- Upload date:
- Size: 26.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6067625ca71948d4d3ac86deacd0379bf1c7dea05d29f5f5145a5de07862ebc9
|
|
| MD5 |
2f8579d45b6754eab2278db59fd656f6
|
|
| BLAKE2b-256 |
d048c47fb50ae83609563523045246d305a0f84f0bf79bfe964190d4ec72160f
|
Provenance
The following attestation bundles were made for stroma-0.3.0-py3-none-any.whl:
Publisher:
workflow.yml on jengroff/stroma
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
stroma-0.3.0-py3-none-any.whl -
Subject digest:
6067625ca71948d4d3ac86deacd0379bf1c7dea05d29f5f5145a5de07862ebc9 - Sigstore transparency entry: 1239365187
- Sigstore integration time:
-
Permalink:
jengroff/stroma@1d03cd8c54f5a0ea813b850781a1e34a17f97e2c -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/jengroff
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
workflow.yml@1d03cd8c54f5a0ea813b850781a1e34a17f97e2c -
Trigger Event:
release
-
Statement type: