Workflow framework for LLM pipelines and tool-calling agents.
Project description
OpenArmature
Documentation: openarmature.ai
OpenArmature is a workflow framework for LLM pipelines and tool-calling agents.
Typed state, compile-time topology checks, observability, and crash-safe checkpoints are baked into the engine. The graph layer itself has no concept of LLMs or tools, so the same primitives drive deterministic ETL pipelines and tool-calling agents alike.
This Python package is the reference implementation. The behavioral contract is specified in openarmature-spec and verified by conformance fixtures.
Install
uv add openarmature # core
uv add 'openarmature[otel]' # with OpenTelemetry observability
# or, with pip:
pip install openarmature
pip install 'openarmature[otel]'
Why OpenArmature
State you can't accidentally mutate.
State schemas are frozen Pydantic models. Nodes return partial updates; the engine merges. The snapshot a node holds can't change mid-execution, and assignment into state raises rather than silently writing.
Schema validation at every merge.
Fields outside the declared schema fail at the merge boundary instead of silently dropping. A node returning {"plann": "..."} (typo) raises StateValidationError immediately, not three nodes downstream when the field is read and doesn't exist.
Merge policy on the schema, not the call site.
Each state field declares its reducer (last_write_wins, append, merge, or a user-defined callable) as part of the schema. Two nodes writing the same field compose via the field's policy: once, declaratively, instead of duplicated across call sites.
Subgraphs compose with explicit data seams.
Subgraphs run against their own state schema with inputs (additive, opt in to share parent fields) and outputs (replacement, name exactly what comes back) mappings. Parent fields don't leak in by accident; subgraph fields don't slip out unless declared.
Bad graphs don't compile.
Dangling edges, unreachable nodes, conflicting reducers, no declared entry, mappings to undeclared fields, multiple outgoing edges from one node. Six categories of structural error all fail at .compile(), not at runtime mid-execution. The graph either constructs cleanly or it doesn't reach invoke().
The graph engine has no concept of LLMs or tools.
Validation, retry, recovery, structured output: those are node-internal or middleware concerns. The same engine runs deterministic ETL pipelines and tool-calling agents; the topology layer doesn't pick a side.
Determinism is a contract.
Same input, same node implementations, same edge functions, same final state, and same observed node-execution order. The spec mandates it; conformance fixtures verify it across every implementation. Replay an audit run and get byte-identical state.
Checkpoint saves are synchronous-by-contract.
The engine awaits each save before advancing. A crash immediately after a completed event cannot have lost the corresponding write. Resume mints a fresh invocation_id (audit trail) while preserving correlation_id (cross-system join key), so a recovered run is traceable as a new attempt without losing the thread to the original request.
Observability that doesn't double-export.
The OpenTelemetry mapping mandates a private TracerProvider. That prevents the trap where global-provider auto-instrumentation libraries (OpenInference, Langfuse v3, etc.) emit duplicate spans alongside the framework's. Your spans flow exactly where you point them; no surprise fan-out to vendor backends you didn't configure.
LLM spans LLM-aware backends can actually read.
Each provider.complete() call emits a dedicated openarmature.llm.complete span carrying both the framework's openarmature.llm.* attributes and the cross-vendor OpenTelemetry GenAI semantic conventions (gen_ai.system, gen_ai.request.*, gen_ai.response.*, gen_ai.usage.*). Langfuse, Phoenix, Honeycomb's LLM lens — they render generations correctly out of the box, no per-service attribute-mapping shim required. Input/output payload emission is opt-in (disable_llm_payload=False), default-off because the payload may contain PII; image bytes are unconditionally redacted at the provider so they never enter the observability stream.
Hello World
About a hundred lines that show the engine in action. Three reducer policies declared on one state class. Three LLM calls each returning typed structured output (Pydantic class on two, raw JSON Schema dict on the third). Conditional routing as a pure function of state, not a hidden state machine. An observer attached at compile time that sees every node boundary the engine emits. Requires Python 3.12 or later and an OpenAI-compatible endpoint (defaults to OpenAI public API; works against any local server too).
import asyncio
import os
from collections.abc import Mapping
from typing import Annotated, Any, Literal
from openarmature.graph import END, GraphBuilder, NodeEvent, State, append, merge
from openarmature.llm import OpenAIProvider, UserMessage
from pydantic import BaseModel, Field
class Classification(BaseModel):
intent: Literal["research", "summarize"]
rationale: str
class Summary(BaseModel):
one_liner: str
confidence: float
class PipelineState(State):
query: str # last_write_wins (default)
classification: Classification | None = None # set by classify
research_plan: dict[str, Any] | None = None # set by research (dict-schema form)
summary: Summary | None = None # set by summarize
sources: Annotated[list[str], append] = Field( # appends across writes
default_factory=list
)
metadata: Annotated[dict[str, str], merge] = Field( # merges across writes
default_factory=dict
)
provider = OpenAIProvider(
base_url=os.environ.get("LLM_BASE_URL", "https://api.openai.com"), # host root; impl adds /v1
model=os.environ.get("LLM_MODEL", "gpt-4o-mini"),
api_key=os.environ.get("LLM_API_KEY") or None, # empty → no-auth
)
async def classify(state: PipelineState) -> Mapping[str, Any]:
response = await provider.complete(
[UserMessage(content=f"Route to 'research' or 'summarize': {state.query!r}")],
response_schema=Classification, # class → instance
)
return {"classification": response.parsed, "metadata": {"classified_by": "llm"}}
async def research(state: PipelineState) -> Mapping[str, Any]:
response = await provider.complete(
[UserMessage(content=f"Plan research for {state.query!r}: list topics + follow-ups.")],
response_schema={ # dict → dict
"type": "object",
"properties": {
"topics": {"type": "array", "items": {"type": "string"}},
"follow_up_questions": {"type": "array", "items": {"type": "string"}},
},
"required": ["topics", "follow_up_questions"],
"additionalProperties": False,
},
)
return {
"research_plan": response.parsed,
"sources": ["wikipedia", "arxiv"],
"metadata": {"tool": "research"},
}
async def summarize(state: PipelineState) -> Mapping[str, Any]:
response = await provider.complete(
[UserMessage(content=f"Summarize {state.query!r} in one sentence with confidence 0-1.")],
response_schema=Summary, # class → instance
)
return {"summary": response.parsed, "sources": ["cache"], "metadata": {"tool": "summarize"}}
def route(state: PipelineState) -> str:
assert state.classification is not None
return state.classification.intent
async def trace(event: NodeEvent) -> None:
if event.phase == "completed" and event.error is None and event.post_state is not None:
print(f"{event.node_name}: sources={event.post_state.sources}")
graph = (
GraphBuilder(PipelineState)
.add_node("classify", classify)
.add_node("research", research)
.add_node("summarize", summarize)
.add_conditional_edge("classify", route)
.add_edge("research", END)
.add_edge("summarize", END)
.set_entry("classify")
.compile()
)
graph.attach_observer(trace)
async def main() -> None:
try:
final = await graph.invoke(PipelineState(query="what is RAG?"))
print(f"\nclassification: {final.classification}")
if final.research_plan is not None:
print(f"research_plan: {final.research_plan}")
if final.summary is not None:
print(f"summary: {final.summary}")
finally:
await graph.drain()
await provider.aclose()
asyncio.run(main())
Set LLM_API_KEY=sk-... and run. To swap providers, point LLM_BASE_URL and LLM_MODEL at OpenRouter, vLLM, LM Studio, llama.cpp, or anything else that speaks the OpenAI Chat Completions wire format. The example also lives at examples/00-hello-world/main.py; see examples/ for more runnable demos.
A few things to notice:
- Three reducer policies on one state schema.
query/classification/research_plan/summaryget the defaultlast_write_wins.sourcesisAnnotated[list[str], append], so successive writes concatenate.metadataisAnnotated[dict[str, str], merge], so successive writes shallow-merge. The merge policy lives on the schema, once. - Structured output, two forms.
response_schema=Classification(a Pydantic class) returnsResponse.parsedas a validatedClassificationinstance, typed end-to-end.response_schema={...}(a raw JSON Schema dict) returnsResponse.parsedas a plain dict. Same wire shape underneath; pick the form that fits. - Conditional routing on a parsed field.
routereadsstate.classification.intentand returns the next node's name. The graph engine doesn't care the discriminator came from an LLM; it would accept a deterministic rule with the same shape. - Observer sees both phases.
tracefilters tocompletedevents for brevity; the engine also deliversstartedevents. - The graph either compiles or it doesn't. Remove
.set_entry()and.compile()raisesNoDeclaredEntrybeforeinvoke()runs.
Next steps
- Quickstart: build your first graph end-to-end. openarmature.ai/getting-started
- Concepts: typed state, reducers, graphs, composition, fan-out, parallel branches, LLMs, prompts, observability, checkpointing. openarmature.ai/concepts
- Model Providers: implement the Provider Protocol for a custom LLM backend. openarmature.ai/model-providers/authoring
- API reference: auto-generated from docstrings. openarmature.ai/reference
- Examples: ten runnable demos with walk-throughs. openarmature.ai/examples (source at ./examples/)
- Spec: behavioral contract this implementation conforms to. LunarCommand/openarmature-spec
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file openarmature-0.8.0.tar.gz.
File metadata
- Download URL: openarmature-0.8.0.tar.gz
- Upload date:
- Size: 987.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c4fdd884e04a0e31e8c3d028f863fac20d003eefa260bd73e2320d806e597c64
|
|
| MD5 |
6055504f3c7cf2066f5f3020bed07c6e
|
|
| BLAKE2b-256 |
1ab07493a8d6952e79c6df39916aabd3fc55040e7fbca5c1ad5856390df79153
|
Provenance
The following attestation bundles were made for openarmature-0.8.0.tar.gz:
Publisher:
release.yml on LunarCommand/openarmature-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
openarmature-0.8.0.tar.gz -
Subject digest:
c4fdd884e04a0e31e8c3d028f863fac20d003eefa260bd73e2320d806e597c64 - Sigstore transparency entry: 1615562317
- Sigstore integration time:
-
Permalink:
LunarCommand/openarmature-python@f1776cee803939cca3b8a35ad709aae4d6bc29b3 -
Branch / Tag:
refs/tags/v0.8.0 - Owner: https://github.com/LunarCommand
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@f1776cee803939cca3b8a35ad709aae4d6bc29b3 -
Trigger Event:
push
-
Statement type:
File details
Details for the file openarmature-0.8.0-py3-none-any.whl.
File metadata
- Download URL: openarmature-0.8.0-py3-none-any.whl
- Upload date:
- Size: 172.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2a1cb00667b42eae1190b2987f806c9c1d68ef578c9f5effa6400ddad7a1ec66
|
|
| MD5 |
1911c5657ba6830413c9d53e21347f42
|
|
| BLAKE2b-256 |
3a3912923afe76daa11f3ecdca89c5414c09a1d15948bb2179123ae162bfbf42
|
Provenance
The following attestation bundles were made for openarmature-0.8.0-py3-none-any.whl:
Publisher:
release.yml on LunarCommand/openarmature-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
openarmature-0.8.0-py3-none-any.whl -
Subject digest:
2a1cb00667b42eae1190b2987f806c9c1d68ef578c9f5effa6400ddad7a1ec66 - Sigstore transparency entry: 1615562322
- Sigstore integration time:
-
Permalink:
LunarCommand/openarmature-python@f1776cee803939cca3b8a35ad709aae4d6bc29b3 -
Branch / Tag:
refs/tags/v0.8.0 - Owner: https://github.com/LunarCommand
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@f1776cee803939cca3b8a35ad709aae4d6bc29b3 -
Trigger Event:
push
-
Statement type: