Skip to main content

The Translation Layer: Compile Universal Agents to AWS, LangGraph, MCP, or UAA Kernel.

Project description

Universal Agent Nexus

The Translation Layer: Build Once, Run Anywhere

Compile universal agent architectures to LangGraph, AWS Step Functions, MCP, or the UAA Kernel—without rewriting code.

PyPI version Python 3.11+ License: MIT Tests Code style: ruff

Quick StartSchema ReferenceState BridgeBatch APIExtending NexusExamplesTroubleshootingReport Bug


🎯 What is Universal Agent Nexus?

The Problem You're Solving

You built an AI agent. It works great locally in LangGraph. Now you need to:

  • Deploy to production at scale → AWS Step Functions
  • Expose to Claude Desktop for teammates → MCP
  • Run through the UAA Kernel → Native execution
  • Keep local dev fast for iteration → LangGraph

Traditional approach: Rewrite your agent three times. Maintain three codebases. Deal with drift.

Universal Agent Nexus approach: Write once. Compile to any runtime.

# One manifest
name: my-agent
graphs: [...]
nexus compile my-agent.yaml --target langgraph  # Local dev
nexus compile my-agent.yaml --target aws        # Production  
nexus compile my-agent.yaml --target mcp        # AI clients
nexus compile my-agent.yaml --target uaa        # UAA Kernel (native)

Same logic. Four runtimes. Zero rewrites.


🔗 How Nexus + Fabric + Architecture Work Together

New to Universal Agent? This section explains the ecosystem.

The Universal Agent ecosystem consists of three interconnected repositories:

Repository Role Metaphor
Architecture Runtime kernel, state management, durability Linux Kernel
Fabric Roles, domains, policies → compiled manifests Userland / distro
Nexus (this repo) Adapters for AWS, LangGraph, MCP, UAA Kernel Network / drivers
┌─────────────────────────────────────────────────────────────────────────────┐
│                         FABRIC (Composition Layer)                           │
│   github.com/mjdevaccount/universal_agent_fabric                             │
│                                                                              │
│   Role + Domain + Policy YAMLs → FabricBuilder → Enriched Manifest          │
└────────────────────────────────────┬────────────────────────────────────────┘
                                     │
                                     ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│                         NEXUS (Compiler Layer)                               │
│   github.com/mjdevaccount/universal_agent_nexus (this repo)                  │
│                                                                              │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │                        ManifestIR                                    │   │
│   └────────────────────────────┬────────────────────────────────────────┘   │
│                                │ Generators                                  │
│   ┌────────────┬───────────────┼───────────────┬────────────────────────┐   │
│   │ LangGraph  │      AWS      │      MCP      │     UAA Native         │   │
│   └────────────┴───────────────┴───────────────┴────────────────────────┘   │
│                                                                              │
│   + State Normalization Bridge (LangGraph ↔ AWS ↔ UAA)                       │
└────────────────────────────────────┬────────────────────────────────────────┘
                                     │
                                     ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│                       ARCHITECTURE (Kernel Layer)                            │
│   github.com/mjdevaccount/universal_agent_architecture                       │
│                                                                              │
│   GraphEngine + Handlers + PolicyEngine + TaskStore + ContractRegistry      │
└─────────────────────────────────────────────────────────────────────────────┘

The Workflow

# Step 1: Use Fabric to compose your agent
fabric build \
  --role researcher.yaml \
  --domain finance.yaml \
  --policy safety.yaml \
  --out my_agent.yaml

# Step 2: Use Nexus to compile to runtime
nexus compile my_agent.yaml --target langgraph --output agent.py

# Step 3: Run
python agent.py

Why Separate Repos?

Repo Owner Purpose
Fabric Security/Business teams Roles, capabilities, governance policies
Nexus Platform/DevOps teams Runtime translation, deployment pipelines

This separation allows security teams to own policies while platform teams own infrastructure—without stepping on each other.

Can I Use Nexus Without Fabric?

Absolutely! You can write manifest YAML files directly (see Manifest Schema Reference). Fabric just makes it easier for complex, enterprise scenarios.


🚀 Complete Beginner Tutorial

From Zero to Running Agent (5 minutes)

Goal: Build a simple chatbot that greets users by name.

Step 1: Install

pip install "universal-agent-nexus[langgraph]"

Step 2: Create manifest

Create chatbot.yaml:

name: greeter-bot
version: "1.0.0"
description: "A friendly greeting agent"

graphs:
  - name: main
    entry_node: greet
    nodes:
      - id: greet
        kind: task
        label: "Generate Greeting"
        config:
          prompt: "Generate a friendly greeting for {name}"
    edges: []

tools: []
routers: []
policies: []

Step 3: Compile to LangGraph

nexus compile chatbot.yaml --target langgraph --output bot.py

This generates bot.py with:

  • ✅ LangGraph StateGraph definition
  • ✅ Postgres checkpointing
  • ✅ Async execution

Step 4: Run locally

# Start Postgres (Docker)
docker run -d -p 5432:5432 \
  -e POSTGRES_PASSWORD=password \
  -e POSTGRES_DB=nexus_dev \
  postgres:16-alpine

# Set environment
export UAA_POSTGRES_URL=postgresql://postgres:password@localhost:5432/nexus_dev

# Run
python bot.py

Expected output:

✅ Connected to PostgreSQL
✅ Graph initialized: main
Input: {"name": "Alice"}
Output: "Hello Alice! Welcome to Universal Agent Nexus! 👋"

Step 5: Deploy to AWS (Optional)

# Compile to AWS
nexus compile chatbot.yaml --target aws --output state_machine.json

# Deploy (requires AWS CLI configured)
cd terraform/environments/dev
terraform init
terraform apply

✅ You just built and deployed a multi-runtime agent!


📝 Manifest Schema Reference

Complete Example

name: "my-agent"
version: "1.0.0"
description: "Agent description"

graphs:
  - name: "main"
    entry_node: "start"
    nodes:
      - id: "start"
        kind: router              # Decision point
        label: "Risk Classifier"
        config:
          llm: "gpt-4o-mini"
          system_message: "You are a risk classifier..."
          
      - id: "validate"
        kind: tool                # External capability
        label: "Policy Validator"
        tool_ref: "policy_check"
        
      - id: "escalate"
        kind: task                # LLM generation
        label: "Generate Summary"
        config:
          prompt: "Summarize incident: {input}"
    
    edges:
      - from_node: "start"
        to_node: "validate"
        condition:
          expression: "risk_level > 0.5"
          
      - from_node: "validate"
        to_node: "escalate"
        condition:
          expression: "policy_violated == true"

tools:
  - name: "policy_check"
    protocol: "mcp"
    config:
      command: "mcp-policy-server"
      args: ["--strict"]

routers:
  - name: "risk_classifier"
    llm_provider: "openai"
    model: "gpt-4o-mini"
    system_message: "Classify risk as low/medium/high"

policies:
  - name: "require_approval"
    target_pattern: "execute_*"
    action: "require_approval"
    conditions:
      risk_level: ">0.8"

Node Types

Kind Purpose When to Use Config Fields
router LLM makes a decision Multi-path routing, classification llm, system_message, tools
tool Call external API/MCP Database query, API call, file I/O tool_ref, input_mapping
task LLM generates text Summarization, translation, writing prompt, llm, max_tokens

Edge Conditions

Conditions use simple expression syntax:

# Simple comparison
condition:
  expression: "risk_level > 0.5"

# Multiple conditions (AND)
condition:
  expression: "risk_level > 0.5 AND confidence > 0.8"

# Object field access
condition:
  expression: "user.role == 'admin'"

# List operations
condition:
  expression: "len(violations) > 0"

Tool Protocols

Protocol Use Case Example
mcp Model Context Protocol servers Claude Desktop tools, Cursor plugins
http REST APIs External services, webhooks
local Python functions In-process tools
subprocess CLI commands Shell scripts, binaries

Required vs Optional Fields

Field Required Default Description
name - Unique agent identifier
version - Semantic version (e.g., "1.0.0")
description "" Human-readable description
graphs - Array of graph definitions
graphs[].name - Graph identifier
graphs[].entry_node - ID of starting node
graphs[].nodes - Array of node definitions
graphs[].edges [] Array of edge definitions
tools [] External tool definitions
routers [] Router configurations
policies [] Governance policies

📚 Core Concepts

1. Universal Agent Architecture (UAA)

The Universal Agent Architecture is a specification for defining multi-step agent workflows as portable, runtime-agnostic manifests.

