The Translation Layer: Compile Universal Agents to AWS, LangGraph, MCP, or UAA Kernel.
Project description
Universal Agent Nexus
The Translation Layer: Build Once, Run Anywhere
Compile universal agent architectures to LangGraph, AWS Step Functions, MCP, or the UAA Kernel—without rewriting code.
Quick Start • Schema Reference • State Bridge • Batch API • Extending Nexus • Examples • Troubleshooting • Report Bug
🎯 What is Universal Agent Nexus?
The Problem You're Solving
You built an AI agent. It works great locally in LangGraph. Now you need to:
- Deploy to production at scale → AWS Step Functions
- Expose to Claude Desktop for teammates → MCP
- Run through the UAA Kernel → Native execution
- Keep local dev fast for iteration → LangGraph
Traditional approach: Rewrite your agent three times. Maintain three codebases. Deal with drift.
Universal Agent Nexus approach: Write once. Compile to any runtime.
# One manifest
name: my-agent
graphs: [...]
nexus compile my-agent.yaml --target langgraph # Local dev
nexus compile my-agent.yaml --target aws # Production
nexus compile my-agent.yaml --target mcp # AI clients
nexus compile my-agent.yaml --target uaa # UAA Kernel (native)
Same logic. Four runtimes. Zero rewrites.
🔗 How Nexus + Fabric + Architecture Work Together
New to Universal Agent? This section explains the ecosystem.
The Universal Agent ecosystem consists of three interconnected repositories:
| Repository | Role | Metaphor |
|---|---|---|
| Architecture | Runtime kernel, state management, durability | Linux Kernel |
| Fabric | Roles, domains, policies → compiled manifests | Userland / distro |
| Nexus (this repo) | Adapters for AWS, LangGraph, MCP, UAA Kernel | Network / drivers |
┌─────────────────────────────────────────────────────────────────────────────┐
│ FABRIC (Composition Layer) │
│ github.com/mjdevaccount/universal_agent_fabric │
│ │
│ Role + Domain + Policy YAMLs → FabricBuilder → Enriched Manifest │
└────────────────────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ NEXUS (Compiler Layer) │
│ github.com/mjdevaccount/universal_agent_nexus (this repo) │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ ManifestIR │ │
│ └────────────────────────────┬────────────────────────────────────────┘ │
│ │ Generators │
│ ┌────────────┬───────────────┼───────────────┬────────────────────────┐ │
│ │ LangGraph │ AWS │ MCP │ UAA Native │ │
│ └────────────┴───────────────┴───────────────┴────────────────────────┘ │
│ │
│ + State Normalization Bridge (LangGraph ↔ AWS ↔ UAA) │
└────────────────────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ ARCHITECTURE (Kernel Layer) │
│ github.com/mjdevaccount/universal_agent_architecture │
│ │
│ GraphEngine + Handlers + PolicyEngine + TaskStore + ContractRegistry │
└─────────────────────────────────────────────────────────────────────────────┘
The Workflow
# Step 1: Use Fabric to compose your agent
fabric build \
--role researcher.yaml \
--domain finance.yaml \
--policy safety.yaml \
--out my_agent.yaml
# Step 2: Use Nexus to compile to runtime
nexus compile my_agent.yaml --target langgraph --output agent.py
# Step 3: Run
python agent.py
Why Separate Repos?
| Repo | Owner | Purpose |
|---|---|---|
| Fabric | Security/Business teams | Roles, capabilities, governance policies |
| Nexus | Platform/DevOps teams | Runtime translation, deployment pipelines |
This separation allows security teams to own policies while platform teams own infrastructure—without stepping on each other.
Can I Use Nexus Without Fabric?
Absolutely! You can write manifest YAML files directly (see Manifest Schema Reference). Fabric just makes it easier for complex, enterprise scenarios.
🚀 Complete Beginner Tutorial
From Zero to Running Agent (5 minutes)
Goal: Build a simple chatbot that greets users by name.
Step 1: Install
pip install "universal-agent-nexus[langgraph]"
Step 2: Create manifest
Create chatbot.yaml:
name: greeter-bot
version: "1.0.0"
description: "A friendly greeting agent"
graphs:
- name: main
entry_node: greet
nodes:
- id: greet
kind: task
label: "Generate Greeting"
config:
prompt: "Generate a friendly greeting for {name}"
edges: []
tools: []
routers: []
policies: []
Step 3: Compile to LangGraph
nexus compile chatbot.yaml --target langgraph --output bot.py
This generates bot.py with:
- ✅ LangGraph StateGraph definition
- ✅ Postgres checkpointing
- ✅ Async execution
Step 4: Run locally
# Start Postgres (Docker)
docker run -d -p 5432:5432 \
-e POSTGRES_PASSWORD=password \
-e POSTGRES_DB=nexus_dev \
postgres:16-alpine
# Set environment
export UAA_POSTGRES_URL=postgresql://postgres:password@localhost:5432/nexus_dev
# Run
python bot.py
Expected output:
✅ Connected to PostgreSQL
✅ Graph initialized: main
Input: {"name": "Alice"}
Output: "Hello Alice! Welcome to Universal Agent Nexus! 👋"
Step 5: Deploy to AWS (Optional)
# Compile to AWS
nexus compile chatbot.yaml --target aws --output state_machine.json
# Deploy (requires AWS CLI configured)
cd terraform/environments/dev
terraform init
terraform apply
✅ You just built and deployed a multi-runtime agent!
📝 Manifest Schema Reference
Complete Example
name: "my-agent"
version: "1.0.0"
description: "Agent description"
graphs:
- name: "main"
entry_node: "start"
nodes:
- id: "start"
kind: router # Decision point
label: "Risk Classifier"
config:
llm: "gpt-4o-mini"
system_message: "You are a risk classifier..."
- id: "validate"
kind: tool # External capability
label: "Policy Validator"
tool_ref: "policy_check"
- id: "escalate"
kind: task # LLM generation
label: "Generate Summary"
config:
prompt: "Summarize incident: {input}"
edges:
- from_node: "start"
to_node: "validate"
condition:
expression: "risk_level > 0.5"
- from_node: "validate"
to_node: "escalate"
condition:
expression: "policy_violated == true"
tools:
- name: "policy_check"
protocol: "mcp"
config:
command: "mcp-policy-server"
args: ["--strict"]
routers:
- name: "risk_classifier"
llm_provider: "openai"
model: "gpt-4o-mini"
system_message: "Classify risk as low/medium/high"
policies:
- name: "require_approval"
target_pattern: "execute_*"
action: "require_approval"
conditions:
risk_level: ">0.8"
Node Types
| Kind | Purpose | When to Use | Config Fields |
|---|---|---|---|
router |
LLM makes a decision | Multi-path routing, classification | llm, system_message, tools |
tool |
Call external API/MCP | Database query, API call, file I/O | tool_ref, input_mapping |
task |
LLM generates text | Summarization, translation, writing | prompt, llm, max_tokens |
Edge Conditions
Conditions use simple expression syntax:
# Simple comparison
condition:
expression: "risk_level > 0.5"
# Multiple conditions (AND)
condition:
expression: "risk_level > 0.5 AND confidence > 0.8"
# Object field access
condition:
expression: "user.role == 'admin'"
# List operations
condition:
expression: "len(violations) > 0"
Tool Protocols
| Protocol | Use Case | Example |
|---|---|---|
mcp |
Model Context Protocol servers | Claude Desktop tools, Cursor plugins |
http |
REST APIs | External services, webhooks |
local |
Python functions | In-process tools |
subprocess |
CLI commands | Shell scripts, binaries |
Required vs Optional Fields
| Field | Required | Default | Description |
|---|---|---|---|
name |
✅ | - | Unique agent identifier |
version |
✅ | - | Semantic version (e.g., "1.0.0") |
description |
❌ | "" |
Human-readable description |
graphs |
✅ | - | Array of graph definitions |
graphs[].name |
✅ | - | Graph identifier |
graphs[].entry_node |
✅ | - | ID of starting node |
graphs[].nodes |
✅ | - | Array of node definitions |
graphs[].edges |
❌ | [] |
Array of edge definitions |
tools |
❌ | [] |
External tool definitions |
routers |
❌ | [] |
Router configurations |
policies |
❌ | [] |
Governance policies |
📚 Core Concepts
1. Universal Agent Architecture (UAA)
The Universal Agent Architecture is a specification for defining multi-step agent workflows as portable, runtime-agnostic manifests.
Key components:
- Graph: A directed graph of nodes (tasks, routers, tools) connected by edges
- Node: A single execution unit (LLM call, tool invocation, conditional logic)
- Edge: Transition between nodes (with optional conditions)
- Tool: External capability (API, database, MCP server)
- Router: Decision point that chooses next action(s) dynamically
2. Compilation Targets (Adapters)
| Target | Best For | Execution Model | State Storage |
|---|---|---|---|
| LangGraph | Local dev, debugging | Python async | PostgreSQL |
| AWS | Production scale | Step Functions, Lambda | DynamoDB |
| MCP | AI client integration | stdio transport | In-memory |
3. The IR-Based Compiler (v1.0.0)
Why an Intermediate Representation?
Without IR, we'd need N² translators (LangGraph↔AWS, AWS↔MCP, etc.). With IR, we need only 2N (N parsers + N generators).
Benefits:
- ✅ Bidirectional translation: LangGraph ⇄ AWS ⇄ MCP
- ✅ Optimization passes: Dead code elimination, edge deduplication
- ✅ Validation: Type checking before code generation
- ✅ Future-proof: Add new runtimes without changing existing ones
How it works:
┌────────────────────────────────────────────────────────────┐
│ FRONTENDS (Parsers) │
│ LangGraph → IR | AWS → IR | YAML → IR │
└──────────────────────────┬─────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────┐
│ INTERMEDIATE REPRESENTATION │
│ ManifestIR { GraphIR, ToolIR, RouterIR } │
└──────────────────────────┬─────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────┐
│ TRANSFORMATION PASSES │
│ Dead code elimination | Edge deduplication | Validation │
└──────────────────────────┬─────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────┐
│ BACKENDS (Generators) │
│ IR → LangGraph | IR → AWS | IR → YAML │
└────────────────────────────────────────────────────────────┘
Bidirectional translation in action:
# AWS → LangGraph (reverse compile!)
nexus translate state_machine.json --to langgraph --out agent.py
# LangGraph → AWS
nexus translate agent.py --to aws --out state_machine.json
# Chain translations: AWS → LangGraph → MCP
nexus translate state_machine.json --to langgraph --out temp.py
nexus translate temp.py --to mcp --out server.py
🔧 Extending Nexus: Platform Extension API
For Platform Engineers & Framework Developers
Nexus exposes a comprehensive extension API that enables you to customize every aspect of the compilation pipeline. This section documents all extension points, APIs, and patterns for building on top of Nexus.
Extension Points Overview
Nexus provides 8 tiers of abstraction exposure, each enabling different levels of customization:
| Tier | Extension Point | Use Case | Complexity |
|---|---|---|---|
| 1 | Parser Registry | Add support for new source formats (Airflow, Prefect, Temporal) | Low |
| 2 | Generator Registry | Add support for new target runtimes (K8s, GCP, Azure) | Low |
| 3 | Pass Interface | Create custom optimization/validation passes | Medium |
| 4 | Annotations | Add typed metadata to IR nodes/edges | Low |
| 5 | Factories | Customize tool/router instantiation logic | Medium |
| 6 | Enrichment Strategy | Customize Fabric fragment composition | Medium |
| 7 | Compiler Builder | Configure entire compiler with custom components | High |
| 8 | IR Visitor | Traverse and analyze IR structure | Low |
Tier 1: Parser Registry
Purpose: Register custom parsers to support new source formats.
API Reference:
from universal_agent_nexus.parser_registry import (
register_parser,
get_parser,
detect_source_type,
list_parsers,
ParserRegistry,
)
# Register a custom parser
register_parser(
source_type: str,
parser: Parser,
*,
detection_priority: int = 100,
aliases: Optional[Set[str]] = None,
description: Optional[str] = None,
) -> None
# Get parser by source type
parser = get_parser(source_type: str) -> Parser
# Auto-detect source type
detected = detect_source_type(source: str) -> str
# List all registered parsers
parsers = list_parsers() -> Dict[str, ParserInfo]
Parser Protocol:
All parsers must implement the Parser protocol:
from universal_agent_nexus.ir.parser import Parser
from universal_agent_nexus.ir import ManifestIR
class MyFrameworkParser:
"""Custom parser for MyFramework."""
def parse(self, source: str) -> ManifestIR:
"""
Parse source and produce IR.
Args:
source: Path to source file or source string
Returns:
ManifestIR instance
"""
# Parse your format → ManifestIR
return ManifestIR(...)
def can_parse(self, source: str) -> bool:
"""
Check if this parser can handle the source.
Args:
source: Path to source file or source string
Returns:
True if parser can handle this source
"""
# Detection logic (file extension, content sniffing, etc.)
return source.endswith(".myfw")
Complete Example: Airflow Parser
from universal_agent_nexus.parser_registry import register_parser
from universal_agent_nexus.ir import ManifestIR, GraphIR, NodeIR, NodeKind
from pathlib import Path
import yaml
class AirflowParser:
"""Parse Apache Airflow DAGs to ManifestIR."""
def parse(self, source: str) -> ManifestIR:
path = Path(source)
if path.exists():
dag_def = yaml.safe_load(path.read_text())
else:
dag_def = yaml.safe_load(source)
# Convert Airflow DAG → ManifestIR
nodes = [
NodeIR(
id=task_id,
kind=NodeKind.TASK,
label=task.get("task_name", task_id),
description=task.get("description"),
)
for task_id, task in dag_def.get("tasks", {}).items()
]
graph = GraphIR(
name=dag_def.get("dag_id", "main"),
entry_node=nodes[0].id if nodes else "start",
nodes=nodes,
edges=[], # Convert Airflow dependencies to edges
)
return ManifestIR(
name=dag_def.get("dag_id", "airflow-dag"),
version="1.0.0",
description=dag_def.get("description", ""),
graphs=[graph],
)
def can_parse(self, source: str) -> bool:
path = Path(source)
if path.exists():
return source.endswith((".airflow", ".dag.yaml"))
try:
data = yaml.safe_load(source)
return "dag_id" in data and "tasks" in data
except Exception:
return False
# Register the parser
register_parser(
"airflow",
AirflowParser(),
detection_priority=150, # Higher = checked first
aliases={"dag", "airflow_dag"},
description="Apache Airflow DAG parser",
)
# Now compile works with Airflow!
from universal_agent_nexus.compiler import compile
result = compile("my_dag.airflow", target="aws")
Best Practices:
- Detection Priority: Set higher priority (200+) for formats with unique file extensions, lower (50-100) for ambiguous formats
- Aliases: Register common aliases (e.g.,
{"uaa", "yaml"}for YAML parser) - Error Handling: Provide clear error messages with source location information
- Performance: Cache parsed results if parsing is expensive
Available Parsers:
# List all registered parsers
parsers = list_parsers()
for source_type, info in parsers.items():
print(f"{source_type}: {info.description}")
print(f" Aliases: {info.aliases}")
print(f" Priority: {info.detection_priority}")
Tier 2: Generator Registry
Purpose: Register custom generators to support new target runtimes.
API Reference:
from universal_agent_nexus.generator_registry import (
register_generator,
get_generator,
list_generators,
GeneratorRegistry,
)
# Register a custom generator
register_generator(
target_type: str,
generator: Generator,
*,
aliases: Optional[Set[str]] = None,
default_options: Optional[Dict] = None,
description: Optional[str] = None,
) -> None
# Get generator by target type
generator = get_generator(target_type: str) -> Generator
# List all registered generators
generators = list_generators() -> Dict[str, GeneratorInfo]
Generator Protocol:
All generators must implement the Generator protocol:
from universal_agent_nexus.ir.generator import Generator
from universal_agent_nexus.ir import ManifestIR
class MyPlatformGenerator:
"""Custom generator for MyPlatform."""
def generate(self, ir: ManifestIR) -> str:
"""
Generate target format from IR.
Args:
ir: ManifestIR instance
Returns:
Generated code/config as string
"""
# Convert ManifestIR → your platform's format
lines = []
for graph in ir.graphs:
lines.append(f"# Graph: {graph.name}")
# ... generate code ...
return "\n".join(lines)
Complete Example: Temporal Generator
from universal_agent_nexus.generator_registry import register_generator
from universal_agent_nexus.ir import ManifestIR, GraphIR, NodeIR, NodeKind
class TemporalGenerator:
"""Generate Temporal workflows from ManifestIR."""
def generate(self, ir: ManifestIR) -> str:
"""Generate Temporal workflow code."""
lines = [
'"""',
f"Auto-generated Temporal workflow: {ir.name}",
'"""',
"",
"from temporalio import workflow",
"",
]
for graph in ir.graphs:
lines.extend([
f"@workflow.defn",
f"class {graph.name.title()}Workflow:",
f' """{graph.name} workflow."""',
"",
" @workflow.run",
" async def run(self, input_data: dict) -> dict:",
])
# Generate workflow steps
for node in graph.nodes:
if node.kind == NodeKind.TASK:
lines.append(f" result = await {node.id}_activity(input_data)")
elif node.kind == NodeKind.TOOL:
lines.append(f" result = await execute_tool('{node.tool_ref}', input_data)")
lines.append(" return result")
lines.append("")
return "\n".join(lines)
# Register the generator
register_generator(
"temporal",
TemporalGenerator(),
aliases={"temporalio"},
description="Temporal.io workflow generator",
)
# Now compile works with Temporal!
from universal_agent_nexus.compiler import compile
result = compile("agent.yaml", target="temporal")
Best Practices:
- Idempotency: Generated code should be deterministic (same IR → same output)
- Readability: Include comments and clear structure in generated code
- Error Handling: Validate IR before generation, provide helpful error messages
- Options: Use
default_optionsfor generator-specific configuration
Tier 3: Pass Interface
Purpose: Create custom transformation passes for optimization, validation, or instrumentation.
API Reference:
from universal_agent_nexus.ir.pass_interface import (
Transform,
ValidationPass,
OptimizationPass,
InstrumentationPass,
PassMetadata,
)
from universal_agent_nexus.ir import ManifestIR
from universal_agent_nexus.ir.pass_manager import PassManager
# Base interface
class Transform(ABC):
@property
@abstractmethod
def metadata(self) -> PassMetadata: ...
@abstractmethod
def apply(self, ir: ManifestIR) -> ManifestIR: ...
# Specialized base classes
class ValidationPass(Transform): # Don't modify IR
@abstractmethod
def validate(self, ir: ManifestIR) -> list[str]: ...
class OptimizationPass(Transform): # Modify IR for performance
...
class InstrumentationPass(Transform): # Add metadata/observability
...
Pass Metadata:
from dataclasses import dataclass
from typing import Set
@dataclass
class PassMetadata:
name: str # Unique pass identifier
description: str # Human-readable description
requires: Set[str] = set() # Passes that must run before this
invalidates: Set[str] = set() # Analyses invalidated by this pass
preserves: Set[str] = set() # Analyses preserved by this pass
category: str = "optimization" # optimization, validation, instrumentation
Complete Example: Custom Optimization Pass
from universal_agent_nexus.ir.pass_interface import OptimizationPass, PassMetadata
from universal_agent_nexus.ir import ManifestIR, GraphIR, NodeIR, NodeKind
class MergeDuplicateTools(OptimizationPass):
"""Merge duplicate tool nodes with identical configurations."""
@property
def metadata(self) -> PassMetadata:
return PassMetadata(
name="merge-duplicate-tools",
description="Merge duplicate tool nodes to reduce redundancy",
requires={"dead-node-elimination"}, # Run after dead code removal
invalidates={"edge-analysis"}, # May change edges
preserves={"cycle-free"}, # Preserves cycle-freedom
category="optimization",
)
def apply(self, ir: ManifestIR) -> ManifestIR:
"""Merge duplicate tools."""
for graph in ir.graphs:
# Group nodes by tool signature
tool_signatures = {}
nodes_to_remove = []
for node in graph.nodes:
if node.kind == NodeKind.TOOL and node.tool_ref:
sig = (node.tool_ref, str(sorted(node.config.items())))
if sig in tool_signatures:
# Found duplicate - redirect edges to original
original = tool_signatures[sig]
self._redirect_edges(graph, node.id, original.id)
nodes_to_remove.append(node.id)
else:
tool_signatures[sig] = node
# Remove merged nodes
if nodes_to_remove:
graph.nodes = [n for n in graph.nodes if n.id not in nodes_to_remove]
graph._build_indexes()
return ir
def _redirect_edges(self, graph: GraphIR, from_id: str, to_id: str) -> None:
"""Redirect all edges from one node to another."""
for edge in graph.edges:
if edge.from_node == from_id:
edge.from_node = to_id
if edge.to_node == from_id:
edge.to_node = to_id
# Use the pass
from universal_agent_nexus.ir.pass_manager import PassManager, OptimizationLevel
manager = PassManager(opt_level=OptimizationLevel.DEFAULT)
manager.add_custom_pass(
MergeDuplicateTools(),
after="dead-node-elimination", # Explicit ordering
)
ir = manager.run(ir)
Complete Example: Validation Pass
from universal_agent_nexus.ir.pass_interface import ValidationPass, PassMetadata
class CostLimitValidation(ValidationPass):
"""Validate that estimated costs don't exceed limits."""
@property
def metadata(self) -> PassMetadata:
return PassMetadata(
name="cost-limit-validation",
description="Validate estimated execution costs",
requires={"dead-node-elimination"},
category="validation",
)
def validate(self, ir: ManifestIR) -> list[str]:
"""Return list of error messages (empty if valid)."""
errors = []
max_cost_cents = 1000 # $10.00 limit
from universal_agent_nexus.ir.visitor import DefaultIRVisitor, traverse
class CostCollector(DefaultIRVisitor):
def __init__(self):
self.total_cost = 0.0
def visit_node(self, node):
from universal_agent_nexus.ir.annotations import CostAnnotation
cost = node.metadata.get_annotation(CostAnnotation)
if cost:
self.total_cost += cost.cost_cents
collector = CostCollector()
traverse(ir, collector)
if collector.total_cost > max_cost_cents:
errors.append(
f"Estimated cost ${collector.total_cost/100:.2f} exceeds "
f"limit ${max_cost_cents/100:.2f}"
)
return errors
# Use validation pass
manager = PassManager()
manager.add_custom_pass(CostLimitValidation())
try:
ir = manager.run(ir)
except RuntimeError as e:
print(f"Validation failed: {e}")
Pass Ordering:
Passes are automatically ordered based on requires dependencies. You can also specify explicit ordering:
manager.add_custom_pass(
MyPass(),
before="some-other-pass", # Run before this pass
after="dead-node-elimination", # Run after this pass
)
Available Passes:
from universal_agent_nexus.ir.pass_manager import PassManager
manager = PassManager()
passes = manager.list_passes()
for name, metadata in passes.items():
print(f"{name}: {metadata.description}")
print(f" Requires: {metadata.requires}")
print(f" Category: {metadata.category}")
Tier 4: Annotations
Purpose: Add typed metadata to IR nodes and edges for analysis, optimization, or observability.
API Reference:
from universal_agent_nexus.ir.annotations import (
Annotation,
CostAnnotation,
MonitoringAnnotation,
PerformanceAnnotation,
BatchAnnotation,
)
from universal_agent_nexus.ir import NodeIR, Metadata
# Annotate a node
node.metadata.annotate(CostAnnotation(tokens=500, latency_ms=250, cost_cents=0.05))
# Get annotation by type
cost = node.metadata.get_annotation(CostAnnotation) -> Optional[CostAnnotation]
# Check if annotation exists
has_cost = node.metadata.has_annotation(CostAnnotation) -> bool
# List all annotations
annotations = node.metadata.list_annotations() -> Dict[str, Annotation]
Built-in Annotations:
# Cost tracking
cost = CostAnnotation(
tokens=500, # Estimated token count
latency_ms=250, # Estimated latency in milliseconds
cost_cents=0.05, # Estimated cost in cents
)
node.metadata.annotate(cost)
# Monitoring configuration
monitoring = MonitoringAnnotation(
trace=True, # Enable distributed tracing
log_level="INFO", # Logging level
)
node.metadata.annotate(monitoring)
# Performance characteristics
performance = PerformanceAnnotation(
expected_duration_ms=100.0, # Expected execution time
max_concurrency=5, # Max concurrent executions
timeout_ms=5000, # Timeout in milliseconds
)
node.metadata.annotate(performance)
# Batch API eligibility (added by BatchOptimizationPass)
batch = BatchAnnotation(
eligible=True, # Whether node can be batched
batch_group="claude:abc123", # Group for batching similar requests
cache_key="sha256...", # Cache key for prompt caching
priority=100, # Higher = process sooner
max_wait_ms=5000, # Max wait for batch accumulation
)
node.metadata.annotate(batch)
Creating Custom Annotations:
from universal_agent_nexus.ir.annotations import Annotation
class SecurityAnnotation(Annotation):
"""Track security requirements for nodes."""
@classmethod
def key(cls) -> str:
return "security"
def __init__(
self,
requires_auth: bool = True,
required_roles: Optional[List[str]] = None,
encryption_required: bool = False,
):
self.requires_auth = requires_auth
self.required_roles = required_roles or []
self.encryption_required = encryption_required
# Use custom annotation
security = SecurityAnnotation(
requires_auth=True,
required_roles=["admin", "operator"],
encryption_required=True,
)
node.metadata.annotate(security)
# Retrieve later
security = node.metadata.get_annotation(SecurityAnnotation)
if security and "admin" in security.required_roles:
# Apply admin-only logic
pass
Complete Example: Cost Analysis
from universal_agent_nexus.ir.annotations import CostAnnotation
from universal_agent_nexus.ir.visitor import DefaultIRVisitor, traverse
class CostAnalyzer(DefaultIRVisitor):
"""Analyze total estimated cost of IR."""
def __init__(self):
self.total_cost_cents = 0.0
self.total_tokens = 0
self.total_latency_ms = 0.0
self.node_costs = {}
def visit_node(self, node):
cost = node.metadata.get_annotation(CostAnnotation)
if cost:
self.total_cost_cents += cost.cost_cents
self.total_tokens += cost.tokens
self.total_latency_ms += cost.latency_ms
self.node_costs[node.id] = cost
def get_summary(self) -> dict:
return {
"total_cost_dollars": self.total_cost_cents / 100,
"total_tokens": self.total_tokens,
"total_latency_seconds": self.total_latency_ms / 1000,
"node_count": len(self.node_costs),
}
# Analyze IR
analyzer = CostAnalyzer()
traverse(ir, analyzer)
summary = analyzer.get_summary()
print(f"Estimated cost: ${summary['total_cost_dollars']:.2f}")
print(f"Estimated latency: {summary['total_latency_seconds']:.2f}s")
Best Practices:
- Type Safety: Always use typed annotations (
get_annotation<T>()) instead of raw dict access - Immutability: Annotations should be immutable after creation
- Key Uniqueness: Each annotation class must have a unique
key() - Optional Access: Always check for
Nonewhen retrieving annotations
Tier 5: Factories
Purpose: Customize how tools and routers are instantiated during parsing.
API Reference:
from universal_agent_nexus.ir.factories import (
ToolFactory,
RouterFactory,
set_tool_factory,
set_router_factory,
get_tool_factory,
get_router_factory,
DefaultToolFactory,
DefaultRouterFactory,
)
# Set global factories
set_tool_factory(factory: ToolFactory) -> None
set_router_factory(factory: RouterFactory) -> None
# Get current factories
tool_factory = get_tool_factory() -> ToolFactory
router_factory = get_router_factory() -> RouterFactory
Tool Factory Interface:
from universal_agent_nexus.ir.factories import ToolFactory
from universal_agent_nexus.ir import ToolIR
class CustomToolFactory(ToolFactory):
"""Custom tool factory with retry logic."""
def create_tool(
self,
name: str,
description: str,
protocol: str,
config: Dict[str, Any],
) -> ToolIR:
"""Create ToolIR with custom logic."""
# Add retry configuration to all tools
if "retry" not in config:
config["retry"] = {
"max_attempts": 3,
"backoff_factor": 2,
"timeout_ms": 5000,
}
# Add observability hooks
config["observability"] = {
"trace": True,
"log_input": True,
"log_output": False, # Don't log sensitive outputs
}
return ToolIR(
name=name,
description=description,
protocol=protocol,
config=config,
)
# Set global factory
set_tool_factory(CustomToolFactory())
# Now all parsed tools will have retry logic
from universal_agent_nexus.compiler import parse
ir = parse("agent.yaml") # Uses custom factory
Router Factory Interface:
from universal_agent_nexus.ir.factories import RouterFactory
from universal_agent_nexus.ir import RouterIR, RouterStrategy
class SmartRouterFactory(RouterFactory):
"""Router factory with intelligent model selection."""
def create_router(
self,
name: str,
strategy: RouterStrategy,
system_message: Optional[str],
model_candidates: List[str],
default_model: Optional[str] = None,
config: Optional[Dict[str, Any]] = None,
) -> RouterIR:
"""Create RouterIR with smart defaults."""
# If no default model, select based on strategy
if not default_model and model_candidates:
if strategy == RouterStrategy.LLM:
# Prefer faster models for LLM routing
default_model = next(
(m for m in model_candidates if "mini" in m.lower()),
model_candidates[0]
)
else:
default_model = model_candidates[0]
# Add fallback configuration
config = config or {}
if "fallback" not in config:
config["fallback"] = {
"enabled": True,
"model": model_candidates[-1] if model_candidates else None,
}
return RouterIR(
name=name,
strategy=strategy,
system_message=system_message,
model_candidates=model_candidates,
default_model=default_model,
config=config,
)
# Set global factory
set_router_factory(SmartRouterFactory())
Best Practices:
- Global State: Factories are global - set them before parsing
- Idempotency: Factory methods should be idempotent (same inputs → same outputs)
- Validation: Validate inputs and provide helpful error messages
- Defaults: Provide sensible defaults for optional parameters
Tier 6: Enrichment Strategy
Purpose: Customize how Fabric fragments (roles, domains, policies, mixins) are merged into manifests.
API Reference:
from universal_agent_nexus.enrichment import (
EnrichmentStrategy,
DefaultEnrichmentStrategy,
ComposableEnrichmentStrategy,
EnrichmentHandler,
NexusEnricher,
RoleEnrichmentHandler,
DomainEnrichmentHandler,
PolicyEnrichmentHandler,
MixinEnrichmentHandler,
create_custom_enrichment_strategy,
create_default_enrichment_strategy,
)
from universal_agent_fabric import NexusEnricher
# Create custom strategy
strategy = create_custom_enrichment_strategy(
handlers: Optional[List[EnrichmentHandler]] = None,
) -> ComposableEnrichmentStrategy
# Use with enricher
enricher = NexusEnricher(strategy=strategy)
enriched = enricher.enrich(
baseline_path: str,
role_path: Optional[str] = None,
domain_paths: List[str] = [],
policy_paths: List[str] = [],
mixin_paths: List[str] = [],
output_path: Optional[str] = None,
) -> Dict[str, Any]
Enrichment Strategy Interface:
from universal_agent_nexus.enrichment import EnrichmentStrategy
from typing import Any, Dict, List, Optional
class CustomEnrichmentStrategy(EnrichmentStrategy):
"""Custom enrichment strategy."""
def merge(
self,
baseline: Dict[str, Any],
role: Optional[Dict],
domains: List[Dict],
policies: List[Dict],
mixins: List[Dict],
) -> Dict[str, Any]:
"""Merge fragments with custom logic."""
result = baseline.copy()
# Apply role first
if role:
result = self._merge_role(result, role)
# Apply domains in order
for domain in domains:
result = self._merge_domain(result, domain)
# Apply policies last (highest priority)
for policy in policies:
result = self._merge_policy(result, policy)
return result
def _merge_role(self, manifest: Dict, role: Dict) -> Dict:
# Custom role merging logic
return manifest
def _merge_domain(self, manifest: Dict, domain: Dict) -> Dict:
# Custom domain merging logic
return manifest
def _merge_policy(self, manifest: Dict, policy: Dict) -> Dict:
# Custom policy merging logic
return manifest
Handler-Based Composition:
from universal_agent_nexus.enrichment import (
EnrichmentHandler,
ComposableEnrichmentStrategy,
NexusEnricher,
)
class CustomMetadataHandler(EnrichmentHandler):
"""Add custom metadata during enrichment."""
def handle(
self,
manifest: Dict[str, Any],
role: Optional[Dict],
domains: List[Dict],
policies: List[Dict],
mixins: List[Dict],
) -> Dict[str, Any]:
"""Add enrichment metadata."""
if "metadata" not in manifest:
manifest["metadata"] = {}
manifest["metadata"]["enriched_at"] = datetime.utcnow().isoformat()
manifest["metadata"]["enrichment_version"] = "1.0.0"
if role:
manifest["metadata"]["role"] = role.get("name")
if domains:
manifest["metadata"]["domains"] = [d.get("name") for d in domains]
return manifest
# Compose strategy with handlers
strategy = ComposableEnrichmentStrategy()
strategy.add_handler(RoleEnrichmentHandler())
strategy.add_handler(DomainEnrichmentHandler())
strategy.add_handler(CustomMetadataHandler()) # Custom handler
strategy.add_handler(PolicyEnrichmentHandler())
# Use with enricher
enricher = NexusEnricher(strategy=strategy)
enriched = enricher.enrich(
baseline_path="manifest.yaml",
role_path="roles/researcher.yaml",
domain_paths=["domains/finance.yaml"],
policy_paths=["policies/safety.yaml"],
output_path="enriched.yaml",
)
Best Practices:
- Order Matters: Handlers execute in registration order
- Idempotency: Merging should be idempotent (merge twice = merge once)
- Conflict Resolution: Define clear rules for handling conflicts
- Validation: Validate merged manifest before returning
Tier 7: Compiler Builder
Purpose: Configure the entire compiler with custom components for advanced use cases.
API Reference:
from universal_agent_nexus.builder import CompilerBuilder
from universal_agent_nexus.ir.pass_manager import OptimizationLevel
# Create builder
builder = CompilerBuilder()
# Configure components
compiler = (builder
.register_parser("my_format", MyParser())
.register_generator("my_target", MyGenerator())
.with_optimization_level(OptimizationLevel.AGGRESSIVE)
.with_tool_factory(CustomToolFactory())
.with_router_factory(CustomRouterFactory())
.add_optimization_pass(CustomOptimization())
.with_validation(True)
.with_optimization(True)
.build())
# Use configured compiler
result = compiler.compile("source.py", target="my_target")
Complete Example: Enterprise Compiler Configuration
from universal_agent_nexus.builder import CompilerBuilder
from universal_agent_nexus.ir.pass_manager import OptimizationLevel
from universal_agent_nexus.ir.factories import ToolFactory, RouterFactory
# Custom components
class EnterpriseToolFactory(ToolFactory):
def create_tool(self, name, description, protocol, config):
# Add enterprise-specific configuration
config["monitoring"] = {"enabled": True, "level": "detailed"}
config["security"] = {"encryption": True, "audit": True}
return super().create_tool(name, description, protocol, config)
class EnterpriseRouterFactory(RouterFactory):
def create_router(self, name, strategy, system_message, model_candidates, ...):
# Add enterprise-specific router configuration
config = config or {}
config["rate_limiting"] = {"enabled": True, "rpm": 1000}
return super().create_router(...)
# Build enterprise compiler
enterprise_compiler = (CompilerBuilder()
.register_parser("airflow", AirflowParser())
.register_generator("temporal", TemporalGenerator())
.with_optimization_level(OptimizationLevel.AGGRESSIVE)
.with_tool_factory(EnterpriseToolFactory())
.with_router_factory(EnterpriseRouterFactory())
.add_optimization_pass(CostOptimizationPass())
.add_optimization_pass(SecurityValidationPass())
.with_validation(True)
.with_optimization(True)
.build())
# Use enterprise compiler
result = enterprise_compiler.compile(
"dag.airflow",
target="temporal",
output="workflow.py",
)
Builder Methods:
| Method | Description | Returns |
|---|---|---|
register_parser(...) |
Register custom parser | CompilerBuilder |
register_generator(...) |
Register custom generator | CompilerBuilder |
with_optimization_level(level) |
Set optimization level | CompilerBuilder |
with_tool_factory(factory) |
Set tool factory | CompilerBuilder |
with_router_factory(factory) |
Set router factory | CompilerBuilder |
add_optimization_pass(pass) |
Add custom pass | CompilerBuilder |
with_validation(enabled) |
Enable/disable validation | CompilerBuilder |
with_optimization(enabled) |
Enable/disable optimization | CompilerBuilder |
build() |
Build configured compiler | Compiler |
Best Practices:
- Reusability: Create builder configurations as reusable functions/classes
- Isolation: Use separate builder instances for different use cases
- Defaults: Builder inherits defaults from global registries
- Validation: Validate configuration in
build()method
Tier 8: IR Visitor
Purpose: Traverse and analyze IR structure using the visitor pattern.
API Reference:
from universal_agent_nexus.ir.visitor import (
IRVisitor,
DefaultIRVisitor,
traverse,
)
# Traverse IR with visitor
traverse(ir: ManifestIR, visitor: IRVisitor) -> None
# Base visitor interface
class IRVisitor(ABC):
@abstractmethod
def visit_manifest(self, ir: ManifestIR) -> None: ...
@abstractmethod
def visit_graph(self, graph: GraphIR) -> None: ...
@abstractmethod
def visit_node(self, node: NodeIR) -> None: ...
@abstractmethod
def visit_edge(self, edge: EdgeIR) -> None: ...
@abstractmethod
def visit_tool(self, tool: ToolIR) -> None: ...
@abstractmethod
def visit_router(self, router: RouterIR) -> None: ...
Complete Example: Dependency Analyzer
from universal_agent_nexus.ir.visitor import DefaultIRVisitor, traverse
from universal_agent_nexus.ir import ManifestIR, GraphIR, NodeIR, EdgeIR
class DependencyAnalyzer(DefaultIRVisitor):
"""Analyze dependencies between nodes."""
def __init__(self):
self.dependencies = {} # node_id -> set of dependencies
self.dependents = {} # node_id -> set of dependents
def visit_graph(self, graph: GraphIR):
"""Build dependency graph."""
# Initialize
for node in graph.nodes:
self.dependencies[node.id] = set()
self.dependents[node.id] = set()
# Process edges
for edge in graph.edges:
self.dependencies[edge.to_node].add(edge.from_node)
self.dependents[edge.from_node].add(edge.to_node)
def get_critical_path(self) -> List[str]:
"""Find longest path (critical path)."""
# Topological sort + longest path algorithm
# ... implementation ...
return []
def get_leaf_nodes(self) -> List[str]:
"""Find nodes with no outgoing edges."""
return [
node_id
for node_id, deps in self.dependents.items()
if not deps
]
# Analyze IR
analyzer = DependencyAnalyzer()
traverse(ir, analyzer)
print(f"Critical path: {analyzer.get_critical_path()}")
print(f"Leaf nodes: {analyzer.get_leaf_nodes()}")
Complete Example: Metrics Collector
class MetricsCollector(DefaultIRVisitor):
"""Collect metrics about IR structure."""
def __init__(self):
self.node_counts = {"task": 0, "router": 0, "tool": 0}
self.edge_count = 0
self.graph_count = 0
self.tool_count = 0
self.router_count = 0
def visit_manifest(self, ir: ManifestIR):
self.graph_count = len(ir.graphs)
self.tool_count = len(ir.tools)
self.router_count = len(ir.routers)
super().visit_manifest(ir)
def visit_node(self, node: NodeIR):
self.node_counts[node.kind.value] += 1
def visit_edge(self, edge: EdgeIR):
self.edge_count += 1
def get_summary(self) -> dict:
return {
"graphs": self.graph_count,
"nodes": sum(self.node_counts.values()),
"node_breakdown": self.node_counts,
"edges": self.edge_count,
"tools": self.tool_count,
"routers": self.router_count,
}
# Collect metrics
collector = MetricsCollector()
traverse(ir, collector)
metrics = collector.get_summary()
print(f"Total nodes: {metrics['nodes']}")
print(f"Node breakdown: {metrics['node_breakdown']}")
Best Practices:
- State Management: Use visitor instance variables to accumulate state
- Selective Override: Only override methods you need (use
DefaultIRVisitor) - Efficiency: Visitors are O(n) - efficient for large IRs
- Composability: Combine multiple visitors for complex analysis
Extension Patterns & Best Practices
Pattern 1: Plugin Architecture
Create a plugin system for extensions:
# plugins/my_plugin.py
from universal_agent_nexus.parser_registry import register_parser
from universal_agent_nexus.generator_registry import register_generator
def register_plugin():
"""Register all plugin components."""
register_parser("my_format", MyParser())
register_generator("my_target", MyGenerator())
# In application startup
from plugins import my_plugin
my_plugin.register_plugin()
Pattern 2: Configuration-Driven Extensions
Load extensions from configuration:
# config.yaml
extensions:
parsers:
- type: airflow
class: plugins.AirflowParser
priority: 150
generators:
- type: temporal
class: plugins.TemporalGenerator
passes:
- class: plugins.CustomOptimization
after: dead-node-elimination
# Load extensions
import yaml
config = yaml.safe_load(open("config.yaml"))
for parser_config in config["extensions"]["parsers"]:
parser_class = import_class(parser_config["class"])
register_parser(
parser_config["type"],
parser_class(),
detection_priority=parser_config.get("priority", 100),
)
Pattern 3: Extension Discovery
Auto-discover extensions:
import importlib
import pkgutil
def discover_extensions(package_name: str):
"""Auto-discover and register extensions."""
package = importlib.import_module(package_name)
for importer, modname, ispkg in pkgutil.iter_modules(package.__path__):
try:
module = importlib.import_module(f"{package_name}.{modname}")
if hasattr(module, "register"):
module.register()
except Exception as e:
logger.warning(f"Failed to load extension {modname}: {e}")
# Discover all extensions
discover_extensions("my_extensions")
Performance Considerations
- Parser Performance: Cache parsed results for expensive parsers
- Generator Performance: Use generators (yield) for large outputs
- Pass Performance: Profile passes - optimize hot paths
- Visitor Performance: Visitors are O(n) - avoid nested traversals
Testing Extensions
import pytest
from universal_agent_nexus.parser_registry import get_parser
from universal_agent_nexus.compiler import compile
def test_custom_parser():
"""Test custom parser registration."""
parser = get_parser("my_format")
assert parser is not None
ir = parser.parse("test.myformat")
assert ir.name == "test"
def test_custom_generator():
"""Test custom generator."""
result = compile("test.yaml", target="my_target")
assert "my_target_code" in result
🎯 Choosing the Right Runtime
| Scenario | Use | Why |
|---|---|---|
| Local development | LangGraph | Fast iteration, debugging, breakpoints |
| Production (< 1000 req/min) | LangGraph + Postgres | Simple deployment, lower cost |
| Production (> 1000 req/min) | AWS Step Functions | Auto-scaling, pay-per-use, 10K+ req/sec |
| Serverless/Event-driven | AWS Lambda + Step Functions | No servers to manage |
| AI client integration | MCP | Claude Desktop, Cursor, VS Code |
| Edge deployment | MCP (stdio) | Local-first, no internet required |
| Multi-cloud | LangGraph → Kubernetes | Cloud-agnostic containers |
Cost Comparison (1M executions/month)
| Runtime | Infrastructure | LLM Calls | Total |
|---|---|---|---|
| LangGraph (VPS) | $50 | $500 | $550 |
| AWS Step Functions | $25 | $500 | $525 |
| MCP (local) | $0 | $500 | $500 |
| LangGraph + Batch API | $50 | $10 | $60 ✨ |
💡 Batch API + Prompt Caching can reduce LLM costs by up to 98%. See Batch API.
📦 Examples
See complete, runnable examples in the examples/ folder:
| Example | Description | Directory |
|---|---|---|
| Hello World | Basic graph structure | examples/hello_langgraph/ |
| MCP Server | Expose agent as MCP tool | examples/mcp_server/ |
| AWS Production | Step Functions deployment | examples/aws_production/ |
| Multi-Cloud | AWS + GCP configuration | examples/multi_cloud/ |
| Hello MCP | MCP client integration | examples/hello_mcp/ |
Migration Guides
🔌 Adapters
LangGraph Adapter
Purpose: Local development with full debugging capabilities
Features:
- ✅ Async Python execution with asyncio
- ✅ Postgres checkpointing (AsyncPostgresSaver)
- ✅ MCP tool integration
- ✅ LLM router nodes (OpenAI, Anthropic)
- ✅ Batch API integration for cost optimization
Standard Runtime:
from universal_agent_nexus.adapters.langgraph import LangGraphRuntime, load_manifest
manifest = load_manifest("manifest.yaml")
runtime = LangGraphRuntime(
postgres_url="postgresql://localhost:5432/uaa_dev",
enable_checkpointing=True
)
await runtime.initialize(manifest, graph_name="main")
result = await runtime.execute(
execution_id="exec-001",
input_data={"context": {"query": "Hello!"}}
)
Batch-Aware Runtime (for cost optimization):
from universal_agent_nexus.adapters.langgraph import BatchAwareLangGraphRuntime
runtime = BatchAwareLangGraphRuntime(
postgres_url="postgresql://localhost:5432/uaa_dev",
enable_batching=True,
anthropic_api_key=os.environ["ANTHROPIC_API_KEY"],
)
await runtime.initialize(manifest)
result = await runtime.execute("exec-001", {"query": "Hello!"})
# Check stats: {'requests_queued': 100, 'cache_hits': 85, ...}
print(runtime.get_batch_stats())
AWS Adapter
Purpose: Production deployment at enterprise scale
Components:
- Step Functions - State machine execution
- Lambda - Tool execution functions
- DynamoDB - State persistence (single-table design)
- CloudWatch - Logging and metrics
- X-Ray - Distributed tracing
# Compile to AWS Step Functions
nexus compile manifest.yaml --target aws --output state_machine.json
# Deploy with Terraform
cd terraform/environments/prod
terraform apply
MCP Adapter
Purpose: Expose agents as tools to AI clients (Claude, Cursor, VS Code)
# Start MCP server
nexus serve manifest.yaml --protocol mcp --transport stdio
Claude Desktop configuration:
{
"mcpServers": {
"uaa-agent": {
"command": "nexus",
"args": ["serve", "/path/to/manifest.yaml", "--protocol", "mcp"]
}
}
}
💻 CLI Reference
| Command | Description | Example |
|---|---|---|
nexus compile |
Compile manifest to target runtime | nexus compile agent.yaml --target langgraph |
nexus translate |
Convert between runtimes | nexus translate agent.py --to aws |
nexus serve |
Start MCP server | nexus serve agent.yaml --protocol mcp |
nexus validate |
Validate manifest syntax | nexus validate agent.yaml |
nexus deploy |
Deploy to cloud provider | nexus deploy agent.yaml --env prod |
Common Flags
| Flag | Description | Default |
|---|---|---|
--target, -t |
Target runtime (langgraph, aws, mcp) |
Required |
--output, -o |
Output file path | ./output |
--opt-level |
Optimization level (none, basic, aggressive) |
basic |
--verbose, -v |
Enable verbose logging | false |
--dry-run |
Preview without writing | false |
✨ Features
Core Features
- ✅ Write Once, Run Anywhere - Single manifest compiles to LangGraph, AWS, MCP
- ✅ Production-Ready - Built-in state persistence, error handling, observability
- ✅ Type-Safe - Pydantic schemas with full validation
- ✅ Async-Native - asyncio throughout, no blocking calls
Observability
- ✅ OpenTelemetry - Distributed tracing across all adapters
- ✅ Structured Logging - JSON logs with execution context
- ✅ CloudWatch Integration - Automatic log/metric export (AWS)
- ✅ X-Ray Tracing - End-to-end request tracing (AWS)
from universal_agent_nexus.observability import setup_tracing, trace_execution
setup_tracing(service_name="my-agent", environment="production")
async with trace_execution("execute_graph", execution_id="exec-001"):
result = await runtime.execute(...)
IR Validation (v1.0.0)
Comprehensive validation with error codes and source locations:
from universal_agent_nexus.ir.validation import validate_ir, validate_and_raise
errors = validate_ir(ir)
for error in errors:
print(error)
# error[E001]: Entry node 'start' not found
# = hint: Add a node with id='start'
# Or raise on any error
validate_and_raise(ir, strict=True)
Error codes:
| Code | Category | Description |
|---|---|---|
E001-E004 |
Structural | Missing nodes, bad edges |
E101-E104 |
Type | Missing refs, unknown tools |
E201-E203 |
Semantic | No outgoing edges |
W301-W304 |
Warnings | Unreachable nodes, no terminal |
PassManager (v1.0.0)
LLVM-style optimization levels:
from universal_agent_nexus.compiler import compile
from universal_agent_nexus.ir.pass_manager import OptimizationLevel
# No optimization (fastest compile)
compile("agent.py", target="aws", opt_level=OptimizationLevel.NONE)
# Aggressive optimization (slowest compile, best output)
compile("agent.py", target="aws", opt_level=OptimizationLevel.AGGRESSIVE)
Available passes:
- Dead node elimination
- Edge deduplication
- Condition simplification
- Constant folding
- Router/Tool validation
- Cycle detection
- Batch optimization (for LLM cost reduction)
🌉 Cross-Runtime State Bridge
NEW in v2.0.0: Full state portability across LangGraph, AWS Step Functions, and UAA Kernel.
Nexus includes a Universal State Bridge that normalizes execution state across runtimes, enabling:
- Cross-runtime debugging and inspection
- State migration between environments
- Unified observability across runtimes
State Normalization
from universal_agent_nexus.bridges import (
normalize,
denormalize,
NormalizedGraphState,
StateFormat,
)
# Normalize LangGraph checkpoint to canonical format
normalized = normalize(langgraph_checkpoint)
print(f"Execution: {normalized.execution_id}")
print(f"Status: {normalized.status}")
print(f"History: {len(normalized.history)} steps")
# Convert to AWS Step Functions format
aws_input = denormalize(normalized, StateFormat.AWS)
# Or sync directly between runtimes
from universal_agent_nexus.bridges import sync_state
aws_state = sync_state(
source_state=langgraph_checkpoint,
source_format=StateFormat.LANGGRAPH,
target_format=StateFormat.AWS,
)
NormalizedGraphState Schema
class NormalizedGraphState(BaseModel):
execution_id: str # Unique execution identifier
graph_name: str # Name of the graph
status: str # pending, running, completed, failed, suspended
context: Dict[str, Any] # Current execution context
history: List[NormalizedHistoryEntry] # Full execution history
current_node_id: Optional[str] # Currently executing node
version: str # State schema version
created_at: Optional[str] # ISO8601 timestamp
updated_at: Optional[str] # ISO8601 timestamp
History Preservation
Unlike simple state snapshots, the bridge preserves full execution history:
class NormalizedHistoryEntry(BaseModel):
node_id: str # Node that was executed
node_kind: str # router, tool, task, human
status: str # completed, failed, running
input_data: Dict[str, Any] # Input to the node
output_data: Optional[Dict] # Output from the node
error: Optional[str] # Error message if failed
started_at: str # ISO8601 timestamp
ended_at: Optional[str] # ISO8601 timestamp
🔧 UAA Native Adapter
NEW in v2.0.0: Run directly through the Universal Agent Architecture kernel.
The UAA Native adapter makes the UAA Kernel a first-class compilation target:
Compile to UAA
nexus compile agent.yaml --target uaa --output kernel_manifest.yaml
Execute Through UAA Kernel
from universal_agent_nexus.adapters.uaa import UAANativeRuntime, UAANativeRuntimeBuilder
from universal_agent_architecture.task.store import SQLTaskStore
from my_llm_client import OpenAIClient
# Build runtime with dependency injection
runtime = (UAANativeRuntimeBuilder()
.with_task_store(SQLTaskStore("postgresql://..."))
.with_llm_client(OpenAIClient(api_key="..."))
.with_tool_executor("mcp", MCPToolExecutor())
.with_tool_executor("http", HTTPToolExecutor())
.build())
# Execute
result = await runtime.execute(
manifest=my_manifest,
graph_name="main",
input_data={"query": "Hello, world!"},
)
print(f"Status: {result.status}")
print(f"Output: {result.context}")
Resume Suspended Executions
# Resume a human-in-the-loop execution
result = await runtime.resume_execution(
execution_id="exec-001",
human_input={"approved": True, "feedback": "Looks good!"},
)
ContractRegistry (Architecture)
The UAA Kernel uses a ContractRegistry for dependency injection:
from universal_agent_architecture.runtime import get_global_registry
registry = get_global_registry()
# Register implementations
registry.register_task_store(SQLTaskStore("postgresql://..."))
registry.register_llm_client(OpenAIClient())
registry.register_tool_executor("mcp", MCPToolExecutor())
# Or configure from environment variables
# UAA_TASK_STORE=mypackage.stores.DynamoTaskStore
# UAA_LLM_CLIENT=mypackage.clients.AnthropicClient
registry.configure_from_env()
💰 Batch API + Prompt Caching
Save up to 98% on LLM costs with automatic request batching and prompt caching.
Nexus includes built-in support for the Anthropic Batch API with prompt caching. This is implemented as a compile-time optimization pass + runtime batching layer.
How It Works
┌─────────────────────────────────────────────────────────────────┐
│ COMPILE TIME │
│ BatchOptimizationPass analyzes IR: │
│ • Identifies LLM call nodes │
│ • Computes cache keys from system messages │
│ • Annotates nodes with BatchAnnotation │
└───────────────────────────────┬─────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ RUNTIME │
│ BatchAwareLangGraphRuntime: │
│ • Accumulates LLM requests │
│ • Batches them via Anthropic Batch API │
│ • Applies prompt caching headers │
└─────────────────────────────────────────────────────────────────┘
Cost Impact
| Scenario | Before | After | Savings |
|---|---|---|---|
| 100 LLM calls | $0.15 | $0.003 | 98% |
| 1000 calls/hour | $1.50/hr | $0.03/hr | $35/day |
| Shared system prompts | Full price | Cached | 90% on input tokens |
Quick Start
Option 1: Compile-time only (annotate for batching)
from universal_agent_nexus.builder import CompilerBuilder
builder = CompilerBuilder()
builder.with_batch_optimization(batch_size=100, max_wait_ms=5000)
result = builder.compile("agent.yaml", target="langgraph")
Option 2: Full runtime batching
from universal_agent_nexus.adapters.langgraph import BatchAwareLangGraphRuntime
runtime = BatchAwareLangGraphRuntime(
postgres_url="postgresql://localhost:5432/nexus",
enable_batching=True,
anthropic_api_key=os.environ["ANTHROPIC_API_KEY"],
batch_size=100, # Requests per batch
batch_wait_ms=5000, # Max wait before flush
)
await runtime.initialize(manifest)
# LLM calls are automatically batched
result = await runtime.execute("exec-001", {"query": "Hello"})
# Check batch statistics
stats = runtime.get_batch_stats()
print(f"Cache hits: {stats['cache_hits']}")
print(f"Batches submitted: {stats['batches_submitted']}")
Option 3: Explicit batch execution
# Execute multiple requests as a single batch
results = await runtime.execute_batch([
{"execution_id": "exec-001", "input_data": {"query": "Hello"}},
{"execution_id": "exec-002", "input_data": {"query": "World"}},
{"execution_id": "exec-003", "input_data": {"query": "Test"}},
])
Components
| Component | Location | Purpose |
|---|---|---|
BatchAnnotation |
ir/annotations.py |
Marks nodes eligible for batching |
BatchOptimizationPass |
ir/passes/batch_optimization.py |
Compile-time LLM node analysis |
BatchAccumulator |
adapters/langgraph/batch_accumulator.py |
Request queue + Anthropic Batch API |
BatchAwareLangGraphRuntime |
adapters/langgraph/runtime.py |
Batch-enabled runtime |
Customization
The batch system is fully pluggable. You can:
Use your own runtime:
from universal_agent_nexus.adapters.langgraph import LangGraphRuntime
class MyCustomBatchRuntime(LangGraphRuntime):
"""Your own batch implementation."""
async def execute(self, execution_id, input_data):
# Read batch annotations
for node in self.graph.nodes:
batch_info = node.metadata.get_annotation(BatchAnnotation)
if batch_info and batch_info.eligible:
# Your batching strategy here
pass
return await super().execute(execution_id, input_data)
Use the accumulator standalone:
from universal_agent_nexus.adapters.langgraph import BatchAccumulator
accumulator = BatchAccumulator(
api_key=os.environ["ANTHROPIC_API_KEY"],
max_batch_size=50,
)
await accumulator.start()
# Queue requests
future1 = await accumulator.queue_request(messages=[...], model="claude-sonnet-4-20250514")
future2 = await accumulator.queue_request(messages=[...], model="claude-sonnet-4-20250514")
# Flush and get results
await accumulator.flush()
result1 = await future1
result2 = await future2
Create a custom batch pass:
from universal_agent_nexus.ir.pass_interface import OptimizationPass
class MyBatchPass(OptimizationPass):
"""Custom batch analysis for OpenAI or other providers."""
@property
def name(self):
return "my-batch-optimization"
def apply(self, ir):
for graph in ir.graphs:
for node in graph.nodes:
# Your analysis logic
node.metadata.annotate(MyCustomAnnotation(...))
return ir
# Register with builder
builder = CompilerBuilder()
builder.add_optimization_pass(MyBatchPass())
Configuration
| Option | Default | Description |
|---|---|---|
batch_size |
100 | Maximum requests per batch |
batch_wait_ms |
5000 | Max wait time before auto-flush (ms) |
auto_flush |
True | Automatically flush on timer |
enable_cache_keys |
True | Compute cache keys for prompt caching |
Environment Variables
# Required for batch API
ANTHROPIC_API_KEY=sk-ant-...
# Optional: Override batch settings
NEXUS_BATCH_SIZE=100
NEXUS_BATCH_WAIT_MS=5000
⚡ Performance
Compiler Benchmarks (v1.0.0)
| Operation | Time | Throughput |
|---|---|---|
| IR Parsing | 0.45ms | 2,200/sec |
| Transform Passes | 0.07ms | 15,500/sec |
| Code Generation | 0.03-0.07ms | 14,000-33,000/sec |
| Full Compile Pipeline | 0.4-0.5ms | 2,000/sec |
| Validation | 0.03ms | 31,000/sec |
Runtime Benchmarks
| Metric | LangGraph (Local) | AWS (Serverless) |
|---|---|---|
| Cold Start | 50ms | 200-300ms (Lambda) |
| Warm Execution | 10-20ms/node | 15-25ms/node |
| State Persistence | 5ms (Postgres) | 3ms (DynamoDB) |
| Throughput | 1,000 req/sec | 10,000+ req/sec |
Optimizations (v1.0.0)
- ✅ boto3 + asyncio.to_thread - 30% faster than aioboto3
- ✅ DynamoDB batch operations - 25x faster bulk writes
- ✅ Postgres prepared statements - 2-3x faster queries
- ✅ Connection pooling - max_pool_connections=50
- ✅ Direct AsyncPostgresSaver - no bridge overhead
🔧 Troubleshooting
Common Issues
"Module not found: universal_agent_nexus"
# Make sure you installed with the right adapter
pip install "universal-agent-nexus[langgraph]" # For LangGraph
pip install "universal-agent-nexus[all]" # For everything
"Connection refused: PostgreSQL"
# Check Docker is running
docker ps | grep postgres
# Check environment variable
echo $UAA_POSTGRES_URL
# Should be: postgresql://postgres:password@localhost:5432/nexus_dev
# Restart Postgres container
docker run -d -p 5432:5432 \
-e POSTGRES_PASSWORD=password \
-e POSTGRES_DB=nexus_dev \
postgres:16-alpine
"Compilation failed: Unknown node kind 'foo'"
Valid node kinds: router, tool, task
Check your manifest against the Manifest Schema Reference.
"AWS deployment failed: Access Denied"
# Check AWS credentials
aws sts get-caller-identity
# Ensure IAM permissions for:
# - Step Functions (states:*)
# - Lambda (lambda:*)
# - DynamoDB (dynamodb:*)
# - IAM (iam:PassRole)
"MCP server not connecting"
# Test locally first
nexus serve manifest.yaml --protocol mcp --transport stdio
# Check Claude Desktop config path:
# macOS: ~/Library/Application Support/Claude/claude_desktop_config.json
# Windows: %APPDATA%\Claude\claude_desktop_config.json
Getting Help
📁 Project Structure
universal_agent_nexus/
├── universal_agent_nexus/ # Core package
│ ├── adapters/ # Runtime adapters
│ │ ├── langgraph/ # LangGraph adapter
│ │ │ ├── runtime.py # LangGraphRuntime + BatchAwareLangGraphRuntime
│ │ │ ├── batch_accumulator.py # Anthropic Batch API client
│ │ │ └── compiler.py # Manifest → StateGraph
│ │ ├── aws/ # AWS adapter
│ │ └── mcp/ # MCP adapter
│ ├── ir/ # Intermediate Representation
│ │ ├── annotations.py # BatchAnnotation, CostAnnotation, etc.
│ │ ├── passes/ # Optimization passes
│ │ │ └── batch_optimization.py # LLM batching analysis
│ │ ├── pass_manager.py # LLVM-style pass pipeline
│ │ └── transforms.py # Built-in transforms
│ ├── cli/ # CLI interface
│ ├── builder.py # CompilerBuilder with batch support
│ └── observability/ # Tracing & logging
├── terraform/ # Infrastructure as Code
│ ├── modules/ # Reusable modules
│ └── environments/ # Environment configs
├── lambda/ # Lambda function code
├── examples/ # Example manifests
└── tests/ # Test suite
🧪 Testing
# All tests
pytest
# Unit tests only
pytest tests/unit/
# Integration tests (requires Docker)
docker-compose up -d postgres
pytest tests/integration/
# With coverage
pytest --cov=universal_agent_nexus --cov-report=html
Current status: 55 tests + 5 benchmarks passing
🚀 Deployment
Local Development
# Start Postgres
docker run -d -p 5432:5432 \
-e POSTGRES_PASSWORD=password \
-e POSTGRES_DB=uaa_dev \
postgres:16-alpine
# Run agent locally
python examples/hello_langgraph/run.py
AWS Production
# Compile manifest
nexus compile manifest.yaml --target aws \
--output terraform/environments/prod/state_machine.json
# Deploy with Terraform
cd terraform/environments/prod
terraform init
terraform apply
# Execute
aws stepfunctions start-execution \
--state-machine-arn $(terraform output -raw state_machine_arn) \
--input '{"context": {"query": "Hello!"}}'
⚙️ Configuration
Environment Variables
# LangGraph
UAA_POSTGRES_URL=postgresql://localhost:5432/uaa_dev
UAA_LOG_LEVEL=INFO
# AWS
AWS_REGION=us-east-1
AWS_DYNAMODB_TABLE=uaa-agent-state
# Observability
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
OTEL_SERVICE_NAME=universal-agent-nexus
# LLM APIs
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-... # Required for Batch API
# Batch API (optional overrides)
NEXUS_BATCH_SIZE=100 # Requests per batch
NEXUS_BATCH_WAIT_MS=5000 # Max wait before flush
🔄 Version Compatibility
| Nexus Version | Python | LangGraph | boto3 | MCP SDK |
|---|---|---|---|---|
| 1.0.x | 3.11+ | ≥0.2.0 | ≥1.34.0 | ≥1.0.0 |
| 0.9.x | 3.10+ | ≥0.1.0 | ≥1.28.0 | ≥0.9.0 |
🤝 Contributing
We welcome contributions!
# Clone repository
git clone https://github.com/mjdevaccount/universal_agent_nexus.git
cd universal_agent_nexus
# Install dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Format code
ruff format .
ruff check . --fix
📄 License
This project is licensed under the MIT License - see the LICENSE file for details.
🙏 Acknowledgments
Built with inspiration from:
- LangChain/LangGraph - Agent orchestration patterns
- AWS Step Functions - State machine execution model
- Model Context Protocol (MCP) - AI client integration standard
- Terraform - Infrastructure as code principles
Made with ❤️ by the Universal Agent Nexus team
⭐ Star us on GitHub if this project helps you!
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file universal_agent_nexus-2.0.2.tar.gz.
File metadata
- Download URL: universal_agent_nexus-2.0.2.tar.gz
- Upload date:
- Size: 160.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
123e3911020bbc2620a0af1f07ed3bd20841edc79fd8ceeb9518d44fc1de5bc7
|
|
| MD5 |
5842aff191e2d663b4d90ee46b554787
|
|
| BLAKE2b-256 |
1a8befee02230853189798fcd6d0655d4d27d2626fc195d2b46bbf1ccc63b54d
|
File details
Details for the file universal_agent_nexus-2.0.2-py3-none-any.whl.
File metadata
- Download URL: universal_agent_nexus-2.0.2-py3-none-any.whl
- Upload date:
- Size: 116.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b6167196665e81bb8b0b616c2b24e36fc34747f963e8ca1ca81785cf8490a6c5
|
|
| MD5 |
9035f3bf2011bf661904b1c118784420
|
|
| BLAKE2b-256 |
dec32a21d5d2563c12d6e7c9ea49196c5e28da9ba5cb35005d8e69ab708847d1
|