Skip to main content

Reliability primitives for agent pipelines.

Project description

Stroma

dbt didn't replace your data warehouse. Stroma doesn't replace your agent framework.

Typed node contracts, formal failure classification, and cost-aware execution — portable across whatever orchestration framework you're building on. The framework handles the graph. Stroma handles the guarantees.

from pydantic import BaseModel
from stroma import StromaRunner

class Input(BaseModel):
    value: int

class Output(BaseModel):
    result: int

runner = StromaRunner.quick()

@runner.node("double", input=Input, output=Output)
async def double(state: Input) -> dict:
    return {"result": state.value * 2}

result = await runner.run([double], Input(value=5))
print(result.final_state)  # result=10

Install

Requires Python 3.12+.

uv add stroma

Optional extras:

uv add stroma[redis]       # Redis-backed checkpointing
uv add stroma[langgraph]   # LangGraph adapter
uv add stroma[deepagents]  # DeepAgents adapter

What You Get

  • Contracts — Pydantic-based input/output validation at every node boundary
  • Failure classification — automatic categorization of errors as recoverable, terminal, or ambiguous
  • Retry policies — configurable retries with jittered backoff, per failure class or per node
  • Checkpointing — async-first save and resume across crashes (in-memory or Redis)
  • Cost estimation — model-aware USD cost tracking via KNOWN_MODELS and token/dollar/latency budgets
  • Per-node timeouts — configurable node_timeouts with asyncio.wait_for; timeouts are classified as recoverable and retried automatically
  • Parallel execution — fan out work to concurrent nodes with parallel(), per-child contract validation, merged output, and full retry support
  • Node hooks — async on_node_start, on_node_success, and on_node_failure callbacks
  • Shared context — pass a mutable context dict through RunConfig to every node
  • Execution tracing — full record of every node attempt, with diffing and JSON export
  • Per-run logging — structured LoggerAdapter with run_id in every log line
  • Fluent builder API — configure runners with chained .with_budget(), .with_hooks(), .with_redis(), etc.
  • LangGraph adapter — apply contracts to existing LangGraph graphs
  • DeepAgents adapter — contract validation and cost tracking for deepagents graphs
  • Framework-agnostic — works with any async Python code, no framework lock-in

Quick Examples

Cost estimation

Nodes can return token counts and a model name. Stroma computes USD cost automatically from built-in pricing data:

@runner.node("summarize", input=DocInput, output=Summary)
async def summarize(state: DocInput) -> tuple:
    # call your LLM here...
    return ({"text": response}, input_tokens, output_tokens, "gpt-4o")

Parallel execution

Run independent nodes concurrently and merge their outputs:

from stroma import parallel

result = await runner.run(
    [parallel(fetch_metadata, fetch_embeddings), merge_node],
    initial_state,
)

Node hooks

Attach lifecycle callbacks to observe node execution:

from stroma import NodeHooks, RunConfig

async def on_start(run_id, node_id, input_dict):
    print(f"Starting {node_id}")

config = RunConfig(hooks=NodeHooks(on_node_start=on_start))

Shared context

Pass runtime configuration to nodes that accept a second argument:

@runner.node("enrich", input=Input, output=Output)
async def enrich(state: Input, ctx: dict) -> dict:
    api_key = ctx["api_key"]
    # ...

config = RunConfig(context={"api_key": "sk-..."})

Per-node retry policies

Override the global retry policy for specific nodes:

from stroma import FailureClass, FailurePolicy

config = RunConfig(
    node_policies={
        "flaky_node": {
            FailureClass.RECOVERABLE: FailurePolicy(max_retries=5, backoff_seconds=2.0),
        }
    }
)

Per-node timeouts

Guard against hanging LLM calls with per-node timeouts. Timeouts raise TimeoutError, which is classified as recoverable and retried automatically:

runner = StromaRunner.quick().with_node_timeouts({
    "llm_call": 30_000,   # 30 seconds
    "embedding": 10_000,  # 10 seconds
})

Async checkpointing

The default store is now async. For distributed pipelines, use the async Redis store:

from stroma import RedisStore, CheckpointManager

store = RedisStore("redis://localhost:6379", ttl_seconds=7200)
manager = CheckpointManager(store)

The original synchronous Redis store is still available as SyncRedisStore.

Documentation

Full documentation including a tutorial and API reference is available at the docs site.

Development

uv sync --extra dev
uv run pytest tests/ -v --cov=stroma --cov-fail-under=85

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stroma-0.3.0.tar.gz (32.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stroma-0.3.0-py3-none-any.whl (26.3 kB view details)

Uploaded Python 3

File details

Details for the file stroma-0.3.0.tar.gz.

File metadata

  • Download URL: stroma-0.3.0.tar.gz
  • Upload date:
  • Size: 32.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for stroma-0.3.0.tar.gz
Algorithm Hash digest
SHA256 0a4c21de6b1cb09b0185ec483189d7ebc8d9e4dc942f93e6fec28be42a02fc08
MD5 cf6083405cfd30a1f2293dd0bf34100a
BLAKE2b-256 640b0285ed070601d6b2f31ed3c59df67d95b55e8e2e977bf30b5f54887e026d

See more details on using hashes here.

Provenance

The following attestation bundles were made for stroma-0.3.0.tar.gz:

Publisher: workflow.yml on jengroff/stroma

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file stroma-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: stroma-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 26.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for stroma-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6067625ca71948d4d3ac86deacd0379bf1c7dea05d29f5f5145a5de07862ebc9
MD5 2f8579d45b6754eab2278db59fd656f6
BLAKE2b-256 d048c47fb50ae83609563523045246d305a0f84f0bf79bfe964190d4ec72160f

See more details on using hashes here.

Provenance

The following attestation bundles were made for stroma-0.3.0-py3-none-any.whl:

Publisher: workflow.yml on jengroff/stroma

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page