Key components:

  • Graph: A directed graph of nodes (tasks, routers, tools) connected by edges
  • Node: A single execution unit (LLM call, tool invocation, conditional logic)
  • Edge: Transition between nodes (with optional conditions)
  • Tool: External capability (API, database, MCP server)
  • Router: Decision point that chooses next action(s) dynamically

2. Compilation Targets (Adapters)

Target Best For Execution Model State Storage
LangGraph Local dev, debugging Python async PostgreSQL
AWS Production scale Step Functions, Lambda DynamoDB
MCP AI client integration stdio transport In-memory

3. The IR-Based Compiler (v1.0.0)

Why an Intermediate Representation?

Without IR, we'd need N² translators (LangGraph↔AWS, AWS↔MCP, etc.). With IR, we need only 2N (N parsers + N generators).

Benefits:

  • Bidirectional translation: LangGraph ⇄ AWS ⇄ MCP
  • Optimization passes: Dead code elimination, edge deduplication
  • Validation: Type checking before code generation
  • Future-proof: Add new runtimes without changing existing ones

How it works:

┌────────────────────────────────────────────────────────────┐
│                     FRONTENDS (Parsers)                    │
│  LangGraph → IR  |  AWS → IR  |  YAML → IR                │
└──────────────────────────┬─────────────────────────────────┘
                           │
                           ▼
┌────────────────────────────────────────────────────────────┐
│                  INTERMEDIATE REPRESENTATION               │
│            ManifestIR { GraphIR, ToolIR, RouterIR }        │
└──────────────────────────┬─────────────────────────────────┘
                           │
                           ▼
┌────────────────────────────────────────────────────────────┐
│                   TRANSFORMATION PASSES                    │
│  Dead code elimination | Edge deduplication | Validation   │
└──────────────────────────┬─────────────────────────────────┘
                           │
                           ▼
┌────────────────────────────────────────────────────────────┐
│                    BACKENDS (Generators)                   │
│  IR → LangGraph  |  IR → AWS  |  IR → YAML                │
└────────────────────────────────────────────────────────────┘

Bidirectional translation in action:

# AWS → LangGraph (reverse compile!)
nexus translate state_machine.json --to langgraph --out agent.py

# LangGraph → AWS
nexus translate agent.py --to aws --out state_machine.json

# Chain translations: AWS → LangGraph → MCP
nexus translate state_machine.json --to langgraph --out temp.py
nexus translate temp.py --to mcp --out server.py

🔧 Extending Nexus: Platform Extension API

For Platform Engineers & Framework Developers

Nexus exposes a comprehensive extension API that enables you to customize every aspect of the compilation pipeline. This section documents all extension points, APIs, and patterns for building on top of Nexus.

Extension Points Overview

Nexus provides 8 tiers of abstraction exposure, each enabling different levels of customization:

Tier Extension Point Use Case Complexity
1 Parser Registry Add support for new source formats (Airflow, Prefect, Temporal) Low
2 Generator Registry Add support for new target runtimes (K8s, GCP, Azure) Low
3 Pass Interface Create custom optimization/validation passes Medium
4 Annotations Add typed metadata to IR nodes/edges Low
5 Factories Customize tool/router instantiation logic Medium
6 Enrichment Strategy Customize Fabric fragment composition Medium
7 Compiler Builder Configure entire compiler with custom components High
8 IR Visitor Traverse and analyze IR structure Low

Tier 1: Parser Registry

Purpose: Register custom parsers to support new source formats.

API Reference:

from universal_agent_nexus.parser_registry import (
    register_parser,
    get_parser,
    detect_source_type,
    list_parsers,
    ParserRegistry,
)

# Register a custom parser
register_parser(
    source_type: str,
    parser: Parser,
    *,
    detection_priority: int = 100,
    aliases: Optional[Set[str]] = None,
    description: Optional[str] = None,
) -> None

# Get parser by source type
parser = get_parser(source_type: str) -> Parser

# Auto-detect source type
detected = detect_source_type(source: str) -> str

# List all registered parsers
parsers = list_parsers() -> Dict[str, ParserInfo]

Parser Protocol:

All parsers must implement the Parser protocol:

from universal_agent_nexus.ir.parser import Parser
from universal_agent_nexus.ir import ManifestIR

class MyFrameworkParser:
    """Custom parser for MyFramework."""
    
    def parse(self, source: str) -> ManifestIR:
        """
        Parse source and produce IR.
        
        Args:
            source: Path to source file or source string
            
        Returns:
            ManifestIR instance
        """
        # Parse your format → ManifestIR
        return ManifestIR(...)
    
    def can_parse(self, source: str) -> bool:
        """
        Check if this parser can handle the source.
        
        Args:
            source: Path to source file or source string
            
        Returns:
            True if parser can handle this source
        """
        # Detection logic (file extension, content sniffing, etc.)
        return source.endswith(".myfw")

Complete Example: Airflow Parser

from universal_agent_nexus.parser_registry import register_parser
from universal_agent_nexus.ir import ManifestIR, GraphIR, NodeIR, NodeKind
from pathlib import Path
import yaml

class AirflowParser:
    """Parse Apache Airflow DAGs to ManifestIR."""
    
    def parse(self, source: str) -> ManifestIR:
        path = Path(source)
        if path.exists():
            dag_def = yaml.safe_load(path.read_text())
        else:
            dag_def = yaml.safe_load(source)
        
        # Convert Airflow DAG → ManifestIR
        nodes = [
            NodeIR(
                id=task_id,
                kind=NodeKind.TASK,
                label=task.get("task_name", task_id),
                description=task.get("description"),
            )
            for task_id, task in dag_def.get("tasks", {}).items()
        ]
        
        graph = GraphIR(
            name=dag_def.get("dag_id", "main"),
            entry_node=nodes[0].id if nodes else "start",
            nodes=nodes,
            edges=[],  # Convert Airflow dependencies to edges
        )
        
        return ManifestIR(
            name=dag_def.get("dag_id", "airflow-dag"),
            version="1.0.0",
            description=dag_def.get("description", ""),
            graphs=[graph],
        )
    
    def can_parse(self, source: str) -> bool:
        path = Path(source)
        if path.exists():
            return source.endswith((".airflow", ".dag.yaml"))
        try:
            data = yaml.safe_load(source)
            return "dag_id" in data and "tasks" in data
        except Exception:
            return False

# Register the parser
register_parser(
    "airflow",
    AirflowParser(),
    detection_priority=150,  # Higher = checked first
    aliases={"dag", "airflow_dag"},
    description="Apache Airflow DAG parser",
)

# Now compile works with Airflow!
from universal_agent_nexus.compiler import compile
result = compile("my_dag.airflow", target="aws")

Best Practices:

  • Detection Priority: Set higher priority (200+) for formats with unique file extensions, lower (50-100) for ambiguous formats
  • Aliases: Register common aliases (e.g., {"uaa", "yaml"} for YAML parser)
  • Error Handling: Provide clear error messages with source location information
  • Performance: Cache parsed results if parsing is expensive

Available Parsers:

# List all registered parsers
parsers = list_parsers()
for source_type, info in parsers.items():
    print(f"{source_type}: {info.description}")
    print(f"  Aliases: {info.aliases}")
    print(f"  Priority: {info.detection_priority}")

Tier 2: Generator Registry

Purpose: Register custom generators to support new target runtimes.

API Reference:

from universal_agent_nexus.generator_registry import (
    register_generator,
    get_generator,
    list_generators,
    GeneratorRegistry,
)

# Register a custom generator
register_generator(
    target_type: str,
    generator: Generator,
    *,
    aliases: Optional[Set[str]] = None,
    default_options: Optional[Dict] = None,
    description: Optional[str] = None,
) -> None

# Get generator by target type
generator = get_generator(target_type: str) -> Generator

# List all registered generators
generators = list_generators() -> Dict[str, GeneratorInfo]

Generator Protocol:

All generators must implement the Generator protocol:

from universal_agent_nexus.ir.generator import Generator
from universal_agent_nexus.ir import ManifestIR

class MyPlatformGenerator:
    """Custom generator for MyPlatform."""
    
    def generate(self, ir: ManifestIR) -> str:
        """
        Generate target format from IR.
        
        Args:
            ir: ManifestIR instance
            
        Returns:
            Generated code/config as string
        """
        # Convert ManifestIR → your platform's format
        lines = []
        for graph in ir.graphs:
            lines.append(f"# Graph: {graph.name}")
            # ... generate code ...
        return "\n".join(lines)

Complete Example: Temporal Generator

from universal_agent_nexus.generator_registry import register_generator
from universal_agent_nexus.ir import ManifestIR, GraphIR, NodeIR, NodeKind

class TemporalGenerator:
    """Generate Temporal workflows from ManifestIR."""
    
    def generate(self, ir: ManifestIR) -> str:
        """Generate Temporal workflow code."""
        lines = [
            '"""',
            f"Auto-generated Temporal workflow: {ir.name}",
            '"""',
            "",
            "from temporalio import workflow",
            "",
        ]
        
        for graph in ir.graphs:
            lines.extend([
                f"@workflow.defn",
                f"class {graph.name.title()}Workflow:",
                f'    """{graph.name} workflow."""',
                "",
                "    @workflow.run",
                "    async def run(self, input_data: dict) -> dict:",
            ])
            
            # Generate workflow steps
            for node in graph.nodes:
                if node.kind == NodeKind.TASK:
                    lines.append(f"        result = await {node.id}_activity(input_data)")
                elif node.kind == NodeKind.TOOL:
                    lines.append(f"        result = await execute_tool('{node.tool_ref}', input_data)")
            
            lines.append("        return result")
            lines.append("")
        
        return "\n".join(lines)

# Register the generator
register_generator(
    "temporal",
    TemporalGenerator(),
    aliases={"temporalio"},
    description="Temporal.io workflow generator",
)

# Now compile works with Temporal!
from universal_agent_nexus.compiler import compile
result = compile("agent.yaml", target="temporal")

Best Practices:

  • Idempotency: Generated code should be deterministic (same IR → same output)
  • Readability: Include comments and clear structure in generated code
  • Error Handling: Validate IR before generation, provide helpful error messages
  • Options: Use default_options for generator-specific configuration

Tier 3: Pass Interface

Purpose: Create custom transformation passes for optimization, validation, or instrumentation.

API Reference:

from universal_agent_nexus.ir.pass_interface import (
    Transform,
    ValidationPass,
    OptimizationPass,
    InstrumentationPass,
    PassMetadata,
)
from universal_agent_nexus.ir import ManifestIR
from universal_agent_nexus.ir.pass_manager import PassManager

# Base interface
class Transform(ABC):
    @property
    @abstractmethod
    def metadata(self) -> PassMetadata: ...
    
    @abstractmethod
    def apply(self, ir: ManifestIR) -> ManifestIR: ...

# Specialized base classes
class ValidationPass(Transform):  # Don't modify IR
    @abstractmethod
    def validate(self, ir: ManifestIR) -> list[str]: ...

class OptimizationPass(Transform):  # Modify IR for performance
    ...

class InstrumentationPass(Transform):  # Add metadata/observability
    ...

Pass Metadata:

from dataclasses import dataclass
from typing import Set

@dataclass
class PassMetadata:
    name: str                          # Unique pass identifier
    description: str                   # Human-readable description
    requires: Set[str] = set()         # Passes that must run before this
    invalidates: Set[str] = set()      # Analyses invalidated by this pass
    preserves: Set[str] = set()        # Analyses preserved by this pass
    category: str = "optimization"     # optimization, validation, instrumentation

Complete Example: Custom Optimization Pass

from universal_agent_nexus.ir.pass_interface import OptimizationPass, PassMetadata
from universal_agent_nexus.ir import ManifestIR, GraphIR, NodeIR, NodeKind

class MergeDuplicateTools(OptimizationPass):
    """Merge duplicate tool nodes with identical configurations."""
    
    @property
    def metadata(self) -> PassMetadata:
        return PassMetadata(
            name="merge-duplicate-tools",
            description="Merge duplicate tool nodes to reduce redundancy",
            requires={"dead-node-elimination"},  # Run after dead code removal
            invalidates={"edge-analysis"},       # May change edges
            preserves={"cycle-free"},            # Preserves cycle-freedom
            category="optimization",
        )
    
    def apply(self, ir: ManifestIR) -> ManifestIR:
        """Merge duplicate tools."""
        for graph in ir.graphs:
            # Group nodes by tool signature
            tool_signatures = {}
            nodes_to_remove = []
            
            for node in graph.nodes:
                if node.kind == NodeKind.TOOL and node.tool_ref:
                    sig = (node.tool_ref, str(sorted(node.config.items())))
                    
                    if sig in tool_signatures:
                        # Found duplicate - redirect edges to original
                        original = tool_signatures[sig]
                        self._redirect_edges(graph, node.id, original.id)
                        nodes_to_remove.append(node.id)
                    else:
                        tool_signatures[sig] = node
            
            # Remove merged nodes
            if nodes_to_remove:
                graph.nodes = [n for n in graph.nodes if n.id not in nodes_to_remove]
                graph._build_indexes()
        
        return ir
    
    def _redirect_edges(self, graph: GraphIR, from_id: str, to_id: str) -> None:
        """Redirect all edges from one node to another."""
        for edge in graph.edges:
            if edge.from_node == from_id:
                edge.from_node = to_id
            if edge.to_node == from_id:
                edge.to_node = to_id

# Use the pass
from universal_agent_nexus.ir.pass_manager import PassManager, OptimizationLevel

manager = PassManager(opt_level=OptimizationLevel.DEFAULT)
manager.add_custom_pass(
    MergeDuplicateTools(),
    after="dead-node-elimination",  # Explicit ordering
)
ir = manager.run(ir)

Complete Example: Validation Pass

from universal_agent_nexus.ir.pass_interface import ValidationPass, PassMetadata

class CostLimitValidation(ValidationPass):
    """Validate that estimated costs don't exceed limits."""
    
    @property
    def metadata(self) -> PassMetadata:
        return PassMetadata(
            name="cost-limit-validation",
            description="Validate estimated execution costs",
            requires={"dead-node-elimination"},
            category="validation",
        )
    
    def validate(self, ir: ManifestIR) -> list[str]:
        """Return list of error messages (empty if valid)."""
        errors = []
        max_cost_cents = 1000  # $10.00 limit
        
        from universal_agent_nexus.ir.visitor import DefaultIRVisitor, traverse
        
        class CostCollector(DefaultIRVisitor):
            def __init__(self):
                self.total_cost = 0.0
            
            def visit_node(self, node):
                from universal_agent_nexus.ir.annotations import CostAnnotation
                cost = node.metadata.get_annotation(CostAnnotation)
                if cost:
                    self.total_cost += cost.cost_cents
        
        collector = CostCollector()
        traverse(ir, collector)
        
        if collector.total_cost > max_cost_cents:
            errors.append(
                f"Estimated cost ${collector.total_cost/100:.2f} exceeds "
                f"limit ${max_cost_cents/100:.2f}"
            )
        
        return errors

# Use validation pass
manager = PassManager()
manager.add_custom_pass(CostLimitValidation())
try:
    ir = manager.run(ir)
except RuntimeError as e:
    print(f"Validation failed: {e}")

Pass Ordering:

Passes are automatically ordered based on requires dependencies. You can also specify explicit ordering:

manager.add_custom_pass(
    MyPass(),
    before="some-other-pass",  # Run before this pass
    after="dead-node-elimination",  # Run after this pass
)

Available Passes:

from universal_agent_nexus.ir.pass_manager import PassManager

manager = PassManager()
passes = manager.list_passes()
for name, metadata in passes.items():
    print(f"{name}: {metadata.description}")
    print(f"  Requires: {metadata.requires}")
    print(f"  Category: {metadata.category}")

Tier 4: Annotations

Purpose: Add typed metadata to IR nodes and edges for analysis, optimization, or observability.

API Reference:

from universal_agent_nexus.ir.annotations import (
    Annotation,
    CostAnnotation,
    MonitoringAnnotation,
    PerformanceAnnotation,
    BatchAnnotation,
)
from universal_agent_nexus.ir import NodeIR, Metadata

# Annotate a node
node.metadata.annotate(CostAnnotation(tokens=500, latency_ms=250, cost_cents=0.05))

# Get annotation by type
cost = node.metadata.get_annotation(CostAnnotation) -> Optional[CostAnnotation]

# Check if annotation exists
has_cost = node.metadata.has_annotation(CostAnnotation) -> bool

# List all annotations
annotations = node.metadata.list_annotations() -> Dict[str, Annotation]

Built-in Annotations:

# Cost tracking
cost = CostAnnotation(
    tokens=500,           # Estimated token count
    latency_ms=250,       # Estimated latency in milliseconds
    cost_cents=0.05,      # Estimated cost in cents
)
node.metadata.annotate(cost)

# Monitoring configuration
monitoring = MonitoringAnnotation(
    trace=True,           # Enable distributed tracing
    log_level="INFO",     # Logging level
)
node.metadata.annotate(monitoring)

# Performance characteristics
performance = PerformanceAnnotation(
    expected_duration_ms=100.0,  # Expected execution time
    max_concurrency=5,           # Max concurrent executions
    timeout_ms=5000,              # Timeout in milliseconds
)
node.metadata.annotate(performance)

# Batch API eligibility (added by BatchOptimizationPass)
batch = BatchAnnotation(
    eligible=True,              # Whether node can be batched
    batch_group="claude:abc123", # Group for batching similar requests
    cache_key="sha256...",      # Cache key for prompt caching
    priority=100,               # Higher = process sooner
    max_wait_ms=5000,           # Max wait for batch accumulation
)
node.metadata.annotate(batch)

Creating Custom Annotations:

from universal_agent_nexus.ir.annotations import Annotation

class SecurityAnnotation(Annotation):
    """Track security requirements for nodes."""
    
    @classmethod
    def key(cls) -> str:
        return "security"
    
    def __init__(
        self,
        requires_auth: bool = True,
        required_roles: Optional[List[str]] = None,
        encryption_required: bool = False,
    ):
        self.requires_auth = requires_auth
        self.required_roles = required_roles or []
        self.encryption_required = encryption_required

# Use custom annotation
security = SecurityAnnotation(
    requires_auth=True,
    required_roles=["admin", "operator"],
    encryption_required=True,
)
node.metadata.annotate(security)

# Retrieve later
security = node.metadata.get_annotation(SecurityAnnotation)
if security and "admin" in security.required_roles:
    # Apply admin-only logic
    pass

Complete Example: Cost Analysis

from universal_agent_nexus.ir.annotations import CostAnnotation
from universal_agent_nexus.ir.visitor import DefaultIRVisitor, traverse

class CostAnalyzer(DefaultIRVisitor):
    """Analyze total estimated cost of IR."""
    
    def __init__(self):
        self.total_cost_cents = 0.0
        self.total_tokens = 0
        self.total_latency_ms = 0.0
        self.node_costs = {}
    
    def visit_node(self, node):
        cost = node.metadata.get_annotation(CostAnnotation)
        if cost:
            self.total_cost_cents += cost.cost_cents
            self.total_tokens += cost.tokens
            self.total_latency_ms += cost.latency_ms
            self.node_costs[node.id] = cost
    
    def get_summary(self) -> dict:
        return {
            "total_cost_dollars": self.total_cost_cents / 100,
            "total_tokens": self.total_tokens,
            "total_latency_seconds": self.total_latency_ms / 1000,
            "node_count": len(self.node_costs),
        }

# Analyze IR
analyzer = CostAnalyzer()
traverse(ir, analyzer)
summary = analyzer.get_summary()
print(f"Estimated cost: ${summary['total_cost_dollars']:.2f}")
print(f"Estimated latency: {summary['total_latency_seconds']:.2f}s")

Best Practices:

  • Type Safety: Always use typed annotations (get_annotation<T>()) instead of raw dict access
  • Immutability: Annotations should be immutable after creation
  • Key Uniqueness: Each annotation class must have a unique key()
  • Optional Access: Always check for None when retrieving annotations

Tier 5: Factories

Purpose: Customize how tools and routers are instantiated during parsing.

API Reference:

from universal_agent_nexus.ir.factories import (
    ToolFactory,
    RouterFactory,
    set_tool_factory,
    set_router_factory,
    get_tool_factory,
    get_router_factory,
    DefaultToolFactory,
    DefaultRouterFactory,
)

# Set global factories
set_tool_factory(factory: ToolFactory) -> None
set_router_factory(factory: RouterFactory) -> None

# Get current factories
tool_factory = get_tool_factory() -> ToolFactory
router_factory = get_router_factory() -> RouterFactory

Tool Factory Interface:

from universal_agent_nexus.ir.factories import ToolFactory
from universal_agent_nexus.ir import ToolIR

class CustomToolFactory(ToolFactory):
    """Custom tool factory with retry logic."""
    
    def create_tool(
        self,
        name: str,
        description: str,
        protocol: str,
        config: Dict[str, Any],
    ) -> ToolIR:
        """Create ToolIR with custom logic."""
        # Add retry configuration to all tools
        if "retry" not in config:
            config["retry"] = {
                "max_attempts": 3,
                "backoff_factor": 2,
                "timeout_ms": 5000,
            }
        
        # Add observability hooks
        config["observability"] = {
            "trace": True,
            "log_input": True,
            "log_output": False,  # Don't log sensitive outputs
        }
        
        return ToolIR(
            name=name,
            description=description,
            protocol=protocol,
            config=config,
        )

# Set global factory
set_tool_factory(CustomToolFactory())

# Now all parsed tools will have retry logic
from universal_agent_nexus.compiler import parse
ir = parse("agent.yaml")  # Uses custom factory

Router Factory Interface:

from universal_agent_nexus.ir.factories import RouterFactory
from universal_agent_nexus.ir import RouterIR, RouterStrategy

class SmartRouterFactory(RouterFactory):
    """Router factory with intelligent model selection."""
    
    def create_router(
        self,
        name: str,
        strategy: RouterStrategy,
        system_message: Optional[str],
        model_candidates: List[str],
        default_model: Optional[str] = None,
        config: Optional[Dict[str, Any]] = None,
    ) -> RouterIR:
        """Create RouterIR with smart defaults."""
        # If no default model, select based on strategy
        if not default_model and model_candidates:
            if strategy == RouterStrategy.LLM:
                # Prefer faster models for LLM routing
                default_model = next(
                    (m for m in model_candidates if "mini" in m.lower()),
                    model_candidates[0]
                )
            else:
                default_model = model_candidates[0]
        
        # Add fallback configuration
        config = config or {}
        if "fallback" not in config:
            config["fallback"] = {
                "enabled": True,
                "model": model_candidates[-1] if model_candidates else None,
            }
        
        return RouterIR(
            name=name,
            strategy=strategy,
            system_message=system_message,
            model_candidates=model_candidates,
            default_model=default_model,
            config=config,
        )

# Set global factory
set_router_factory(SmartRouterFactory())

Best Practices:

  • Global State: Factories are global - set them before parsing
  • Idempotency: Factory methods should be idempotent (same inputs → same outputs)
  • Validation: Validate inputs and provide helpful error messages
  • Defaults: Provide sensible defaults for optional parameters

Tier 6: Enrichment Strategy

Purpose: Customize how Fabric fragments (roles, domains, policies, mixins) are merged into manifests.

API Reference:

from universal_agent_nexus.enrichment import (
    EnrichmentStrategy,
    DefaultEnrichmentStrategy,
    ComposableEnrichmentStrategy,
    EnrichmentHandler,
    NexusEnricher,
    RoleEnrichmentHandler,
    DomainEnrichmentHandler,
    PolicyEnrichmentHandler,
    MixinEnrichmentHandler,
    create_custom_enrichment_strategy,
    create_default_enrichment_strategy,
)
from universal_agent_fabric import NexusEnricher

# Create custom strategy
strategy = create_custom_enrichment_strategy(
    handlers: Optional[List[EnrichmentHandler]] = None,
) -> ComposableEnrichmentStrategy

# Use with enricher
enricher = NexusEnricher(strategy=strategy)
enriched = enricher.enrich(
    baseline_path: str,
    role_path: Optional[str] = None,
    domain_paths: List[str] = [],
    policy_paths: List[str] = [],
    mixin_paths: List[str] = [],
    output_path: Optional[str] = None,
) -> Dict[str, Any]

Enrichment Strategy Interface:

from universal_agent_nexus.enrichment import EnrichmentStrategy
from typing import Any, Dict, List, Optional

class CustomEnrichmentStrategy(EnrichmentStrategy):
    """Custom enrichment strategy."""
    
    def merge(
        self,
        baseline: Dict[str, Any],
        role: Optional[Dict],
        domains: List[Dict],
        policies: List[Dict],
        mixins: List[Dict],
    ) -> Dict[str, Any]:
        """Merge fragments with custom logic."""
        result = baseline.copy()
        
        # Apply role first
        if role:
            result = self._merge_role(result, role)
        
        # Apply domains in order
        for domain in domains:
            result = self._merge_domain(result, domain)
        
        # Apply policies last (highest priority)
        for policy in policies:
            result = self._merge_policy(result, policy)
        
        return result
    
    def _merge_role(self, manifest: Dict, role: Dict) -> Dict:
        # Custom role merging logic
        return manifest
    
    def _merge_domain(self, manifest: Dict, domain: Dict) -> Dict:
        # Custom domain merging logic
        return manifest
    
    def _merge_policy(self, manifest: Dict, policy: Dict) -> Dict:
        # Custom policy merging logic
        return manifest

Handler-Based Composition:

from universal_agent_nexus.enrichment import (
    EnrichmentHandler,
    ComposableEnrichmentStrategy,
    NexusEnricher,
)

class CustomMetadataHandler(EnrichmentHandler):
    """Add custom metadata during enrichment."""
    
    def handle(
        self,
        manifest: Dict[str, Any],
        role: Optional[Dict],
        domains: List[Dict],
        policies: List[Dict],
        mixins: List[Dict],
    ) -> Dict[str, Any]:
        """Add enrichment metadata."""
        if "metadata" not in manifest:
            manifest["metadata"] = {}
        
        manifest["metadata"]["enriched_at"] = datetime.utcnow().isoformat()
        manifest["metadata"]["enrichment_version"] = "1.0.0"
        
        if role:
            manifest["metadata"]["role"] = role.get("name")
        if domains:
            manifest["metadata"]["domains"] = [d.get("name") for d in domains]
        
        return manifest

# Compose strategy with handlers
strategy = ComposableEnrichmentStrategy()
strategy.add_handler(RoleEnrichmentHandler())
strategy.add_handler(DomainEnrichmentHandler())
strategy.add_handler(CustomMetadataHandler())  # Custom handler
strategy.add_handler(PolicyEnrichmentHandler())

# Use with enricher
enricher = NexusEnricher(strategy=strategy)
enriched = enricher.enrich(
    baseline_path="manifest.yaml",
    role_path="roles/researcher.yaml",
    domain_paths=["domains/finance.yaml"],
    policy_paths=["policies/safety.yaml"],
    output_path="enriched.yaml",
)

Best Practices:

  • Order Matters: Handlers execute in registration order
  • Idempotency: Merging should be idempotent (merge twice = merge once)
  • Conflict Resolution: Define clear rules for handling conflicts
  • Validation: Validate merged manifest before returning

Tier 7: Compiler Builder

Purpose: Configure the entire compiler with custom components for advanced use cases.

API Reference:

from universal_agent_nexus.builder import CompilerBuilder
from universal_agent_nexus.ir.pass_manager import OptimizationLevel

# Create builder
builder = CompilerBuilder()

# Configure components
compiler = (builder
    .register_parser("my_format", MyParser())
    .register_generator("my_target", MyGenerator())
    .with_optimization_level(OptimizationLevel.AGGRESSIVE)
    .with_tool_factory(CustomToolFactory())
    .with_router_factory(CustomRouterFactory())
    .add_optimization_pass(CustomOptimization())
    .with_validation(True)
    .with_optimization(True)
    .build())

# Use configured compiler
result = compiler.compile("source.py", target="my_target")

Complete Example: Enterprise Compiler Configuration

from universal_agent_nexus.builder import CompilerBuilder
from universal_agent_nexus.ir.pass_manager import OptimizationLevel
from universal_agent_nexus.ir.factories import ToolFactory, RouterFactory

# Custom components
class EnterpriseToolFactory(ToolFactory):
    def create_tool(self, name, description, protocol, config):
        # Add enterprise-specific configuration
        config["monitoring"] = {"enabled": True, "level": "detailed"}
        config["security"] = {"encryption": True, "audit": True}
        return super().create_tool(name, description, protocol, config)

class EnterpriseRouterFactory(RouterFactory):
    def create_router(self, name, strategy, system_message, model_candidates, ...):
        # Add enterprise-specific router configuration
        config = config or {}
        config["rate_limiting"] = {"enabled": True, "rpm": 1000}
        return super().create_router(...)

# Build enterprise compiler
enterprise_compiler = (CompilerBuilder()
    .register_parser("airflow", AirflowParser())
    .register_generator("temporal", TemporalGenerator())
    .with_optimization_level(OptimizationLevel.AGGRESSIVE)
    .with_tool_factory(EnterpriseToolFactory())
    .with_router_factory(EnterpriseRouterFactory())
    .add_optimization_pass(CostOptimizationPass())
    .add_optimization_pass(SecurityValidationPass())
    .with_validation(True)
    .with_optimization(True)
    .build())

# Use enterprise compiler
result = enterprise_compiler.compile(
    "dag.airflow",
    target="temporal",
    output="workflow.py",
)

Builder Methods:

Method Description Returns
register_parser(...) Register custom parser CompilerBuilder
register_generator(...) Register custom generator CompilerBuilder
with_optimization_level(level) Set optimization level CompilerBuilder
with_tool_factory(factory) Set tool factory CompilerBuilder
with_router_factory(factory) Set router factory CompilerBuilder
add_optimization_pass(pass) Add custom pass CompilerBuilder
with_validation(enabled) Enable/disable validation CompilerBuilder
with_optimization(enabled) Enable/disable optimization CompilerBuilder
build() Build configured compiler Compiler

Best Practices:

  • Reusability: Create builder configurations as reusable functions/classes
  • Isolation: Use separate builder instances for different use cases
  • Defaults: Builder inherits defaults from global registries
  • Validation: Validate configuration in build() method

Tier 8: IR Visitor

Purpose: Traverse and analyze IR structure using the visitor pattern.

API Reference:

from universal_agent_nexus.ir.visitor import (
    IRVisitor,
    DefaultIRVisitor,
    traverse,
)

# Traverse IR with visitor
traverse(ir: ManifestIR, visitor: IRVisitor) -> None

# Base visitor interface
class IRVisitor(ABC):
    @abstractmethod
    def visit_manifest(self, ir: ManifestIR) -> None: ...
    @abstractmethod
    def visit_graph(self, graph: GraphIR) -> None: ...
    @abstractmethod
    def visit_node(self, node: NodeIR) -> None: ...
    @abstractmethod
    def visit_edge(self, edge: EdgeIR) -> None: ...
    @abstractmethod
    def visit_tool(self, tool: ToolIR) -> None: ...
    @abstractmethod
    def visit_router(self, router: RouterIR) -> None: ...

Complete Example: Dependency Analyzer

from universal_agent_nexus.ir.visitor import DefaultIRVisitor, traverse
from universal_agent_nexus.ir import ManifestIR, GraphIR, NodeIR, EdgeIR

class DependencyAnalyzer(DefaultIRVisitor):
    """Analyze dependencies between nodes."""
    
    def __init__(self):
        self.dependencies = {}  # node_id -> set of dependencies
        self.dependents = {}    # node_id -> set of dependents
    
    def visit_graph(self, graph: GraphIR):
        """Build dependency graph."""
        # Initialize
        for node in graph.nodes:
            self.dependencies[node.id] = set()
            self.dependents[node.id] = set()
        
        # Process edges
        for edge in graph.edges:
            self.dependencies[edge.to_node].add(edge.from_node)
            self.dependents[edge.from_node].add(edge.to_node)
    
    def get_critical_path(self) -> List[str]:
        """Find longest path (critical path)."""
        # Topological sort + longest path algorithm
        # ... implementation ...
        return []
    
    def get_leaf_nodes(self) -> List[str]:
        """Find nodes with no outgoing edges."""
        return [
            node_id
            for node_id, deps in self.dependents.items()
            if not deps
        ]

# Analyze IR
analyzer = DependencyAnalyzer()
traverse(ir, analyzer)

print(f"Critical path: {analyzer.get_critical_path()}")
print(f"Leaf nodes: {analyzer.get_leaf_nodes()}")

Complete Example: Metrics Collector

class MetricsCollector(DefaultIRVisitor):
    """Collect metrics about IR structure."""
    
    def __init__(self):
        self.node_counts = {"task": 0, "router": 0, "tool": 0}
        self.edge_count = 0
        self.graph_count = 0
        self.tool_count = 0
        self.router_count = 0
    
    def visit_manifest(self, ir: ManifestIR):
        self.graph_count = len(ir.graphs)
        self.tool_count = len(ir.tools)
        self.router_count = len(ir.routers)
        super().visit_manifest(ir)
    
    def visit_node(self, node: NodeIR):
        self.node_counts[node.kind.value] += 1
    
    def visit_edge(self, edge: EdgeIR):
        self.edge_count += 1
    
    def get_summary(self) -> dict:
        return {
            "graphs": self.graph_count,
            "nodes": sum(self.node_counts.values()),
            "node_breakdown": self.node_counts,
            "edges": self.edge_count,
            "tools": self.tool_count,
            "routers": self.router_count,
        }

# Collect metrics
collector = MetricsCollector()
traverse(ir, collector)
metrics = collector.get_summary()
print(f"Total nodes: {metrics['nodes']}")
print(f"Node breakdown: {metrics['node_breakdown']}")

Best Practices:

  • State Management: Use visitor instance variables to accumulate state
  • Selective Override: Only override methods you need (use DefaultIRVisitor)
  • Efficiency: Visitors are O(n) - efficient for large IRs
  • Composability: Combine multiple visitors for complex analysis

Extension Patterns & Best Practices

Pattern 1: Plugin Architecture

Create a plugin system for extensions:

# plugins/my_plugin.py
from universal_agent_nexus.parser_registry import register_parser
from universal_agent_nexus.generator_registry import register_generator

def register_plugin():
    """Register all plugin components."""
    register_parser("my_format", MyParser())
    register_generator("my_target", MyGenerator())

# In application startup
from plugins import my_plugin
my_plugin.register_plugin()

Pattern 2: Configuration-Driven Extensions

Load extensions from configuration:

# config.yaml
extensions:
  parsers:
    - type: airflow
      class: plugins.AirflowParser
      priority: 150
  generators:
    - type: temporal
      class: plugins.TemporalGenerator
  passes:
    - class: plugins.CustomOptimization
      after: dead-node-elimination

# Load extensions
import yaml
config = yaml.safe_load(open("config.yaml"))
for parser_config in config["extensions"]["parsers"]:
    parser_class = import_class(parser_config["class"])
    register_parser(
        parser_config["type"],
        parser_class(),
        detection_priority=parser_config.get("priority", 100),
    )

Pattern 3: Extension Discovery

Auto-discover extensions:

import importlib
import pkgutil

def discover_extensions(package_name: str):
    """Auto-discover and register extensions."""
    package = importlib.import_module(package_name)
    for importer, modname, ispkg in pkgutil.iter_modules(package.__path__):
        try:
            module = importlib.import_module(f"{package_name}.{modname}")
            if hasattr(module, "register"):
                module.register()
        except Exception as e:
            logger.warning(f"Failed to load extension {modname}: {e}")

# Discover all extensions
discover_extensions("my_extensions")

Performance Considerations

  • Parser Performance: Cache parsed results for expensive parsers
  • Generator Performance: Use generators (yield) for large outputs
  • Pass Performance: Profile passes - optimize hot paths
  • Visitor Performance: Visitors are O(n) - avoid nested traversals

Testing Extensions

import pytest
from universal_agent_nexus.parser_registry import get_parser
from universal_agent_nexus.compiler import compile

def test_custom_parser():
    """Test custom parser registration."""
    parser = get_parser("my_format")
    assert parser is not None
    
    ir = parser.parse("test.myformat")
    assert ir.name == "test"

def test_custom_generator():
    """Test custom generator."""
    result = compile("test.yaml", target="my_target")
    assert "my_target_code" in result

🎯 Choosing the Right Runtime

Scenario Use Why
Local development LangGraph Fast iteration, debugging, breakpoints
Production (< 1000 req/min) LangGraph + Postgres Simple deployment, lower cost
Production (> 1000 req/min) AWS Step Functions Auto-scaling, pay-per-use, 10K+ req/sec
Serverless/Event-driven AWS Lambda + Step Functions No servers to manage
AI client integration MCP Claude Desktop, Cursor, VS Code
Edge deployment MCP (stdio) Local-first, no internet required
Multi-cloud LangGraph → Kubernetes Cloud-agnostic containers

Cost Comparison (1M executions/month)

Runtime Infrastructure LLM Calls Total
LangGraph (VPS) $50 $500 $550
AWS Step Functions $25 $500 $525
MCP (local) $0 $500 $500
LangGraph + Batch API $50 $10 $60

💡 Batch API + Prompt Caching can reduce LLM costs by up to 98%. See Batch API.


📦 Examples

See complete, runnable examples in the examples/ folder:

Example Description Directory
Hello World Basic graph structure examples/hello_langgraph/
MCP Server Expose agent as MCP tool examples/mcp_server/
AWS Production Step Functions deployment examples/aws_production/
Multi-Cloud AWS + GCP configuration examples/multi_cloud/
Hello MCP MCP client integration examples/hello_mcp/

Migration Guides


🔌 Adapters

LangGraph Adapter

Purpose: Local development with full debugging capabilities

Features:

  • ✅ Async Python execution with asyncio
  • ✅ Postgres checkpointing (AsyncPostgresSaver)
  • ✅ MCP tool integration
  • ✅ LLM router nodes (OpenAI, Anthropic)
  • Batch API integration for cost optimization

Standard Runtime:

from universal_agent_nexus.adapters.langgraph import LangGraphRuntime, load_manifest

manifest = load_manifest("manifest.yaml")
runtime = LangGraphRuntime(
    postgres_url="postgresql://localhost:5432/uaa_dev",
    enable_checkpointing=True
)

await runtime.initialize(manifest, graph_name="main")
result = await runtime.execute(
    execution_id="exec-001",
    input_data={"context": {"query": "Hello!"}}
)

Batch-Aware Runtime (for cost optimization):

from universal_agent_nexus.adapters.langgraph import BatchAwareLangGraphRuntime

runtime = BatchAwareLangGraphRuntime(
    postgres_url="postgresql://localhost:5432/uaa_dev",
    enable_batching=True,
    anthropic_api_key=os.environ["ANTHROPIC_API_KEY"],
)

await runtime.initialize(manifest)
result = await runtime.execute("exec-001", {"query": "Hello!"})

# Check stats: {'requests_queued': 100, 'cache_hits': 85, ...}
print(runtime.get_batch_stats())

AWS Adapter

Purpose: Production deployment at enterprise scale

Components:

  • Step Functions - State machine execution
  • Lambda - Tool execution functions
  • DynamoDB - State persistence (single-table design)
  • CloudWatch - Logging and metrics
  • X-Ray - Distributed tracing
# Compile to AWS Step Functions
nexus compile manifest.yaml --target aws --output state_machine.json

# Deploy with Terraform
cd terraform/environments/prod
terraform apply

MCP Adapter

Purpose: Expose agents as tools to AI clients (Claude, Cursor, VS Code)

# Start MCP server
nexus serve manifest.yaml --protocol mcp --transport stdio

Claude Desktop configuration:

{
  "mcpServers": {
    "uaa-agent": {
      "command": "nexus",
      "args": ["serve", "/path/to/manifest.yaml", "--protocol", "mcp"]
    }
  }
}

💻 CLI Reference

Command Description Example
nexus compile Compile manifest to target runtime nexus compile agent.yaml --target langgraph
nexus translate Convert between runtimes nexus translate agent.py --to aws
nexus serve Start MCP server nexus serve agent.yaml --protocol mcp
nexus validate Validate manifest syntax nexus validate agent.yaml
nexus deploy Deploy to cloud provider nexus deploy agent.yaml --env prod

Common Flags

Flag Description Default
--target, -t Target runtime (langgraph, aws, mcp) Required
--output, -o Output file path ./output
--opt-level Optimization level (none, basic, aggressive) basic
--verbose, -v Enable verbose logging false
--dry-run Preview without writing false

Features

Core Features

  • Write Once, Run Anywhere - Single manifest compiles to LangGraph, AWS, MCP
  • Production-Ready - Built-in state persistence, error handling, observability
  • Type-Safe - Pydantic schemas with full validation
  • Async-Native - asyncio throughout, no blocking calls

Observability

  • OpenTelemetry - Distributed tracing across all adapters
  • Structured Logging - JSON logs with execution context
  • CloudWatch Integration - Automatic log/metric export (AWS)
  • X-Ray Tracing - End-to-end request tracing (AWS)
from universal_agent_nexus.observability import setup_tracing, trace_execution

setup_tracing(service_name="my-agent", environment="production")

async with trace_execution("execute_graph", execution_id="exec-001"):
    result = await runtime.execute(...)

IR Validation (v1.0.0)

Comprehensive validation with error codes and source locations:

from universal_agent_nexus.ir.validation import validate_ir, validate_and_raise

errors = validate_ir(ir)
for error in errors:
    print(error)
# error[E001]: Entry node 'start' not found
#   = hint: Add a node with id='start'

# Or raise on any error
validate_and_raise(ir, strict=True)

Error codes:

Code Category Description
E001-E004 Structural Missing nodes, bad edges
E101-E104 Type Missing refs, unknown tools
E201-E203 Semantic No outgoing edges
W301-W304 Warnings Unreachable nodes, no terminal

PassManager (v1.0.0)

LLVM-style optimization levels:

from universal_agent_nexus.compiler import compile
from universal_agent_nexus.ir.pass_manager import OptimizationLevel

# No optimization (fastest compile)
compile("agent.py", target="aws", opt_level=OptimizationLevel.NONE)

# Aggressive optimization (slowest compile, best output)
compile("agent.py", target="aws", opt_level=OptimizationLevel.AGGRESSIVE)

Available passes:

  • Dead node elimination
  • Edge deduplication
  • Condition simplification
  • Constant folding
  • Router/Tool validation
  • Cycle detection
  • Batch optimization (for LLM cost reduction)

🌉 Cross-Runtime State Bridge

NEW in v2.0.0: Full state portability across LangGraph, AWS Step Functions, and UAA Kernel.

Nexus includes a Universal State Bridge that normalizes execution state across runtimes, enabling:

  • Cross-runtime debugging and inspection
  • State migration between environments
  • Unified observability across runtimes

State Normalization

from universal_agent_nexus.bridges import (
    normalize,
    denormalize,
    NormalizedGraphState,
    StateFormat,
)

# Normalize LangGraph checkpoint to canonical format
normalized = normalize(langgraph_checkpoint)
print(f"Execution: {normalized.execution_id}")
print(f"Status: {normalized.status}")
print(f"History: {len(normalized.history)} steps")

# Convert to AWS Step Functions format
aws_input = denormalize(normalized, StateFormat.AWS)

# Or sync directly between runtimes
from universal_agent_nexus.bridges import sync_state
aws_state = sync_state(
    source_state=langgraph_checkpoint,
    source_format=StateFormat.LANGGRAPH,
    target_format=StateFormat.AWS,
)

NormalizedGraphState Schema

class NormalizedGraphState(BaseModel):
    execution_id: str           # Unique execution identifier
    graph_name: str             # Name of the graph
    status: str                 # pending, running, completed, failed, suspended
    context: Dict[str, Any]     # Current execution context
    history: List[NormalizedHistoryEntry]  # Full execution history
    current_node_id: Optional[str]         # Currently executing node
    version: str                # State schema version
    created_at: Optional[str]   # ISO8601 timestamp
    updated_at: Optional[str]   # ISO8601 timestamp

History Preservation

Unlike simple state snapshots, the bridge preserves full execution history:

class NormalizedHistoryEntry(BaseModel):
    node_id: str                # Node that was executed
    node_kind: str              # router, tool, task, human
    status: str                 # completed, failed, running
    input_data: Dict[str, Any]  # Input to the node
    output_data: Optional[Dict] # Output from the node
    error: Optional[str]        # Error message if failed
    started_at: str             # ISO8601 timestamp
    ended_at: Optional[str]     # ISO8601 timestamp

🔧 UAA Native Adapter

NEW in v2.0.0: Run directly through the Universal Agent Architecture kernel.

The UAA Native adapter makes the UAA Kernel a first-class compilation target:

Compile to UAA

nexus compile agent.yaml --target uaa --output kernel_manifest.yaml

Execute Through UAA Kernel

from universal_agent_nexus.adapters.uaa import UAANativeRuntime, UAANativeRuntimeBuilder
from universal_agent_architecture.task.store import SQLTaskStore
from my_llm_client import OpenAIClient

# Build runtime with dependency injection
runtime = (UAANativeRuntimeBuilder()
    .with_task_store(SQLTaskStore("postgresql://..."))
    .with_llm_client(OpenAIClient(api_key="..."))
    .with_tool_executor("mcp", MCPToolExecutor())
    .with_tool_executor("http", HTTPToolExecutor())
    .build())

# Execute
result = await runtime.execute(
    manifest=my_manifest,
    graph_name="main",
    input_data={"query": "Hello, world!"},
)

print(f"Status: {result.status}")
print(f"Output: {result.context}")

Resume Suspended Executions

# Resume a human-in-the-loop execution
result = await runtime.resume_execution(
    execution_id="exec-001",
    human_input={"approved": True, "feedback": "Looks good!"},
)

ContractRegistry (Architecture)

The UAA Kernel uses a ContractRegistry for dependency injection:

from universal_agent_architecture.runtime import get_global_registry

registry = get_global_registry()

# Register implementations
registry.register_task_store(SQLTaskStore("postgresql://..."))
registry.register_llm_client(OpenAIClient())
registry.register_tool_executor("mcp", MCPToolExecutor())

# Or configure from environment variables
# UAA_TASK_STORE=mypackage.stores.DynamoTaskStore
# UAA_LLM_CLIENT=mypackage.clients.AnthropicClient
registry.configure_from_env()

💰 Batch API + Prompt Caching

Save up to 98% on LLM costs with automatic request batching and prompt caching.

Nexus includes built-in support for the Anthropic Batch API with prompt caching. This is implemented as a compile-time optimization pass + runtime batching layer.

How It Works

┌─────────────────────────────────────────────────────────────────┐
│                    COMPILE TIME                                  │
│  BatchOptimizationPass analyzes IR:                             │
│  • Identifies LLM call nodes                                    │
│  • Computes cache keys from system messages                     │
│  • Annotates nodes with BatchAnnotation                         │
└───────────────────────────────┬─────────────────────────────────┘
                                │
                                ▼
┌─────────────────────────────────────────────────────────────────┐
│                     RUNTIME                                      │
│  BatchAwareLangGraphRuntime:                                    │
│  • Accumulates LLM requests                                     │
│  • Batches them via Anthropic Batch API                         │
│  • Applies prompt caching headers                               │
└─────────────────────────────────────────────────────────────────┘

Cost Impact

Scenario Before After Savings
100 LLM calls $0.15 $0.003 98%
1000 calls/hour $1.50/hr $0.03/hr $35/day
Shared system prompts Full price Cached 90% on input tokens

Quick Start

Option 1: Compile-time only (annotate for batching)

from universal_agent_nexus.builder import CompilerBuilder

builder = CompilerBuilder()
builder.with_batch_optimization(batch_size=100, max_wait_ms=5000)
result = builder.compile("agent.yaml", target="langgraph")

Option 2: Full runtime batching

from universal_agent_nexus.adapters.langgraph import BatchAwareLangGraphRuntime

runtime = BatchAwareLangGraphRuntime(
    postgres_url="postgresql://localhost:5432/nexus",
    enable_batching=True,
    anthropic_api_key=os.environ["ANTHROPIC_API_KEY"],
    batch_size=100,        # Requests per batch
    batch_wait_ms=5000,    # Max wait before flush
)

await runtime.initialize(manifest)

# LLM calls are automatically batched
result = await runtime.execute("exec-001", {"query": "Hello"})

# Check batch statistics
stats = runtime.get_batch_stats()
print(f"Cache hits: {stats['cache_hits']}")
print(f"Batches submitted: {stats['batches_submitted']}")

Option 3: Explicit batch execution

# Execute multiple requests as a single batch
results = await runtime.execute_batch([
    {"execution_id": "exec-001", "input_data": {"query": "Hello"}},
    {"execution_id": "exec-002", "input_data": {"query": "World"}},
    {"execution_id": "exec-003", "input_data": {"query": "Test"}},
])

Components

Component Location Purpose
BatchAnnotation ir/annotations.py Marks nodes eligible for batching
BatchOptimizationPass ir/passes/batch_optimization.py Compile-time LLM node analysis
BatchAccumulator adapters/langgraph/batch_accumulator.py Request queue + Anthropic Batch API
BatchAwareLangGraphRuntime adapters/langgraph/runtime.py Batch-enabled runtime

Customization

The batch system is fully pluggable. You can:

Use your own runtime:

from universal_agent_nexus.adapters.langgraph import LangGraphRuntime

class MyCustomBatchRuntime(LangGraphRuntime):
    """Your own batch implementation."""
    
    async def execute(self, execution_id, input_data):
        # Read batch annotations
        for node in self.graph.nodes:
            batch_info = node.metadata.get_annotation(BatchAnnotation)
            if batch_info and batch_info.eligible:
                # Your batching strategy here
                pass
        return await super().execute(execution_id, input_data)

Use the accumulator standalone:

from universal_agent_nexus.adapters.langgraph import BatchAccumulator

accumulator = BatchAccumulator(
    api_key=os.environ["ANTHROPIC_API_KEY"],
    max_batch_size=50,
)
await accumulator.start()

# Queue requests
future1 = await accumulator.queue_request(messages=[...], model="claude-sonnet-4-20250514")
future2 = await accumulator.queue_request(messages=[...], model="claude-sonnet-4-20250514")

# Flush and get results
await accumulator.flush()
result1 = await future1
result2 = await future2

Create a custom batch pass:

from universal_agent_nexus.ir.pass_interface import OptimizationPass

class MyBatchPass(OptimizationPass):
    """Custom batch analysis for OpenAI or other providers."""
    
    @property
    def name(self):
        return "my-batch-optimization"
    
    def apply(self, ir):
        for graph in ir.graphs:
            for node in graph.nodes:
                # Your analysis logic
                node.metadata.annotate(MyCustomAnnotation(...))
        return ir

# Register with builder
builder = CompilerBuilder()
builder.add_optimization_pass(MyBatchPass())

Configuration

Option Default Description
batch_size 100 Maximum requests per batch
batch_wait_ms 5000 Max wait time before auto-flush (ms)
auto_flush True Automatically flush on timer
enable_cache_keys True Compute cache keys for prompt caching

Environment Variables

# Required for batch API
ANTHROPIC_API_KEY=sk-ant-...

# Optional: Override batch settings
NEXUS_BATCH_SIZE=100
NEXUS_BATCH_WAIT_MS=5000

Performance

Compiler Benchmarks (v1.0.0)

Operation Time Throughput
IR Parsing 0.45ms 2,200/sec
Transform Passes 0.07ms 15,500/sec
Code Generation 0.03-0.07ms 14,000-33,000/sec
Full Compile Pipeline 0.4-0.5ms 2,000/sec
Validation 0.03ms 31,000/sec

Runtime Benchmarks

Metric LangGraph (Local) AWS (Serverless)
Cold Start 50ms 200-300ms (Lambda)
Warm Execution 10-20ms/node 15-25ms/node
State Persistence 5ms (Postgres) 3ms (DynamoDB)
Throughput 1,000 req/sec 10,000+ req/sec

Optimizations (v1.0.0)

  • boto3 + asyncio.to_thread - 30% faster than aioboto3
  • DynamoDB batch operations - 25x faster bulk writes
  • Postgres prepared statements - 2-3x faster queries
  • Connection pooling - max_pool_connections=50
  • Direct AsyncPostgresSaver - no bridge overhead

🔧 Troubleshooting

Common Issues

"Module not found: universal_agent_nexus"

# Make sure you installed with the right adapter
pip install "universal-agent-nexus[langgraph]"  # For LangGraph
pip install "universal-agent-nexus[all]"        # For everything

"Connection refused: PostgreSQL"

# Check Docker is running
docker ps | grep postgres

# Check environment variable
echo $UAA_POSTGRES_URL
# Should be: postgresql://postgres:password@localhost:5432/nexus_dev

# Restart Postgres container
docker run -d -p 5432:5432 \
  -e POSTGRES_PASSWORD=password \
  -e POSTGRES_DB=nexus_dev \
  postgres:16-alpine

"Compilation failed: Unknown node kind 'foo'"

Valid node kinds: router, tool, task

Check your manifest against the Manifest Schema Reference.

"AWS deployment failed: Access Denied"

# Check AWS credentials
aws sts get-caller-identity

# Ensure IAM permissions for:
# - Step Functions (states:*)
# - Lambda (lambda:*)
# - DynamoDB (dynamodb:*)
# - IAM (iam:PassRole)

"MCP server not connecting"

# Test locally first
nexus serve manifest.yaml --protocol mcp --transport stdio

# Check Claude Desktop config path:
# macOS: ~/Library/Application Support/Claude/claude_desktop_config.json
# Windows: %APPDATA%\Claude\claude_desktop_config.json

Getting Help


📁 Project Structure

universal_agent_nexus/
├── universal_agent_nexus/      # Core package
│   ├── adapters/               # Runtime adapters
│   │   ├── langgraph/          # LangGraph adapter
│   │   │   ├── runtime.py      # LangGraphRuntime + BatchAwareLangGraphRuntime
│   │   │   ├── batch_accumulator.py  # Anthropic Batch API client
│   │   │   └── compiler.py     # Manifest → StateGraph
│   │   ├── aws/                # AWS adapter
│   │   └── mcp/                # MCP adapter
│   ├── ir/                     # Intermediate Representation
│   │   ├── annotations.py      # BatchAnnotation, CostAnnotation, etc.
│   │   ├── passes/             # Optimization passes
│   │   │   └── batch_optimization.py  # LLM batching analysis
│   │   ├── pass_manager.py     # LLVM-style pass pipeline
│   │   └── transforms.py       # Built-in transforms
│   ├── cli/                    # CLI interface
│   ├── builder.py              # CompilerBuilder with batch support
│   └── observability/          # Tracing & logging
├── terraform/                  # Infrastructure as Code
│   ├── modules/                # Reusable modules
│   └── environments/           # Environment configs
├── lambda/                     # Lambda function code
├── examples/                   # Example manifests
└── tests/                      # Test suite

🧪 Testing

# All tests
pytest

# Unit tests only
pytest tests/unit/

# Integration tests (requires Docker)
docker-compose up -d postgres
pytest tests/integration/

# With coverage
pytest --cov=universal_agent_nexus --cov-report=html

Current status: 55 tests + 5 benchmarks passing


🚀 Deployment

Local Development

# Start Postgres
docker run -d -p 5432:5432 \
  -e POSTGRES_PASSWORD=password \
  -e POSTGRES_DB=uaa_dev \
  postgres:16-alpine

# Run agent locally
python examples/hello_langgraph/run.py

AWS Production

# Compile manifest
nexus compile manifest.yaml --target aws \
  --output terraform/environments/prod/state_machine.json

# Deploy with Terraform
cd terraform/environments/prod
terraform init
terraform apply

# Execute
aws stepfunctions start-execution \
  --state-machine-arn $(terraform output -raw state_machine_arn) \
  --input '{"context": {"query": "Hello!"}}'

⚙️ Configuration

Environment Variables

# LangGraph
UAA_POSTGRES_URL=postgresql://localhost:5432/uaa_dev
UAA_LOG_LEVEL=INFO

# AWS
AWS_REGION=us-east-1
AWS_DYNAMODB_TABLE=uaa-agent-state

# Observability
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
OTEL_SERVICE_NAME=universal-agent-nexus

# LLM APIs
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-...    # Required for Batch API

# Batch API (optional overrides)
NEXUS_BATCH_SIZE=100            # Requests per batch
NEXUS_BATCH_WAIT_MS=5000        # Max wait before flush

🔄 Version Compatibility

Nexus Version Python LangGraph boto3 MCP SDK
1.0.x 3.11+ ≥0.2.0 ≥1.34.0 ≥1.0.0
0.9.x 3.10+ ≥0.1.0 ≥1.28.0 ≥0.9.0

🤝 Contributing

We welcome contributions!

# Clone repository
git clone https://github.com/mjdevaccount/universal_agent_nexus.git
cd universal_agent_nexus

# Install dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Format code
ruff format .
ruff check . --fix

📄 License

This project is licensed under the MIT License - see the LICENSE file for details.


🙏 Acknowledgments

Built with inspiration from:

  • LangChain/LangGraph - Agent orchestration patterns
  • AWS Step Functions - State machine execution model
  • Model Context Protocol (MCP) - AI client integration standard
  • Terraform - Infrastructure as code principles

Made with ❤️ by the Universal Agent Nexus team

Star us on GitHub if this project helps you!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

universal_agent_nexus-2.0.1.tar.gz (158.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

universal_agent_nexus-2.0.1-py3-none-any.whl (113.9 kB view details)

Uploaded Python 3

File details

Details for the file universal_agent_nexus-2.0.1.tar.gz.

File metadata

  • Download URL: universal_agent_nexus-2.0.1.tar.gz
  • Upload date:
  • Size: 158.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for universal_agent_nexus-2.0.1.tar.gz
Algorithm Hash digest
SHA256 a4e9f688bb96c3374700d6e4c3a8e882b16cddbbc2bda619f602c045a856b46b
MD5 6c14ba82ac11483317784b38fc9b71c3
BLAKE2b-256 e3b38a9ebf8d3076d4c0af59f732523c34d71ff09ebf008d45dc317ea6e91d05

See more details on using hashes here.

File details

Details for the file universal_agent_nexus-2.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for universal_agent_nexus-2.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 2661b8842d82728d64f16de2aec07186bb6d79357a49f9d8e1e50153b2a021ba
MD5 b575b4aad5acb8f54f5cfac433f5d76f
BLAKE2b-256 e1051325f337e1baa8f2faab1957e24213a272e9f5fdd2e1ad817304ee2f8ee6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page