Skip to main content

Config-driven multi-agent orchestration kit with optional API/UI/MCP extras

Project description

Topaz Agent Kit Logo

Topaz Agent Kit

A powerful, config-driven multi-agent orchestration framework for building sophisticated AI workflows across multiple frameworks and protocols.

Python 3.11+ License: MIT

๐Ÿš€ What is Topaz Agent Kit?

Topaz Agent Kit is a lightweight, developer-focused toolkit that enables rapid creation, deployment, and orchestration of AI agent workflows. Instead of building from scratch, teams can assemble sophisticated multi-agent demos in hours, experiment freely across frameworks, and focus on delivering real business value.

โœจ Key Features

  • ๐Ÿ”„ Framework Flexibility: Build with LangGraph, CrewAI, Agno, ADK, OAK, or MAF โ€” mix and match without being tied to a single ecosystem
  • ๐Ÿ”— Protocol Choice: Create agents that work with A2A or IN-PROC protocols for flexible multi-agent communication
  • ๐Ÿงฐ Rich Tool Ecosystem: Pre-built MCP toolkits for document processing, web search, math, email, SEC filings, and more
  • ๐Ÿ–ผ๏ธ Multimodal Support: Native support for images, documents, and URLs across all frameworks โ€” agents process visual and document inputs directly
  • โšก Rapid Development: From idea to working demo in hours, not weeks
  • ๐ŸŽ›๏ธ Modern Web UI: Real-time agent interaction visualization with drag-and-drop file uploads, session management, AG-UI protocol, and script execution
  • ๐Ÿ“ฑ App Mode: Config-driven UI apps with three canvas modes (Declarative, Agent, Hybrid), 47 widget types, and flexible layouts for collaborative AI-assisted editing
  • ๐ŸŽ›๏ธ Operations Center: Centralized case management interface with pipeline-specific views, dashboard analytics, Operations Assistant, and comprehensive case detail modals
  • ๐Ÿšช Portal Mode: Customer-facing portal at /portal with config-based login, portal state (GET/PATCH), and an in-page assistant chat; one session per user, separate from main chat
  • ๐Ÿšช Enhanced HITL: Multi-type gates (approval, input, selection) with async HITL support for batch processing, dynamic flow control (retry, skip, stop), and context injection
  • โšก Async HITL: Non-blocking human-in-the-loop with checkpoint management, case queuing, and pipeline resumption for high-throughput workflows
  • ๐Ÿ“„ Document Intelligence: Complete RAG-as-a-Service with document upload, analysis, semantic search, and citation support
  • ๐Ÿ”„ 10 Execution Patterns: Sequential, parallel, repeat, loop, conditional, switch, handoff, group chat, nested patterns, and pipeline composition
  • ๐Ÿ”— Pipeline Composition: Use entire pipelines as nodes within other pipelines for maximum reusability
  • โšก Event-Driven Pipelines: Automatic pipeline execution triggered by file system events, webhooks, or custom triggers
  • ๐ŸŽจ Auto-Generated Diagrams: Professional workflow diagrams with Graphviz for all pipelines
  • ๐Ÿง  Prompt Intelligence: Automatic variable detection and context-aware agent generation
  • ๐Ÿ’ฌ Intelligent Routing: LLM-based pipeline selection based on user intent
  • ๐Ÿ“‹ SOP-Driven Agents: Standard Operating Procedure support for structured, consistent agent workflows
  • ๐Ÿง  AgentOS Memory: Filesystem-based memory system with Unix-like commands for persistent agent memory, shared templates, and semantic search
  • ๐Ÿ“ฆ Production Ready: Built-in monitoring, error handling, logging, and scalability features

๐Ÿ—๏ธ Architecture Overview

Multi-Framework Support

Topaz Agent Kit supports 6 major AI frameworks with unified orchestration:

Framework Description Best For Multimodal Support
LangGraph State-based agent workflows Complex stateful processes โœ… Images, Documents, URLs
CrewAI Collaborative agent teams Multi-agent collaboration โš ๏ธ URLs only (local file limitation)
Agno Lightweight agent framework Simple, fast agent tasks โœ… Images, Documents, URLs
ADK Google Agent Development Kit Google ecosystem integration โœ… Images, Documents, URLs
OAK OpenAI Agents Kit OpenAI model optimization โœ… Images, Documents, URLs
MAF Microsoft Agent Framework Enterprise agent orchestration โœ… Images, Documents, URLs

Note: Semantic Kernel (SK) support has been removed due to dependency conflicts with A2UI protocol support. All SK agents have been migrated to Microsoft Agent Framework (MAF), which is Microsoft's recommended path forward. See Framework Limitations for details.

Multimodal Input Support: All frameworks (except CrewAI with limitations) can process images, documents, and URLs directly as agent input. Files are automatically preprocessed and passed to agents in framework-native formats:

  • Images: Local files and URLs supported (base64-encoded for most frameworks)
  • Documents: PDFs, text files, and other document types (extracted text included when available)
  • URLs: Automatically detected in user messages and processed as images or documents

Note: CrewAI has known limitations with local image processing when using Azure OpenAI models. Image URLs work reliably; local files may require alternative approaches.

Framework Limitations

Semantic Kernel (SK) Removal

Status: โŒ Completely Removed - Semantic Kernel framework support has been completely removed from Topaz Agent Kit as of this version.

Reason: SK had a dependency conflict with A2UI protocol support:

  • SK requires pydantic<2.12
  • A2UI requires pydantic>=2.12.5
  • These requirements are incompatible and cannot be resolved

What Was Removed:

  • SKBaseAgent class and all SK framework code
  • SK framework configuration (sk.yml)
  • SK from framework validation and factory registration
  • All SK-related imports and dependencies

Migration: All existing SK agents have been migrated to Microsoft Agent Framework (MAF), which is Microsoft's recommended path forward for agent development. MAF provides similar capabilities to SK with better future support and full A2UI compatibility.

For Existing Projects: If you have SK agents in your project:

  1. Change type: sk to type: maf in agent config files
  2. Regenerate agents: topaz-agent-kit generate <project_dir>
  3. Test your agents (MAF has similar capabilities to SK)

For New Projects: Use MAF, LangGraph, CrewAI, Agno, ADK, or OAK frameworks. SK is no longer available.

Note: The SK base class (sk_base_agent.py) and configuration file (sk.yml) have been completely removed from the codebase. Attempting to use type: sk in agent configurations will result in validation errors.

See SK and A2UI Dependency Conflict Documentation for detailed migration guide and technical details.

Execution Patterns

Revolutionary pattern-based execution replaces complex graph definitions:

  • Sequential: Run agents in order with dependency management
  • Parallel: Execute multiple agents simultaneously for maximum efficiency
  • Repeat: Execute the same agent multiple times in parallel with instance-specific inputs (new)
  • Loop: Iterative execution with configurable termination conditions
  • Conditional: Execute agents or entire branch based on dynamic conditions
  • Switch: Route to different branches based on field values or expressions
  • Handoff: LLM-driven routing to specialist agents with auto-generated orchestrator
  • Group Chat: Multiple agents collaborate in a shared conversation (new)
  • Nested: Combine patterns for complex workflows
  • Pipeline Composition: Use entire pipelines as nodes within other pipelines for maximum reusability (new)

Pattern Containers & Grouping (UI)

  • The UI renders pattern containers for every pattern_started/pattern_finished pair across all patterns (sequential, parallel, repeat, loop, switch, handoff, group_chat). Containers show name/description, status, timestamps, and elapsed time.
  • Child items (agent cards, protocol chips, HITL gates, nested pattern containers) attach to a parent pattern via parent_pattern_id, preserving hierarchy even for deep nesting and repeat/loop instances.
  • Concrete examples from starter pipelines:
    • Haiku Writers Room (group_chat): One group_chat container with the three participants (haiku_form_expert, haiku_imagery_specialist, haiku_editor) as child cards in round order.
    • Math Compass (sequential + switch): Top-level sequential container; inside it a switch container (โ€œComplexity Routerโ€) with a nested sequential path for the TRUE branch containing the calculator agent card. Protocol chips sit immediately before their target card inside the parent container.
    • TCI Policy Risk Assessor (loop + nested sequential + parallel + switch): Loop container per application; inside it a nested sequential โ€œSingle Application Assessment Flowโ€; inside that two parallel containers (โ€œData Collection Phaseโ€, โ€œRisk Factor Assessmentโ€) followed by a switch container (โ€œDecision Routingโ€) whose cases each render as nested sequential containers.
    • ECI Claims Vetter (loop + nested sequential + parallel): Loop over claims; each iteration shows a sequential container (โ€œSingle Claim Processing Flowโ€) containing a parallel container (โ€œParallel Validation Checksโ€) and the involved agent cards.
    • Handoff patterns (e.g., specialist routing): Handoff container with orchestrator card and the chosen specialist card inside; pattern_finished marks completion or failure.
  • Backend emits pattern_started/pattern_finished with name, description, parent_pattern_id, started_at/ended_at; the frontend groups by these IDs and sorts by timestamp/priority so workflow โ†’ patterns โ†’ cards/chips/HITL render in chronological order. Each repeat/loop instance and nested pattern gets a unique pattern_id, preventing cross-iteration mixing.

Using Variables in Pattern Descriptions

Pattern descriptions support Jinja2 templating to display dynamic information about the current execution context. This is particularly useful for showing relevant data about items being processed in loops or conditional branches.

Key Concepts
  1. Timing: Pattern descriptions are rendered BEFORE agents execute, so you can only use variables from:

    • Scanner outputs (e.g., eci_pending_claims_scanner.pending_claims_list)
    • Loop items (e.g., current_claim, current_application)
    • Database fields already stored
    • Cannot use: Extractor outputs or agent results (they haven't run yet)
  2. Variable Access: Use the same Jinja2 syntax as in agent inputs:

    • Simple variables: {{variable_name}}
    • Nested access: {{agent_id.field}} or {{current_item.field}}
    • Filters: {{amount | round(2)}}
    • Conditionals: {% if condition %}...{% endif %}
  3. Table Formatting: Use markdown tables to display structured data:

    • Multi-column tables for loop descriptions (showing all items)
    • Two-column tables (Field | Value) for single item details
Example: Loop Description (All Items)

Display a summary table of all items being processed:

pattern:
  type: loop
  name: "Claims Processing Loop"
  description: |
    ## Iterative Claim Processing
    
    Processes each pending claim through the complete vetting workflow.
    
    {% if eci_pending_claims_scanner.pending_claims_list and eci_pending_claims_scanner.pending_claims_list | length > 0 %}
    ## Pending Claims Summary
    
    | Claim ID | Seller | Claimant | Claim Amount | Claim Reason |
    |----------|--------|----------|-------------|-------------|
    {% for claim in eci_pending_claims_scanner.pending_claims_list %}
    | {{ claim.claim_id }} | {{ claim.seller_name or 'N/A' }} | {{ claim.claimant_name or 'N/A' }} | {% if claim.invoice_amount %}${{ claim.invoice_amount | round(2) }}{% else %}N/A{% endif %} | {{ claim.claim_reason_category or 'N/A' }} |
    {% endfor %}
    {% endif %}
  iterate_over: "eci_pending_claims_scanner.pending_claims_list"
  loop_item_key: "current_claim"

Result: Shows a table with all pending claims before processing begins.

Example: Single Item Description (Two-Column Format)

Display detailed information about the current item being processed:

pattern:
  type: sequential
  name: "Single Claim Processing Flow"
  description: |
    ## Individual Claim Vetting Process
    
    Complete workflow for processing a single claim.
    
    {% if current_claim %}
    ## Current Claim Details
    
    | Field | Value |
    |-------|-------|
    | Claim ID | {{ current_claim.claim_id }} |
    {% if current_claim.policy_number %} | Policy Number | {{ current_claim.policy_number }} |
    {% endif %}{% if current_claim.seller_name %} | Seller | {{ current_claim.seller_name }} |
    {% endif %}{% if current_claim.buyer_name %} | Buyer | {{ current_claim.buyer_name }} |
    {% endif %}{% if current_claim.invoice_amount %} | Claim Amount | ${{ current_claim.invoice_amount | round(2) }} |
    {% endif %}{% if current_claim.claim_reason_category %} | Claim Reason | {{ current_claim.claim_reason_category }} |
    {% endif %}
    {% endif %}

Result: Shows detailed information about the current claim in a two-column format.

Example: Application Loop
pattern:
  type: loop
  name: "Applications Processing Loop"
  description: |
    ## Iterative Application Processing
    
    Processes each pending application through the complete risk assessment workflow.
    
    {% if tci_pending_applications_scanner.pending_applications_list and tci_pending_applications_scanner.pending_applications_list | length > 0 %}
    ## Pending Applications Summary
    
    | Application ID | Applicant (Seller) | Buyer | Requested Credit Limit | Requested Term |
    |----------------|-------------------|-------|----------------------|----------------|
    {% for app in tci_pending_applications_scanner.pending_applications_list %}
    | {{ app.application_id }} | {{ app.seller_name or 'N/A' }} | {{ app.buyer_name or 'N/A' }} | {% if app.requested_amount %}{{ app.currency_symbol or '$' }}{{ app.requested_amount | format_currency }}{% else %}N/A{% endif %} | {% if app.requested_term_days %}{{ app.requested_term_days }} days{% else %}N/A{% endif %} |
    {% endfor %}
    {% endif %}
  iterate_over: "tci_pending_applications_scanner.pending_applications_list"
  loop_item_key: "current_application"
Dynamic List Iteration (dynamic_iterate_over)

For recursive discovery or scenarios where new items are added during iteration, enable dynamic re-evaluation of the list:

pattern:
  type: sequential
  steps:
    - node: scanner
    - type: loop
      iterate_over: "scanner.items_list"
      loop_item_key: "current_item"
      dynamic_iterate_over: true  # Re-evaluate list before each iteration
      termination:
        max_iterations: 50  # Safety limit (required for dynamic mode)
      body:
        type: sequential
        steps:
          - node: processor
          - node: discovery_agent  # May add new items to scanner.items_list
          - node: recorder
    - node: finalizer

Key Features:

  • dynamic_iterate_over: true: Re-evaluates the iterate_over list before each iteration
  • Picks up new items: Items added during iteration (e.g., by discovery_agent) are automatically processed
  • Duplicate prevention: Tracks processed items to avoid processing the same item twice
  • Safety limit: Always use max_iterations to prevent infinite loops

How It Works:

  1. Before each iteration, the loop re-resolves the iterate_over path
  2. Filters out already processed items (using item ID or hash)
  3. Processes the first unprocessed item
  4. Continues until no new items are found or max_iterations is reached

Use Cases:

  • Recursive Discovery: Finding related items that may lead to more items (e.g., ReconVoy pipeline)
  • Database Updates: Processing items that may trigger new items to be added to the queue
  • Dynamic Workloads: When the work list grows during processing

Example: Recursive Discovery:

- type: loop
  iterate_over: "related_items_discovery.related_items"
  loop_item_key: "related_item"
  dynamic_iterate_over: true  # Pick up newly discovered items
  termination:
    max_iterations: 50  # Safety limit
  body:
    type: sequential
    steps:
      - node: item_discovery  # Finds foreign book matches
      - node: related_items_discovery  # May add more items to related_items list

Performance Note: Dynamic iteration re-evaluates the list on each iteration, which may have performance implications for expensive operations (database queries, complex context resolution). Use static iteration (dynamic_iterate_over: false or omitted) when the list doesn't change during processing.

Accessing Accumulated Loop Results

When accumulate_results is enabled (default: true), loop patterns automatically create *_instances aliases for downstream agents, similar to repeat patterns. This allows downstream agents to access all loop iteration results, not just the last one.

How it works:

  • After the loop completes, for each agent that ran inside the loop body, a dictionary is created with the pattern {agent_id}_instances
  • Each iteration's result is stored with a key like {agent_id}_0, {agent_id}_1, etc.
  • Downstream agents can access all iterations using {{agent_id}_instances} in their prompts or inputs

Example:

pattern:
  type: sequential
  steps:
    - node: question_loader
    - type: loop
      iterate_over: "question_loader.questions_list"
      loop_item_key: "current_question"
      accumulate_results: true  # Default, but explicit for clarity
      body:
        type: sequential
        steps:
          - node: validator  # Runs for each question
    - node: summary_reporter  # Can access all validator results

In the summary_reporter agent configuration, you can access all validation results:

# config/agents/summary_reporter.yml
prompt:
  inputs:
    inline: |
      - Total Questions: {{question_loader.total_count}}
      - Validation Results: {{validator_instances}}

The validator_instances dictionary will contain:

{
  "validator_0": {"validation_results": {...}, "tools_used": {...}},
  "validator_1": {"validation_results": {...}, "tools_used": {...}},
  "validator_2": {"validation_results": {...}, "tools_used": {...}}
}

Key Points:

  • Only agents that run inside the loop body get *_instances aliases
  • The alias is created automatically when accumulate_results=true (default)
  • Instance keys follow the pattern {agent_id}_{iteration_index} (0-based)
  • Prefers structured parsed output when available, otherwise uses raw result
  • This mirrors repeat pattern behavior for consistency across patterns

When to use accumulate_results:

  • โœ… Enable (default): When you need downstream agents to process all loop results (e.g., summary reports, aggregations, batch processing)
  • โŒ Disable: When you only need the final iteration's result or want to reduce memory usage for very large loops
Best Practices
  1. Use Conditional Rendering: Always check if variables exist before using them:

    {% if current_claim.seller_name %} | Seller | {{ current_claim.seller_name }} |{% endif %}
    
  2. Provide Fallbacks: Use or 'N/A' for optional fields:

    {{ claim.seller_name or 'N/A' }}
    
  3. Format Numbers: Use built-in Jinja2 filters for currency and decimals:

    {{ amount | format_currency }}  # "125,000.00"
    {{ 0.85 | format_percentage }}  # "85.0%"
    {{ amount | format_number(decimals=2) }}  # "125,000.00"
    

    See Available Jinja2 Filters for all formatting options.

  4. Loop Descriptions: Show all items in a multi-column table

  5. Single Item Descriptions: Use two-column format (Field | Value) for readability

  6. Scanner Data: Ensure your scanner agents extract all needed fields from the database:

    SELECT claim_id, seller_name, claimant_name, invoice_amount, claim_reason_category
    FROM claims WHERE status = 'pending'
    
  7. Table Formatting: Use Jinja2 whitespace control to ensure proper markdown table formatting:

    • Use {%- and -%} to strip whitespace around tags
    • Separate conditional rows onto their own lines
    • Example:
    | Field | Value |
    |-------|-------|
    | Claim ID | {{ current_claim.claim_id }} |
    {%- if current_claim.seller_name %}
    | Seller | {{ current_claim.seller_name }} |
    {%- endif %}
    {%- if current_claim.buyer_name %}
    | Buyer | {{ current_claim.buyer_name }} |
    {%- endif %}
    

    For loop tables:

    | Claim ID | Seller | Claimant |
    |----------|--------|----------|
    {%- for claim in scanner.pending_claims_list %}
    | {{ claim.claim_id }} | {{ claim.seller_name }} | {{ claim.claimant_name }} |
    {%- endfor %}
    

    Why: Jinja2 whitespace control ({%- and -%}) prevents extra blank lines that break markdown table formatting.

Markdown Table Formatting Best Practices

Based on testing and validation, follow these rules to ensure tables render correctly:

Critical Rules:

  1. No Blank Lines Within Tables: Blank lines between table rows break markdown table formatting

    โŒ BAD:
    {% if condition %}
    
    | Status | {{ status }} |
    {% endif %}
    
    โœ… GOOD:
    {%- if condition %}
    | Status | {{ status }} |
    {%- endif %}
    
  2. Whitespace Control for Conditionals: Always use {%- and -%} for conditionals in table cells

    โŒ BAD:
    | Amount | {% if sym %}{{ sym }}{{ amount | format_currency }}{% else %}N/A{% endif %} |
    
    โœ… GOOD:
    | Amount | {%- if sym %}{{ sym }}{{ amount | format_currency }}{%- else %}N/A{%- endif %} |
    
  3. Nested Conditionals: Apply whitespace control to conditionals, but NOT to {% set %} statements (they need newlines for markdown tables)

    โŒ BAD (whitespace control on set strips newline):
    | Amount | {%- if has_amount %}{%- set sym = currency_symbol %}{%- if sym | length >= 3 %}{{ sym }} {{ amount }}{%- else %}{{ sym }}{{ amount }}{%- endif %}{%- else %}N/A{%- endif %} |
    
    โœ… GOOD (no whitespace control on set, preserves newlines):
    | Amount | {%- if has_amount %}{% set sym = currency_symbol %}{%- if sym | length >= 3 %}{{ sym }} {{ amount }}{%- else %}{{ sym }}{{ amount }}{%- endif %}{%- else %}N/A{%- endif %} |
    
  4. checks_table Pattern: Use {{ checks_table }} (not {{- checks_table -}}) to preserve newlines, and ensure blank line after </summary>. Use {% if %} (NOT {%- if %}) for conditionals around checks_table to preserve newlines.

    โŒ BAD (strips newlines, breaks table):
    {%- if agent.checks_table %}{{- agent.checks_table -}}{%- else %}
    **Summary:** {{ agent.details }}
    {%- endif %}
    
    โŒ BAD (no blank line after </summary>, table won't render):
    <details>
    <summary>Test Summary</summary>
    {%- if agent.checks_table %}
    {{ agent.checks_table }}
    {%- endif %}
    </details>
    
    โœ… GOOD (preserves newlines, blank line after </summary>, no whitespace control on if):
    <details>
    <summary>Test Summary</summary>
    
    {% if agent.checks_table %}
    {{ agent.checks_table }}
    
    {% endif %}
    </details>
    
  5. Set Statements Before Table Rows: Use {% set %} (NOT {%- set %}) when used before markdown tables, especially in <details> blocks. The whitespace control strips newlines that markdown tables need.

    โŒ BAD (whitespace control strips newline, breaks table rendering):
    <details>
    <summary>Test Summary</summary>
    
    {%- set fh_score = assessor.score if assessor else None %}
    {% if agent.checks_table %}
    {{ agent.checks_table }}
    {% endif %}
    </details>
    
    โœ… GOOD (no whitespace control on set, preserves newlines):
    <details>
    <summary>Test Summary</summary>
    
    {% set fh_score = assessor.score if assessor else None %}
    {% if agent.checks_table %}
    {{ agent.checks_table }}
    {% endif %}
    </details>
    

    Also: Put {% set %} statements on separate lines before table rows, not inline

    โŒ BAD (set on same line as table row, breaks formatting):
    | Risk Factor | Score | Weight | Weighted Score |
    |-------------|-------|--------|----------------|
    {% set fh_score_val = tci_financial_health_assessor.score if tci_financial_health_assessor else None %}| Financial Health | {{ fh_score_val | default_if_none }} | 0.22 | {{ fh_weighted | default_if_none }} |
    
    โœ… GOOD (set on separate line before table row):
    | Risk Factor | Score | Weight | Weighted Score |
    |-------------|-------|--------|----------------|
    {% set fh_score_val = tci_financial_health_assessor.score if tci_financial_health_assessor else None %}
    {% set fh_weighted = (fh_score_val * 0.22) | round(2) if fh_score_val is not none else None %}
    | Financial Health | {{ fh_score_val | default_if_none }} | 0.22 | {{ fh_weighted | default_if_none }} |
    
  6. Filters Don't Introduce Whitespace: Filters themselves are safe, but Jinja2 tags need control

    โœ… SAFE (filters don't add whitespace):
    | Amount | {{ amount | format_currency }} |
    | Status | {{ status | default_if_none }} |
    
    โš ๏ธ NEEDS CONTROL (tags add whitespace):
    | Amount | {%- set sym = currency_symbol %}{{ amount | format_currency(currency_symbol=sym) }} |
    
  7. Tables Inside HTML Tags: Markdown tables inside HTML tags (like <details>) require a blank line after the closing tag

    โŒ BAD (table starts on same line as </summary>):
    <details>
    <summary>Test Summary</summary>
    | Check | Status |
    |-------|--------|
    | Test | PASS |
    </details>
    
    โœ… GOOD (blank line after </summary>):
    <details>
    <summary>Test Summary</summary>
    
    | Check | Status |
    |-------|--------|
    | Test | PASS |
    </details>
    

Key Learnings from Testing:

  • Blank lines within tables: Break markdown rendering - never include blank lines between table rows
  • Blank lines before tables: Required when tables are inside HTML tags (e.g., <details>)
  • checks_table pattern: Use {{ checks_table }} (not {{- checks_table -}}) to preserve newlines that are part of the table structure
  • Whitespace control for conditionals: Use {%- and -%} for conditionals and loops within table cells
  • Set statements: Use {% set %} (NOT {%- set %}) when used before markdown tables - whitespace control strips newlines that markdown parsers need
  • Set statement placement: Place {% set %} statements on separate lines before table rows, not inline

Testing: Use the tests/test_markdown_tables.py script to validate table formatting:

python tests/test_markdown_tables.py
# or
python -m pytest tests/test_markdown_tables.py -v

The test script generates markdown files in tests/output/markdown_tables/ for visual inspection and validates:

  • No blank lines within tables
  • Tables don't start on the same line as previous content
  • Consistent column counts across all table rows
  • Proper whitespace control in conditionals

Testing: Use the tests/test_markdown_tables.py script to validate table formatting before deployment:

python tests/test_markdown_tables.py
# or
python -m pytest tests/test_markdown_tables.py -v
Available Jinja2 Filters

The Topaz Agent Kit provides a comprehensive set of Jinja2 filters for formatting and styling in templates. These filters are automatically available in:

  • Pattern descriptions (pipeline YAML files)
  • HITL gate descriptions
  • Agent input templates (YAML inputs.inline sections)

Number Formatting:

{{ 125000 | format_currency }}           # "125,000.00"
{{ 125000 | format_currency(decimals=0) }}  # "125,000"
{{ 1250.5 | format_number(decimals=2) }}    # "1,250.50"
{{ 0.85 | format_percentage }}             # "85.0%"
{{ 0.8523 | format_percentage(decimals=2) }} # "85.23%"

Score/Risk Color Coding:

{# For risk scores where lower is better (0-25 green, 26-50 amber, etc.) #}
<span style="color: {{ risk_score | risk_score_color }};">{{ risk_score }}</span>

{# For credit/quality scores where higher is better (85-100 green, 70-84 amber, etc.) #}
<span style="color: {{ credit_score | credit_score_color }};">{{ credit_score }}</span>

{# Generic score color with custom thresholds #}
<span style="color: {{ score | score_color(thresholds=[(80, "#green"), (60, "#yellow"), (0, "#red")]) }};">{{ score }}</span>

Text Formatting:

{{ long_text | truncate_text(50) }}              # Truncate to 50 chars with "..."
{{ 5 | pluralize("item") }}                      # "items"
{{ 1 | pluralize("child", "children") }}         # "child"
{{ text | highlight_text("search term") }}        # Highlight search terms

Date/Time Formatting:

{{ "2025-01-28" | format_date }}                    # "2025-01-28"
{{ "2025-01-28" | format_date("%B %d, %Y") }}       # "January 28, 2025"
{{ 3665 | format_duration }}                        # "1 hour 1 minute 5 seconds"
{{ 3665 | format_duration(compact=True) }}          # "1h 1m 5s"

Data Formatting:

{{ 1572864 | format_file_size }}                    # "1.5 MB"
{{ "1234567890" | mask_sensitive(4) }}              # "1234******"
{{ "1234567890" | format_phone }}                   # "(123) 456-7890"

Utility Functions:

{{ 10 | safe_divide(2) }}                          # 5.0
{{ 10 | safe_divide(0, "N/A") }}                    # "N/A"
{{ None | default_if_none("โ€”") }}                   # "โ€”"

Complete Filter Reference:

Filter Description Example
format_currency(value, decimals=2) Format as currency with commas 125000 โ†’ "125,000.00"
format_number(value, decimals=0, thousands_sep=",") Format number with optional decimals 1250.5 โ†’ "1,250.50"
format_percentage(value, decimals=1, multiply=True) Format as percentage 0.85 โ†’ "85.0%"
risk_score_color(value) Color for risk scores (lower is better) 15 โ†’ "#22c55e" (green)
credit_score_color(value) Color for credit scores (higher is better) 90 โ†’ "#22c55e" (green)
score_color(value, thresholds, low_is_better) Generic score color with custom thresholds See examples above
truncate_text(value, max_length=100, suffix="...") Truncate text with suffix "Very long text" โ†’ "Very long..."
pluralize(value, singular, plural=None) Singular/plural form 5 โ†’ "items"
format_date(value, format_str="%Y-%m-%d") Format date/datetime "2025-01-28" โ†’ "January 28, 2025"
format_duration(seconds, compact=False) Format duration 3665 โ†’ "1 hour 1 minute 5 seconds"
format_file_size(value, binary=False) Format bytes as file size 1572864 โ†’ "1.5 MB"
mask_sensitive(value, visible_chars=4, mask_char="*") Mask sensitive data "1234567890" โ†’ "1234******"
format_phone(value, format_str="us") Format phone number "1234567890" โ†’ "(123) 456-7890"
safe_divide(numerator, denominator, default=0) Safe division 10 / 0 โ†’ 0
default_if_none(value, default="N/A") Default for None values None โ†’ "N/A"
Available Variables by Context
  • Loop Description: Access the full list from scanner (e.g., scanner.pending_list)
  • Loop Body (Sequential/Parallel): Access current item via loop_item_key (e.g., current_claim, current_application)
  • Conditional/Switch Branches: Access variables from upstream agents or context
  • Not Available: Extractor outputs or downstream agent results (they haven't executed yet)
Hierarchy snapshots (cards/chips/patterns)
  • Haiku Writers Room (group_chat)

    • group_chat (Haiku Writers Room)
      • card: haiku_form_expert
      • card: haiku_imagery_specialist
      • card: haiku_editor
  • Math Compass (sequential + switch)

    • sequential: Math Problem Solving Flow
      • chip: orchestrator โ†’ math_strategist
      • card: math_strategist
      • switch: Complexity Router
        • sequential (TRUE case)
          • chip: math_strategist โ†’ math_calculator
          • card: math_calculator
  • TCI Policy Risk Assessor (loop โ†’ sequential โ†’ parallel โ†’ switch)

    • loop: Applications Processing Loop (one per application)
      • sequential: Single Application Assessment Flow
        • parallel: Data Collection Phase (collectors)
        • parallel: Risk Factor Assessment (risk scorers)
        • switch: Decision Routing
          • sequential: case Modified Terms
          • sequential: case Rejection
          • sequential: case Escalation
          • sequential: case Information Request
  • ECI Claims Vetter (loop โ†’ sequential โ†’ parallel)

    • loop: Claims Processing Loop (one per claim)
      • sequential: Single Claim Processing Flow
        • parallel: Parallel Validation Checks (e.g., history lookup, eligibility)
  • Handoff

    • handoff:

Protocol Support

Flexible agent communication protocols:

  • A2A (Agent-to-Agent): Direct agent interaction patterns via SDK (default for remote agents)
  • IN-PROC: Local in-process execution for performance (automatic for local agents)
  • Mixed: Use different protocols per agent in the same workflow
  • Auto-detection: Local agents automatically use IN-PROC regardless of pattern protocol

Why A2A Only?

ACP (Agent Communication Protocol) support has been removed as of version 0.3.0. This change was made for the following reasons:

  1. Official Deprecation: The ACP SDK has been officially deprecated and merged with A2A under the Linux Foundation (August 2025). The ACP SDK is no longer actively maintained and new features are not being added.

  2. Simplified Configuration: Removing ACP support significantly simplifies agent configuration:

    • Single url field instead of urls.a2a and urls.acp objects
    • No protocol_support array needed
    • Cleaner, more maintainable YAML files
    • Reduced port management complexity
  3. Unified Protocol: A2A protocol now supports all ACP functionality plus additional features like path-based routing, making it the single recommended protocol for remote agent communication.

  4. Future-Proof: A2A is actively developed with community support and is the long-term standard for agent-to-agent communication.

Migration: If you have existing agents using ACP, they should be migrated to A2A. The A2A SDK provides full compatibility with ACP features, and migration is straightforward - simply update the remote.url field in your agent configurations.

๐Ÿงฐ Built-in MCP Toolkits

Topaz Agent Kit includes 15 comprehensive MCP toolkits with 75+ tools ready to use:

๐Ÿ“„ Document Intelligence

DocExtract Toolkit (doc_extract_*)

Extract structured data, tables, and metadata from documents.

  • doc_extract_structured_data: Extract structured data from PDFs, DOCX, PPTX, HTML, Markdown using AI-powered field extraction
  • doc_extract_tables: Extract tables from documents with structure preservation
  • doc_extract_metadata: Extract document metadata (title, author, creation date, etc.)

DocRAG Toolkit (doc_rag_*)

Document retrieval and semantic search with ChromaDB.

  • doc_rag_query_document: Query documents using semantic search with citation support
  • doc_rag_list_documents: List all documents in the collection

ImageRAG Toolkit (image_rag_*)

Image analysis and OCR-based search.

  • image_rag_query_images: Query images using OCR text content with semantic search
  • image_rag_list_images: List all images in the collection

๐ŸŒ Web & Search

Browser Toolkit (browser_*)

Web scraping and automation.

  • browser_scrape_website_content: Scrape and extract content from websites

Serper API Toolkit (serper_api_*)

Web search integration via Serper API.

  • serper_api_search_internet: Search the internet with Serper API
  • serper_api_search_news: Search news articles with Serper API

SEC API Toolkit (sec_api_*)

SEC filings and financial document retrieval.

  • sec_api_search_10q: Search and retrieve SEC 10-Q quarterly reports
  • sec_api_search_10k: Search and retrieve SEC 10-K annual reports

๐Ÿ”ข Math & Analysis

Math Toolkit (math_*)

Advanced mathematical operations and problem-solving.

Basic Operations:

  • math_multiply: Multiply two numbers
  • math_evaluate_expression: Evaluate mathematical expressions safely
  • math_percentage_of: Calculate percentage of a value
  • math_percent_change: Calculate percent change between two values
  • math_fraction_to_decimal: Convert fraction to decimal
  • math_ceil_divide: Ceiling division (round up)

Advanced Math:

  • math_solve_equations: Solve systems of equations
  • math_solve_inequality: Solve inequalities
  • math_compute_log: Compute logarithms (natural or custom base)
  • math_compute_exp: Compute exponential functions
  • math_trig_functions: Compute trigonometric functions (sin, cos, tan, etc.)
  • math_differentiate: Differentiate mathematical expressions
  • math_integrate_expr: Integrate mathematical expressions

Text Processing:

  • math_summarize: Summarize text (rule-based)
  • math_llm_summarize: Summarize text using LLM
  • math_extract_quantities: Extract numerical quantities from text
  • math_llm_parse_math_problem: Parse math problems using LLM
  • math_sanitize_expression: Sanitize mathematical expressions for safe evaluation

๐Ÿ“ง Communication

Email Toolkit (email_*)

Gmail integration for email management via SimpleGmail.

Email Operations:

  • email_send: Send emails with HTML/plain text, attachments, and custom headers
  • email_list_messages: List messages from a label/folder with pagination
  • email_read_message: Read full message content with attachments
  • email_search: Search emails by query with filters
  • email_move: Move emails between labels/folders
  • email_mark_read: Mark emails as read
  • email_mark_unread: Mark emails as unread
  • email_delete: Delete emails
  • email_download_attachment: Download email attachments to local path

Email Management:

  • email_list_labels: List all Gmail labels/folders

Helper Tools:

  • email_get_company_info: Retrieve company information
  • email_get_email_signature: Generate professional email signature
  • email_spell_check: Perform spell check on text

๐Ÿ—„๏ธ Database

SQLite Toolkit (sqlite_*)

SQLite database operations.

  • sqlite_query: Execute SELECT queries and return results as JSON
  • sqlite_execute: Execute INSERT, UPDATE, DELETE statements
  • sqlite_schema: Get table schema information

๐Ÿ“ Filesystem

Filesystem Toolkit (fs_*)

Basic file and directory operations.

  • fs_listdir: List directory contents
  • fs_makedirs: Create directories (with exist_ok option)
  • fs_move_file: Move files between locations (with automatic directory creation)

โœˆ๏ธ Travel

Flights Toolkit (flight_*)

Flight search and booking via Amadeus API.

  • flight_search: Search for flights with flexible date options
  • flight_price: Get detailed pricing for flight offers
  • flight_seatmap: Get seat map information
  • route_price_metrics: Get price metrics for routes
  • airline_lookup: Lookup airline information by IATA codes
  • flight_book: Book flights with traveler and payment information
  • flight_order_get: Get flight order details
  • flight_order_cancel: Cancel flight orders

Hotels Toolkit (hotel_*)

Hotel search and booking via Amadeus API.

  • hotel_search: Search for hotels by location, amenities, ratings
  • hotel_offers: Get hotel offers with pricing
  • hotel_offer_details: Get detailed offer information
  • hotel_book: Book hotels with guest and payment information

Activities Toolkit (activities_*)

Travel activities and points of interest via Amadeus API.

  • activities_search: Search for activities by location, date, keywords
  • activity_details: Get detailed activity information
  • pois_search: Search for points of interest

๐Ÿ› ๏ธ Utilities

Common Toolkit (common_*)

General utility functions.

  • common_ocr_reader: Extract text from images using OCR
  • common_form_parser: Parse form data from text
  • common_entity_normalizer: Normalize entity names and values
  • common_read_image: Read and process image files
  • common_read_document: Read and process document files

Insurance Toolkit (insurance_*)

Insurance-specific domain tools.

  • insurance_policy_lookup: Lookup insurance policy information
  • insurance_actuarial_calculator: Perform actuarial calculations
  • insurance_severity_classifier: Classify claim severity
  • insurance_fraud_scoring: Score claims for fraud risk
  • insurance_duplicate_claim_checker: Check for duplicate claims
  • insurance_anomaly_detector: Detect anomalies in claims data

๐Ÿง  AgentOS Memory Toolkit (agentos_*)

Filesystem-based memory system for agents to store, retrieve, and search information using Unix-like commands.

  • agentos_shell: Execute shell commands in a sandboxed filesystem (ls, cat, echo, grep, semgrep, mkdir, etc.)

Key Features:

  • Filesystem as Memory: Agents use familiar Unix commands to manage memory
  • Pipeline-Level Shared Memory: Share templates, company info, and reference data across agents
  • Agent-Level Individual Memory: Each agent has isolated memory for personal data
  • Auto-Indexing: Files automatically indexed for semantic search (semgrep)
  • Security: Sandboxed execution with path traversal protection
  • Configurable Structure: Define memory directories in YAML configuration

Example Usage:

# Agent configuration with AgentOS memory
mcp:
  
  servers:
    - url: "http://localhost:8050/mcp"
      toolkits: ["agentos_memory"]
      tools: ["agentos_shell"]

memory:
  inherit: true  # Inherit shared memory from pipeline
  directories:
    - path: "/memory/senders/"
      description: "Sender preferences and history"
      readonly: false
      auto_index: true
    - path: "/workspace/"
      description: "Working directory for drafts"
      readonly: false
      auto_index: false

Agent Prompt Example:

{{agentos_memory_section}}

## Workflow:
1. Check sender history: `agentos_shell("ls /memory/senders/")`
2. Load preferences: `agentos_shell("cat /memory/senders/{email}/preferences.md")`
3. Access shared templates: `agentos_shell("cat /shared/email_templates/greetings/formal.md")`
4. Store new data: `agentos_shell('echo "content" > /memory/senders/{email}/preferences.md')`
5. Semantic search: `agentos_shell('semgrep "similar email pattern"')`

Pipeline-Level Shared Memory:

# Pipeline configuration
memory:
  shared:
    directories:
      - path: "/shared/email_templates/"
        description: "Email template library (READ-ONLY)"
        readonly: true
        auto_index: true
        bootstrap: true
      - path: "/shared/company_info/"
        description: "Company information (READ-ONLY)"
        readonly: true
        auto_index: true
        bootstrap: true

Best Practices:

  • Use /shared/ for read-only reference data (templates, company info, policies)
  • Use /memory/ for agent-specific data that persists across sessions
  • Use /workspace/ for temporary working files
  • Enable auto_index: true for directories you want to search semantically
  • Set readonly: true for shared directories to prevent accidental modifications

For detailed documentation, see the AgentOS Memory System section below.

๐Ÿ”ง External MCP Server Integration

Connect to external MCP servers for enterprise integrations:

mcp:
  servers:
    - url: "http://enterprise-mcp-server:8080/mcp"
      toolkits: ["enterprise", "database"]
      tools: ["enterprise_*", "db_*"]

Features:

  • Connect to any MCP-compatible server
  • Custom toolkits and tools from external servers
  • Automatic tool discovery and registration
  • Secure credential management

๐Ÿš€ Quick Start

Installation

From PyPI (Recommended)

# Install Topaz Agent Kit using pip
pip install topaz-agent-kit

# Or with optional dependencies
pip install topaz-agent-kit[fastapi,mcp,ui]

# Install using uv (requires --prerelease=allow due to dependency requirements)
uv add --prerelease=allow topaz-agent-kit

# Or with optional dependencies
uv add --prerelease=allow "topaz-agent-kit[fastapi,mcp,ui]"

Note for uv users: The --prerelease=allow flag may be required for some dependencies that use pre-release versions. This is typically needed for agent-framework (MAF) beta versions.

Reproducible Installation (Exact Versions)

For reproducible installs with exact dependency versions matching the development environment:

Using pip:

# Download requirements.txt from the package source distribution
# Or extract it from the installed package:
pip show -f topaz-agent-kit | grep requirements.txt

# Install with exact versions
pip install -r requirements.txt topaz-agent-kit

Using uv (Recommended):

# Option 1: Install with exact versions using uv pip (pip-compatible)
uv pip install -r requirements.txt topaz-agent-kit

# Option 2: Sync environment to match requirements.txt exactly
uv pip sync requirements.txt
uv pip install topaz-agent-kit

# Option 3: If you have a project with pyproject.toml, you can also:
# Extract requirements.txt from source distribution, then:
uv pip install -r requirements.txt topaz-agent-kit

Why use exact versions? The requirements.txt file contains exact pinned versions of all dependencies that were tested and verified during development. This ensures:

  • โœ… Same dependency versions as the development environment
  • โœ… Reproducible builds across different machines
  • โœ… Avoids version conflicts and compatibility issues

Note: The requirements.txt file is automatically generated from uv.lock during the build process and is included in the source distribution. Both pip and uv can use this file directly.

From Local Wheel File

If you've built the package locally or have a wheel file:

# Using uv (recommended for projects using uv)
uv add --prerelease=allow /path/to/topaz_agent_kit-0.3.0-py3-none-any.whl

# Or using pip
pip install /path/to/topaz_agent_kit-0.3.0-py3-none-any.whl

# Or using uv pip
uv pip install /path/to/topaz_agent_kit-0.3.0-py3-none-any.whl

Note: Replace 0.3.0 with the actual version number and update the path to your wheel file location.

Create Your First Project

# Quick start with default basic template (recommended for beginners)
topaz-agent-kit init .

# Or specify a template explicitly
topaz-agent-kit init -f basic ./my_project        # Basic foundation (same as above)
topaz-agent-kit init -s ensemble ./my_project     # Full starter with examples

Run Your Agents

# Start web interface (recommended)
topaz-agent-kit serve fastapi --project ./my_project

# Or command-line interface
topaz-agent-kit serve cli --project ./my_project

# Or MCP server
topaz-agent-kit serve mcp --project ./my_project

Generate Workflow Diagrams

Topaz Agent Kit automatically generates professional workflow diagrams for all your pipelines using Graphviz:

# Diagrams are auto-generated during project initialization
topaz-agent-kit init .                    # Default basic template
topaz-agent-kit init -s ensemble ./my_project  # Or use starter template

# Or generate diagrams manually for existing projects
topaz-agent-kit generate --project ./my_project

Generated Files: projects/ensemble/ui/static/assets/{pipeline}_workflow.{dot,svg}

Visual Elements

  • Agent Nodes: Blue rounded rectangles with agent names
  • HITL Gates: Colored diamonds (blue=approval, purple=input, teal=selection)
  • SWITCH Pattern: Amber hexagon for conditional routing
  • Conditional Nodes/Gates: Gray diamonds with TRUE/FALSE paths
  • LOOP Pattern: Self-loop with iteration count label
  • HANDOFF Pattern: Gray circles for virtual orchestrator with dotted edges to specialists
  • GROUP CHAT: Ellipse hub with solid edges to participants, dashed returns, LOOP self-loop, END edge label (ALL CAPS)
  • Arrow Labels: ALL CAPS labels showing gate paths (APPROVE, REJECT, TRUE, FALSE)

Example Workflow

START โ†’ Agent A โ†’ HITL Gate โ†’ True โ†’ Agent B โ†’ END
                            โ†“
                         Reject โ†’ END

Group Chat Pattern

Multiple agents collaborate in a shared conversation. Supports:

  • Selection strategies: round_robin, llm (custom or virtual orchestrator)
  • Termination: max_rounds and/or condition
  • Diagramming: Central hub, dashed returns, LOOP self-loop, END edge labeled in ALL CAPS

YAML example (Haiku Writers Room):

pattern:
  type: group_chat
  participants:
    - node: haiku_form_exper
    - node: haiku_imagery_specialist
    - node: haiku_editor
  selection_strategy: round_robin
  termination:
    max_rounds: 9
    condition: "contains(last_message, 'APPROVED')"

Generate workflow diagrams:

# Diagrams auto-generated during init
topaz-agent-kit init -s ensemble ./my_project

# Or generate manually
topaz-agent-kit generate --project ./my_project

Outputs:

  • projects/ensemble/ui/static/assets/haiku_writers_room_workflow.svg

Implementation references:

  • Pattern runner: src/topaz_agent_kit/core/execution_patterns.py (GroupChatRunner)
  • Graphviz generator: src/topaz_agent_kit/cli/graphviz_generator.py

Event-Driven Pipelines

Pipelines can be automatically triggered by external events, enabling reactive workflows that respond to file system changes, webhooks, database updates, or custom triggers.

Key Features

  • ๐Ÿ”„ Multiple Trigger Types: File watcher (filesystem events), webhook (HTTP), database (row changes), scheduled (cron), and extensible for custom triggers
  • ๐Ÿ“ File Watcher: Monitor directories for file creation, modification, deletion, or move events
  • ๐ŸŽฏ Pattern Matching: Use wildcards to filter files (e.g., *.txt, contract_*.pdf)
  • ๐Ÿ“ Context Extraction: Jinja2 templates extract context from trigger events (file paths, metadata, etc.)
  • ๐Ÿ”€ Session Strategies: Control how sessions are managed (per_file, per_pipeline, custom)
  • โšก Non-Blocking: Event triggers don't interfere with normal user-initiated pipeline execution

Configuration

Add event_triggers section to your pipeline configuration:

name: "Math Repeater"
description: "Solves math problems from files automatically"

# Event triggers configuration
event_triggers:
  type: "file_watcher"
  watch_directory: "data/repeat"
  file_patterns:
    - "*.txt"
  event_types:
    - "created"
  extract_context:
    user_text_template: "Solve problems in {{source}}"
  session_strategy: "per_file"

# Normal pipeline pattern (unchanged)
pattern:
  type: sequential
  steps:
    - node: math_repeater_parser
    # ... rest of pattern

Trigger Types

File Watcher (currently supported):

  • Monitors a directory for file system events
  • Supports wildcard patterns (*.txt, contract_*.pdf)
  • Event types: created, modified, deleted, moved
  • Automatically passes file paths to pipeline execution

Future trigger types (extensible architecture):

  • webhook: HTTP POST triggers
  • database: Row insert/update triggers
  • scheduled: Cron-based scheduling
  • Custom triggers via plugin system

Context Extraction

Use Jinja2 templates to extract context from trigger events:

extract_context:
  user_text_template: "Process file: {{file_name}} ({{file_size}} bytes)"

Available variables (file_watcher):

  • {{source}} - Full file path
  • {{file_path}} - Alias for source
  • {{file_name}} - Filename only
  • {{file_size}} - File size in bytes
  • {{event_type}} - Event type (created, modified, etc.)
  • All metadata fields from the trigger event

Session Strategies

Control how chat sessions are managed for triggered executions:

  • per_file (default): New session for each file event - isolated, no shared context
  • per_pipeline: One session for all events - accumulates context across files
  • custom: Pipeline-specific logic (e.g., per contract_id, per user_id)

Visual Representation

Workflow diagrams show event triggers as an alternative entry point:

[FILE WATCHER] โ”€โ”€(dashed)โ”€โ”€> START โ†’ Agent A โ†’ Agent B โ†’ END
     โ†‘
     โ”‚ (watching: data/repeat/*.txt)
  • Trigger node: Folder icon, light blue, shows trigger type
  • Dashed edge: Indicates alternative entry point
  • Normal flow: Unchanged - user-initiated execution still works

Normal Execution Unchanged

Important: Event triggers are additive - they don't change normal pipeline execution:

  • โœ… User-initiated execution: START โ†’ agents โ†’ END (unchanged)
  • โœ… Event-triggered execution: TRIGGER โ†’ START โ†’ agents โ†’ END (new)
  • โœ… Both paths converge at START, so the rest of the workflow is identical

Example Use Cases

  • Document Processing: Auto-process contracts when uploaded to data/contracts/
  • Batch Processing: Process math problems when files appear in data/problems/
  • Contract Lifecycle: Trigger sub-pipelines when contract files are created/modified
  • Data Pipeline: Process CSV files automatically when dropped in watch directory

Implementation

Event triggers are managed by the TriggerManager service:

  • Startup: Triggers are initialized when FastAPI starts
  • Monitoring: Background services watch for events
  • Execution: Events trigger pipeline execution via orchestrator
  • Shutdown: Triggers are gracefully stopped on app shutdown

Code location:

  • Trigger system: src/topaz_agent_kit/core/triggers/
  • File watcher: src/topaz_agent_kit/core/triggers/file_watcher.py
  • Manager: src/topaz_agent_kit/core/triggers/manager.py
  • Integration: src/topaz_agent_kit/services/fastapi_app.py

๐Ÿ› ๏ธ Creating New Pipelines

Pipeline Generation Workflow

Topaz Agent Kit includes a comprehensive, interactive workflow for generating complete pipeline configurations from a use case description. This workflow guides you through requirements gathering, workflow design, file generation, and validation.

When to Use the Workflow

Use the pipeline generation workflow when you need to:

โœ… Create a new pipeline from scratch
โœ… Generate all configuration files (pipeline config, agent configs, prompts, UI manifests)
โœ… Ensure production-ready output with proper validation
โœ… Follow best practices and naming conventions
โœ… Avoid common mistakes (conflicts, incorrect context variables, duplicate file extraction)

Don't use this workflow for:

  • โŒ Minor modifications to existing pipelines (edit files directly)
  • โŒ Quick experiments or prototypes (use simpler approaches)

How to Use the Workflow

For Users:

  1. Start a conversation with your AI assistant (e.g., Cursor Composer, ChatGPT, Claude)

  2. Request pipeline generation:

    I need to create a new pipeline. Please follow the pipeline generation workflow.
    

    Or be more specific:

    I want to create a pipeline for [your use case]. Follow the pipeline generation workflow.
    
  3. Work interactively through the workflow:

    • Answer questions about your requirements
    • Review proposals at each checkpoint
    • Approve or request changes
    • Confirm before file generation
  4. Review generated files and test the pipeline

Example Usage:

User: "I need to create a pipeline for analyzing customer feedback emails 
       and generating response suggestions."

AI Assistant: "I'll help you create this pipeline. Let me follow the pipeline 
               generation workflow. First, let me understand your use case better..."

The AI will guide you through:

  • Step 1: Comprehensive requirements gathering (use case, agents, patterns, HITL gates, etc.)
  • Step 2: Workflow design and proposal
  • Step 3: Interactive refinement
  • Step 4: File generation (all config files, prompts, UI manifests)
  • Step 5: Validation and summary

Workflow Features

  • 6 Review Checkpoints: The workflow includes explicit checkpoints where the AI pauses for your approval
  • Complete File Generation: Generates all necessary files (pipeline config, agent configs, prompts, UI manifests, icons)
  • Conflict Detection: Automatically checks for agent ID conflicts with existing files
  • Validation: Validates all generated files before completion
  • Best Practices: Follows naming conventions, context variable patterns, and MCP tool usage guidelines

What Gets Generated

The workflow generates all files needed for a complete pipeline:

  • Pipeline Config: config/pipelines/{pipeline_id}.yml (backend logic)
  • Agent Configs: config/agents/{agent_id}.yml (one per agent)
  • Prompt Templates: config/prompts/{agent_id}.jinja (one per agent)
  • UI Manifest: config/ui_manifests/{pipeline_id}.yml (display metadata)
  • Assets: Icons and workflow diagrams
  • Main Config Updates: Updates to config/pipelines.yml (registry) and config/ui_manifest.yml (global UI settings)
  • Assistant Classification: Updates assistant_intent_classifier.jinja for pipeline discovery

All files are placed in: src/topaz_agent_kit/templates/starters/ensemble/

Documentation

For detailed information about the workflow, see the complete workflow guide:

Quick Example

# 1. Start conversation with AI assistant
# 2. Request: "I need to create a pipeline for customer support ticket analysis. 
#              Follow the pipeline generation workflow."

# 3. Work through the interactive workflow:
#    - Answer questions about requirements
#    - Review workflow proposals
#    - Approve file generation
#    - Review validation results

# 4. Test your new pipeline:
topaz-agent-kit serve fastapi --project ./my_project

The workflow ensures you get production-ready, validated pipeline configurations that follow all best practices and naming conventions.

๐Ÿ”ง Local Tools: Pipeline-Specific Python Functions

Local tools are the strongest case for pipeline-specific business logic. They allow you to implement deterministic, schema-aware operations that agents can call directly, with automatic adaptation across all supported frameworks.

When to Use Local Tools

โœ… Use local tools for:

  • Pipeline-specific database operations: Schema-aware queries, validations, aggregations
  • Deterministic business logic: Billing calculations, statistical aggregations, simulations
  • Data transformations: Complex joins, data validation, format conversions specific to your domain
  • Operations requiring correctness guarantees: Critical calculations that must be reproducible and testable

โŒ Don't use local tools for:

  • Generic SQL queries โ†’ Use sqlite_query MCP tool
  • Generic file operations โ†’ Use fs_* MCP tools
  • External API calls โ†’ Use MCP tools or external services
  • Simple text generation โ†’ Let the LLM handle it

How to Define Local Tools

1. Create a tool module in your project:

projects/your_pipeline/
  tools/
    your_pipeline/
      your_tools.py

2. Implement tools with @pipeline_tool decorator:

# tools/your_pipeline/your_tools.py

from topaz_agent_kit.local_tools.registry import pipeline_tool
from typing import Dict, Any, Optional
import sqlite3

@pipeline_tool(toolkit="your_pipeline", name="validate_and_summarize")
def validate_and_summarize(
    db_file: str,
    target_state: str
) -> Dict[str, Any]:
    """Validate database schema and summarize data.
    
    Args:
        db_file: Absolute path to SQLite database file
        target_state: Target state name
    
    Returns:
        Dictionary with validation results and summary
    """
    conn = sqlite3.connect(db_file)
    try:
        # Your schema-aware validation and aggregation logic
        # ...
        return {
            "ok": True,
            "target_state": target_state,
            "summary": {...}
        }
    finally:
        conn.close()

@pipeline_tool(toolkit="your_pipeline", name="calculate_bill")
def calculate_bill(
    db_file: str,
    customer_id: str,
    usage_kwh: float
) -> Dict[str, Any]:
    """Calculate bill for a customer.
    
    Args:
        db_file: Absolute path to database
        customer_id: Customer identifier
        usage_kwh: Usage in kilowatt-hours
    
    Returns:
        Dictionary with billing details
    """
    # Your billing calculation logic
    # ...
    return {"total": 123.45, "breakdown": {...}}

Key Requirements:

  • Use type annotations for all parameters (required for framework adaptation)
  • Provide docstrings (used as tool descriptions)
  • Use @pipeline_tool(toolkit="...", name="...") decorator
  • Return structured data (dict, list, or JSON-serializable types)

How to Use Local Tools in Agents

Wire tools into agent configuration:

# config/agents/your_agent.yml

local_tools:
  modules:
    - tools.your_pipeline.your_tools
  toolkits: ["your_pipeline"]
  tools: ["your_pipeline.*"]  # Pattern: all tools in toolkit
  # Or specify individual tools:
  # tools: ["your_pipeline.validate_and_summarize", "your_pipeline.calculate_bill"]

Configuration Options:

  • modules: List of Python module paths (relative to project root)
  • toolkits: List of toolkit names to filter by
  • tools: List of tool name patterns (supports glob patterns like "your_pipeline.*")

Framework Compatibility

Local tools are automatically adapted for all supported frameworks via FrameworkToolAdapter:

  • โœ… CrewAI: Tools converted to CrewAI Tool objects
  • โœ… LangGraph: Tools converted to LangChain StructuredTool objects
  • โœ… Microsoft Agent Framework (MAF): Tools integrated via MCPStreamableHTTPTool
  • โœ… OAK: Tools converted to FunctionTool objects
  • โœ… ADK: Tools passed as callables
  • โœ… MAF: Tools used directly with Pydantic model generation
  • โœ… Agno: Tools passed as callables

The same tool implementation works across all frameworks - no framework-specific code needed!

Complete Example: Rate Case Pipeline

See the ensemble starter for a complete reference:

  • Tool Implementation: projects/ensemble/tools/rate_case_filing_navigator/rate_case_tools.py
  • Agent Configuration: projects/ensemble/config/agents/rate_case_data_summarizer.yml

The rate case pipeline demonstrates:

  • Schema-aware database operations
  • Complex billing calculations (tiered rates, time-of-use)
  • Data validation and aggregation
  • Multiple tools in a single toolkit
  • Tools used across multiple agents and frameworks

Best Practices

  1. Group related tools in a single module under a toolkit
  2. Use descriptive names that indicate the toolkit (e.g., rate_case_validate_and_summarize)
  3. Provide comprehensive docstrings - they become tool descriptions
  4. Use type annotations - required for proper framework adaptation
  5. Handle errors gracefully - log errors and return structured error responses
  6. Keep tools focused - one tool, one responsibility

๐ŸŽฏ Advanced Features

๐Ÿง  Prompt Intelligence Engine

Automatically detects variables in prompts and generates agent classes with proper context handling. The Prompt Intelligence Engine supports multiple variable syntaxes for flexible input handling.

Input Variable Options

When defining agent inputs in YAML files, you can use several variable syntaxes:

1. Simple Variables

Reference variables from the main execution context:

inputs:
  inline: |
    User request: {{user_text}}
    Current date: {{current_date}}

Resolution: Variables are resolved from the main context, HITL results, or upstream agent outputs (in that order).

2. Agent-Prefixed Variables

Explicitly reference variables from specific upstream agents:

inputs:
  inline: |
    Problem: {{user_text}}
    
    Strategist Output:
      - Expression: {{math_strategist.expression}}
      - Steps: {{math_strategist.steps}}
    
    Calculator Output:
      - Result: {{math_calculator.result}}
      - Rationale: {{math_calculator.rationale}}

Syntax: {{agent_id.variable_name}}

Resolution: Directly accesses the specified field from the upstream agent's parsed output.

Benefits:

  • Explicit dependency declaration
  • Clear data flow visualization
  • Prevents ambiguity when multiple agents have similar field names
3. Expression Variables

Use complex expressions with conditional logic, operators, and functions:

inputs:
  inline: |
    Expression: {{math_strategist.expression if math_strategist.expression else 'No expression provided'}}
    Step count: {{len(math_strategist.steps) if math_strategist.steps else 0}}
    Status: {{'Ready' if math_calculator.result else 'Pending'}}

Supported Operators:

  • Comparison: ==, !=, >, <, >=, <=
  • Boolean: AND, OR, NOT
  • String: contains, starts_with, ends_with, in, not in
  • Null checks: is null, is not null
  • Functions: len(array) for array length
  • Ternary: A if B else C (Python-style conditional expressions)

Examples:

# Ternary expression with default value
{{upstream_agent.field if upstream_agent.field else 'default_value'}}

# Conditional based on array length
{{'Complex' if len(agent.steps) > 5 else 'Simple'}}

# Nested field access with fallback
{{agent.nested.field if agent.nested else 'N/A'}}

# Boolean logic
{{'High' if agent.score > 0.8 AND agent.verified else 'Low'}}

Resolution: Expressions are evaluated at runtime using the ExpressionEvaluator, which has access to all upstream agent outputs and context variables.

4. Variables with Default Values

Provide default values for variables that might not exist:

inputs:
  inline: |
    Topic: {{topic:General Discussion}}
    Tone: {{tone:Professional}}
    Max length: {{max_length:1000}}

Syntax: {{variable_name:default_value}}

Resolution: If the variable is not found in context, the default value is used.

Note: Default values work with simple variables. For prefixed variables or expressions, use ternary expressions instead.

5. Mixed Usage

Combine all variable types in a single prompt:

inputs:
  task:
    description:
      inline: |
        User Request: {{user_text}}
        
        Previous Agent Analysis:
          - Expression: {{math_strategist.expression if math_strategist.expression else 'Not provided'}}
          - Steps: {{math_strategist.steps}}
          - Complexity: {{'High' if len(math_strategist.steps) > 5 else 'Low'}}
        
        Current Calculation:
          - Result: {{math_calculator.result}}
          - Rationale: {{math_calculator.rationale if math_calculator.rationale else 'No rationale available'}}
        
        Settings:
          - Max iterations: {{max_iterations:10}}
          - Debug mode: {{debug_mode:false}}

Variable Resolution Order

Variables are resolved in the following order:

  1. Main Context: Root-level context variables (e.g., user_text, current_date)
  2. HITL Results: Data from human-in-the-loop gates (e.g., user_preferences, approval_feedback)
  3. Upstream Agents: Outputs from agents that executed earlier in the pipeline
    • For simple variables: Searches all upstream agents automatically
    • For prefixed variables: Directly accesses the specified agent's output
    • For expressions: Evaluates using all available context

INPUTS Tab Display

The INPUTS tab in the UI shows only variables that were explicitly defined in your YAML configuration:

  • โœ… Shown: Variables explicitly used in inputs, task.description, instruction, etc.
  • โŒ Hidden: System variables (context, pipeline_data), upstream agent dicts (added for Jinja2 expressions), and internal variables

This ensures the INPUTS tab displays only user-facing variables that are relevant to understanding the agent's input.

Backward Compatibility

All existing variable syntaxes continue to work:

  • Simple variables: {{variable_name}} โœ…
  • Default values: {{variable_name:default}} โœ…
  • Jinja2 filters: {{variable_name | filter}} โœ…

New syntaxes are additive and don't break existing configurations.

Best Practices

  1. Use Prefixed Variables for Clarity: When referencing upstream agents, use {{agent_id.field}} for explicit dependencies
  2. Use Expressions for Logic: Complex conditional logic should use expressions rather than multiple simple variables
  3. Provide Defaults: Use default values or ternary expressions to handle missing data gracefully
  4. Document Dependencies: Prefixed variables make data flow explicit and easier to understand
  5. Test Expressions: Verify complex expressions work correctly with your data structures

Example: Math Compass Pipeline

# math_calculator.yml
inputs:
  inline: |
    Expression: {{math_strategist.expression if math_strategist.expression else 'No expression provided'}}
    Steps: {{math_strategist.steps}}

# math_auditor.yml
inputs:
  task:
    description:
      inline: |
        Problem: {{user_text}}
        
        Strategist Output:
          - Expression: {{math_strategist.expression}}
          - Steps: {{math_strategist.steps}}
        
        Calculator Output:
          - Result: {{math_calculator.result}}
          - Rationale: {{math_calculator.rationale}}

Features:

  • Automatic variable extraction from all syntaxes
  • Context-aware generation with proper resolution logic
  • Template rendering with Jinja2
  • Type-safe variable injection
  • Expression evaluation with full operator support
  • Backward compatible with existing configurations

๐Ÿ” Repeat Pattern

The Repeat Pattern allows you to execute the same agent multiple times in parallel, with each instance receiving unique input data. This is perfect for processing arrays of items, batch operations, or parallelizing independent tasks.

Basic Configuration

pattern:
  type: sequential
  steps:
    - node: data_parser
    - type: parallel
      repeat:
        node: processor_agent
        instances: "data_parser.item_count"
        input_mapping:
          item: "data_parser.items[index]"
          item_index: "index"
        instance_id_template: "{{node_id}}_{{index}}"
    - node: aggregator_agent

Key Parameters

  • node (required): The agent ID to repeat
  • instances (required): Number of instances (integer) or expression string (e.g., "parser.count")
  • input_mapping (optional): Maps input variable names to templates with {{index}} substitution
  • instance_id_template (optional): Template for generating unique instance IDs (default: "{{node_id}}_instance_{{index}}")
  • instance_context_key (optional): Context key for instance metadata (default: "repeat_instance")

Instance-Specific Inputs

Use input_mapping to provide unique data to each instance:

repeat:
  node: math_solver
  instances: "problem_parser.problem_count"
  input_mapping:
    problem_text: "problem_parser.problems[index]"
    problem_index: "index"

How it works:

  • {{index}} is automatically replaced with the instance number (0, 1, 2, ...)
  • Array indexing like problems[index] extracts the specific item for each instance
  • Each instance receives its own copy of the mapped variables

Accessing Instance Results in Downstream Agents

After all instances complete, their results are aggregated into a dictionary accessible by downstream agents. Use the pattern {base_agent_id}_instances to access all instance results:

# In downstream agent's inputs section
inputs:
  inline: |
    {% if math_solver_instances %}
    Solver Results:
    {% for instance_id, solver_data in math_solver_instances.items() %}
    - {{instance_id}}:
      Problem: {{solver_data.problem_text}}
      Answer: {{solver_data.answer}}
      Explanation: {{solver_data.explanation}}
    {% endfor %}
    {% endif %}

Key Points:

  • The aggregated dictionary uses the key {base_agent_id}_instances (e.g., math_solver_instances)
  • Each entry is keyed by the instance ID (e.g., math_solver_0, math_solver_1)
  • The value is the parsed output from that instance
  • Use Jinja2 {% for %} loops to iterate over all instances
  • Loop variables (instance_id, solver_data) are automatically filtered out from INPUTS tab

Important: Using Loop Variables in Templates

The prompt intelligence engine only detects and expands loop variables that are used directly in {{ }} expressions. Variables used only in {% set %} statements are not detected for expansion in the INPUTS tab.

โœ… Correct - Use variables directly in {{ }} expressions:

inputs:
  inline: |
    {% for instance_id, file_data in file_report_generator_instances.items() %}
    - {{instance_id}}:
      File Name: {{file_data.file_report_generator.file_name}}
      Report: {{file_data.file_report_generator.report_md}}
    {% endfor %}

โŒ Incorrect - Using {% set %} prevents variable detection:

inputs:
  inline: |
    {% for instance_id, file_data in file_report_generator_instances.items() %}
      {% set file_report = file_data.file_report_generator %}
      - {{instance_id}}:
        File Name: {{file_report.file_name}}  # file_data won't be expanded in INPUTS tab
    {% endfor %}

Why? The prompt intelligence engine scans {{ }} expressions to identify variables for the INPUTS tab. Variables used only in {% set %} statements are not detected, so they won't appear in the INPUTS tab with expanded values (e.g., file_data[0], file_data[1]).

Best Practice: Use loop variables directly in {{ }} expressions instead of {% set %} for better visibility in the INPUTS tab.

Complete Example: Math Problem Solver

# Pipeline configuration
pattern:
  type: sequential
  steps:
    - node: math_repeater_parser
    - type: parallel
      repeat:
        node: math_repeater_solver
        instances: "math_repeater_parser.problem_count"
        instance_id_template: "{{node_id}}_{{index}}"
        input_mapping:
          problem_text: "math_repeater_parser.problems[index]"
          problem_index: "index"
    - node: math_repeater_report_generator

# Parser agent outputs: {problem_count: 3, problems: ["2+2", "3*4", "10/2"]}

# Each solver instance receives:
# Instance 0: problem_text="2+2", problem_index=0
# Instance 1: problem_text="3*4", problem_index=1
# Instance 2: problem_text="10/2", problem_index=2

# Report generator accesses all results:
inputs:
  inline: |
    Problem Count: {{math_repeater_parser.problem_count}}
    
    {% if math_repeater_solver_instances %}
    Solutions:
    {% for instance_id, solver_data in math_repeater_solver_instances.items() %}
    - Problem {{solver_data.problem_index}}: {{solver_data.problem_text}}
      Answer: {{solver_data.answer}}
    {% endfor %}
    {% endif %}

Instance Context Metadata

Each instance has access to metadata via the instance_context_key:

# In agent's prompt template
inputs:
  inline: |
    Processing item {{problem_instance.index}} of {{problem_instance.instance_id}}
    Item: {{problem_text}}

Available metadata:

  • {{instance_context_key}}.index: The instance index (0, 1, 2, ...)
  • {{instance_context_key}}.instance_id: The unique instance ID (e.g., math_solver_0)

Best Practices

  1. Use Dynamic Instance Counts: Use expressions like "parser.count" instead of hardcoded numbers
  2. Clear Variable Names: Use descriptive names in input_mapping (e.g., problem_text not text)
  3. Access Results via _instances Dictionary: Always use the {base_agent_id}_instances pattern in downstream agents
  4. Handle Empty Results: Use {% if agent_instances %} checks before iterating
  5. Unique Instance IDs: Customize instance_id_template if you need specific ID formats

Use Cases

  • Batch Processing: Process multiple files, documents, or data items in parallel
  • Parallel Problem Solving: Solve multiple independent problems simultaneously
  • Data Validation: Validate multiple records concurrently
  • Content Generation: Generate multiple variations or responses in parallel
  • API Calls: Make multiple independent API calls simultaneously

๐Ÿ” Enhanced Repeat Pattern (Sequential Repeat)

The Enhanced Repeat Pattern extends the basic repeat pattern to allow repeating an entire sequence of agents in parallel. This enables complex workflows where each instance runs through a multi-step process independently.

Configuration

pattern:
  type: sequential
  steps:
    - node: folder_scanner
    - type: parallel
      repeat:
        type: sequential  # Enhanced repeat: nested sequential pattern
        instances: "folder_scanner.file_count"
        instance_id_template: "file_{{index}}"
        instance_context_key: "file_instance"
        steps:
          - node: file_reader
            input_mapping:
              file_path: "folder_scanner.file_paths[index]"
              file_index: "index"
          - node: processor
          - node: report_generator
    - node: final_aggregator

Key Differences from Basic Repeat

  • type: sequential (required): Indicates this is an enhanced repeat with nested sequential pattern
  • steps (required): Array of agents/steps to run in sequence for each instance
  • input_mapping: Can be defined globally (applies to all agents) or per-step (overrides global)
  • Each instance runs the entire sequence independently and in parallel with other instances

How It Works

  1. Instance Creation: Creates N instances based on instances expression
  2. Parallel Execution: All instances run their sequences in parallel
  3. Sequence Execution: Within each instance, agents run sequentially
  4. Result Aggregation: Results from the last agent in each sequence are aggregated

Example: Multi-File Processing

pattern:
  type: sequential
  steps:
    - node: folder_scanner  # Scans folder, outputs: {file_count: 3, file_paths: [...]}
    - type: parallel
      repeat:
        type: sequential
        instances: "folder_scanner.file_count"  # 3 instances
        instance_id_template: "file_{{index}}"
        instance_context_key: "file_instance"
        steps:
          - node: file_reader  # Reads file for this instance
            input_mapping:
              file_path: "folder_scanner.file_paths[index]"
          - node: processor    # Processes the file
          - node: file_reporter # Generates report for this file
    - node: final_aggregator  # Aggregates all file reports

Execution Flow:

  • 3 parallel sequences run simultaneously:
    • Instance 0: file_reader_0 โ†’ processor_0 โ†’ file_reporter_0
    • Instance 1: file_reader_1 โ†’ processor_1 โ†’ file_reporter_1
    • Instance 2: file_reader_2 โ†’ processor_2 โ†’ file_reporter_2
  • All sequences complete before final_aggregator runs

Accessing Results from Enhanced Repeat

Results from the last agent in each sequence are aggregated into {last_agent_id}_instances:

# In final_aggregator's inputs
inputs:
  inline: |
    {% if file_reporter_instances %}
    File Reports:
    {% for instance_id, file_data in file_reporter_instances.items() %}
    - {{instance_id}}: {{file_data.report_md}}
    {% endfor %}
    {% endif %}

๐Ÿ” Nested Repeat Patterns

You can nest repeat patterns within enhanced repeat patterns to create complex hierarchical workflows. This is useful when each instance needs to process multiple sub-items.

Configuration

pattern:
  type: sequential
  steps:
    - node: folder_scanner
    - type: parallel
      repeat:
        type: sequential  # Enhanced repeat: process each file
        instances: "folder_scanner.file_count"
        instance_id_template: "file_{{index}}"
        instance_context_key: "file_instance"
        steps:
          - node: file_reader
            input_mapping:
              file_path: "folder_scanner.file_paths[index]"
          - type: parallel
            repeat:  # Nested repeat: process each problem in the file
              node: problem_solver
              instances: "file_reader.problem_count"
              input_mapping:
                problem_text: "file_reader.problems[index]"
                problem_index: "index"
              instance_id_template: "{{node_id}}_{{index}}"
          - node: file_report_generator
    - node: final_report_generator

Instance ID Structure

Nested repeat patterns create hierarchical instance IDs:

  • Format: {base_agent_id}_{parent_index}_{nested_index}
  • Example: problem_solver_0_0 = File 0, Problem 0
  • Display: "Problem Solver (Instance 1.1)" = File 1, Problem 1 (1-based)

ID Naming Convention:

  • Parent index (first number): The outer repeat pattern instance (e.g., file index)
  • Nested index (second number): The inner repeat pattern instance (e.g., problem index)
  • Both indices are 0-based in IDs, but displayed as 1-based in UI

Example: Multi-File Problem Solver

# Pipeline processes multiple files, each containing multiple problems
pattern:
  type: sequential
  steps:
    - node: folder_scanner  # Finds 2 files
    - type: parallel
      repeat:
        type: sequential
        instances: "folder_scanner.file_count"  # 2 instances (file_0, file_1)
        instance_id_template: "file_{{index}}"
        steps:
          - node: file_reader  # Reads file_0.txt (3 problems) or file_1.txt (2 problems)
          - type: parallel
            repeat:  # Nested: solve problems within each file
              node: problem_solver
              instances: "file_reader.problem_count"
              input_mapping:
                problem_text: "file_reader.problems[index]"
              instance_id_template: "{{node_id}}_{{index}}"
          - node: file_report_generator  # Report for this file's problems
    - node: final_report_generator  # Aggregates all file reports

Instance IDs Created:

  • file_reader_0 (file 0)
  • problem_solver_0_0 (file 0, problem 0)
  • problem_solver_0_1 (file 0, problem 1)
  • problem_solver_0_2 (file 0, problem 2)
  • file_report_generator_0 (file 0)
  • file_reader_1 (file 1)
  • problem_solver_1_0 (file 1, problem 0)
  • problem_solver_1_1 (file 1, problem 1)
  • file_report_generator_1 (file 1)

Accessing Nested Instance Results

Nested repeat patterns create scoped instance dictionaries to avoid conflicts:

# In file_report_generator (within file_0 instance)
inputs:
  inline: |
    # Access problem solvers for THIS file only
    {% if problem_solver_instances %}
    Solutions for this file:
    {% for instance_id, solver_data in problem_solver_instances.items() %}
    - {{solver_data.problem_text}}: {{solver_data.answer}}
    {% endfor %}
    {% endif %}

Scoping:

  • Within each file instance, problem_solver_instances contains only that file's problem solvers
  • The system automatically scopes nested instance dictionaries to prevent cross-contamination
  • Both scoped (problem_solver_instances_file_0) and unscoped (problem_solver_instances) keys are available

Note on Loop Variables: When accessing nested structures (e.g., file_data.file_report_generator.file_name), use variables directly in {{ }} expressions rather than {% set %} statements. This ensures the prompt intelligence engine can detect and expand loop variables for the INPUTS tab. See the "Important: Using Loop Variables in Templates" section above for details.

Frontend Visibility

Agent Cards:

  • Each instance appears as a separate card in the UI
  • Instance numbers are displayed in the format: "Agent Name (Instance N)" or "Agent Name (Instance Parent.Nested)"
  • Example: "Problem Solver (Instance 1.1)" for file 1, problem 1

INPUTS Tab:

  • Shows instance-specific variables from input_mapping
  • Displays loop variables (e.g., instance_id, solver_data) when iterating over instances
  • Filters out internal context variables automatically

OUTPUTS Tab:

  • Shows the parsed output for each instance
  • Instance ID is displayed in the output JSON (agent_id field)
  • Each instance's output is stored separately in upstream context

Protocol Chips:

  • Each instance shows its own protocol (local/remote)
  • Instances are visually grouped by their parent pattern

Important Nuances

  1. Instance ID Uniqueness: Nested patterns ensure unique IDs by including parent index

    • problem_solver_0_0 (file 0, problem 0) vs problem_solver_0_1 (file 1, problem 0)
    • This prevents conflicts when multiple parent instances run in parallel
  2. Context Scoping: Nested patterns automatically scope instance dictionaries

    • problem_solver_instances_file_0 (scoped to file 0)
    • problem_solver_instances (unscoped, accessible within file context)
  3. Input Mapping Inheritance:

    • Global input_mapping applies to all agents in the sequence
    • Step-level input_mapping overrides global mapping for that step
    • Nested patterns inherit parent context automatically
  4. Result Aggregation:

    • Enhanced repeat: Aggregates results from the last agent in each sequence
    • Nested repeat: Aggregates results from the nested agent, scoped to parent instance
  5. Instance Context Key:

    • Default: repeat_instance (basic repeat)
    • Customizable: file_instance, problem_instance, etc.
    • Access via: {{instance_context_key.index}} and {{instance_context_key.instance_id}}

Complete Example: Enhanced Math Repeater

# Pipeline: Process multiple math problem files, solve problems in each file, generate reports
pattern:
  type: sequential
  steps:
    - node: folder_scanner  # Scans folder, finds files
    - type: parallel
      repeat:
        type: sequential  # Enhanced repeat: process each file
        instances: "folder_scanner.file_count"
        instance_id_template: "file_{{index}}"
        instance_context_key: "file_instance"
        steps:
          - node: file_reader  # Read file
            input_mapping:
              file_path: "folder_scanner.file_paths[index]"
              file_index: "index"
          - type: parallel
            repeat:  # Nested repeat: solve each problem
              node: problem_solver
              instances: "file_reader.problem_count"
              input_mapping:
                problem_text: "file_reader.problems[index]"
                problem_index: "index"
              instance_id_template: "{{node_id}}_{{index}}"
          - node: file_report_generator  # Report for this file
    - node: final_report_generator  # Aggregate all file reports

# file_report_generator accesses problem_solver_instances (scoped to this file)
# final_report_generator accesses file_report_generator_instances (all files)

Best Practices for Nested Patterns

  1. Use Descriptive Instance Context Keys: file_instance, problem_instance instead of generic repeat_instance
  2. Customize Instance ID Templates: Use meaningful templates like "file_{{index}}" for clarity
  3. Scope Awareness: Understand that nested instance dictionaries are automatically scoped
  4. Access Patterns: Use unscoped keys within the same parent instance context
  5. Error Handling: Each instance fails independently; failed instances are reported separately
  6. Performance: Nested patterns can create many parallel instances; monitor resource usage

๐Ÿ”— Pipeline Composition (Pipeline as Node)

Pipeline Composition allows you to use an entire pipeline as a node within another pipeline. This powerful feature enables maximum code reusability, modular workflow design, and the ability to compose complex workflows from simpler, tested pipelines.

Overview

Instead of copying agents or recreating functionality, you can reference an existing pipeline and use it as a single step in a new pipeline. The sub-pipeline executes with its own isolated context, and its results are made available to the parent pipeline in a structured format.

Basic Configuration

# Parent pipeline configuration
pipelines:
  - id: math_repeater
    pipeline_file: "pipelines/math_repeater.yml"

nodes:
  - id: folder_scanner
    # config_file is optional - defaults to "agents/folder_scanner.yml" if not specified
  - id: final_report_generator
    # config_file is optional - defaults to "agents/final_report_generator.yml" if not specified

pattern:
  type: sequential
  steps:
    - node: folder_scanner
    - pipeline: math_repeater  # Use pipeline as a node
      input_mapping:
        user_text: "{{folder_scanner.file_paths[0]}}"
    - node: final_report_generator

Pipeline Registry

Similar to the nodes registry, you define a pipelines registry in your pipeline configuration:

pipelines:
  - id: math_repeater
    pipeline_file: "pipelines/math_repeater.yml"  # Relative to config/pipelines/
  - id: data_processor
    pipeline_file: "pipelines/data_processor.yml"

Important:

  • pipeline_file is relative to the config/pipelines/ directory of the parent pipeline
  • Pipeline IDs must be unique within a pipeline configuration
  • Circular dependencies are detected and prevented during validation

Using Pipelines in Patterns

You can use pipeline: instead of node: in any pattern step:

pattern:
  type: sequential
  steps:
    - node: folder_scanner
    - pipeline: math_repeater  # Pipeline as a step
      input_mapping:
        user_text: "{{folder_scanner.file_path}}"
    - node: aggregator

Input Mapping

Sub-pipelines can receive inputs from the parent pipeline through input_mapping:

pattern:
  type: sequential
  steps:
    - node: folder_scanner
    - pipeline: math_repeater
      input_mapping:
        user_text: "{{folder_scanner.file_paths[index]}}"  # Dynamic input per instance
        config: "{{folder_scanner.config}}"

How Input Mapping Works:

  1. The input_mapping is evaluated in the parent pipeline's context
  2. Values are set in the sub-pipeline's context before execution
  3. The sub-pipeline's first node can access these values via {{variable_name}}
  4. If the sub-pipeline has an inputs section defined, it takes precedence and can transform the mapped values

Accessing Sub-Pipeline Outputs

Sub-pipeline results are stored in a structured format in the parent's context:

# Access individual node outputs
{{math_repeater.nodes.math_repeater_parser.problem_count}}
{{math_repeater.nodes.math_repeater_solver.answer}}
{{math_repeater.nodes.math_repeater_report_generator.report_md}}

# Access final output (if outputs.final is configured)
{{math_repeater.report_md}}
{{math_repeater.summary}}

Output Structure:

context.upstream[pipeline_id] = {
  "result": final_output,           # Final pipeline output
  "parsed": final_parsed,           # Parsed final output
  "nodes": {                        # Individual node outputs
    "node_id": {
      "result": node_result,
      "parsed": node_parsed
    }
  },
  "intermediate": {                 # Intermediate outputs (if configured)
    "output_id": {
      "value": intermediate_value
    }
  }
}

Pipeline in Repeat Patterns

You can use pipelines in repeat patterns to process multiple items:

pattern:
  type: sequential
  steps:
    - node: folder_scanner
    - type: parallel
      repeat:
        pipeline: math_repeater  # Repeat pipeline for each file
        instances: "folder_scanner.file_count"
        instance_id_template: "{{pipeline_id}}_instance_{{index}}"
        instance_context_key: "file_instance"
        input_mapping:
          user_text: "{{folder_scanner.file_paths[index]}}"
    - node: final_report_generator

Instance Results Access: When a pipeline is repeated, all instances are collected in {pipeline_id}_instances:

# In final_report_generator inputs
inputs:
  inline: |
    {% if math_repeater_instances %}
    Pipeline Instance Results:
    {% for instance_id, instance_data in math_repeater_instances.items() %}
    - {{instance_id}}:
      Report: {{instance_data.nodes.math_repeater_report_generator.report_md}}
      Total Problems: {{instance_data.nodes.math_repeater_report_generator.total_problems}}
      Solved: {{instance_data.nodes.math_repeater_report_generator.solved_count}}
    {% endfor %}
    {% endif %}

Context Isolation and Sharing

Isolated Context:

  • Sub-pipelines execute with an isolated upstream context (deep copy)
  • Sub-pipeline agents write to their own upstream copy
  • Parent pipeline's upstream is not affected by sub-pipeline execution

Shared Context:

  • Read-only data is shared (shallow copy): project_dir, emitter, user_profiles, etc.
  • Sub-pipelines can read parent's upstream data for input mapping expressions
  • Sub-pipelines cannot write to parent's upstream (isolation)

Context Flow:

Parent Context
โ”œโ”€โ”€ upstream (deep copied for sub-pipeline)
โ”‚   โ”œโ”€โ”€ parent_node_1 (readable by sub-pipeline)
โ”‚   โ””โ”€โ”€ parent_node_2 (readable by sub-pipeline)
โ”œโ”€โ”€ project_dir (shared)
โ”œโ”€โ”€ emitter (shared)
โ””โ”€โ”€ user_profiles (shared)

Sub-Pipeline Context (isolated)
โ”œโ”€โ”€ upstream (isolated copy)
โ”‚   โ”œโ”€โ”€ parent_node_1 (read-only, from parent)
โ”‚   โ”œโ”€โ”€ parent_node_2 (read-only, from parent)
โ”‚   โ”œโ”€โ”€ sub_node_1 (writable, isolated)
โ”‚   โ””โ”€โ”€ sub_node_2 (writable, isolated)
โ””โ”€โ”€ [input_mapping variables] (set before execution)

Technical Details

Agent Factory Isolation:

  • Sub-pipelines create their own AgentFactory with merged configuration
  • Parent's agent_factory is removed from context before sub-pipeline execution
  • Sub-pipeline's AgentFactory can find agents from both parent and sub-pipeline configs

Agent Bus Isolation:

  • Sub-pipelines create their own AgentBus with merged configuration
  • Each sub-pipeline has its own agent registry and routing logic
  • Remote agents in sub-pipelines are properly routed

Configuration Merging:

  • Parent pipeline config is merged with sub-pipeline config
  • Sub-pipeline's nodes take precedence (for sub-pipeline execution)
  • Parent's nodes remain available (for parent pipeline execution)

Result Storage:

  • Sub-pipeline results are stored using instance_pipeline_id (for repeat patterns) or pipeline_id (for single execution)
  • Results are filtered to include only sub-pipeline nodes (not parent nodes)
  • Results are stored in nested structure: context.upstream[pipeline_id]

Pattern Type Support

Pipeline composition is supported across all pattern types:

Pattern Type Support Notes
Sequential โœ… Full Pipelines execute in order
Parallel โœ… Full Multiple pipelines execute simultaneously
Repeat โœ… Full Pipeline instances run in parallel with unique inputs
Loop โœ… Full Pipelines can be looped with termination conditions
Conditional โœ… Full Pipelines execute based on conditions
Switch โœ… Full Different pipelines in different branches
Handoff โš ๏ธ Limited Orchestrator must reference pipeline's final output node
Group Chat โœ… Full Pipelines can participate as group chat members

Complete Example: Multi-File Processing with Pipeline Reuse

# Parent pipeline: pipeline_math_repeater.yml
pipelines:
  - id: math_repeater
    pipeline_file: "pipelines/math_repeater.yml"

nodes:
  - id: enhanced_math_repeater_folder_scanner
    config_file: "agents/enhanced_math_repeater_folder_scanner.yml"
  - id: enhanced_math_repeater_final_report_generator
    config_file: "agents/enhanced_math_repeater_final_report_generator.yml"

pattern:
  type: sequential
  steps:
    - node: enhanced_math_repeater_folder_scanner
    - type: parallel
      repeat:
        pipeline: math_repeater  # Reuse math_repeater pipeline
        instances: "enhanced_math_repeater_folder_scanner.file_count"
        instance_id_template: "{{pipeline_id}}_instance_{{index}}"
        instance_context_key: "file_instance"
        input_mapping:
          user_text: "{{enhanced_math_repeater_folder_scanner.file_paths[index]}}"
    - node: enhanced_math_repeater_final_report_generator

# Sub-pipeline: math_repeater.yml
nodes:
  - id: math_repeater_parser
    config_file: "agents/math_repeater_parser.yml"
  - id: math_repeater_solver
    config_file: "agents/math_repeater_solver.yml"
  - id: math_repeater_report_generator
    config_file: "agents/math_repeater_report_generator.yml"

pattern:
  type: sequential
  steps:
    - node: math_repeater_parser
    - type: parallel
      repeat:
        node: math_repeater_solver
        instances: "math_repeater_parser.problem_count"
        input_mapping:
          problem_text: "math_repeater_parser.problems[index]"
    - node: math_repeater_report_generator

outputs:
  final:
    node: math_repeater_report_generator
    selectors: ["report_md", "total_problems", "solved_count"]

Execution Flow:

  1. enhanced_math_repeater_folder_scanner scans folder, finds 2 files
  2. math_repeater pipeline runs 2 instances in parallel:
    • Instance 0: Processes file_0.txt โ†’ parser โ†’ solver(s) โ†’ report
    • Instance 1: Processes file_1.txt โ†’ parser โ†’ solver(s) โ†’ report
  3. enhanced_math_repeater_final_report_generator aggregates all reports

Final Report Generator Access:

# In enhanced_math_repeater_final_report_generator inputs
inputs:
  inline: |
    Folder Scanner Results:
      File Count: {{enhanced_math_repeater_folder_scanner.file_count}}
    
    {% if math_repeater_instances %}
    Math Repeater Pipeline Instance Results:
    {% for instance_id, instance_data in math_repeater_instances.items() %}
    - {{instance_id}}:
      Report: {{instance_data.nodes.math_repeater_report_generator.report_md}}
      Total Problems: {{instance_data.nodes.math_repeater_report_generator.total_problems}}
      Solved: {{instance_data.nodes.math_repeater_report_generator.solved_count}}
    {% endfor %}
    {% endif %}

Visual Representation

Pipeline nodes are visually distinct in generated workflow diagrams:

  • Color: Light orange background (#FFF3E0) with dark orange border (#FF6F00)
  • Label: Shows pipeline name (e.g., "Math Repeater Pipeline")
  • Instance Info: For repeat patterns, shows "Pipeline Instance N"

Best Practices

  1. Reuse Over Recreate: Use pipeline composition instead of copying agents
  2. Clear Input Mapping: Explicitly map inputs from parent to sub-pipeline
  3. Output Configuration: Define outputs.final in sub-pipelines for clean final output access
  4. Instance ID Templates: Use descriptive templates like "{{pipeline_id}}_instance_{{index}}"
  5. Context Awareness: Understand that sub-pipelines have isolated upstream contexts
  6. Error Handling: Sub-pipeline failures are isolated and reported separately
  7. Agent Reuse: Reuse agents across pipelines to avoid duplication (service generation handles this automatically)

Common Patterns

Pattern 1: Sequential Pipeline Composition

steps:
  - node: preprocessor
  - pipeline: processor  # Process with reusable pipeline
  - node: postprocessor

Pattern 2: Parallel Pipeline Execution

steps:
  - type: parallel
    steps:
      - pipeline: data_processor
      - pipeline: image_processor
      - pipeline: text_processor

Pattern 3: Pipeline in Repeat Pattern

steps:
  - node: folder_scanner
  - type: parallel
    repeat:
      pipeline: file_processor
      instances: "folder_scanner.file_count"
      input_mapping:
        file_path: "{{folder_scanner.file_paths[index]}}"

Pattern 4: Nested Pipeline Composition

# Parent pipeline uses sub-pipeline
# Sub-pipeline can also use other pipelines
# Creates hierarchical workflow composition

Limitations and Considerations

  1. Circular Dependencies: Detected during validation, prevented at runtime
  2. Context Isolation: Sub-pipelines cannot directly modify parent's upstream (by design)
  3. Agent Factory: Each sub-pipeline creates its own factory (necessary for agent discovery)
  4. Service Generation: Agents reused across pipelines share service files (no duplication)
  5. Edge Protocol: Correctly detected for local/remote agents in sub-pipelines
  6. Expression Evaluation: Supports hierarchical paths like {pipeline_id}.{node_id}.{field}

Troubleshooting

Issue: Sub-pipeline agents not found

  • Cause: Agent factory not properly isolated
  • Solution: Ensure sub-pipeline's AgentFactory is created with merged config

Issue: Input mapping not working

  • Cause: Expression evaluation failing
  • Solution: Check that parent's upstream data is accessible (use {{parent_node.field}})

Issue: Cannot access sub-pipeline outputs

  • Cause: Wrong path structure
  • Solution: Use {{pipeline_id}.nodes.{node_id}.{field}} or {{pipeline_id}.{field}} for final output

Issue: Edge protocol showing A2A for local agents

  • Cause: Agent factory not in context for protocol detection
  • Solution: Fixed in code - uses agent_runner.agent_bus as fallback

๐Ÿ’ฌ Intelligent Pipeline Selection

LLM-based assistant automatically routes user requests to the correct pipeline:

# User: "Translate this to Spanish"
# Assistant routes to: translator pipeline

# User: "Solve this math problem"
# Assistant routes to: math_compass pipeline

# User: "Write a haiku about autumn"
# Assistant routes to: haiku_writers_room pipeline

๐Ÿ“Š Multi-Pipeline Architecture

Run multiple specialized pipelines in a single project:

# config/pipeline.yml
pipelines:
  - id: math_compass
    # config_file is optional - defaults to "pipelines/math_compass.yml" if not specified
  - id: stock_analysis
    # config_file is optional - defaults to "pipelines/stock_analysis.yml" if not specified
  - id: translator
    # config_file is optional - defaults to "pipelines/translator.yml" if not specified

๐Ÿ“ Configuration File Paths (Optional)

The config_file field is optional for nodes, pipelines, and independent agents. Topaz Agent Kit automatically resolves configuration file paths using sensible defaults:

  • For nodes: Defaults to agents/{id}.yml (e.g., agents/my_agent.yml for id: my_agent)
  • For pipelines: Defaults to pipelines/{id}.yml (e.g., pipelines/my_pipeline.yml for id: my_pipeline)
  • For independent_agents: Defaults to agents/{id}.yml

When to specify config_file explicitly:

  • If your configuration file is in a different location or has a different name
  • If you want to use a non-standard directory structure
  • If you need to override the default path for clarity

Examples:

# Using default path (recommended)
nodes:
  - id: my_agent  # Automatically resolves to "agents/my_agent.yml"

# Explicit override (when needed)
nodes:
  - id: my_agent
    config_file: "agents/custom/my_agent.yml"  # Custom path

# Pipeline registry with defaults
pipelines:
  - id: my_pipeline  # Automatically resolves to "pipelines/my_pipeline.yml"
  - id: legacy_pipeline
    config_file: "pipelines/legacy/old_pipeline.yml"  # Custom path

๐Ÿ” Context-Aware Output Management

Configure intermediate and final outputs per pipeline:

outputs:
  intermediate:
    selectors: ["content", "summary"]

  final:
    selectors: ["content", "summary", "metadata"]
    transform: |
      ๐Ÿ“Š Analysis Summary
      {{ content }}
      ๐Ÿ“ˆ Metadata: {{ metadata }}

๐ŸŽจ Visual Workflow Diagrams

Automatically generate professional diagrams with:

  • Agent nodes with proper styling
  • HITL gates with flow paths
  • Pattern visualization (switch, loop, handoff)
  • Protocol indicators
  • Termination conditions

๐Ÿ”„ Session Management

  • SQLite-based session persistence
  • Cross-pipeline context sharing
  • Document availability across sessions
  • Chat history with citations

๐Ÿ“‹ Available Templates

Starter Templates

  • ensemble: Complete multi-agent system with document processing, math, stock analysis, and content generation
  • math_demo: Mathematical problem-solving with calculator, strategist, and auditor agents
  • stock_analysis: Financial analysis pipeline with research, filing, and investment advisor agents
  • article_smith: Content creation workflow with research, writing, critique, and editing agents
  • reply_wizard: Email reply generation with context analysis, drafting, and polishing

Foundation Templates

  • basic: Minimal project structure for custom agent development

๐Ÿ”ง CLI Commands

Project Management

# Create new project (defaults to basic foundation)
topaz-agent-kit init .                    # Quick start with basic template
topaz-agent-kit init -f basic ./my_project  # Explicit basic foundation
topaz-agent-kit init -s ensemble ./my_project  # Full starter template

# Scaffold project structure only
topaz-agent-kit scaffold --starter <template> <output_dir>

# Generate agent code from configuration
topaz-agent-kit generate <project_dir>

# Validate project configuration
topaz-agent-kit validate <project_dir>

Service Management

# Start web interface with hot reload
topaz-agent-kit serve fastapi --project <project_dir> --reload

# Start command-line interface
topaz-agent-kit serve cli --project <project_dir>

# Start MCP server
topaz-agent-kit serve mcp --project <project_dir>

# Start services (A2A unified service)
topaz-agent-kit serve services --project <project_dir>

Discovery

# List available templates
topaz-agent-kit list --starters
topaz-agent-kit list --foundations
topaz-agent-kit list --all

Portable Demos

Prerequisites: uv must be installed for faster exports. Install with:

curl -LsSf https://astral.sh/uv/install.sh | sh  # Mac/Linux
# Export all three (wheel + runtime + demo) - default
topaz-agent-kit export -p projects/pa --output ./exports

# Export wheel only
topaz-agent-kit export-wheel --output ./exports

# Export runtime only
topaz-agent-kit export-runtime --output ./exports

# Export demo project only
topaz-agent-kit export-demo -p projects/pa --output ./exports

# Export only wheel + demo (skip runtime)
topaz-agent-kit export -p projects/pa --output ./exports --skip-runtime

# Export only wheel (skip runtime + demo) - no --project needed
topaz-agent-kit export --output ./exports --skip-runtime --skip-demo

# Dev build with git hash
topaz-agent-kit export-runtime --dev --output ./exports

# Skip build step (use existing wheel)
topaz-agent-kit export-runtime --skip-build --output ./exports

See Portable Demos section for detailed documentation.

โš™๏ธ Configuration

Pipeline Configuration (pipeline.yml)

Define your agent workflows with YAML, including integrated HITL gates:

# Multi-pipeline configuration with HITL integration
pipelines:
  - id: "content_creator"
    name: "Content Creator"
    description: "AI-powered content creation with human oversight"
pattern:
  type: sequential
      steps:
        - node: research_analyst

        - gate: approve_research
          on_approve: continue
          on_reject: stop

        - node: content_author

        - gate: review_draft
          on_approve: continue
          on_reject: retry_node
          retry_target: content_author
          max_retries: 3

        - node: editor

# HITL Gates configuration
gates:
  - id: approve_research
    type: approval
    title: "Approve Research"
    description: "Review research findings and approve to proceed"
    timeout_ms: 30000
    on_timeout: approve

  - id: review_draft
    type: input
    title: "Review Draft"
    description: "Provide feedback on the content draft"
    fields:
      - name: feedback
        label: "Your Feedback"
        type: textarea
        required: true
        validation:
          min_length: 10
          max_length: 1000
      - name: action
        label: "Action"
        type: select
        required: true
        options:
          - value: "approve"
            label: "Approve"
          - value: "refine"
            label: "Request Revision"
          - value: "reject"
            label: "Reject"
    target_agents: ["content_author"]
    context_key: "draft_feedback"
    timeout_ms: 120000
    on_timeout: skip

# Agent registry
nodes:
  - id: research_analyst
    # config_file is optional - defaults to "agents/research_analyst.yml" if not specified
  - id: content_author
    # config_file is optional - defaults to "agents/content_author.yml" if not specified
  - id: editor
    # config_file is optional - defaults to "agents/editor.yml" if not specified

Agent Configuration (agents/*.yml)

# Agent definition with HITL context integration
instruction: |
  You are a content author. Create high-quality content based on research findings.

  {% if draft_feedback %}
  Previous feedback: {{ draft_feedback.feedback }}
  Action requested: {{ draft_feedback.action }}
  {% endif %}

framework: "langgraph"
model: "azure_openai"

# MCP tool integration
mcp:
  servers:
    - url: "http://127.0.0.1:8050/mcp"
      toolkits: ["doc_extract", "common"]
      tools: ["doc_extract_*", "common_*"]

# Output configuration
outputs:
  final:
    selectors: ["content", "summary"]
    transform: |
      ๐Ÿ“ **Content**: {{ value.content }}
      ๐Ÿ“Š **Summary**: {{ value.summary }}

UI Configuration (ui_manifest.yml)

# Web interface customization
brand:
  logo: "assets/my-logo.png"
  name: "My Agent System"

appearance:
  default_theme: "dark"
  default_accent: "210 92% 56%"

features:
  pipeline_panel: true
  hitl: true # Enable Enhanced HITL System
  typing_indicator: true

# Pipeline cards
cards:
  - id: "content_creator"
    title: "Content Creator"
    subtitle: "AI-powered content creation with human oversight"
    icon: "assets/content-creator.svg"

๐ŸŒ Web Interface Features

Scripts Tab

The Scripts tab in the sidebar provides a convenient way to discover, configure, and execute setup scripts for your pipelines. This is especially useful for initializing databases, generating mock data, and setting up test environments.

Features:

  • Script Discovery: Automatically lists all executable scripts from project_dir/scripts/
  • Script Registry: Uses scripts.yml for human-readable names, descriptions, and parameter definitions
  • Interactive Execution: Run scripts directly from the UI with parameter configuration
  • Parameter Management:
    • View default parameters from registry
    • Modify parameter values before execution
    • Add custom parameters (string or flag types)
    • Visual indicators for parameters using default values
  • Execution Monitoring: Real-time output display with:
    • Execution status (running, success, error)
    • Standard output (stdout)
    • Error output (stderr)
    • Return code
    • Execution time

Script Types Supported:

  • Python scripts (.py)
  • PowerShell scripts (.ps1)
  • Shell scripts (.sh)
  • Batch files (.bat, .cmd)

Script Registry Format (scripts.yml):

scripts:
  - filename: "setup_eci_database.py"
    name: "Setup ECI Database"
    description: "Initializes the ECI Claims database and generates mock data"
    category: "Setup"
    parameters:
      - name: "db-path"
        description: "Path to SQLite database file"
        type: "string"
        default: "projects/ensemble/data/eci/eci_database.db"
        required: false
      - name: "reset"
        description: "Reset database (drop existing tables)"
        type: "flag"
        default: "false"
        required: false
      - name: "approve-count"
        description: "Number of 'approve' claims to generate"
        type: "integer"
        default: "1"
        required: false

Parameter Types:

  • string: Text input with optional default value
  • integer: Numeric input with optional default value
  • boolean: Checkbox input (true/false)
  • flag: Command-line flag (added to command if value is true)

Usage:

  1. Navigate to the Scripts tab in the sidebar
  2. Browse available scripts grouped by category
  3. Click Run next to any script
  4. Review and modify parameters in the modal dialog
  5. Click Run Script to execute
  6. Monitor execution output in real-time

Path Resolution: Scripts automatically handle path resolution based on execution context:

  • Scripts can use paths relative to repository root (e.g., projects/ensemble/data/...)
  • Works correctly whether run from repository root or project directory
  • Uses intelligent path resolution utilities for maximum compatibility

๐ŸŒ Web Interface Features

Real-time Agent Visualization

  • Live Pipeline Execution: Watch agents execute in real-time with AG-UI protocol
  • Interactive Workflow Diagrams: Visual representation of agent flows with dynamic updates
  • Step-by-step Progress: Detailed execution tracking with intermediate results
  • Protocol Indicators: Visual representation of A2A and IN-PROC protocols in UI

Document Management

  • Drag & Drop Upload: Intuitive file upload interface
  • Multi-file Support: Upload multiple documents simultaneously
  • Session Persistence: Files available across chat sessions
  • Document Preview: Built-in PDF and image preview capabilities
  • Direct Agent Processing: Files are automatically passed to agents as multimodal input (images, documents, URLs)
    • Agents receive file content directly (not just file paths)
    • Supports images (JPEG, PNG, GIF, WebP, etc.) for visual analysis
    • Supports documents (PDF, DOCX, TXT, CSV, etc.) with text extraction
    • URL detection and processing for remote resources

Enhanced HITL Interface

  • Multi-Type Gates: Dynamic UI rendering for approval, input, and selection gates
  • Form Validation: Real-time validation for input fields with custom rules
  • Flow Control Visualization: See retry, skip, and stop actions in real-time
  • Context-Aware Prompts: HITL data automatically available in agent prompts
  • Timeout Management: Configurable timeouts with fallback behaviors

Assistant Response Card

Display structured assistant decision context in the UI timeline:

# config/pipeline.yml
assistant:
  id: assistant_intent_classifier
  type: maf
  model: azure_openai
  prompt:
    instruction:
      jinja: prompts/assistant_intent_classifier.jinja

Features:

  • Full Structured Response: Shows complete assistant response data including tool planning, execution, reasoning, and metadata
  • Timeline Placement: Appears after agent pipeline cards, before the final assistant message
  • Generic Design: Automatically displays all fields from structured response (no hardcoding, future-proof)
  • Tabbed Interface:
    • INPUTS tab: Shows user input + added context (final message sent to assistant)
    • OUTPUTS tab: Shows complete structured response data
  • Metadata Chips: Displays framework, model, pipeline name, agent name, and status as header chips
  • Persistent: Saved to chat database and restored on session reload
  • User Control: Display controlled via "Assistant Card" setting in sidebar (Settings tab โ†’ Execution Settings)

Card Content:

  • Assistant response text
  • Tool planned vs tool executed
  • Tool parameters
  • Raw tool output (for debugging)
  • Reasoning (if available)
  • Success/error status
  • Framework and model information
  • Pipeline/agent metadata

User Settings

The sidebar Settings tab provides comprehensive control over UI appearance and execution visibility:

Appearance Settings

  • Theme: Choose between System (follows OS preference), Light, or Dark mode
  • Accent Color: Select from preset color schemes (Sky, Emerald, Amber, Violet, Blush)
  • Project Defaults: Theme and accent can be configured in ui_manifest.yml as project defaults
  • Restore Defaults: Reset all settings to project defaults with a single click

Execution Settings

Control which execution-related UI components are displayed in the timeline:

  • Assistant Card: Show/hide assistant response cards with decision context (tool planning, execution, reasoning)
  • Agent Cards & Protocol Chips: Show/hide agent execution cards and protocol indicators (IN-PROC, A2A)
  • Workflow Card: Show/hide pipeline workflow visualization diagram
  • Citations & Sources: Show/hide citation cards and sources indicator from RAG responses
  • Footers: Show/hide footer entries with action buttons (copy, feedback, regenerate)
  • File Upload Cards: Show/hide file upload progress/status cards in timeline

Features:

  • Real-time Updates: Changes apply immediately without page refresh
  • Persistent: Settings saved to browser localStorage and restored on page load
  • Cross-tab Sync: Settings synchronized across multiple browser tabs/windows
  • User Preferences: All settings are user-specific and override project defaults
  • HITL Gates: Always displayed regardless of settings (required for user interaction)

Note: Execution settings control UI visibility only. All events are still emitted by the backend; the frontend filters display based on user preferences.

๐Ÿ“ฑ App Mode: Config-Driven UI Apps

App Mode enables you to build config-driven UI applications where users and AI agents collaborate on shared artifacts (articles, forms, documents, etc.) through a rich widget library and flexible layouts.

Overview

App Mode provides:

  • 47 Widget Types: Input widgets, viewers, editors, cards, chips, tables, and more
  • Flexible Layouts: Grid, flex, columns, and stack layouts with responsive design
  • State Management: Persistent state stored in database with real-time sync
  • AI Integration: Agents can read and update canvas state via tools
  • Validation: Client-side and server-side validation support
  • Three Canvas Modes: Declarative (YAML-defined), Agent (AI-generated), and Hybrid (mix of both)

Canvas Modes: How App Mode Works

Topaz Agent Kit supports three modes for canvas UI generation, giving you flexibility in how you define and control your app's interface:

1. Declarative Mode (YAML-Defined UI)

Definition: UI structure is fully defined in YAML manifest. Agents can update state (data) but cannot modify UI structure.

Characteristics:

  • โœ… Stable & Predictable: UI structure never changes
  • โœ… Type Safety: Bindings validated against YAML definitions
  • โœ… Developer Control: Full control over UI layout
  • โœ… Performance: Predictable rendering, no structure changes
  • โš ๏ธ Less Flexible: Cannot adapt to data variations

Use Cases:

  • Stable forms with fixed structure
  • Critical UI that must remain consistent
  • Applications where layout is predetermined
  • Production apps requiring predictable behavior

Example Configuration:

# config/apps/invoice_review_app.yml
app_id: invoice_review_app
title: Invoice Review

canvas:
  source: declarative  # Explicit mode declaration (default if not specified)
  layout:
    type: grid
    columns: 12
  
  sections:
    - id: invoice_overview
      title: "Invoice Overview"
      widget: key_value_form
      binding: invoice.data
      editable: true
    
    - id: validation_status
      title: "Validation Status"
      widget: toggle
      binding: invoice.validated
      editable: false  # Agent-only output
    
    - id: review_notes
      title: "Review Notes"
      widget: markdown_editor
      binding: invoice.notes
      editable: true

default_state:
  invoice:
    data: {}
    validated: false
    notes: ""

Agent Capabilities in Declarative Mode:

  • โœ… get_canvas_state(path?) - Read canvas state
  • โœ… update_canvas(path, value) - Update state data
  • โŒ Cannot modify UI structure (sections, layout, widgets)

Example Agent Interaction:

# Agent can read and update data
current_invoice = get_canvas_state("invoice.data")
update_canvas("invoice.validated", True)
update_canvas("invoice.notes", "Invoice validated successfully")
# But cannot add new sections or change widget types

2. Agent Mode (AI-Generated UI)

Definition: UI structure is fully generated by agent. YAML provides only app metadata and state schema.

Characteristics:

  • โœ… Fully Adaptive: UI adapts to data and context
  • โœ… Data-Driven: Structure reflects data analysis
  • โœ… Intelligent Layout: Agent optimizes for information
  • โš ๏ธ Less Predictable: Structure can change
  • โš ๏ธ Requires State Schema: Need schema for validation

Use Cases:

  • Data-driven dashboards that adapt to content
  • Dynamic analysis tools with varying data structures
  • Exploratory interfaces where structure is unknown
  • AI-powered UI generation based on data patterns

Example Configuration:

# config/apps/dynamic_analysis_app.yml
app_id: dynamic_analysis_app
title: Dynamic Data Analysis

canvas:
  source: agent  # Fully agent-driven
  # No sections defined - agent creates them

# REQUIRED: State schema for validation
state_schema:
  type: object
  properties:
    analysis:
      type: object
      properties:
        data:
          type: array
        insights:
          type: array
        visualizations:
          type: object
  required: [analysis]

# OPTIONAL: Initial state hint
default_state:
  analysis:
    data: []
    insights: []
    visualizations: {}

Agent Capabilities in Agent Mode:

  • โœ… get_state_schema() - Understand state structure
  • โœ… initialize_state() - Create state paths
  • โœ… get_canvas_state(path?) - Read state
  • โœ… update_canvas(path, value) - Update state
  • โœ… update_canvas_section() - Create/modify sections
  • โœ… update_canvas_structure() - Modify layout

Example Agent Workflow:

# 1. Understand data structure
schema = get_state_schema()

# 2. Initialize state paths
initialize_state({
    "analysis.data": [],
    "analysis.insights": [],
    "analysis.visualizations": {}
})

# 3. Create UI sections based on data
update_canvas_section(
    section_id="data_table",
    section_spec={
        "title": "Data Overview",
        "widget": "table_view",
        "binding": "analysis.data"
    }
)

# 4. Add insights section
update_canvas_section(
    section_id="insights",
    section_spec={
        "title": "Key Insights",
        "widget": "cards_grid",
        "binding": "analysis.insights"
    }
)

# 5. Organize layout
update_canvas_structure({
    "layout": {"type": "grid", "columns": 2},
    "order": ["data_table", "insights"]
})

3. Hybrid Mode (Mix of YAML + Agent)

Definition: UI structure is mix of YAML-defined and agent-generated sections. YAML defines stable core, agent fills placeholders and enhances hybrid sections.

Characteristics:

  • โœ… Best of Both Worlds: Stable core + adaptive content
  • โœ… Gradual Enhancement: Start static, add dynamic sections
  • โœ… Developer Control: Lock critical sections
  • โœ… Agent Flexibility: Fill placeholders, enhance sections
  • โš ๏ธ More Complex: Requires merge logic

Use Cases:

  • Core UI with dynamic insights
  • Stable forms with AI-generated recommendations
  • Fixed layout with adaptive content sections
  • Production apps needing both stability and flexibility

Example Configuration:

# config/apps/hybrid_invoice_app.yml
app_id: hybrid_invoice_app
title: Invoice Analysis

canvas:
  source: hybrid  # Mix of YAML + agent
  layout:
    type: grid
    columns: 12
  
  sections:
    # Declarative section (never changes)
    - id: invoice_overview
      source: declarative  # Explicit
      locked: true  # Agent cannot modify
      title: "Invoice Overview"
      widget: key_value_form
      binding: invoice.data
      editable: true
    
    # Placeholder section (agent fills)
    - id: agent_insights
      source: agent
      placeholder: true  # Mark as placeholder
      agent_hint: "Generate insights based on invoice data"
      title: "AI Insights"
      # No widget/binding - agent will create
    
    # Hybrid section (YAML base + agent enhancements)
    - id: validation_results
      source: hybrid
      title: "Validation Results"
      widget: toggle
      binding: validation.status
      # Agent can add more controls here

default_state:
  invoice:
    data: {}
  validation:
    status: false

Section Types in Hybrid Mode:

  1. Declarative Sections (source: declarative):

    • Rendered exactly as YAML defines
    • Agent cannot modify (if locked: true)
    • Bindings from YAML
  2. Placeholder Sections (source: agent, placeholder: true):

    • Agent fills with content
    • Replaces placeholder when agent generates
    • Bindings created by agent
  3. Hybrid Sections (source: hybrid):

    • YAML provides base structure
    • Agent can add controls, sections
    • Merges YAML + agent content

Agent Capabilities in Hybrid Mode:

  • โœ… get_canvas_structure() - See full structure
  • โœ… get_canvas_state() - Read state
  • โœ… update_canvas() - Update state
  • โœ… update_canvas_section() - Fill placeholders, enhance hybrid
  • โŒ Cannot modify locked: true sections
  • โŒ Cannot modify source: declarative sections (unless not locked)

Example Agent Interaction:

# 1. See current structure
structure = get_canvas_structure()

# 2. Fill placeholder section
update_canvas_section(
    section_id="agent_insights",
    section_spec={
        "widget": "cards_grid",
        "binding": "analysis.insights",
        "title": "AI-Generated Insights"
    }
)

# 3. Enhance hybrid section (add more controls)
update_canvas_section(
    section_id="validation_results",
    section_spec={
        "widget": "toggle",
        "binding": "validation.status",
        # Agent adds additional controls
        "additional_controls": [
            {
                "id": "confidence_score",
                "widget": "number",
                "binding": "validation.confidence"
            }
        ]
    }
)

# Cannot modify locked declarative sections
# update_canvas_section("invoice_overview", ...)  # โŒ Will fail

Placeholder Sections

Placeholder sections allow you to define sections in YAML that agents will fill with content. This is particularly useful in hybrid mode.

Configuration:

canvas:
  source: hybrid
  
  sections:
    - id: ai_recommendations
      source: agent
      placeholder: true  # Mark as placeholder
      agent_hint: "Generate recommendations based on current data"
      title: "AI Recommendations"
      # No widget/binding defined - agent creates these

How It Works:

  1. YAML defines placeholder section with placeholder: true
  2. Frontend shows loading/empty state until agent fills it
  3. Agent calls update_canvas_section() to fill placeholder
  4. Placeholder is replaced with agent-generated content

Example Agent Filling Placeholder:

# Agent detects placeholder section
structure = get_canvas_structure()
# Returns: {"sections": {"ai_recommendations": {"placeholder": true, ...}}}

# Agent fills placeholder
update_canvas_section(
    section_id="ai_recommendations",
    section_spec={
        "widget": "list_editor",
        "binding": "recommendations.list",
        "title": "AI Recommendations",
        "editable": true
    }
)

Mode Comparison

Aspect Declarative Agent Hybrid
UI Definition YAML manifest Agent generates Mix of both
Flexibility Fixed structure Fully adaptive Selective adaptation
Use Cases Stable forms, critical UI Data-driven dashboards Core UI + dynamic insights
State Schema Optional Recommended Recommended
Binding Management YAML-defined Agent creates Mix of both
Performance Predictable Needs optimization Balanced
Developer Control Full Minimal Selective

Choosing the Right Mode

Use Declarative Mode when:

  • โœ… UI structure is known and stable
  • โœ… You need predictable, consistent layouts
  • โœ… Critical UI that must remain unchanged
  • โœ… Production apps requiring type safety

Use Agent Mode when:

  • โœ… UI structure depends on data analysis
  • โœ… You need fully adaptive interfaces
  • โœ… Data-driven dashboards with varying content
  • โœ… Exploratory tools with unknown structure

Use Hybrid Mode when:

  • โœ… You have core UI that must stay stable
  • โœ… You want dynamic insights or recommendations
  • โœ… You need both developer control and agent flexibility
  • โœ… Gradual enhancement from static to dynamic

Quick Example

# config/apps/article_app.yml
app_id: article_app
title: "Article Editor"
description: "Collaborative article editing"

canvas:
  layout:
    type: grid
    columns: 2
    gap: 4
  
  sections:
    - id: title
      widget: text
      binding: article.title
      placeholder: "Enter article title..."
      validation:
        required: true
        max: 200
    
    - id: body
      widget: markdown_editor
      binding: article.body
      colSpan: 2
      placeholder: "Write your article..."
    
    - id: tags
      widget: tags_input
      binding: article.tags
      colSpan: 2
    
    - id: preview
      widget: markdown_viewer
      binding: article.body
      colSpan: 2
      editable: false

default_state:
  article:
    title: ""
    body: ""
    tags: []

Widget Library

Topaz Agent Kit provides 47 widget types organized into categories:

Input Widgets

Text Inputs:

  • text - Single-line text input
  • textarea - Multi-line text input
  • email - Email input with validation
  • url - URL input with validation
  • password - Password input with show/hide toggle
  • number - Number input with min/max/step
  • color - Color picker

Date/Time:

  • date - Date picker
  • datetime - Date and time picker
  • time - Time picker

Selection:

  • select - Dropdown select (single or multiple)
  • multiselect - Multi-select dropdown
  • checkbox - Single checkbox
  • checkbox_group - Multiple checkboxes group
  • radio - Radio button group
  • toggle - Toggle switch (on/off)

Rich Editors:

  • markdown_editor - Rich markdown editor with toolbar
  • rich_text_editor - WYSIWYG rich text editor
  • code_editor - Code editor with syntax highlighting

Other Inputs:

  • slider - Range slider input
  • rating - Star rating widget

Viewer Widgets (Read-only)

  • markdown_viewer - Markdown renderer
  • html_viewer - HTML renderer (sandboxed)
  • code_viewer - Code viewer with syntax highlighting
  • json_viewer - JSON viewer
  • pdf_viewer - PDF viewer
  • image_viewer - Image viewer (supports web URLs and local files)
  • video_viewer - Video player

Array/List Widgets

  • list_editor - Editable list/array editor
  • list_view - Read-only list view
  • key_value_form - Editable key-value pairs form
  • table_editor - Editable table/grid editor
  • table_view - Read-only table view

Cards Widgets

  • card - Single card widget
  • cards_grid - Grid of cards (supports editing)
  • cards_horizontal - Horizontal cards layout (supports editing)
  • cards_vertical - Vertical cards layout (supports editing)

Chips Widgets

  • chips_horizontal - Horizontal chips display (supports editing)
  • chips_vertical - Vertical chips display (supports editing)
  • chips_view - Read-only chips display
  • tags_input - Editable tags/chips input

File Upload Widgets

  • file_upload - File upload widget
  • image_upload - Image upload widget with preview

Map Widgets

  • map - Interactive map using OpenStreetMap/Leaflet (no API key required)
  • google_map - Interactive map using Google Maps (requires API key; see Maps & Full-Screen Widgets)

Layout Widgets

  • section - Section container widget
  • divider - Visual divider/separator
  • spacer - Spacing widget
  • tabs - Tabbed interface widget
  • accordion - Accordion/collapsible widget

Maps & Full-Screen Widgets

Map and viewer widgets (map, google_map, image_viewer, video_viewer, etc.) can fill the entire canvas when configured as full-screen widgets. This is ideal for map-centric apps, dashboards, and immersive viewers.

Full-Screen Configuration

Use fullScreenWidgets in your canvas config to make specific widgets fill the canvas (no labels, section titles hidden):

canvas:
  source: agent
  fullScreenWidgets: ["google_map"]  # or ["map"] for OSM
  title: ""
  # ...

Or set fullScreen: true on individual controls. For agent mode, fullScreenWidgets ensures the agent creates a single full-screen map control.

Map Widgets: map vs google_map

Widget Tiles API Key Use Case
map OpenStreetMap None General mapping, no setup
google_map Google Maps Required Directions, geocoding, rich features

Both share the same state shape: { center: [lat, lng], zoom, markers: [], route: [] }. Agents can use tools like geocode_place, get_directions to populate markers and routes.

Google Maps API Key Setup

Development (Next.js dev server):

  • Add NEXT_PUBLIC_GOOGLE_MAPS_API_KEY to apps/ui/.env.development

Production (packaged UI served by FastAPI):

  • Add GOOGLE_MAPS_API_KEY to your project .env file, or set as an environment variable when starting the server
  • The backend injects the key into the manifest at runtime; no build-time configuration needed
  • Users can provide their own key without modifying .env.production
# In project .env (e.g. projects/nexus/.env)
GOOGLE_MAPS_API_KEY=your_production_key_here

Map State Shape

default_state:
  google_map:  # or "map" for OSM
    center: [20, 0]
    zoom: 2
    markers: []
    route: []

Widget Properties

Each widget supports common properties and widget-specific properties:

Common Properties

All widgets support these at the section level:

sections:
  - id: my_field
    widget: text
    binding: data.field
    editable: true          # Enable/disable editing
    placeholder: "Enter..." # Placeholder text
    validation:             # Validation rules
      required: true
      min: 5
      max: 100
      pattern: "^[A-Z]"

Widget-Specific Properties

Configure via widgetProps:

sections:
  - id: article_title
    widget: text
    binding: article.title
    widgetProps:
      debounceMs: 300  # Debounce delay
  
  - id: category
    widget: select
    binding: article.category
    widgetProps:
      options:
        - value: "tech"
          label: "Technology"
        - value: "news"
          label: "News"
      multiple: false
  
  - id: content
    widget: markdown_editor
    binding: article.body
    widgetProps:
      debounceMs: 800
      toolbar: true
      preview: true
  
  - id: rating
    widget: rating
    binding: article.rating
    widgetProps:
      maxRating: 5
      size: lg
      allowHalf: true
  
  - id: images
    widget: image_viewer
    binding: article.images
    widgetProps:
      alt: "Article images"

Layout System

Configure layouts at the canvas level:

canvas:
  layout:
    type: grid        # grid, flex, columns, or stack
    columns: 3        # For grid/columns layout
    gap: 4            # Tailwind gap value
    direction: row    # For flex layout
    wrap: true        # For flex layout

Layout Types:

  • grid - CSS Grid layout with configurable columns
  • flex - Flexbox layout with direction and wrap options
  • columns - Multi-column layout
  • stack - Vertical stack (default)

Section-Level Layout:

sections:
  - id: field1
    widget: text
    binding: data.field1
    colSpan: 6        # Grid column span (1-12)
    width: "1/2"      # Width: "full", "1/2", "1/3", "auto", or pixel value
    order: 2          # Display order
    alignSelf: center # Self alignment
    responsive:       # Responsive overrides
      sm:
        colSpan: 12
        width: full
      md:
        colSpan: 6
        width: "1/2"
      lg:
        colSpan: 4
        width: "1/3"

Complete Widget Reference

For detailed widget properties and examples, see:

Example App Configurations

Article Editor:

app_id: article_app
title: "Article Editor"

canvas:
  layout:
    type: grid
    columns: 2
  
  sections:
    - id: title
      widget: text
      binding: article.title
      validation:
        required: true
    
    - id: body
      widget: markdown_editor
      binding: article.body
      colSpan: 2
    
    - id: tags
      widget: tags_input
      binding: article.tags
      colSpan: 2
    
    - id: preview
      widget: markdown_viewer
      binding: article.body
      colSpan: 2
      editable: false

default_state:
  article:
    title: ""
    body: ""
    tags: []

Form Builder:

app_id: form_app
title: "Dynamic Form"

canvas:
  layout:
    type: stack
    gap: 6
  
  sections:
    - id: name
      widget: text
      binding: form.name
      validation:
        required: true
    
    - id: email
      widget: email
      binding: form.email
      validation:
        required: true
    
    - id: age
      widget: number
      binding: form.age
      widgetProps:
        min: 18
        max: 100
        step: 1
    
    - id: country
      widget: select
      binding: form.country
      widgetProps:
        options:
          - value: "us"
            label: "United States"
          - value: "uk"
            label: "United Kingdom"
    
    - id: interests
      widget: checkbox_group
      binding: form.interests
      widgetProps:
        options:
          - value: "tech"
            label: "Technology"
          - value: "sports"
            label: "Sports"
        direction: horizontal
    
    - id: rating
      widget: rating
      binding: form.rating
      widgetProps:
        maxRating: 5
    
    - id: notes
      widget: textarea
      binding: form.notes
      widgetProps:
        rows: 5

Data Dashboard:

app_id: dashboard_app
title: "Data Dashboard"

canvas:
  layout:
    type: grid
    columns: 3
  
  sections:
    - id: summary
      widget: cards_grid
      binding: dashboard.summary
      colSpan: 3
      widgetProps:
        columns: 3
    
    - id: data_table
      widget: table_view
      binding: dashboard.data
      colSpan: 3
      widgetProps:
        headers:
          - "Name"
          - "Value"
          - "Status"
    
    - id: chart
      widget: image_viewer
      binding: dashboard.chart_url
      colSpan: 2
    
    - id: stats
      widget: list_view
      binding: dashboard.stats
      colSpan: 1

Advanced Features

State Validation:

app_id: validated_app
state_schema:
  type: object
  properties:
    article:
      type: object
      required: ["title", "body"]
      properties:
        title:
          type: string
          minLength: 5
          maxLength: 200
        body:
          type: string
          minLength: 100

Array Binding:

sections:
  - id: item_name
    widget: text
    binding: items[0].name  # Access array elements
  
  - id: item_tags
    widget: tags_input
    binding: items[0].tags  # Nested array access

Conditional Sections: Sections can be conditionally rendered based on state (future feature).

AI Agent Integration

Agents can interact with canvas state using tools:

# Agent tool: update_canvas
update_canvas(
    path="article.title",
    value="New Title"
)

# Agent tool: read_canvas
title = read_canvas(path="article.title")

Best Practices

  1. Use Appropriate Widgets: Choose widgets that match your data type
  2. Set Validation: Add validation rules for user inputs
  3. Responsive Design: Use responsive properties for mobile support
  4. Read-only Views: Use editable: false for agent-generated content
  5. State Schema: Define state_schema for complex data structures
  6. Default State: Provide sensible defaults in default_state

Advanced Features

  • Citation Support: Source attribution for AI responses with clickable links
  • Multi-session Management: Multiple concurrent chat sessions with persistence
  • Custom Branding: Logo, themes, and color customization
  • AG-UI Protocol: Complete AG-UI support for standardized agent interaction events

๐Ÿ“ก AG-UI Events Reference

Topaz Agent Kit implements the complete AG-UI protocol with 16 standard events plus custom convenience methods. All events are emitted through the AGUIEventEmitter class.

Standard AG-UI Events

Text Message Events

text_message_start(role: str = "assistant") -> str

  • Starts a new text message stream
  • Returns: message_id for tracking the message
  • Use: Beginning of agent responses or assistant messages

text_message_content(message_id: str, delta: str)

  • Streams incremental text content
  • Use: Streaming partial text updates during message generation

text_message_end(message_id: str)

  • Completes a text message
  • Use: Mark the end of a message stream

Tool Call Events

tool_call_start(tool_name: str, agent_name: Optional[str] = None) -> str

  • Starts a tool call execution
  • Returns: tool_call_id for tracking the call
  • Use: When an agent begins executing a tool

tool_call_args(tool_call_id: str, args: Dict[str, Any])

  • Sends tool call arguments
  • Use: Provide parameters for the tool call

tool_call_end(tool_call_id: str)

  • Completes tool call execution
  • Use: Mark the end of tool call execution

tool_call_result(tool_call_id: str, result: Any, error: Optional[str] = None)

  • Sends tool call result or error
  • Use: Provide the tool's output or error information

State Events

state_delta(delta: List[Dict[str, Any]])

  • Sends incremental state changes
  • Use: Update UI with partial state changes

messages_snapshot(messages: List[Dict[str, Any]])

  • Sends complete message history
  • Use: Initialize or update full conversation context

step_output(node_id: str, result: Optional[Any] = None, status: str = "completed", error_message: Optional[str] = None, ended_at: Optional[str] = None, elapsed_ms: Optional[int] = None)

  • Emits step output/result data (uses STATE_SNAPSHOT internally)
  • Use: Update agent node status and results in the UI
  • Note: Header comes from step_started, inputs from step_input

Run Lifecycle Events

run_started(run_id: Optional[str] = None, thread_id: Optional[str] = None, session_id: Optional[str] = None, pipeline_name: Optional[str] = None) -> str

  • Starts a new agent run or pipeline execution
  • Returns: run_id for tracking the run
  • Use: Beginning of pipeline or agent execution
  • Supports: Session and pipeline metadata via rawEvent field

run_finished(run_id: str, result: Optional[Any] = None, thread_id: Optional[str] = None)

  • Completes an agent run or pipeline
  • Use: Mark successful completion of execution

run_error(run_id: str, error: str, details: Optional[Dict[str, Any]] = None, thread_id: Optional[str] = None)

  • Reports an error during run execution
  • Use: Handle and report execution errors

run_metadata(run_id: str, pipeline_name: Optional[str] = None, agent_id: Optional[str] = None, framework: Optional[str] = None, model: Optional[str] = None, run_mode: Optional[str] = None, extra: Optional[Dict[str, Any]] = None)

  • Emits run metadata with pipeline/agent information (uses CUSTOM event)
  • Use: Provide additional context about the run (framework, model, etc.)

Step Lifecycle Events

step_started(step_id: Optional[str] = None, agent_name: Optional[str] = None, framework: Optional[str] = None, model: Optional[str] = None, run_mode: Optional[str] = None, protocol_support: Optional[List[str]] = None, started_at: Optional[str] = None) -> str

  • Starts an agent step with optional header metadata
  • Returns: step_id for tracking the step
  • Use: Beginning of individual agent execution
  • Supports: Framework, model, protocol metadata via rawEvent field
  • Auto-generates step names with counts for multiple executions

step_input(step_name: str, node_id: str, inputs: Optional[Dict[str, Any]] = None)

  • Emits step input data (uses CUSTOM event)
  • Use: Provide input parameters for a step
  • Note: Header comes from step_started

step_finished(step_id: str, result: Optional[Any] = None, status: Optional[str] = None, error: Optional[str] = None)

  • Completes an agent step with optional status and error
  • Use: Mark completion or failure of a step
  • Supports: Status and error via rawEvent field

Special Events

custom_event(name: str, value: Dict[str, Any])

  • Sends custom application-specific events
  • Use: Extend AG-UI protocol with domain-specific events

raw_event(data: Dict[str, Any])

  • Sends raw event data
  • Use: Low-level event emission when needed

Convenience Methods

Topaz Agent Kit provides convenience methods for common patterns:

hitl_request(gate_id: str, gate_type: str, title: str = "", description: str = "", fields: List[Dict[str, Any]] = None, options: List[Dict[str, Any]] = None, buttons: Dict[str, Any] = None, timeout_ms: int = 300000, on_timeout: str = "reject", context_key: str = None, retry_target: str = None, max_retries: int = None)

  • Emits HITL request event (uses CUSTOM event)
  • Use: Request human approval, input, or selection
  • Types: "approval", "input", "selection"

hitl_result(gate_id: str, decision: str, actor: str = "user", data: Any = None)

  • Emits HITL result event (uses CUSTOM event)
  • Use: Report human decision from HITL gate

edge_protocol(from_agent: str, to_agent: str, protocol: str, label: Optional[str] = None)

  • Emits edge protocol event (uses CUSTOM event)
  • Use: Document agent connections and communication protocols
  • Protocols: "A2A", "IN-PROC"

session_title_updated(session_id: str, title: str)

  • Emits session title update event (uses CUSTOM event)
  • Use: Update session title dynamically

assistant_response(data: Dict[str, Any])

  • Emits assistant response card event (uses CUSTOM event)
  • Use: Display structured assistant decision context in UI timeline
  • Data includes: assistant_response, tool_planned, tool_executed, tool_params, raw_tool_output, reasoning, success, error, framework, model, pipeline_name, agent_name, user_input
  • Timeline placement: After agent pipeline cards, before assistant message
  • Display controlled by frontend "Assistant Card" setting (always emitted by backend)

get_step_name(step_id: str) -> Optional[str]

  • Helper method to get step name for a given step_id
  • Use: Retrieve human-readable step name for display

Event Usage Patterns

Small Talk Flow (5-6 events)

RUN_STARTED โ†’ [CUSTOM(assistant_response)] โ†’ TEXT_MESSAGE_START โ†’ TEXT_MESSAGE_CONTENT โ†’
TEXT_MESSAGE_END โ†’ RUN_FINISHED

Note: CUSTOM(assistant_response) is always emitted by backend. Display is controlled by frontend "Assistant Card" setting.

Pipeline Flow (19+ events)

RUN_STARTED โ†’ STEP_STARTED โ†’ STEP_INPUT โ†’ STATE_DELTA โ†’ STEP_OUTPUT โ†’ STEP_FINISHED โ†’
STEP_STARTED โ†’ ... โ†’ CUSTOM(assistant_response) โ†’ TEXT_MESSAGE_START โ†’ TEXT_MESSAGE_CONTENT โ†’
TEXT_MESSAGE_END โ†’ RUN_FINISHED

Note: CUSTOM(assistant_response) is always emitted by backend. Display is controlled by frontend "Assistant Card" setting.

HITL Flow (6+ events)

RUN_STARTED โ†’ STEP_STARTED โ†’ CUSTOM(hitl_request) โ†’ ... โ†’ CUSTOM(hitl_result) โ†’
STEP_FINISHED โ†’ RUN_FINISHED

Event Implementation

All events are implemented in AGUIEventEmitter class (src/topaz_agent_kit/core/ag_ui_event_emitter.py):

from topaz_agent_kit.core.ag_ui_event_emitter import AGUIEventEmitter

# Initialize emitter with emit function
emitter = AGUIEventEmitter(emit_fn=your_emit_function)

# Use standard events
message_id = emitter.text_message_start(role="assistant")
emitter.text_message_content(message_id, "Hello")
emitter.text_message_end(message_id)

# Use convenience methods
emitter.hitl_request(
    gate_id="approve_content",
    gate_type="approval",
    title="Approve Content"
)

Event Structure

All events follow AG-UI protocol structure:

  • Type: Standard AG-UI event type (e.g., TEXT_MESSAGE_START)
  • Timestamp: Milliseconds since epoch
  • Data: Event-specific fields
  • rawEvent: Optional metadata for run/step events

๐Ÿšช Enhanced Human-in-the-Loop (HITL) System

Multi-Type Gates

Topaz Agent Kit features a sophisticated HITL system with three gate types for different interaction patterns:

Approval Gates

Simple approve/reject decisions with optional form fields:

gates:
  - id: review_content
    type: approval
    title: "Review Content"
    description: "Approve or reject the generated content"
    fields:
      - name: feedback
        label: "Feedback (optional)"
        type: textarea
    timeout_ms: 30000
    on_timeout: reject

Input Gates

Collect structured data from users via customizable forms:

gates:
  - id: collect_requirements
    type: input
    title: "Project Requirements"
    description: "Please provide the project requirements"
    fields:
      - name: project_name
        label: "Project Name"
        type: text
        required: true
      - name: budget
        label: "Budget"
        type: number
        required: true
        validation:
          min: 1000
          max: 100000
      - name: priority
        label: "Priority Level"
        type: select
        required: true
        options:
          - value: "low"
            label: "Low"
          - value: "high"
            label: "High"
    target_agents: ["project_manager"]
    context_key: "project_requirements"

Selection Gates

Present multiple options for user choice:

gates:
  - id: choose_approach
    type: selection
    title: "Choose Implementation Approach"
    description: "Select the best approach for this feature"
    options:
      - value: "microservices"
        label: "Microservices Architecture"
        description: "Break down into small, independent services"
      - value: "monolith"
        label: "Monolithic Architecture"
        description: "Single, unified application"
    target_agents: ["architect"]
    context_key: "implementation_approach"

Dynamic Flow Control

Gates are integrated directly into pipeline patterns with powerful flow control:

pattern:
  type: sequential
  steps:
    - node: content_author

    - gate: review_draft
      on_approve: continue
      on_reject: retry_node
      retry_target: content_author
      max_retries: 3

    - gate: choose_publication
      on_selection:
        blog: skip_to_node
        whitepaper: continue
        draft: stop
      skip_to:
        blog: simple_publisher

    - node: chief_editor
    - node: simple_publisher

Flow Control Actions

  • continue: Proceed to next step
  • stop: Terminate pipeline execution
  • retry_node: Re-run a specific agent with HITL feedback
  • skip_to_node: Jump to a different agent based on conditions

Context Injection

HITL data is automatically injected into agent contexts:

# Gate configuration
gates:
  - id: user_feedback
    type: input
    target_agents: ["content_writer", "editor"]
    context_key: "user_preferences"
# Agent prompt can now use:
# {{user_preferences.project_name}}
# {{user_preferences.budget}}
# {{user_preferences.deadline}}

Advanced HITL Features

Pre-populated Fields

Set default values for input gate fields to pre-fill forms with context data:

gates:
  - id: collect_clarification
    type: input
    title: "One more detail to plan your trip"
    description: "{{ trip_requester.clarification_prompt | default('Please provide missing details to continue.') }}"
    fields:
      - name: clarification_response
        label: "Your answer"
        type: textarea
        required: false
        default: "{{ trip_requester.suggested_response | default('') }}" # Pre-populate from context
        validation:
          min_length: 1
          max_length: 2000
    context_key: "trip_clarifications"
    context_strategy: append

Example with Dynamic Defaults:

gates:
  - id: review_booking
    type: input
    title: "Review Booking Details"
    fields:
      - name: traveler_name
        label: "Traveler Name"
        type: text
        default: "{{ user_profile.name }}" # Pre-fill from user profile
      - name: email
        label: "Email"
        type: email
        default: "{{ user_profile.email }}"
      - name: phone
        label: "Phone"
        type: tel
        default: "{{ user_profile.phone | default('') }}"
      - name: special_requests
        label: "Special Requests (optional)"
        type: textarea
        default: "{{ previous_booking.special_requests | default('') }}"

Options Source (Dynamic Selection Gates)

Pull selection options dynamically from upstream agent outputs:

gates:
  - id: select_flights
    type: selection
    title: "Select Flight"
    description: "Choose your preferred flight option"
    options_source: "trip_flights_expert.flights_options" # Pull from expert output
    buttons:
      submit:
        label: "CONFIRM"
        description: "Confirm your flight selection"
      cancel:
        label: "CANCEL"
        description: "Cancel and stop pipeline"
    context_key: "flight_selection"
    timeout_ms: 300000

How it Works:

  1. Expert Agent Output (e.g., trip_flights_expert):
{
  "flights_options": [
    {
      "value": "offer_123",
      "id": "offer_123",
      "label": "1 stop: $720.00",
      "description": "PHX โ†’ JFK โ†’ CDG; Depart: 2025-05-10 10:00, Arrive: 2025-05-11 08:30; Duration: 12h 15m; Carriers: AA, AF"
    },
    {
      "value": "offer_456",
      "id": "offer_456",
      "label": "Non-stop: $850.50",
      "description": "PHX โ†’ CDG; Depart: 2025-05-10 08:00, Arrive: 2025-05-10 16:30; Duration: 11h 30m; Carrier: AA"
    }
  ],
  "has_flights": true
}
  1. Gate Configuration:
gates:
  - id: select_flights
    type: selection
    options_source: "trip_flights_expert.flights_options" # Automatically pulls from expert
    # Each option object is used as-is: {value, id, label, description}
  1. User Selection: User selects an option, and the entire option object is stored in flight_selection.

Real-World Example (Trip Planner):

pattern:
  type: sequential
  steps:
    - node: trip_requester

    - type: parallel
      steps:
        # Flights domain: Expert โ†’ Gate (appears immediately when expert finishes)
        - type: sequential
          condition: "trip_requester.flights_ready == true"
          steps:
            - node: trip_flights_expert
            - gate: select_flights
              condition: "trip_flights_expert.has_flights == true"
              on_submit: continue
              on_cancel: stop

gates:
  - id: select_flights
    type: selection
    title: "Select Flight"
    description: "Choose your preferred flight option"
    options_source: "trip_flights_expert.flights_options" # Dynamic options from expert
    context_key: "flight_selection" # Stores selected option object

Multi-Domain Selection:

gates:
  # Flights selection (from flights expert)
  - id: select_flights
    type: selection
    options_source: "trip_flights_expert.flights_options"

  # Hotels selection (from hotels expert)
  - id: select_hotels
    type: selection
    options_source: "trip_hotels_expert.hotels_options"

  # Activities selection (from activities expert)
  - id: select_activities
    type: input
    fields:
      - name: selected_activities
        label: "Select Activities"
        type: checkbox
        options_source: "trip_activities_expert.activities_options" # Works with checkbox too

Conditional Fields

Show/hide fields dynamically based on expressions evaluated at runtime:

gates:
  - id: collect_booking_details
    type: input
    title: "Booking Information"
    fields:
      - name: booking_type
        label: "Booking Type"
        type: select
        required: true
        options:
          - value: "flight"
            label: "Flight"
          - value: "hotel"
            label: "Hotel"
          - value: "package"
            label: "Flight + Hotel Package"

      # Show only if booking_type is "flight" or "package"
      - name: origin
        label: "Origin Airport"
        type: text
        required: true
        condition: "booking_type == 'flight' OR booking_type == 'package'"

      - name: destination
        label: "Destination Airport"
        type: text
        required: true
        condition: "booking_type == 'flight' OR booking_type == 'package'"

      # Show only if booking_type is "hotel" or "package"
      - name: check_in
        label: "Check-in Date"
        type: date
        required: true
        condition: "booking_type == 'hotel' OR booking_type == 'package'"

      - name: check_out
        label: "Check-out Date"
        type: date
        required: true
        condition: "booking_type == 'hotel' OR booking_type == 'package'"

      # Show only if previous booking exists
      - name: use_loyalty_points
        label: "Use Loyalty Points"
        type: checkbox
        condition: "user_profile.loyalty_points > 0"

      # Show based on nested field access
      - name: special_requests
        label: "Special Requests"
        type: textarea
        condition: "trip_requester.require_special_services == true"

Advanced Conditional Examples:

gates:
  - id: travel_details
    type: input
    fields:
      # Show based on array length
      - name: traveler_count
        label: "Number of Travelers"
        type: number
        default: 1

      # Show fields for each traveler (conditional repetition)
      - name: traveler_1_name
        label: "Traveler 1 Name"
        type: text
        condition: "traveler_count >= 1"

      - name: traveler_1_age
        label: "Traveler 1 Age"
        type: number
        condition: "traveler_count >= 1"

      - name: traveler_2_name
        label: "Traveler 2 Name"
        type: text
        condition: "traveler_count >= 2"

      - name: traveler_2_age
        label: "Traveler 2 Age"
        type: number
        condition: "traveler_count >= 2"

      # Show based on upstream agent output
      - name: preferred_class
        label: "Preferred Class"
        type: select
        condition: "trip_requester.flights_ready == true"
        options:
          - value: "economy"
            label: "Economy"
          - value: "business"
            label: "Business"
          - value: "first"
            label: "First Class"

      # Show based on complex expression
      - name: insurance_required
        label: "Travel Insurance Required"
        type: checkbox
        condition: "trip_requester.total_cost > 1000 AND trip_requester.international == true"

Conditional Field Operators:

  • Comparison: ==, !=, >, <, >=, <=
  • Boolean: AND, OR, NOT
  • String: contains, starts_with, ends_with, in, not in
  • Null checks: is null, is not null
  • Functions: len(array) for array length
  • Nested access: agent_id.field, agent_id.nested.field

Example with Nested Conditions:

gates:
  - id: review_claim
    type: input
    fields:
      - name: claim_amount
        label: "Claim Amount"
        type: number

      # Show if amount exceeds threshold
      - name: requires_manual_review
        label: "Requires Manual Review"
        type: checkbox
        condition: "claim_amount > 10000"

      # Show if fraud detected AND amount is high
      - name: fraud_investigation
        label: "Fraud Investigation Required"
        type: checkbox
        condition: "claim_analyzer.fraud_detected == true AND claim_amount > 5000"

      # Show based on upstream agent flag
      - name: priority_notes
        label: "Priority Notes"
        type: textarea
        condition: "claim_analyzer.priority == 'high' OR claim_analyzer.risk_score > 0.8"

External HITL Description Templates

For complex HITL gate descriptions with extensive formatting, dynamic content, or reusable templates, you can store them in separate Jinja2 files within the config/hitl/ folder. This keeps your pipeline YAML files clean and makes gate descriptions easier to maintain.

When to Use Separate Jinja Files:

  • โœ… Use separate files for:

    • Complex descriptions with multiple sections (e.g., claim details, validation summary, recommendations)
    • Extensive markdown tables and formatting
    • Long descriptions (50+ lines)
    • Descriptions with complex nested conditionals and loops
    • Templates that benefit from better syntax highlighting and editing
  • โœ… Use inline descriptions for:

    • Simple, short descriptions (1-10 lines)
    • Single-line dynamic content
    • Quick prototypes or simple approval gates

Examples:

  • eci_decision_gate.jinja - Complex gate with multiple sections, tables, and validation summaries (moved from inline)
  • tci_recommendation_review_gate.jinja - Extensive risk assessment display with multiple tables
  • Simple inline: description: "{{ agent_id.summary | default('Review the results.') }}"

Folder Structure:

project/
โ”œโ”€โ”€ config/
โ”‚   โ”œโ”€โ”€ hitl/                    # HITL gate description templates
โ”‚   โ”‚   โ”œโ”€โ”€ review_gate.jinja
โ”‚   โ”‚   โ”œโ”€โ”€ approval_gate.jinja
โ”‚   โ”‚   โ””โ”€โ”€ input_gate.jinja
โ”‚   โ”œโ”€โ”€ pipelines/
โ”‚   โ”‚   โ””โ”€โ”€ my_pipeline.yml
โ”‚   โ””โ”€โ”€ ...

Usage in Pipeline YAML:

Instead of inline descriptions, reference external Jinja templates:

gates:
  - id: review_content
    type: approval
    title: "Review Content"
    description:
      jinja: "hitl/review_gate.jinja"  # Path relative to config/ directory
    timeout_ms: 300000
    on_timeout: reject

Template Features:

HITL description templates support full Jinja2 syntax and have access to the entire pipeline context:

  • Upstream Agent Outputs: Access data from any agent that executed before the gate
  • Context Variables: Use any variables stored in the pipeline context
  • Conditional Logic: Show/hide content based on conditions
  • Formatting: Use Markdown, HTML, and Jinja2 filters for rich formatting

Example Template (config/hitl/review_gate.jinja):

## Application Review

**Application ID:** {{ current_application.application_id }}

### Application Details

| Field | Value |
|-------|-------|
| Applicant Name | {{ current_application.applicant_name }} |
| Requested Amount | {% if current_application.requested_amount %}{{ currency_symbol }}{{ "{:,.0f}".format(current_application.requested_amount) }}{% else %}N/A{% endif %} |
| Status | {{ current_application.status }} |

### Risk Assessment

{% if risk_analyzer %}
**Risk Score:** <span style="color: {{ 'green' if risk_analyzer.score < 50 else 'orange' if risk_analyzer.score < 75 else 'red' }}; font-weight: bold;">{{ risk_analyzer.score }}</span> / 100

**Risk Level:** {{ risk_analyzer.risk_level }}

{% if risk_analyzer.risk_factors %}
**Risk Factors:**
{% for factor in risk_analyzer.risk_factors %}
- {{ factor.name }}: {{ factor.score }} ({{ factor.severity }})
{% endfor %}
{% endif %}
{% else %}
Risk assessment is being processed...
{% endif %}

### Recommendation

{% if recommendation_generator %}
**Recommended Action:** {{ recommendation_generator.action }}
**Confidence:** {{ recommendation_generator.confidence }}%
**Rationale:** {{ recommendation_generator.rationale }}
{% endif %}

Benefits:

  • โœ… Separation of Concerns: Keep pipeline logic separate from UI descriptions
  • โœ… Reusability: Share templates across multiple gates or pipelines
  • โœ… Maintainability: Update descriptions without touching pipeline YAML
  • โœ… Version Control: Track description changes independently
  • โœ… Complex Formatting: Support for rich Markdown/HTML without cluttering YAML

Path Resolution:

  • Templates are resolved relative to the config/ directory
  • Use forward slashes (/) as path separators
  • Example: hitl/review_gate.jinja resolves to config/hitl/review_gate.jinja

Fallback Behavior:

If a template file cannot be loaded:

  • The system logs a warning
  • Falls back to an empty string (gate will still function, just without description)
  • Pipeline execution continues normally

๐ŸŽ›๏ธ Operations Center

The Operations Center provides a centralized interface for managing async HITL cases, monitoring pipeline health, and reviewing cases across multiple pipelines. It's designed for operations teams who need to efficiently review, approve, and manage large volumes of cases.

Overview

The Operations Center provides:

  • Centralized Case Management: View and manage cases from all pipelines in one place
  • Pipeline-Specific Views: Custom columns, fields, and analytics per pipeline
  • Operations Assistant: Natural language interface for managing cases
  • Dashboard Analytics: Pipeline-specific metrics, charts, and timelines
  • Case Detail Views: Comprehensive case information with tabs for review, data, timeline, and chat
  • Bulk Operations: Approve/reject multiple cases efficiently

Accessing Operations Center

Navigate to /operations in the web UI to access the Operations Center. The interface provides:

Main Components:

  1. Case List Panel: Filterable table of all cases
  2. Dashboard: Pipeline-specific analytics and metrics
  3. Case Detail Modal: Comprehensive case review interface
  4. Operations Assistant: Chat-based case management

Case List Panel

The case list provides a tabbed view for managing cases:

"All" Tab:

  • Shows all cases across all pipelines
  • Common fields: Case ID, Pipeline, Status, HITL Status, Created At
  • Filter by pipeline, status, time range, and search

Pipeline-Specific Tabs:

  • Custom columns defined in list_view.column_order
  • Pipeline-specific fields from list_view.pipeline_fields
  • Filtered by same criteria as "All" tab

Features:

  • Status Badges: Visual indicators for Pending, In-Progress, Completed, Failed
  • Sorting: Click column headers to sort
  • Filtering: Filter by pipeline, status, date range, and search
  • Bulk Actions: Select multiple cases for bulk operations

Dashboard

Pipeline-specific analytics cards provide insights into case processing:

Card Types:

  • Metric Cards: Count, sum, average, min, max of fields
  • Percentage Cards: Ratio calculations (e.g., anomaly detection rate)
  • Distribution Charts: Donut/bar charts showing value distributions
  • Timeline Cards: Case creation and completion timelines

Configuration Example:

# config/operations/my_pipeline.yml
dashboard:
  cards:
    # Percentage metric
    - type: "percentage"
      title: "Anomaly Detection Rate"
      icon: "AlertTriangle"
      numerator:
        field: "analyzer.anomaly_detected"
        filter: true
      denominator:
        field: "total"
      color: "amber"
    
    # Numeric metric
    - type: "metric"
      title: "Average Confidence Score"
      icon: "TrendingUp"
      field: "analyzer.confidence_score"
      aggregation: "avg"
      format: "number"
      decimals: 2
      color: "blue"
    
    # Distribution chart
    - type: "donut"
      title: "Anomaly Types"
      icon: "PieChart"
      field: "analyzer.anomaly_type"
      show_legend: true
      show_percentages: true
      value_mapping:
        "capital_revenue_misclassification": "Capital/Revenue"
        "lease_rou_misclassification": "Lease/ROU"

Case Detail Modal

The case detail modal provides comprehensive case information across multiple tabs:

Review Tab:

  • HITL gate information and description
  • Approve/Reject buttons
  • Optional form fields for additional input
  • Direct response to HITL requests

Data Tab:

  • Structured case data organized by sections
  • All agent outputs displayed
  • Field types: text, multiline, number, boolean, list, object
  • Conditional sections based on data availability

Timeline Tab:

  • System events: case created, completed, failed, HITL queued/responded
  • Custom events from agent outputs
  • Chronological view of case lifecycle
  • Event details with timestamps

Documents Tab (if configured):

  • Uploaded documents
  • Generated documents
  • Document preview and download

Chat Tab:

  • Operations Assistant for natural language interactions
  • Ask questions about cases
  • Approve/reject via chat commands
  • Get case summaries and insights

Operations Assistant

The Operations Assistant is a specialized AI assistant for managing cases through natural language:

Capabilities:

  • โœ… Approve/reject HITL requests
  • โœ… Get case details and summaries
  • โœ… List cases with filters
  • โœ… Answer questions about cases
  • โœ… Bulk operations via chat commands

Example Interactions:

User: "Show me all pending cases for invoice pipeline"
Assistant: Lists all pending cases with details

User: "Approve case CASE-12345"
Assistant: Approves the case and resumes pipeline

User: "Reject case CASE-12345 with notes: Invalid data format"
Assistant: Rejects with notes and resumes pipeline

User: "What's the status of case CASE-12345?"
Assistant: Provides detailed case status and information

Configuration:

# config/prompts/operations_assistant.jinja
You are the Operations Assistant for managing HITL cases.

You have access to these tools:
- approve_hitl_request(queue_item_id, notes?)
- reject_hitl_request(queue_item_id, notes?)
- get_case_details(case_id)
- get_queue_items(filters?)
- get_case_list(filters?)

Help users manage cases efficiently through natural language.

Base Templates for Main Chat, App Assistant, and Operations Assistant

The kit provides base Jinja2 templates for the main chat (intent classifier), the App Assistant (per-app chat in App Mode), and the Operations Assistant so projects only override whatโ€™s project-specific. Shared rules, output format, and reasoning stay in the base; projects (or per-app prompts) supply context and examples.

Why use base templates:

  • Single source of truth: Critical rules, JSON format, tool-selection logic, and โ€œThoughtโ€ (reasoning) instructions live in one place.
  • Less duplication: Projects donโ€™t copy long prompts; they extend the base and fill in pipelines (or app-only tools) and examples.
  • Easier updates: Kit improvements to the base (e.g. new rules or reasoning wording) apply to all projects that extend it.

Main chat (intent classifier):

  • Base: src/topaz_agent_kit/prompts/base/assistant_intent_classifier_base.jinja โ€” name, critical rules, triggered pipelines (optional), assistant_response guidelines, Reasoning (Thought Process), session title, suggested questions, tool selection logic, output format, and a default Conversational example.
  • Project file: config/prompts/assistant_intent_classifier.jinja in your project (or in the starter template). Use {% extends "base/assistant_intent_classifier_base.jinja" %} and override only:
    • {% block pipelines_list %} โ€” list of pipelines (and โ€œUSE THIS whenโ€ฆโ€) or, for app-only projects (e.g. Nexus), override {% block available_tools %} and {% block tool_planned_options %} / {% block tool_executed_options %} with app tools (e.g. insert_widget, get_current_datetime, run_process_file_turn).
    • {% block examples %} โ€” start with {{ super() }} to keep the base Conversational example, then add your pipeline/tool examples. Use overall thought process in each exampleโ€™s reasoning (what the user wanted, how you interpreted it, and how you chose the response/tool).
  • Pipeline projects (PA, basic, author, icp, ecgc, ensemble): override pipelines_list and examples only.
  • App-only projects (e.g. Nexus): override triggered_pipelines (empty), available_tools (app-only tools, no execute_pipeline/execute_agent), tool_planned_options / tool_executed_options, and examples.

App Assistant (App Mode):

  • Base: src/topaz_agent_kit/prompts/base/app_assistant_base.jinja โ€” generic app-assistant instructions (canvas tools, agent/hybrid/declarative mode, widget usage, output format). Contains overridable blocks such as assistant_intro, canvas_mode_instructions, current_state, and app_guidelines.
  • Per-app file: Each app can have its own prompt, e.g. config/prompts/article_app_assistant.jinja, config/prompts/recipe_app_assistant.jinja. Use {% extends "base/app_assistant_base.jinja" %} and override the blocks you need (e.g. assistant_intro for name and role, current_state for canvas state summary, app_guidelines for app-specific behavior). The base supplies canvas tools, structure rules, and reasoning; app prompts supply app context and guidelines. The appโ€™s agent config references its prompt via prompt.jinja: prompts/<app_id>_app_assistant.jinja (or a custom path).

Operations Assistant:

  • Base: src/topaz_agent_kit/prompts/base/operations_assistant_base.jinja โ€” full Operations Assistant prompt with a single overridable block.
  • Project file: config/prompts/operations_assistant.jinja. Use {% extends "base/operations_assistant_base.jinja" %} and override only {% block project_and_pipeline_context %} with project- and pipeline-specific context (e.g. which pipelines exist, how to refer to cases, or a short workflow list). All other rules and tool usage come from the base.

Loader behavior: The orchestration layer loads the projectโ€™s prompt file (e.g. from config/prompts/) and passes template_path so that {% extends "base/..." %} resolves correctly. Projects that donโ€™t provide a file fall back to whatever the loaderโ€™s default is (often the base rendered as-is, which for the classifier leaves the pipeline list empty).

Assistant Thought Process (Reasoning)

All three assistant experiences expose a Thought (reasoning) section so users can see why the assistant chose a given response or tool. The backend sends a reasoning (or thought_process) field; the UI shows it in a collapsible โ€œThoughtโ€ block when the execution setting is enabled.

Where Thought appears:

  1. Main chat โ€” The intent classifier returns a reasoning field (overall thought process: understanding + decision, including which pipeline/agent/tool). The backend adds it to the assistant_response event as thought_process; the main chat UI shows a โ€œThoughtโ€ section above the assistant message when Thought Process is on in execution settings.
  2. App Assistant โ€” App assistant turns include reasoning and tool steps. The app chat UI shows โ€œThoughtโ€ above the assistant reply when the setting is enabled.
  3. Operations Assistant โ€” Operations Assistant responses include reasoning and tool usage. The Case Detail Chat (and Operations Chat Panel) shows โ€œThoughtโ€ above the assistant message when the setting is enabled.

Enabling or hiding Thought: In the UI, use Execution settings (e.g. in the sidebar): turn Thought Process on or off. When on, all three assistants show the collapsible Thought section when the backend provides reasoning/thought_process. No extra database storage is required; the assistant response payload (including thought_process) is already persisted with the session.

Operations Configuration

Configure how cases are displayed and managed:

Identity Configuration:

# config/operations/my_pipeline.yml
identity:
  prefix: "CASE"                    # Case ID prefix (e.g., "CASE-ABC12345")
  uniqueness: "uuid_suffix"         # "uuid_suffix" (default), "timestamp", or "none"

Detail View Configuration:

detail_view:
  # Modal header
  modal:
    title: "Case Details for {{ current_item.id }}"
    subtitle: "Transaction: {{ current_item.transaction_id }}"
  
  # Tab configuration
  tabs:
    order:
      - overview
      - data
      - timeline
      - review
      - review_response_outcome
  
  # Data sections
  sections:
    - name: "Item Details"
      fields:
        - field: "current_item.id"
          label: "Item ID"
          type: text
        - field: "analyzer.result"
          label: "Result"
          type: text

List View Configuration:

list_view:
  # Pipeline-specific fields
  pipeline_fields:
    - key: "transaction_id"
      field: "current_item.transaction_id"
      label: "Transaction ID"
      type: text
    - key: "amount"
      field: "current_item.amount"
      label: "Amount"
      type: number
      value_mapping:
        "capital_revenue_misclassification": "CAPITAL/REVENUE"
      color_mapping:
        "high": "red"
        "medium": "amber"
        "low": "green"
  
  # Column order
  column_order:
    - "case_id"
    - "transaction_id"
    - "amount"
    - "status"
    - "hitl_status"
    - "created_at"

Post-run hooks (ops pipelines)

When an ops-scoped pipeline (run for an existing case from the Operations Center) completes successfully, you can run post-run actions defined in that pipeline's YAML. No pipeline-specific code lives in the orchestrator; all behavior is driven by the pipeline config under post_run.

Where to configure: In the pipeline file that is run for a case (e.g. config/pipelines/move_eligibility_checks.yml), add a top-level post_run key.

Available hooks:

Key Purpose
update_portal_state Update the case's linked portal session state (e.g. move status, scheduled date).
update_case_data Patch case_data at dotted paths (store pipeline results on the case).
add_case_note Add a timeline note (e.g. "Eligibility check completed. Result: Eligible.").
update_case_status Set case status (e.g. completed, open, processing).
webhook Send an HTTP request (POST/PUT) to an external URL with optional Jinja-built payload.
trigger_pipeline Optionally run another ops pipeline for the same case (e.g. chain eligibility โ†’ scheduling).

1. update_portal_state

Requires the case to have a session_id (portal-linked case). Updates the portal state for that session at the given paths.

# In config/pipelines/my_ops_pipeline.yml
post_run:
  update_portal_state:
    session_from: case
    updates:
      - path: move.status
        value_expression: |
          {% if agent_id.parsed.address_ok and agent_id.parsed.documents_ok %}Eligible{% else %}Not Eligible{% endif %}
      - path: some.other.path
        value: "literal"
  • path: Dotted path into portal state.
  • value: Literal value, or
  • value_expression: Jinja2 string; context is the pipeline's final upstream (agent outputs).

2. update_case_data

Patches the case's case_data at dotted paths. Use to store pipeline results for display or downstream use.

post_run:
  update_case_data:
    path_prefix: "result."   # optional: all paths prefixed (e.g. result.summary)
    updates:
      - path: eligibility_summary
        value_expression: "{{ my_agent.parsed.summary | default('Done') }}"

3. add_case_note

Adds a note to the case (Notes tab and Timeline). Rendered with Jinja; context includes final_upstream, case, and case_id.

post_run:
  add_case_note:
    note_expression: |
      Eligibility check completed. Result: {{ my_agent.parsed.summary | default('N/A') }}.
    internal: true          # true = internal only; false = visible to customer
    added_by: "post_run"    # optional

4. update_case_status

Sets the case status (e.g. after a "close case" pipeline).

post_run:
  update_case_status:
    status: "completed"
    # Or use expression:
    # status_expression: "{% if my_agent.close %}completed{% else %}open{% endif %}"

Context for status_expression: final_upstream, case, case_id.

5. webhook

Sends an HTTP request after the pipeline completes. Requires httpx to be installed.

post_run:
  webhook:
    url: "https://api.example.com/case-updated"
    # Or dynamic:
    # url_expression: "https://api.example.com/cases/{{ case_id }}/notify"
    method: POST
    payload_expression: |
      {"case_id": "{{ case_id }}", "summary": "{{ my_agent.parsed.summary | default('') }}"}
    headers:          # optional
      X-Custom: "value"
    timeout: 10       # optional, seconds

Context for url_expression and payload_expression: final_upstream, case, case_id. payload_expression must evaluate to valid JSON.

6. trigger_pipeline

Runs another ops pipeline for the same case (e.g. after eligibility, auto-run scheduling). The target pipeline must be in ops_pipelines in pipelines.yml.

post_run:
  trigger_pipeline:
    pipeline_id: move_scheduling   # or use pipeline_id_expression for Jinja
    condition_expression: |        # optional; if false, skip trigger
      {{ my_agent.parsed.eligible | default(false) }}

Use with care to avoid unbounded chains; consider a single level of chaining.

Best Practices

  1. Case Configuration:

    • Include all relevant agent outputs in detail_view
    • Use conditional sections for optional data
    • Keep field paths simple and clear
    • Organize sections logically for easy review
  2. Dashboard Design:

    • Focus on key metrics for quick insights
    • Use color coding for status indicators
    • Include distribution charts for pattern analysis
    • Show timelines for case lifecycle tracking
  3. Operations Assistant:

    • Use for bulk operations and complex queries
    • Leverage natural language for efficiency
    • Review cases in priority order
    • Monitor case status and pipeline health
  4. Field Display:

    • Use value mappings for user-friendly labels
    • Apply color coding for visual status indicators
    • Include icons for quick recognition
    • Format numbers appropriately (decimals, units)

Example: Complete Operations Setup

# config/operations/invoice_review.yml
identity:
  prefix: "INV"
  uniqueness: "uuid_suffix"

detail_view:
  modal:
    title: "Invoice Review: {{ current_item.invoice_number }}"
    subtitle: "Amount: ${{ current_item.amount | round(2) }}"
  
  tabs:
    order:
      - overview
      - data
      - timeline
      - review
  
  sections:
    - name: "Invoice Details"
      fields:
        - field: "current_item.invoice_number"
          label: "Invoice Number"
          type: text
        - field: "current_item.amount"
          label: "Amount"
          type: number
          decimals: 2
    
    - name: "Review Results"
      fields:
        - field: "reviewer.approved"
          label: "Approved"
          type: boolean
        - field: "reviewer.notes"
          label: "Review Notes"
          type: multiline

list_view:
  pipeline_fields:
    - key: "invoice_number"
      field: "current_item.invoice_number"
      label: "Invoice #"
      type: text
    - key: "amount"
      field: "current_item.amount"
      label: "Amount"
      type: number
      decimals: 2
  
  column_order:
    - "case_id"
    - "invoice_number"
    - "amount"
    - "status"
    - "hitl_status"

dashboard:
  cards:
    - type: "metric"
      title: "Total Invoices"
      field: "total"
      aggregation: "count"
      color: "blue"
    
    - type: "percentage"
      title: "Approval Rate"
      numerator:
        field: "reviewer.approved"
        filter: true
      denominator:
        field: "total"
      color: "green"

โšก Async HITL (Human-in-the-Loop)

Async HITL enables pipelines to continue processing while waiting for human review, making it ideal for batch processing, loop patterns, and high-throughput workflows where blocking on each review would be inefficient.

Why Async HITL?

Traditional Sync HITL blocks pipeline execution until a human responds:

  • โœ… Simple and straightforward
  • โŒ Pipeline stops completely, waiting for response
  • โŒ Inefficient for batch processing (e.g., processing 100 items where only 10 need review)
  • โŒ Can't process other items while waiting

Async HITL queues review requests and continues processing:

  • โœ… Pipeline continues processing other items
  • โœ… Perfect for batch/loop workflows
  • โœ… Review requests queued for later processing
  • โœ… Cases tracked independently
  • โœ… Resume from checkpoints when ready

Use Cases:

  • Batch Processing: Process 1000 claims, queue complex ones for review, continue with simple ones
  • Loop Patterns: Process items in a loop, queue problematic items, continue with remaining items
  • High-Throughput: Don't block on individual reviews when processing many items
  • Operations Centers: Centralized review queue for multiple pipelines

Enabling Async HITL

Enable async HITL in your pipeline configuration:

# config/pipelines/my_pipeline.yml
name: "My Pipeline"
description: "Batch processing with async HITL"

# Enable async HITL mode
execution_settings:
  hitl_mode: "async"              # "sync" (default) or "async"
  checkpoint_expiry_days: 7       # How long checkpoints remain valid

# Configure case management (required for async HITL)
operations:
  config_file: "operations/my_pipeline.yml"  # Operations configuration file
  tracking_variables:                    # Optional: customize variable names
    hitl_queued: "hitl_queued_cases"    # Default: "hitl_queued_cases"
    completed: "completed_cases"         # Default: "completed_cases"

Key Settings:

  • hitl_mode: "async": Enables async HITL mode
  • checkpoint_expiry_days: How long checkpoints remain valid (default: 7 days)
  • operations.config_file: Path to operations YAML configuration (required)
  • tracking_variables: Optional customization of context variable names

How Async HITL Works

  1. Pipeline Execution:

    • Pipeline runs normally until it hits a HITL gate
    • Instead of blocking, creates a checkpoint and queues the request
    • Pipeline continues processing (especially useful in loops)
  2. Checkpoint Creation:

    • Full execution context saved (all agent outputs up to the gate)
    • Case created with extracted data for display
    • HITL request queued for review
  3. Operations UI:

    • Review requests appear in Operations Center
    • Users can review cases, see data, and respond
    • Responses trigger pipeline resumption
  4. Resume Execution:

    • When user responds, pipeline resumes from checkpoint
    • Pre-gate agents are skipped (outputs already in checkpoint)
    • Post-gate agents execute with HITL response

Case YAML Configuration

Case YAML files define how cases are displayed and identified in the Operations UI. Create a case configuration file:

# config/operations/my_pipeline.yml

# Case type for categorization
case_type: "my_case_type"

# Identity configuration - how to identify cases
identity:
  prefix: "CASE"                    # Case ID prefix (e.g., "CASE-ABC12345")
  uniqueness: "uuid_suffix"         # "uuid_suffix" (default), "timestamp", or "none"

# Detail view - what to show in case detail modal
detail_view:
  # Optional: control Case Detail modal header and tabs
  modal:
    # Simple template form (uses Jinja-style {{ }})
    title: "Case Details for {{ current_item.id }}"
    subtitle: "Expression: {{ current_item.expression }}"

    # Optional: expression form (uses expression evaluator)
    # title_expression takes precedence over title when provided
    # title_expression: "'Case#: ' + current_item.id if current_item.id else 'Case (ID missing)'"

  tabs:
    # Optional: base tab order. Tabs are filtered by availability at runtime.
    order:
      - overview
      - data
      - timeline
      - documents
      - review
      - review_response_outcome

    # Optional: human-friendly labels for tabs
    labels:
      overview: "Overview"
      data: "Data"
      timeline: "Timeline"
      documents: "Documents"
      review: "Review"
      review_response_outcome: "Outcome"

    # Optional: expression-based labels (advanced)
    # labels_expression:
    #   overview: "'Overview for ' + current_item.id if current_item.id else 'Overview'"

  # Sections power the "Data" tab content
  sections:
    - name: "Problem Details"
      fields:
        - field: "current_item.id"
          label: "Item ID"
          type: text
        - field: "current_item.expression"
          label: "Expression"
          type: text
    
    - name: "Analysis"
      fields:
        - field: "analyzer.result"
          label: "Result"
          type: text
        - field: "analyzer.confidence"
          label: "Confidence"
          type: number
    
    # Conditional sections - only show if condition is true
    - name: "Review"
      condition: "reviewer IS NOT NULL"  # Only show if reviewer ran
      fields:
        - field: "reviewer.approved"
          label: "Approved"
          type: boolean

Field Types:

  • text: Single-line text
  • multiline: Multi-line text
  • number: Numeric value
  • boolean: True/false
  • list: Array of items
  • object: Nested object

Field Paths:

  • Use dot notation: agent_id.field or agent_id.nested.field
  • Access loop items: current_item.field or loop_item.field
  • Conditional sections: condition: "agent_id IS NOT NULL"

Modal Header & Tabs:

  • Use modal.title / modal.subtitle when you want simple templates with {{variable}} substitution.
  • Use modal.title_expression / modal.subtitle_expression when you need logic (e.g., concatenation and if/else) using the expression evaluator.
  • Use tabs.order to control the base ordering of tabs; unavailable tabs (e.g., documents when no documents are configured) are automatically ignored.
  • Use tabs.labels for static tab labels, or tabs.labels_expression for expression-based labels.

List View Configuration

Configure how cases are displayed in the Operations UI case list table. The list view supports both common fields (always available) and pipeline-specific fields:

# config/operations/my_pipeline.yml

list_view:
  # Define pipeline-specific fields (extracted from upstream context)
  pipeline_fields:
    - key: "transaction_id"
      field: "current_item.transaction_id"
      label: "Transaction ID"
      type: text
    - key: "amount"
      field: "current_item.amount"
      label: "Amount"
      type: number
    - key: "anomaly_type"
      field: "analyzer.anomaly_type"
      label: "Anomaly Type"
      type: text
      value_mapping:  # Optional: map raw values to display labels
        "capital_revenue_misclassification": "CAPITAL/REVENUE MISCLASSIFICATION"
        "lease_rou_misclassification": "LEASE/ROU MISCLASSIFICATION"
  
  # Column order for pipeline-specific tab
  # Mix common field keys and pipeline field keys
  # Only columns listed here will be shown on the pipeline tab
  column_order:
    - "case_id"           # Common field
    - "transaction_id"    # Pipeline field
    - "amount"            # Pipeline field
    - "anomaly_type"      # Pipeline field
    - "status"            # Common field
    - "hitl_status"       # Common field
    - "created_at"        # Common field
    - "actions"           # Common field (action buttons)

Common Fields (always available, don't need to define):

  • case_id, pipeline_id, status, hitl_gate_title, hitl_description
  • hitl_status, hitl_decision, responded_by, created_at, updated_at, actions

Pipeline Fields:

  • key: Unique identifier for the field (used in column_order)
  • field: Dot-notation path to extract from upstream context (e.g., "agent_id.field")
  • label: Display label for the column header
  • type: Field type (text, number, boolean, list, object)
  • value_mapping: Optional mapping of raw values to display labels
  • color_mapping: Optional color coding for field values (see Field Display Properties)
  • icon_mapping: Optional icons for field values (see Field Display Properties)
  • text_transform: Optional text transformation (see Field Display Properties)
  • decimals: Optional decimal places for number fields (default: 0 for integers, 2 for percentages)

Column Order:

  • When "ALL" tab is selected: Only common fields are shown (default order)
  • When pipeline-specific tab is selected: Columns are shown in the order specified in column_order
  • Mix common field keys and pipeline field keys in any order

Centralized Field Definitions

Define fields once and reuse them across list_view, detail_view, and dashboard sections. This ensures consistent display properties and eliminates duplication:

# config/operations/my_pipeline.yml

# =============================================================================
# CENTRALIZED FIELD DEFINITIONS
# =============================================================================
# Define fields once and reuse across list_view, detail_view, and dashboard
# This ensures consistent display properties (value_mapping, color_mapping, etc.)

fields:
  # Status field with mappings
  - key: "posting_status"
    field: "poster.posting_result.status"
    label: "Posting Status"
    type: text
    value_mapping:
      "success": "Posted"
      "error": "Failed"
    color_mapping:
      "success": "green"
      "error": "red"
    icon_mapping:
      "success": "CheckCircle"
      "error": "XCircle"
    text_transform: "uppercase"
  
  # Amount field with formatting
  - key: "amount"
    field: "current_item.amount"
    label: "Amount"
    type: number
    decimals: 2
  
  # Decision field with mappings
  - key: "decision"
    field: "assessor.decision"
    label: "Decision"
    type: text
    value_mapping:
      "allowed": "Allowed"
      "not_allowed": "Not Allowed"
      "uncertain": "Uncertain"
    color_mapping:
      "allowed": "green"
      "not_allowed": "red"
      "uncertain": "amber"
    icon_mapping:
      "allowed": "CheckCircle"
      "not_allowed": "XCircle"
      "uncertain": "AlertCircle"
    text_transform: "uppercase"

# Now reference these fields in list_view and detail_view
list_view:
  pipeline_fields:
    - key: "posting_status"  # Reference centralized field
    - key: "amount"          # Reference centralized field

detail_view:
  sections:
    - name: "Results"
      fields:
        - key: "posting_status"  # Reference centralized field
        - key: "decision"         # Reference centralized field

Benefits:

  • DRY Principle: Define once, use everywhere
  • Consistency: Same display properties across all views
  • Maintainability: Update in one place, applies everywhere
  • Override Support: Can override specific properties inline if needed

Referencing Fields:

  • Use key: "field_key" to reference a centralized field definition
  • Can override specific properties inline (e.g., key: "amount", decimals: 0 to override decimals)

Field Display Properties

Enhance field display with color coding, icons, and text transformations:

# config/operations/my_pipeline.yml

list_view:
  pipeline_fields:
    - key: "status"
      field: "processor.status"
      label: "Status"
      type: text
      
      # Value mapping: Transform raw values to display labels
      value_mapping:
        "success": "Success"
        "error": "Error"
        "pending": "Pending"
      
      # Color mapping: Apply colors to field values
      # Simple format: color name
      color_mapping:
        "success": "green"
        "error": "red"
        "pending": "amber"
      
      # Advanced format: Detailed color configuration
      # color_mapping:
      #   "success":
      #     background: "green"
      #     text: "white"
      #   "error":
      #     background: "red"
      #     text: "white"
      
      # Icon mapping: Display icons before text (lucide-react icons)
      icon_mapping:
        "success": "CheckCircle"
        "error": "XCircle"
        "pending": "Clock"
      
      # Text transformation: Transform text display
      text_transform: "uppercase"  # Options: uppercase, lowercase, capitalize, title-case, sentence-case, kebab-case, snake_case, camelCase, PascalCase, truncate, abbreviate, remove-whitespace, normalize-whitespace, none
      
      # Text transform options (for truncate/abbreviate)
      # text_transform_options:
      #   max_length: 20  # For truncate

Color Mapping:

  • Simple format: "value": "color_name" (e.g., "success": "green")
  • Advanced format: "value": {background: "green", text: "white"} for custom styling
  • Available colors: red, amber, yellow, green, blue, indigo, purple, pink, gray
  • Works with: value_mapping - colors are applied to the mapped display value

Icon Mapping:

  • Maps field values to lucide-react icon names
  • Icons appear before the text
  • Available icons: CheckCircle, XCircle, AlertCircle, Clock, FileCheck, AlertTriangle, etc.
  • See Lucide Icons for full list

Text Transform Options:

  • uppercase: "hello" โ†’ "HELLO"
  • lowercase: "HELLO" โ†’ "hello"
  • capitalize: "hello world" โ†’ "Hello World"
  • title-case: "hello world" โ†’ "Hello World" (same as capitalize)
  • sentence-case: "HELLO WORLD" โ†’ "Hello world"
  • kebab-case: "Hello World" โ†’ "hello-world"
  • snake_case: "Hello World" โ†’ "hello_world"
  • camelCase: "Hello World" โ†’ "helloWorld"
  • PascalCase: "hello world" โ†’ "HelloWorld"
  • truncate: Truncate text with ellipsis (requires text_transform_options.max_length)
  • abbreviate: Abbreviate long text (requires text_transform_options.max_length)
  • remove-whitespace: Remove all whitespace
  • normalize-whitespace: Normalize multiple spaces to single space
  • none: No transformation (default)

Best Practices:

  1. Use centralized fields for fields used in multiple places
  2. Combine mappings: Use value_mapping + color_mapping + icon_mapping for rich displays
  3. Consistent colors: Use the same color scheme across related fields (e.g., success=green, error=red)
  4. Meaningful icons: Choose icons that clearly represent the value (e.g., CheckCircle for success, XCircle for error)
  5. Text transforms: Use uppercase for status fields, capitalize for names, truncate for long text

Dashboard Configuration

Configure pipeline-specific analytics cards for the Operations dashboard. Cards are filtered by the same criteria as the list view (pipeline, time range, status, search):

# config/operations/my_pipeline.yml

dashboard:
  # Enable dashboard for this pipeline
  
  
  # Cards to display (order matters)
  cards:
    # Card 1: Percentage metric
    - type: "percentage"
      title: "Anomaly Detection Rate"
      icon: "AlertTriangle"
      numerator:
        field: "analyzer.anomaly_detected"
        filter: true  # Count only where field is truthy
      denominator:
        field: "total"  # Special field: total cases
      color: "amber"
      show_breakdown: true
      footer:
        type: "text"
        content: "Based on {{totalCases}} processed cases"
    
    # Card 2: Numeric metric
    - type: "metric"
      title: "Average Confidence Score"
      icon: "TrendingUp"
      field: "analyzer.confidence_score"
      aggregation: "avg"  # count, sum, avg, min, max
      format: "number"     # number, currency, percentage
      decimals: 2
      color: "blue"
      suffix: "/ 1.0"
      visualization: "gauge"  # none, gauge, trend
      footer:
        type: "legend"
        items:
          - label: "Score Range"
            value: "0.0 - 1.0"
          - label: "Cases"
            value: "{{totalCases}}"
    
    # Card 3: Distribution chart (donut)
    - type: "donut"
      title: "Anomaly Types"
      icon: "PieChart"
      field: "analyzer.anomaly_type"
      show_legend: true
      show_percentages: true
      colors:  # Optional: custom colors per value
        "capital_revenue_misclassification": "red"
        "lease_rou_misclassification": "amber"
    
    # Card 4: Bar chart (same as donut, different visualization)
    - type: "bar"
      title: "Status Distribution"
      icon: "BarChart3"
      field: "status"
      show_legend: true
    
    # Card 5: Timeline distribution
    - type: "timeline"
      title: "Cases Timeline"
      icon: "Calendar"
      # Uses created_at field automatically
    
    # Card 6: Reuse default card from "All" tab
    - type: "default"
      card_id: "response_method"
      title: "Response Method"

Card Types:

  1. metric: Numeric metric with aggregation

    • aggregation: count, sum, avg, min, max
    • format: number, currency, percentage
    • visualization: none, gauge, trend
  2. percentage: Percentage with numerator/denominator

    • numerator.field: Field to count (with optional filter: true)
    • denominator.field: Total count (use "total" for all cases)
  3. donut: Distribution chart (pie chart)

    • Groups cases by field value
    • Shows percentages and legend
  4. bar: Bar chart

    • Same as donut, different visualization
  5. timeline: Time-based distribution

    • Groups cases by time period (morning/afternoon/evening/night)
    • Uses created_at field automatically
  6. default: Reuse built-in card from "All" tab

    • card_id: ID of default card to reuse

Field Paths:

  • Use dot notation: agent_id.field or agent_id.nested.field
  • Special fields: "status", "created_at", "total"

Icons: Use Lucide icon names (e.g., AlertTriangle, TrendingUp, PieChart, BarChart3, Calendar)

Colors: red, amber, yellow, green, blue, indigo, purple, pink, gray

Value Mapping in Cards: Dashboard cards also support value_mapping for donut/bar charts:

dashboard:
  cards:
    - type: "donut"
      title: "Status Distribution"
      field: "processor.status"
      value_mapping:  # Map raw values to display labels in legend
        "success": "Success"
        "error": "Error"
      colors:
        "success": "green"
        "error": "red"

Timeline Configuration

Configure the Timeline tab to display system events and custom events from agent outputs:

# config/operations/my_pipeline.yml

timeline:
  sort_order: "desc"  # "asc" or "desc" - most recent first or oldest first
  
  # System-generated events (automatically created)
  system_events:
    case_created:
      label: "Case Created"
      icon: "FileText"
      color: "blue"
      show_details: true  # Show additional details below event
      details:
        - field: "current_item.doc_no"
          label: "Document No"
        - field: "current_item.amount"
          label: "Amount"
        - field: "current_item.currency"
          label: "Currency"
    
    case_completed:
      label: "Case Completed"
      icon: "CheckCircle"
      color: "green"
      timestamp_source: "completed_at"  # Use completed_at timestamp
      show_details: true
    
    hitl_queued:
      label: "Awaiting Human Review"
      icon: "Clock"
      color: "amber"
      show_details: true
    
    hitl_response:
      label: "Human Review {{ decision }}"
      icon: "UserCheck"
      color: "conditional"  # green for approve, red for reject
      show_details: true
      details:
        - field: "responded_by"
          label: "Responded By"
        - field: "selection"
          label: "Selection"
  
  # Custom events extracted from agent outputs
  custom_events:
    # Assessment completed event
    - source_field: "assessor"  # Agent output object
      label: "Assessment Completed"
      icon: "CheckCircle"
      color: "green"
      # Use agent's execution timestamp for accurate ordering
      timestamp_source: "assessor._executed_at"  # Agent's execution timestamp
      condition: "assessor IS NOT NULL"  # Only show if assessor ran
      show_details: true
      details:
        - key: "decision"  # Reference centralized field definition
        - key: "confidence"  # Reference centralized field definition
        - field: "assessor.expense_type"
          label: "Expense Type"

System Events:

  • case_created: When case is created (uses created_at timestamp)
  • case_completed: When case is completed (uses completed_at timestamp)
  • case_failed: When case fails (uses failed_at timestamp)
  • hitl_queued: When HITL request is queued
  • hitl_response: When human responds to HITL request

Custom Events:

  • source_field: Agent ID or field path to check for existence
  • timestamp_source: Where to get timestamp from:
    • "case_created": Use case.created_at
    • "case_updated": Use case.updated_at
    • "agent_id._executed_at": Use agent's execution timestamp (recommended for accurate ordering)
    • Field path: "agent_id.field" to extract from agent output
  • condition: Expression to evaluate (e.g., "assessor IS NOT NULL")
  • details: List of fields to display below the event

Timeline Details:

  • Details can reference centralized field definitions using key: "field_key"
  • Details support all field display properties (value_mapping, color_mapping, icon_mapping, text_transform)
  • Details are displayed in a collapsible section below the event

Best Practices:

  1. Use agent timestamps: Set timestamp_source: "agent_id._executed_at" for custom events to get accurate ordering
  2. Show relevant details: Include key information in details (e.g., decision, amount, status)
  3. Consistent icons/colors: Use the same icon/color scheme as list view for consistency
  4. Reference centralized fields: Use key: "field_key" in details to reuse field definitions
  5. Conditional events: Use condition to only show events when relevant (e.g., only show HITL events for HITL cases)

Example: Complete Timeline Configuration:

timeline:
  sort_order: "desc"
  
  system_events:
    case_created:
      label: "Case Created"
      icon: "FileText"
      color: "blue"
      show_details: true
      details:
        - field: "current_item.doc_no"
          label: "Document No"
        - field: "current_item.amount"
          label: "Amount"
          type: number
          decimals: 2
    
    case_completed:
      label: "Case Completed"
      icon: "CheckCircle"
      color: "green"
      timestamp_source: "completed_at"
      show_details: true
  
  custom_events:
    # Assessment event
    - source_field: "midas_assessor"
      label: "Assessment Completed"
      icon: "CheckCircle"
      color: "green"
      timestamp_source: "midas_assessor._executed_at"
      condition: "midas_assessor IS NOT NULL"
      show_details: true
      details:
        - key: "decision"  # Uses centralized field definition
        - key: "confidence"  # Uses centralized field definition
    
    # Posting event
    - source_field: "acta_poster"
      label: "Journal Posted"
      icon: "FileCheck"
      color: "green"
      timestamp_source: "acta_poster._executed_at"
      condition: "acta_poster IS NOT NULL"
      show_details: true
      details:
        - field: "acta_poster.posting_result.status"
          label: "Status"
        - field: "acta_poster.posting_result.batch_id"
          label: "Batch ID"

Resume Behavior Configuration

Control how agents behave when resuming from checkpoints using resume_behavior in agent configuration:

# config/agents/my_agent.yml
id: my_agent
type: agno
model: "azure_openai"

# Resume behavior options:
# - "always" (default): Always run, unless output already in upstream
# - "skip_on_resume": Never run when resuming (skip completely)
# - "run_only_when_complete": Only run when all loop iterations complete
resume_behavior: "always"

Resume Behavior Options:

  1. always (default):

    • Agent runs normally during resume
    • Skips only if output already exists in checkpoint's upstream
    • Use for: Most agents that should run after HITL
  2. skip_on_resume:

    • Agent never runs when resuming from checkpoint
    • Always skipped during resume
    • Use for: Agents that should only run once (e.g., initializers, scanners)
  3. run_only_when_complete:

    • Only runs when all loop iterations complete (not during resume)
    • Skips during resume if in a loop
    • Use for: Summary/aggregation agents that need all loop results

Example:

# Scanner - should only run once, not on resume
agents:
  - id: batch_scanner
    resume_behavior: "skip_on_resume"  # Never run on resume

# Processor - should run after HITL
agents:
  - id: item_processor
    resume_behavior: "always"  # Run after HITL (default)

# Summary reporter - needs all loop iterations
agents:
  - id: batch_summary
    resume_behavior: "run_only_when_complete"  # Only when loop finishes

Loop Patterns with Async HITL

Async HITL is particularly powerful in loop patterns:

pattern:
  type: sequential
  steps:
    - node: batch_scanner
    
    - type: loop
      iterate_over: "batch_scanner.items_list"
      loop_item_key: "current_item"
      body:
        type: sequential
        steps:
          - node: item_processor
          - gate: review_item  # Async HITL - queues and continues
          - node: item_finalizer
    
    - node: batch_summary  # Runs after all items processed

How It Works:

  1. Loop processes each item
  2. When HITL gate is hit:
    • Checkpoint created for that iteration
    • Case queued for review
    • Loop continues with next item (doesn't wait)
  3. Queued cases tracked in hitl_queued_cases context variable
  4. Summary agent can report on queued vs completed cases

Tracking Variables:

  • hitl_queued_cases: List of cases queued for HITL (automatically populated)
  • completed_cases: List of cases that completed without HITL (automatically populated)

Access in downstream agents:

# Summary agent can report on queued cases
agents:
  - id: batch_summary
    prompt:
      instruction:
        inline: |
          Report on batch processing:
          - Total items: {{batch_scanner.total_count}}
          - Queued for review: {{hitl_queued_cases | length}}
          - Completed: {{completed_cases | length}}

Operations UI Usage

The Operations UI provides a centralized interface for managing async HITL cases. See the Operations Center section above for comprehensive details.

Quick Reference:

Accessing Operations UI:

  • Navigate to /operations in the web UI
  • View all cases across pipelines or filter by pipeline
  • See case summary (total, pending, completed, in-progress)

Case List Panel:

  • Filter by Pipeline: Select specific pipeline or view all
  • Tabbed View:
    • "All" tab: Shows all cases with common fields
    • Pipeline-specific tabs: Shows cases for that pipeline with custom columns (from list_view.column_order)
  • Custom Columns: Pipeline-specific fields displayed based on list_view configuration
  • Case Cards/Table: Each case shows:
    • Case ID and status
    • Pipeline name
    • Created timestamp
    • Case type
    • Custom pipeline fields (if configured)
  • Status Badges: Pending, In-Progress, Completed, Failed
  • Dashboard: Pipeline-specific analytics cards (if dashboard.)
    • Metrics, percentages, distribution charts, timelines
    • Filtered by same criteria as list view

Case Detail Modal:

  • Review Tab:
    • HITL gate information
    • Gate description (from HITL template)
    • Approve/Reject buttons
    • Optional form fields
  • Data Tab:
    • Structured case data (from detail_view in operations YAML)
    • Organized by sections
    • All agent outputs displayed
  • Timeline Tab:
    • System events: case created, completed, failed, HITL queued/responded
    • Custom events from agent outputs
    • Chronological view of case lifecycle
  • Chat Tab:
    • Operations Assistant for natural language interactions
    • Ask questions about cases
    • Approve/reject via chat commands

Responding to HITL Requests:

  1. Direct Response (Review Tab):

    • Click "Approve" or "Reject" button
    • Fill optional form fields if present
    • Pipeline resumes immediately
  2. Via Operations Assistant (Chat Tab):

    • Ask: "Approve case CASE-12345"
    • Ask: "Reject case CASE-12345 with notes: Invalid data"
    • Assistant handles approval/rejection on your behalf

Case Status Flow:

  • Pending: Queued, waiting for review
  • In-Progress: User opened case, reviewing
  • Completed: HITL responded, pipeline resumed successfully
  • Failed: Pipeline resume failed

Advanced Async HITL Patterns

Pattern 1: Batch Processing with Priority Queue

Process items in batches, queue complex ones for review, continue with simple ones:

pattern:
  type: sequential
  steps:
    - node: batch_scanner
    
    - type: loop
      iterate_over: "batch_scanner.items_list"
      loop_item_key: "current_item"
      body:
        type: sequential
        steps:
          - node: complexity_analyzer
          - type: gate
            gate_id: review_complex
            condition: "complexity_analyzer.complexity_score > 0.7"
            # Only queues complex items
          - node: item_processor
    
    - node: batch_summary

Pattern 2: Parallel Processing with Async HITL

Process multiple items in parallel, queue problematic ones:

pattern:
  type: sequential
  steps:
    - node: item_scanner
    
    - type: parallel
      steps:
        - type: loop
          iterate_over: "item_scanner.items_list"
          loop_item_key: "current_item"
          body:
            type: sequential
            steps:
              - node: item_validator
              - gate: review_failed
                condition: "item_validator.valid == false"
              - node: item_processor
    
    - node: aggregation

Pattern 3: Conditional Async HITL

Only queue items that meet certain criteria:

pattern:
  type: sequential
  steps:
    - node: data_extractor
    
    - type: gate
      gate_id: review_anomaly
      condition: "data_extractor.anomaly_detected == true"
      # Only queues items with anomalies
      # Items without anomalies skip gate and continue
    
    - node: finalizer

Checkpoint Management

Checkpoint Expiry:

execution_settings:
  hitl_mode: "async"
  checkpoint_expiry_days: 7  # Checkpoints expire after 7 days

Checkpoint Contents:

  • Full execution context (all agent outputs up to gate)
  • Pipeline state and variables
  • Loop iteration state (if in loop)
  • Case data for display

Resume Behavior:

  • Pre-gate agents are skipped (outputs already in checkpoint)
  • Post-gate agents execute with HITL response
  • Loop continues from next iteration
  • Summary agents run only when loop completes

Error Handling

Failed Resumes:

  • Case status set to "Failed"
  • Error message stored in case
  • Checkpoint remains valid for retry
  • User can retry resume from Operations UI

Expired Checkpoints:

  • Cases with expired checkpoints cannot be resumed
  • User notified in Operations UI
  • Option to create new case or skip

Partial Failures:

  • If some items in batch fail, others continue
  • Failed cases tracked separately
  • Summary reports include failure counts

Performance Considerations

Batch Size:

  • Process items in reasonable batches (100-1000 items)
  • Monitor checkpoint storage size
  • Consider checkpoint expiry for long-running batches

Queue Management:

  • Review cases in priority order
  • Use Operations Assistant for bulk operations
  • Monitor queue depth and processing rate

Database Optimization:

  • Index case tables by pipeline_id, status, created_at
  • Archive completed cases periodically
  • Monitor checkpoint table size

Best Practices

  1. Case YAML Design:

    • Include all relevant agent outputs in detail_view
    • Use conditional sections for optional data
    • Keep field paths simple and clear
    • Organize sections logically for easy review
  2. Resume Behavior:

    • Use skip_on_resume for one-time agents (scanners, initializers)
    • Use run_only_when_complete for summary agents
    • Default always for most processing agents
  3. Loop Patterns:

    • Track queued vs completed cases for reporting
    • Use summary agents to aggregate results
    • Consider checkpoint expiry for long-running batches
  4. Operations UI:

    • Use Operations Assistant for bulk operations
    • Review cases in priority order
    • Monitor case status and pipeline health

Example: Batch Math Solver

Complete example of async HITL in a batch processing pipeline:

# config/pipelines/math_batch_solver.yml
name: "Math Batch Solver"
description: "Batch processing with async HITL"

execution_settings:
  hitl_mode: "async"
  checkpoint_expiry_days: 7

operations:
  config_file: "operations/math_batch_solver.yml"

pattern:
  type: sequential
  steps:
    - node: batch_problem_parser
    
    - type: loop
      iterate_over: "batch_problem_parser.problems_list"
      loop_item_key: "current_problem"
      body:
        type: sequential
        steps:
          - node: math_strategist
          - node: math_calculator
          - gate: approve_auditor  # Async HITL - queues complex problems
          - node: math_auditor
    
    - node: batch_summary_reporter  # Reports on all problems

Operations Configuration (config/operations/math_batch_solver.yml):

identity:
  prefix: "BATCH"
  uniqueness: "uuid_suffix"

detail_view:
  sections:
    - name: "Problem Details"
      fields:
        - field: "current_problem.expression"
          label: "Expression"
          type: text
    
    - name: "Calculation"
      fields:
        - field: "math_calculator.result"
          label: "Result"
          type: text
    
    - name: "Audit"
      condition: "math_auditor IS NOT NULL"
      fields:
        - field: "math_auditor.is_valid"
          label: "Valid"
          type: boolean

Summary Agent (config/agents/batch_summary_reporter.yml):

id: batch_summary_reporter
type: agno
model: "azure_openai"
resume_behavior: "run_only_when_complete"  # Only when loop finishes

prompt:
  instruction:
    inline: |
      Generate batch processing summary:
      - Total problems: {{batch_problem_parser.total_count}}
      - Queued for review: {{hitl_queued_cases | length}}
      - Completed: {{completed_cases | length}}

๐Ÿšช Portal Mode

Portal Mode provides a customer-facing experience at /portal: login, portal content (e.g. billing, usage, account), and a collapsible assistant chat. The main page (/) is unchanged. Portal uses the same backend orchestrator and pipelines as the main chat, with one session per portal user and separate state managed via config/portals/.

Overview

  • Routes: /portal (single-portal: redirects to first portal) or /portal/[portalId] (e.g. /portal/customer, /portal/hawkeye).
  • Login: Config-based; users defined per portal in config/portals/{portal_id}/users.yml. No conversations list on portal; one session per user.
  • Portal content: React component (from starter or vibe-coded) with portalState, onUpdate, and currentUser. State is stored per user and exposed as GET /api/portal/state and PATCH /api/portal/state.
  • Assistant chat: Same primary assistant as main page; can run pipelines and use portal tools (e.g. get_portal_state, update_portal_state). Chat history can be cleared per session.

Enabling Portal

Portal is enabled when config/portals/ exists with portals.yml and at least one portal that has users.yml:

  • Required: config/portals/portals.yml (master config listing portals).
  • Required per portal: config/portals/{portal_id}/users.yml (users for login).
  • Optional per portal: state_schema.yml, initial_state.yml, chat_actions.yml.

If portal is not enabled, /portal may return 404 or redirect to /.

Config Layout

config/portals/
โ”œโ”€โ”€ portals.yml                    # Master: lists portals (title, component, login_component, operations, etc.)
โ”œโ”€โ”€ customer/                     # Per-portal folder (portal_id = customer)
โ”‚   โ”œโ”€โ”€ users.yml                 # Required: users for login
โ”‚   โ”œโ”€โ”€ state_schema.yml          # Optional: schema for portal state
โ”‚   โ”œโ”€โ”€ initial_state.yml         # Optional: default state for new sessions
โ”‚   โ””โ”€โ”€ chat_actions.yml          # Optional: button-to-chat actions
โ””โ”€โ”€ hawkeye/                      # Another portal (portal_id = hawkeye)
    โ”œโ”€โ”€ users.yml
    โ”œโ”€โ”€ state_schema.yml
    โ”œโ”€โ”€ initial_state.yml
    โ””โ”€โ”€ chat_actions.yml

portals.yml (master config):

# Master portal config. Keys = portal_id (used in URLs, session, case_data).
portals:
  customer:
    title: "Customer Portal"
    subtitle: "Manage account and services."
    component: "portals/ampere"
    login_component: "portals/ampere/LoginPage"
    # Optional: operations for case types (used by create_case)
    operations:
      - id: general
        description: "General assistance"
      - id: billing
        description: "Billing questions"
    default_operation: general

  hawkeye:
    title: "Hawkeye"
    subtitle: "Decodes intent. Detects deviation."
    component: "portals/hawkeye"
    login_component: "portals/hawkeye/LoginPage"

component must match a component registered in the app (e.g. portals/ampere after init copies the template to apps/ui/src/portals/ampere).

Configuration Examples

config/portals/{portal_id}/users.yml (required per portal)

Lists users who can log in for that portal. Use password for development only; prefer password_hash (e.g. bcrypt) in production.

users:
  - username: demo1
    password: demo123
    display_name: "Demo User 1"
  - username: demo2
    password: demo123
    display_name: "Demo User 2"
# Production: use password_hash instead of password
#   - username: alice
#     password_hash: "<bcrypt-hash>"
#     display_name: "Alice Smith"

config/portals/{portal_id}/state_schema.yml (optional)

Defines the shape of portal state. The UI and assistant tools use dotted paths (e.g. billing.current_balance, ui.current_page). Keep types and descriptions so the assistant and UI know what each field is for.

description: "Portal state"
state:
  account:
    account_id: { type: string, description: "Account identifier" }
    status: { type: string, description: "Account status" }
  billing:
    current_balance: { type: number, description: "Amount due" }
    due_date: { type: string, description: "Bill due date (ISO date)" }
  ui:
    current_page: { type: string, description: "Active page id (e.g. account|billing|usage)" }

You can add nested objects and arrays (with type: array, optional items). The Ampere starter includes a full utilities-style schema (services, outages, payments, alerts, documents, etc.).

config/portals/{portal_id}/initial_state.yml (optional)

Seeds portal state when a user has no state yet. Use a default key for all users; optionally add a key per username (from users.yml) for user-specific seed data. The backend returns only the inner state object to the API/UI.

default:
  account:
    account_id: "ACC-001"
    status: "active"
  billing:
    current_balance: 0
    due_date: ""
  ui:
    current_page: "account"

# Optional: per-user overrides (key = username from users.yml)
demo1:
  account:
    account_id: "ACC-100"
    status: "active"
  billing:
    current_balance: 82.45
    due_date: "2026-03-15"
  ui:
    current_page: "billing"

chat_actions.yml (optional)

Maps action keys to user messages and assistant instructions. The UI can call โ€œsend to chatโ€ with an action key (e.g. from a button); the backend sends the corresponding user_message and injects assistant_instruction for that turn. Optional phrases list: when the user types free-form text, the longest matching phrase selects the action.

ask_about_bill:
  user_message: "I have a question about my bill."
  phrases: ["question about my bill", "about my bill", "explain my bill"]
  assistant_instruction: |
    The user is asking about their bill. Use get_portal_state to read billing.current_balance,
    billing.due_date, billing_history. Answer using the numbers and dates from state.

report_outage:
  user_message: "I want to report an outage."
  phrases: ["report an outage", "report outage"]
  assistant_instruction: |
    The user has clicked "Report outage". Use get_portal_state to check state. Use
    update_portal_state to set outages.active to true and outages.status_text to a short
    message. If they need human follow-up, use create_case.

For a full set of actions (payments, plan changes, start/stop service, etc.), see the Ampere starterโ€™s config/portals/customer/chat_actions.yml.

Portal Content Contract

Portal content components receive:

  • portalState: Current state (from GET /api/portal/state). Read-only; shape should match config/portals/{portal_id}/state_schema.yml.
  • onUpdate(path, value): Updates state via PATCH /api/portal/state; app refetches and re-renders. Use dotted paths (e.g. ui.current_page, bill.current_balance).
  • currentUser: { id, display_name } for the logged-in user.

The assistant can read and update the same state via tools (get_portal_state, update_portal_state), so the UI and chat stay in sync after refetch.

APIs

Method Endpoint Description
POST /api/portal/login Login; body { username, password }; returns user + session_id
POST /api/portal/logout Logout; clear auth session
GET /api/portal/state Get portal state for current user's session (auth required)
PATCH /api/portal/state Update portal state (auth required)
GET /api/portal/config Portal config (component, branding) for frontend; supports ?portal_id= for multi-portal
POST /api/portal/clear-history Clear chat turns for current user's portal session (auth required)

Starter Templates and Adding Portal to a Project

Starters that include portal (e.g. Ampere, Hawkeye) ship with config/portals/ and portal React components in portals/{portal_id}/; topaz-agent-kit init --starter <name> projects/<target> copies the portal slice into the project and registers the component. To add portal to an existing project:

  1. Create config/portals/portals.yml (master config) and config/portals/{portal_id}/users.yml (required).
  2. Add config/portals/{portal_id}/state_schema.yml and optionally initial_state.yml, chat_actions.yml.
  3. Add a portal content component (e.g. under apps/ui/src/portals/<project_name>) that accepts PortalContentProps and register it so portals.yml component matches the registry key.

Example: Single-portal project (Ampere)
Portal ID customer, content at portals/customer/. URL: /portal redirects to /portal/customer.

Example: Single-portal project (Hawkeye)
Portal ID hawkeye, content at portals/hawkeye/. URL: /portal redirects to /portal/hawkeye.

For full design and implementation details, see docs/design/multi_portal_design.md and docs/design/portal_design_and_implementation.md.

๐ŸŽฏ Conditional Node Execution & Branch-Level Conditions

Node-Level Conditions

Execute agents conditionally based on runtime evaluation of context variables:

pattern:
  type: sequential
  steps:
    - node: claim_analyzer

    # Only run fraud check for high-risk claims
    - node: fraud_checker
      condition: "claim_analyzer.risk_score > 0.8"

    # Stop pipeline if state is not supported
    - node: rate_case_customer_segmentor
      condition: "rate_case_data_summarizer.normalized_data_ready == true"
      on_false: stop

    # If-else pattern: Execute alternative handler when condition is false
    - node: main_processor
      condition: "data.ready == true"
      on_false:
        - node: error_handler
        - node: cleanup
        - node: notify_user

    # Manual review if amount exceeds threshold or fraud detected
    - node: manual_reviewer
      condition: >
        (claim_analyzer.amount > 10000 AND claim_analyzer.tier != "platinum") OR
        fraud_checker.detected == true

Branch-Level Conditions

Apply conditions to entire branches (sequential, parallel, loop, switch, handoff, group_chat) to skip whole execution paths:

pattern:
  type: sequential
  steps:
    - node: trip_requester

    # EAGER: Parallel execution where each domain runs independently
    # Entire branches are skipped if condition is false
    - type: parallel
      steps:
        # Flights domain: Only run if flights are needed
        - type: sequential
          condition: "trip_requester.flights_ready == true" # Entire branch skipped if false
          steps:
            - node: trip_flights_expert
            - gate: select_flights
              condition: "trip_flights_expert.has_flights == true"
              on_submit: continue

        # Hotels domain: Only run if hotels are needed
        - type: sequential
          condition: "trip_requester.hotels_ready == true" # Entire branch skipped if false
          steps:
            - node: trip_hotels_expert
            - gate: select_hotels
              condition: "trip_hotels_expert.has_hotels == true"
              on_submit: continue

        # Activities domain: Only run if activities are needed
        - type: sequential
          condition: "trip_requester.activities_ready == true" # Entire branch skipped if false
          steps:
            - node: trip_activities_expert
            - gate: select_activities
              condition: "trip_activities_expert.has_activities == true"
              on_submit: continue

Benefits of Branch-Level Conditions:

  • Efficiency: Skip entire execution paths, not just individual nodes
  • Cleaner Logic: One condition controls a whole workflow branch
  • Resource Savings: No partial execution if the branch condition fails
  • Eager Execution: Enable parallel branches that appear as soon as ready

Supported Pattern Types

Branch-level conditions work with all pattern types:

pattern:
  type: sequential
  steps:
    # Conditional sequential branch
    - type: sequential
      condition: "coordinator.need_flights == true"
      steps:
        - node: flights_expert
        - gate: select_flights

    # Conditional parallel branch
    - type: parallel
      condition: "coordinator.multi_domain == true"
      steps:
        - type: sequential
          condition: "coordinator.need_hotels == true"
          steps:
            - node: hotels_expert
        - type: sequential
          condition: "coordinator.need_activities == true"
          steps:
            - node: activities_expert

    # Conditional loop branch
    - type: loop
      condition: "requires_iteration == true"
      max_iterations: 5
      body:
        - node: refine_output
        - gate: approve_refinement

    # Conditional switch branch
    - type: switch(category)
      condition: "classification_complete == true"
      cases:
        high_priority:
          - node: premium_handler
        low_priority:
          - node: standard_handler

    # Conditional handoff branch
    - type: handoff
      condition: "requires_specialist == true"
      handoffs:
        - node: specialist_a
        - node: specialist_b

    # Conditional group chat branch
    - type: group_chat
      condition: "collaboration_needed == true"
      participants:
        - node: agent_a
        - node: agent_b
      selection_strategy: round_robin

on_false Actions

When a condition evaluates to false, you can control what happens:

1. Default Behavior (Skip and Continue):

- node: optional_processor
  condition: "data.available == true"
  # If false: Skip node, continue to next step

2. Stop Pipeline:

- node: critical_validator
  condition: "data.valid == true"
  on_false: stop  # Stop pipeline execution if condition is false

3. If-Else Pattern (Execute Alternative Steps):

# Single alternative node
- node: main_processor
  condition: "data.ready == true"
  on_false:
    - node: error_handler

# Multiple alternative steps
- node: main_processor
  condition: "data.ready == true"
  on_false:
    - node: error_handler
    - node: cleanup
    - node: notify_user

# Full pattern in else branch
- node: main_processor
  condition: "data.ready == true"
  on_false:
    - type: sequential
      steps:
        - node: error_handler
        - gate: user_approval
        - node: cleanup

Benefits:

  • Stop Action: Gracefully end pipeline when prerequisites aren't met
  • If-Else Pattern: Execute alternative workflows without complex switch patterns
  • Flexible: Supports single nodes, multiple steps, or full patterns in on_false

Supported Conditions

Operators:

  • Comparison: ==, !=, >, <, >=, <=
  • Boolean: AND, OR, NOT
  • String: contains, starts_with, ends_with, in, not in
  • Null checks: is null, is not null
  • Functions: len(array) for array length

Variable Resolution:

  • agent_id.field โ†’ Automatically resolves from agent output
  • agent_id.nested.field โ†’ Supports nested field access
  • context_variable โ†’ Falls back to root context

Real-World Example (Trip Planner with Eager Gates):

pattern:
  type: sequential
  steps:
    - node: trip_requester

    # Parallel execution with branch-level conditions
    # Each branch appears immediately when its expert finishes (eager gates)
    - type: parallel
      steps:
        # Flights branch: Runs only if flights_ready is true
        - type: sequential
          condition: "trip_requester.flights_ready == true"
          steps:
            - node: trip_flights_expert
            - gate: select_flights
              condition: "trip_flights_expert.has_flights == true"
              on_submit: continue
              on_cancel: stop

        # Hotels branch: Runs only if hotels_ready is true
        - type: sequential
          condition: "trip_requester.hotels_ready == true"
          steps:
            - node: trip_hotels_expert
            - gate: select_hotels
              condition: "trip_hotels_expert.has_hotels == true"
              on_submit: continue
              on_cancel: stop

        # Activities branch: Runs only if activities_ready is true
        - type: sequential
          condition: "trip_requester.activities_ready == true"
          steps:
            - node: trip_activities_expert
            - gate: select_activities
              condition: "trip_activities_expert.has_activities == true"
              on_submit: continue
              on_cancel: stop

    # Aggregator runs after all selections are made
    - node: trip_aggregator

Benefits:

  • Resource Efficiency: Skip unnecessary processing branches entirely
  • Smart Routing: Adapt workflow based on runtime conditions
  • Cost Optimization: Only run branches when conditions are met
  • Eager Execution: Gates appear immediately when experts finish (no waiting)
  • Cleaner Logic: Single condition controls entire execution path

๐ŸŽฏ Switch Pattern (MVP-8.0)

Dynamic Branching with Expressions

Route execution to different branches based on field values or evaluated expressions:

pattern:
  type: sequential
  steps:
    - node: claim_classifier

    # Switch on field value
    - type: switch(claim_classifier.category)
      cases:
        medical:
          - node: medical_specialist
          - node: medical_reviewer
        property:
          - node: property_assessor
        auto:
          - node: auto_adjuster
      default:
        - node: general_processor

    # Switch on expression result (boolean)
    - type: switch(claim_analyzer.amount > 10000)
      cases:
        true:
          - gate: premium_approval
            on_approve: continue
          - node: premium_handler
        false:
          - node: standard_handler

    # Switch on complex expression
    - type: switch(len(math_strategist.steps) > 4)
      cases:
        true: # Complex problem
          - node: calculator
          - gate: approve_auditor
            condition: "len(math_strategist.steps) > 6"
            on_approve: continue
          - node: auditor
        false: # Simple problem
          - node: calculator

Switch Features

Expression Support:

  • Evaluates any expression: boolean, numeric, string, function calls
  • Supports len(array) function
  • Variable resolution same as conditional nodes

Syntax Options:

# Shorthand (recommended)
- type: switch(field_name)

# With expression
- type: switch(len(agent.steps) > 5)

# Verbose syntax
- type: switch
  field: field_name

Case Matching:

  • Literal values: strings, numbers, booleans (e.g., false, true, 5, "medical")
  • Expression results: evaluated and matched to cases
  • Type-aware matching: strict type checking between expression result and case keys

Nested Patterns:

  • Cases can contain: nodes, sequential, parallel, loop, or nested switch
  • Supports conditional gates within cases

Real-World Example (Math Compass with Three Complexity Tiers):

pattern:
  type: sequential
  steps:
    - node: math_strategist

    # SWITCH: Route based on problem complexity
    - type: switch(len(math_strategist.steps) > 2)
      cases:
        false: # Simple problem (โ‰ค2 steps)
          - node: math_calculator
        true: # Complex problem (>2 steps)
          - node: math_calculator
          - gate: approve_auditor
            condition: "len(math_strategist.steps) > 4"
            on_approve: continue
            on_reject: stop
          - node: math_auditor

Output Template (Three Complexity Tiers):

outputs:
  final:
    transform: |
      {% set step_count = results.math_strategist.steps|length %}

      {% if 'math_auditor' in results %}
        # Complex Problem ({{ step_count }} steps)
        โœ… Final Answer (Audited): {{ results.math_auditor.final_answer }}
        
        Complexity: {{ 'Medium' if step_count <= 4 else 'High (Manual Review)' }}
      {% else %}
        # Simple Problem ({{ step_count }} steps)
        ๐Ÿ“Š Calculated Result: {{ results.math_calculator.result }}
        
        Complexity: Low (No audit needed)
      {% endif %}

Benefits:

  • Multi-Path Routing: Different workflows for different scenarios
  • Expression-Based: Evaluate complex conditions with functions
  • Conditional Gates: Skip gates automatically based on logic
  • Cost Efficiency: Route to appropriate handling tier
  • Human-in-the-Loop: Conditional gates for manual review when needed

๐ŸŽฏ Handoff Pattern (MVP-9.0)

LLM-Driven Agent Routing

Intelligent handoff pattern where a virtual orchestrator automatically routes requests to specialist agents based on user intent. The orchestrator is auto-generated from agent descriptionsโ€”no orchestration code required!

Automatic Orchestration

The handoff pattern creates a virtual orchestrator at runtime from your agent descriptions:

  1. Orchestrator analyzes request: Evaluates user intent
  2. Routes to specialist: Automatically selects the best specialist
  3. Specialist handles request: Processes with full context
  4. Returns to orchestrator: Provides final closing message

Configuration

Global Orchestrator Setting (config/pipeline.yml):

# Global configuration for all handoff patterns
orchestrator:
  model: azure_openai # Model for virtual orchestrator

Pipeline Configuration (config/pipelines/translator.yml):

name: "Translator"
description: "Multi-language translation with intelligent routing"

nodes:
  - id: spanish_translator
    config_file: agents/spanish_translator.yml
  - id: english_translator
    config_file: agents/english_translator.yml
  - id: hindi_translator
    config_file: agents/hindi_translator.yml

# Handoff pattern - orchestrator is auto-generated!
pattern:
  type: handoff
  handoffs:
    - node: spanish_translator
    - node: english_translator
    - node: hindi_translator

outputs:
  final:
    selectors: ["content"]

Agent Configuration (config/agents/spanish_translator.yml):

# Description is used by orchestrator to route requests
description: "Translates text to and from Spanish with natural, fluent translations"

instruction: |
  You are a professional Spanish translation specialist.

  Your expertise:
  - Translate to Spanish (Espaรฑol)
  - Translate from Spanish
  - Maintain natural, fluent Spanish
  - Preserve meaning and tone

  When translating:
  - Use natural Spanish expressions
  - Maintain the original tone (formal/casual)
  - Preserve technical terminology appropriately

framework: "langgraph"
model: "azure_openai"

How It Works

Example Flow:

User: "Translate 'Hello, how are you?' to Spanish"

1. Virtual Orchestrator:
   - Analyzes request
   - Identifies Spanish translation need
   - Routes: "HANDOFF: spanish_translator"

2. Spanish Translator:
   - Receives full context
   - Translates to Spanish
   - Returns: "Hola, ยฟcรณmo estรกs?"

3. Virtual Orchestrator (returns):
   - Receives translator response
   - Provides final closing message
   - Returns to user

Result: "Hola, ยฟcรณmo estรกs?" with friendly closing

Orchestrator Prompt Generation

The orchestrator prompt is automatically generated from agent descriptions:

Available Specialists:
1. spanish_translator: Translates text to and from Spanish with natural, fluent translations
2. english_translator: Translates text to and from English with clarity and precision
3. hindi_translator: Translates text to and from Hindi with cultural sensitivity

When a user needs a specialist, respond EXACTLY with:
HANDOFF: <specialist_id>

Features

Zero Configuration:

  • No custom orchestrator code needed
  • Auto-generated from agent descriptions
  • Works with existing agents

Intelligent Routing:

  • LLM analyzes user intent
  • Routes to appropriate specialist
  • Handles multi-step requests

Context Preservation:

  • Full user context passed to specialist
  • Orchestrator receives specialist response
  • Automatic return flow

Transparent UI:

  • Virtual orchestrator steps are hidden
  • Only specialist agents visible to users
  • Clean, focused UI experience

Optional Custom Orchestrator

You can provide a custom orchestrator instead of the virtual one:

pattern:
  type: handoff
  orchestrator: "custom_orchestrator" # Custom agent
  handoffs:
    - node: specialist_a
    - node: specialist_b

Real-World Example (Translator Pipeline)

# Try the translator pipeline
topaz-agent-kit init --starter ensemble ./my_project
topaz-agent-kit serve fastapi --project ./my_project

# Test scenarios:
"Translate 'Hello' to Spanish"
"Translate 'เคจเคฎเคธเฅเคคเฅ‡' to English"
"What languages do you support?"
"Translate 'Good morning' to Spanish and then translate that to Hindi"

What Happens:

  • First scenario: Routes to Spanish translator โ†’ returns to orchestrator
  • Second scenario: Routes to English translator โ†’ returns to orchestrator
  • Third scenario: Orchestrator responds directly (no handoff needed)
  • Fourth scenario: Routes to Spanish โ†’ then Hindi โ†’ final message

Best Practices

  1. Clear Agent Descriptions: Use descriptive description fields in agent YAML
  2. Specific Capabilities: Describe exactly what each specialist does
  3. Context Preservation: Ensure agents handle upstream context
  4. Orchestrator Model: Configure globally for consistency
  5. Handoff Format: Orchestrator uses "HANDOFF: <agent_id>" pattern

Benefits

โœ… Intelligent Routing: LLM decides which specialist to use
โœ… Automatic Generation: No orchestrator code required
โœ… Universal Compatibility: Works with all frameworks
โœ… Zero Configuration: Just list specialists
โœ… Cost Efficient: Only runs specialist when needed

๐Ÿ”Œ MCP Integration

Built-in MCP Toolkits

Toolkit Tools Count Description
DocExtract doc_extract_* 3 Document content extraction and structured data
DocRAG doc_rag_* 2 Document retrieval and semantic search
ImageRAG image_rag_* 2 Image processing and OCR-based search
Browser browser_* 1 Web scraping and automation
SerperAPI serper_api_* 2 Web search integration
SEC API sec_api_* 2 Financial document retrieval
Math math_* 20 Mathematical operations and problem solving
Email email_* 13 Gmail integration and email management
SQLite sqlite_* 3 SQLite database operations
Filesystem fs_* 3 File and directory operations
Flights flight_* 9 Flight search and booking
Hotels hotel_* 4 Hotel search and booking
Activities activities_* 3 Travel activities and POI search
Common common_* 5 General utility functions
Insurance insurance_* 6 Insurance-specific domain tools
SOP sop_* 8 Standard Operating Procedure reader and navigator
Total - 83+ Comprehensive toolkit ecosystem

Standard Operating Procedures (SOP) Support

Topaz Agent Kit includes built-in SOP-driven agent capabilities that enable agents to follow structured Standard Operating Procedures stored as markdown files. This is particularly useful for complex, multi-step workflows that require consistent execution.

SOP Toolkit Features

The SOP MCP Toolkit provides 8 tools for agents to read and navigate SOPs:

Tool Purpose
sop_initialize Load SOP manifest, return overview + available sections
sop_get_section Read specific section content (procedures, references)
sop_get_example Get scenario examples for specific use cases
sop_get_troubleshooting Get troubleshooting guidance for errors
sop_list_sections List available sections by type
sop_get_glossary_term Look up domain-specific terms from pipeline glossary
sop_list_glossary_terms List all available glossary terms
sop_invalidate_cache Clear cached SOP data after updates

SOP Structure

SOPs are organized in your project's config/sop/ directory:

config/sop/
โ””โ”€โ”€ <pipeline>/
    โ”œโ”€โ”€ glossary.md                    # Pipeline-specific terminology
    โ””โ”€โ”€ <agent>/
        โ”œโ”€โ”€ manifest.yml              # SOP structure and metadata (REQUIRED)
        โ”œโ”€โ”€ overview.md               # High-level workflow guidance
        โ”œโ”€โ”€ steps/
        โ”‚   โ”œโ”€โ”€ step_01_*.md          # Procedural steps
        โ”‚   โ””โ”€โ”€ step_02_*.md
        โ”œโ”€โ”€ scenarios/
        โ”‚   โ”œโ”€โ”€ two_way_match.md      # Example scenarios
        โ”‚   โ””โ”€โ”€ three_way_match.md
        โ””โ”€โ”€ troubleshooting.md        # Error resolution guide

Using SOPs in Agents

  1. Enable SOP Toolkit: Add sop to your agent's MCP toolkits:
agents:
  - id: my_sop_agent
    mcp:
      toolkits: ["sop"]
    # ... other config
  1. Initialize SOP: Agent calls sop_initialize at start:
# Agent automatically calls:
sop_initialize(
    project_dir="/path/to/project",
    sop_path="config/sop/my_pipeline/my_agent/manifest.yml"
)
  1. Follow Procedures: Agent reads steps as needed:
# Before each step:
sop_get_section(
    sop_path="config/sop/my_pipeline/my_agent/manifest.yml",
    section_id="step_02_find_match"
)
  1. Handle Scenarios: Agent can reference examples:
# When encountering specific scenario:
sop_get_example(
    sop_path="config/sop/my_pipeline/my_agent/manifest.yml",
    scenario_name="two_way_match"
)

SOP Section Types

  • procedure: Step-by-step instructions (executed in order)
  • reference: Contextual information (read on-demand)
  • example: Scenario examples (for pattern matching)
  • troubleshooting: Error resolution guides

Benefits

โœ… Consistent Execution: Agents follow documented procedures
โœ… Easy Updates: Modify SOPs without changing code
โœ… Domain Knowledge: Pipeline-specific glossaries and terminology
โœ… Error Handling: Built-in troubleshooting guidance
โœ… Scenario Support: Example-based learning for agents
โœ… Caching: Efficient section loading with LRU cache

Example: ReconVoy SOP

The ICP starter template includes a complete SOP example for the ReconVoy matcher agent:

  • 6 procedural steps: From foreign book identification to journal proposal
  • 2 scenario examples: Two-way and three-way matching patterns
  • Pipeline glossary: Domain-specific terms (GBP items, processing_status, etc.)
  • Troubleshooting guide: Common issues and resolutions

See src/topaz_agent_kit/templates/starters/icp/config/sop/reconvoy/ for a complete reference implementation.

Custom MCP Servers

Connect to external MCP servers for enterprise integrations:

mcp:
  servers:
    - url: "http://enterprise-mcp-server:8080/mcp"
      toolkits: ["enterprise", "database"]
      tools: ["enterprise_*", "db_*"]

MCP Toolkit Development Guidelines

When creating MCP toolkits, follow these best practices to ensure compatibility across all frameworks:

โŒ Never use Any type for parameters

The Any type causes JSON schema generation issues with mcpadapt (used by CrewAI's MCPServerAdapter), resulting in "Cannot take a Union of no types" errors.

# โŒ BAD - Don't use Any for parameters
def book_flight(travelers: Any, payments: Any | None = None) -> Dict[str, Any]:
    ...

# โœ… GOOD - Use specific types
def book_flight(travelers: list[dict], payments: Optional[list[dict]] = None) -> Dict[str, Any]:
    ...

# โœ… GOOD - Use Optional for nullable parameters
def search_flights(origin: str, destination: str, returnDate: Optional[str] = None) -> Dict[str, Any]:
    ...

Type Guidelines:

  • Use specific types: str, int, float, bool, list[dict], dict, Dict[str, Any]
  • Use Optional[...] for nullable parameters: Optional[str] = None, Optional[int] = None
  • Return types can use Dict[str, Any] or dict (both work fine)
  • Never use Any as a parameter type - always specify the concrete type

Why this matters:

  • CrewAI's MCPServerAdapter uses mcpadapt to convert MCP tool schemas
  • mcpadapt cannot generate valid JSON schemas for Any-typed parameters
  • This causes schema parsing errors that prevent tool initialization
  • Other frameworks (LangGraph, Agno, etc.) work fine, but CrewAI requires strict typing

๐Ÿง  AgentOS Memory System

AgentOS is a filesystem-based memory system that enables agents to store, retrieve, and search information using familiar Unix-like commands. Instead of complex APIs, agents interact with memory through a single agentos_shell tool that supports commands like ls, cat, echo, grep, semgrep, and mkdir.

Overview

AgentOS provides a 3-level memory hierarchy with declarative schema definitions:

  • /global/ - Project-wide shared memory (system docs, cross-pipeline data)
  • /shared/ - Pipeline-wide shared memory (templates, shared runtime data)
  • /memory/ - Agent-specific individual memory (isolated, not shared)
  • /workspace/ - Agent workspace (temporary files)

Key Features:

  • โœ… Declarative schemas: Define file structures in YAML, auto-generate instructions
  • โœ… Template-based initialization: Copy templates from config/memory/shared/ to runtime
  • โœ… Two types of shared memory: System files (read-only templates) and runtime data (write-once, read-many)
  • โœ… Auto-indexing: Semantic search across indexed files
  • โœ… Isolation: Agent memory is isolated; use /shared/ or /global/ for sharing

Key Concepts

1. Filesystem as Memory

Agents manage memory using standard Unix commands:

# List directories
agentos_shell("ls /")

# Read a file
agentos_shell("cat /memory/senders/john@example.com/preferences.md")

# Write a file
agentos_shell('echo "preference data" > /memory/senders/john@example.com/preferences.md')

# Semantic search across indexed files
agentos_shell('semgrep "similar email pattern"')

# Create directories
agentos_shell("mkdir -p /memory/senders/john@example.com")

2. Memory Hierarchy

AgentOS provides a 3-level memory hierarchy:

  • /global/: Global shared memory (project-wide, read-only for agents)

    • Shared across all pipelines in the project
    • Typically contains system-wide reference data, compliance rules
    • Initialized from config/memory/shared/global/ template files
    • Can also contain runtime data (write-once, read-many) for cross-pipeline sharing
  • /shared/: Pipeline-level shared memory (pipeline-wide, configurable readonly)

    • Shared across all agents in a pipeline
    • Typically contains templates, company info, policies
    • Initialized from config/memory/shared/pipeline/{pipeline_id}/ template files
    • Can also contain runtime data (shared between agents in the same pipeline)
  • /memory/: Agent-level individual memory (agent-specific, read-write)

    • Isolated per agent instance (not shared with other agents)
    • Persists across sessions
    • Used for agent-specific data (preferences, patterns, history)
    • If agents need to share data, use /shared/ instead
  • /workspace/: Agent workspace (agent-specific, temporary)

    • Temporary working directory
    • Used for drafts, intermediate files
    • Can be cleared between sessions

3. Auto-Indexing

Files in directories with auto_index: true are automatically indexed for semantic search using semgrep:

directories:
  - path: "/memory/senders/"
    auto_index: true  # Enable semantic search

Configuration

Memory Configuration Structure

All memory-related configuration is organized under config/memory/:

config/memory/
โ”œโ”€โ”€ memory.yml                    # Global memory configuration (future)
โ”œโ”€โ”€ prompts/                      # Custom memory prompt templates
โ”‚   โ””โ”€โ”€ {agent_id}.jinja
โ””โ”€โ”€ shared/
    โ”œโ”€โ”€ global/                   # Global memory templates
    โ”‚   โ”œโ”€โ”€ reference/
    โ”‚   โ””โ”€โ”€ compliance/
    โ””โ”€โ”€ pipeline/                 # Pipeline memory templates
        โ””โ”€โ”€ {pipeline_id}/
            โ”œโ”€โ”€ email_templates/
            โ””โ”€โ”€ company_info/

Global Memory (Project-Wide)

Global memory is configured in config/memory/memory.yml (future feature). Template files are stored in config/memory/shared/global/:

# config/memory/memory.yml (future)
memory:
  global:
    directories:
      - path: "/global/reference/"
        description: "System-wide reference data (READ-ONLY)"
        readonly: true
        auto_index: true
        bootstrap: true
        template_source: "config/memory/shared/global/reference/"

Template Files: Create template files in config/memory/shared/global/:

config/memory/shared/global/
โ”œโ”€โ”€ reference/
โ”‚   โ””โ”€โ”€ system_docs.json
โ””โ”€โ”€ compliance/
    โ””โ”€โ”€ rules.json

These files are automatically copied to data/agentos/global_shared/ on first run.

Pipeline-Level Shared Memory

Define shared memory directories in your pipeline configuration. All agents in the pipeline with memory.inherit: true (default) automatically have access to these shared directories:

# config/pipelines/reply_wizard.yml
name: "Reply Wizard"
description: "Email reply generation pipeline"

memory:
  shared:
    directories:
      # Type 1: System files (read-only templates)
      - path: "/shared/email_templates/"
        description: "Email template library (READ-ONLY)"
        readonly: true
        auto_index: true
        bootstrap: true
        template_source: "config/memory/shared/pipeline/reply_wizard/email_templates/"
      
      # Type 2: Runtime data (write-once, read-many)
      - path: "/shared/senders/"
        description: "Sender interaction history (shared across agents)"
        readonly: false
        auto_index: true
        bootstrap: true
        schemas:
          interactions:
            file: "interactions.jsonl"
            format: "jsonl"
            write_mode: "append"
            structure:
              timestamp: "ISO 8601 timestamp"
              original_email: {...}
              response: {...}
          preferences:
            file: "preferences.json"
            format: "json"
            write_mode: "overwrite"
            structure:
              preferred_tone: "string"
              communication_style: "string"

Inheritance: Agents with memory.inherit: true (the default) automatically inherit access to all pipeline-level shared directories. They can access these via /shared/ paths. Set inherit: false in an agent's configuration to disable inheritance for that specific agent.

Template Files: Create template files in config/memory/shared/pipeline/{pipeline_id}/:

config/memory/shared/pipeline/reply_wizard/
โ”œโ”€โ”€ email_templates/
โ”‚   โ”œโ”€โ”€ greetings/
โ”‚   โ”‚   โ”œโ”€โ”€ formal.md
โ”‚   โ”‚   โ”œโ”€โ”€ casual.md
โ”‚   โ”‚   โ””โ”€โ”€ professional.md
โ”‚   โ”œโ”€โ”€ closings/
โ”‚   โ”‚   โ””โ”€โ”€ ...
โ”‚   โ””โ”€โ”€ structures/
โ”‚       โ””โ”€โ”€ ...
โ””โ”€โ”€ company_info/
    โ”œโ”€โ”€ standard_responses.md
    โ”œโ”€โ”€ policies.md
    โ””โ”€โ”€ tone_guidelines.md

These files are automatically copied to data/agentos/{pipeline_id}/shared/ on first run.

Two Types of Shared Memory:

  1. System Files (Read-Only Templates):

    • Source: Template files in config/memory/shared/pipeline/{pipeline_id}/
    • Behavior: Copied once, read-only for agents
    • Use case: Reference docs, templates, policies
    • Update: Edit template files, re-copied on next run
  2. Runtime Data (Write-Once, Read-Many):

    • Source: Created at runtime by agents/pipelines
    • Behavior: One pipeline/agent writes, others read
    • Use case: Cross-pipeline data, shared state, interaction history
    • Update: Agents write via agentos_shell, other pipelines/agents read

Agent-Level Memory Configuration

Configure agent-specific memory in agent YAML files. Agent memory is isolated - not shared with other agents. If agents need to share data, use /shared/ (pipeline-level) or /global/ (project-level):

# config/agents/reply_context_wizard.yml
id: reply_context_wizard
type: agno
model: "azure_openai"

# Enable MCP and AgentOS
mcp:
  
  servers:
    - url: "http://localhost:8050/mcp"
      toolkits: ["agentos_memory"]
      tools: ["agentos_shell"]

# Memory configuration
memory:
  inherit: true  # Inherit shared memory from pipeline
  directories:
    - path: "/memory/patterns/"
      description: "Agent-specific learned patterns (not shared)"
      readonly: false
      auto_index: true
      bootstrap: true
      schemas:
        learned_patterns:
          file: "patterns.jsonl"
          format: "jsonl"
          write_mode: "append"
          structure:
            pattern: "string"
            context: "string"
            confidence: "float"
    - path: "/workspace/"
      description: "Working directory"
      readonly: false
      auto_index: false
      bootstrap: true
  prompt_section:
    jinja: memory/prompts/reply_context_wizard.jinja  # Custom memory prompt template

Memory Configuration Options:

Option Type Default Description
inherit bool true Inherit shared memory from pipeline. When true, agent automatically has access to all pipeline-level shared directories (e.g., /shared/email_templates/, /shared/company_info/). Set to false to disable inheritance.
directories list[object] [] Agent-specific directories (agent-level individual memory, isolated from other agents)
directories[].path str required Virtual path (e.g., /memory/senders/)
directories[].description str required Human-readable description
directories[].readonly bool false Make directory read-only
directories[].auto_index bool true Enable semantic search indexing
directories[].bootstrap bool true Create directory on initialization
directories[].template_source str null Optional: Template source path relative to project root (e.g., config/memory/shared/pipeline/reply_wizard/email_templates/)
directories[].schemas object {} Optional: File schemas for this directory (see Schema Definitions below)
prompt_section object null Optional: Custom memory prompt template (inline/file/jinja). Paths are relative to config/memory/prompts/ (e.g., memory/prompts/reply_context_wizard.jinja). If not provided, system uses a default template that lists available directories and commands.

Inheritance Behavior:

  • inherit: true (default): Agent automatically inherits all pipeline-level shared memory directories defined in config/pipelines/{pipeline_id}.yml under memory.shared.directories. These are accessible via /shared/ paths and are read-only by default.
  • inherit: false: Agent does not inherit pipeline shared memory. Only agent-specific directories are available.

Prompt Integration

AgentOS memory sections are automatically injected into agent prompts. Use the {{agentos_memory_section}} marker in your prompt templates:

Default Behavior: If prompt_section is not specified in agent configuration, the system automatically provides a default memory prompt template that:

  • Lists all available memory directories (agent-specific and inherited shared)
  • Shows directory descriptions and read-only status
  • Provides examples of available commands (ls, cat, echo, semgrep, etc.)

Custom Templates: You can override the default by providing a prompt_section configuration:

# config/prompts/reply_context_wizard.jinja
You are an email context extraction agent.

{{agentos_memory_section}}

## Workflow:
1. **Check sender history**: `agentos_shell("ls /memory/senders/")` to see if you know this sender
2. **Load sender preferences**: If sender exists, `agentos_shell("cat /memory/senders/{sender_email}/preferences.md")`
3. **Check email templates**: `agentos_shell("ls /shared/email_templates/")` for relevant templates
4. **Load company info**: `agentos_shell("cat /shared/company_info/standard_responses.md")` if needed
5. **Store new sender info**: After analysis, `agentos_shell('echo "preferences" > /memory/senders/{sender_email}/preferences.md')`
6. **Search patterns**: `agentos_shell('semgrep "similar email intent"')` to find similar past emails

Custom Memory Prompt Templates (Optional):

If you want to customize the memory prompt section, create a custom template in config/memory/prompts/{agent_id}.jinja and reference it in the agent configuration:

memory:
  prompt_section:
    jinja: memory/prompts/reply_context_wizard.jinja

If prompt_section is not provided, the system automatically uses a default template that lists available directories, commands, and schema documentation.

When to Use Default Template (Recommended for most cases):

โœ… Use the default template when:

  • You have simple memory needs (just listing directories and basic commands)
  • You want to get started quickly without customizing prompts
  • Your agents have standard memory usage patterns
  • You prefer consistency across agents
  • You want to reduce maintenance overhead

The default template automatically:

  • Lists all available memory directories (agent-specific and inherited shared)
  • Shows directory descriptions and read-only status
  • Provides examples of all available commands
  • Adapts to your configuration (only shows directories you've defined)

When to Use Custom Template:

โœ… Create a custom template when:

  • You need workflow-specific guidance for how agents should use memory
  • You want to provide step-by-step instructions tailored to your agent's task
  • You need to emphasize specific commands or usage patterns
  • You want to include domain-specific examples or use cases
  • Your agent has complex memory workflows that need detailed explanation
  • You want to guide agents through specific memory access patterns

Example Custom Template:

# config/memory/reply_context_wizard.jinja
## Memory System

You have access to a filesystem-based memory via the `agentos_shell` tool.

### Your Memory Structure:
{% for dir in memory.directories %}
- {{ dir.path }} - {{ dir.description }}{% if dir.readonly %} (READ-ONLY){% endif %}
{% endfor %}

{% if memory.shared_directories %}
### Shared Memory (from pipeline):
{% for dir in memory.shared_directories %}
- {{ dir.path }} - {{ dir.description }} (READ-ONLY)
{% endfor %}
{% endif %}

### Workflow for Email Context Extraction:
1. **Check sender history**: `agentos_shell("ls /memory/senders/")` to see if you know this sender
2. **Load sender preferences**: If sender exists, `agentos_shell("cat /memory/senders/{sender_email}/preferences.md")`
3. **Check email templates**: `agentos_shell("ls /shared/email_templates/")` for relevant templates
4. **Load company info**: `agentos_shell("cat /shared/company_info/standard_responses.md")` if needed
5. **Store new sender info**: After analysis, `agentos_shell('echo "preferences" > /memory/senders/{sender_email}/preferences.md')`
6. **Search patterns**: `agentos_shell('semgrep "similar email intent"')` to find similar past emails

### Available Commands:
- `agentos_shell("ls /")` - List root directories
- `agentos_shell("cat /memory/file.md")` - Read file
- `agentos_shell('echo "content" > /memory/file.md')` - Write file
- `agentos_shell('semgrep "query"')` - Semantic search across indexed memory
- `agentos_shell("mkdir -p /memory/subdir")` - Create directory

Best Practice: Start with the default template. Only create a custom template if you find that agents need more specific guidance or workflow instructions for your use case.

Available Commands

The agentos_shell tool supports a subset of Unix commands:

Command Description Example
ls [path] List directory contents agentos_shell("ls /memory/senders/")
cat [file] Read file contents agentos_shell("cat /memory/file.md")
echo "text" > [file] Write to file agentos_shell('echo "data" > /memory/file.md')
echo "text" >> [file] Append to file agentos_shell('echo "more" >> /memory/file.md')
grep "pattern" [file] Search text in file agentos_shell('grep "keyword" /memory/file.md')
semgrep "query" Semantic search across indexed files agentos_shell('semgrep "similar pattern"')
mkdir -p [path] Create directory agentos_shell("mkdir -p /memory/subdir")

Security Features:

  • โœ… Sandboxed Execution: Commands run in isolated filesystem
  • โœ… Path Traversal Protection: Prevents access outside allowed directories
  • โœ… Command Injection Prevention: Validates and sanitizes commands
  • โœ… Read-Only Enforcement: Respects readonly: true configuration
  • โœ… Rate Limiting: Prevents command flooding
  • โœ… Audit Logging: All commands are logged for security

Schema Definitions

Define file structures declaratively in YAML configuration. Schemas enable:

  • Auto-generated instructions: System generates read/write commands from schema
  • Type safety: Clear field definitions for structured data
  • Scalability: Proper formats (JSONL for append-only, JSON for overwrite)
  • Maintainability: Change structure in one place

Schema Configuration:

memory:
  shared:
    directories:
      - path: "/shared/senders/"
        schemas:
          interactions:
            file: "interactions.jsonl"
            format: "jsonl"           # jsonl, json, markdown
            write_mode: "append"      # append, overwrite
            structure:
              timestamp: "ISO 8601 timestamp"
              original_email:
                subject: "string"
                content: "string"
                sender:
                  name: "string"
                  email: "string"
              response:
                subject: "string"
                content: "string"
            # Optional: Custom instructions (overrides auto-generated)
            instructions:
              read: "Read all interactions: `agentos_shell(command='cat /shared/senders/<sender_email>/interactions.jsonl')`"
              write: "Append new interaction: `agentos_shell(command='echo \"<single_line_json>\" >> /shared/senders/<sender_email>/interactions.jsonl')`"

Schema Fields:

Field Type Required Description
file str Yes Filename for this schema
format enum No File format: jsonl, json, markdown (default: json)
write_mode enum No Write behavior: append, overwrite (default: overwrite)
readonly bool No Whether this file is read-only (default: false)
structure object No Simple structure definition (key-value mapping)
instructions object No Custom read/write instructions (overrides auto-generated)

Auto-Generated Instructions:

The system automatically generates read/write instructions from schemas:

  • JSONL format: Uses >> (append) or > (overwrite) with single-line JSON
  • JSON format: Uses > (overwrite) or requires parsing for append
  • Markdown format: Uses > (overwrite) or >> (append)

You can override auto-generated instructions with custom instructions.read and instructions.write fields.

Runtime Structure

AgentOS creates the following directory structure at runtime:

data/agentos/
โ”œโ”€โ”€ global_shared/                 # Global shared memory
โ”‚   โ”œโ”€โ”€ reference/                  # From config/memory/shared/global/reference/
โ”‚   โ””โ”€โ”€ compliance/                 # From config/memory/shared/global/compliance/
โ”œโ”€โ”€ {pipeline_id}/
โ”‚   โ”œโ”€โ”€ shared/                     # Pipeline shared memory
โ”‚   โ”‚   โ”œโ”€โ”€ email_templates/        # From config/memory/shared/pipeline/{pipeline_id}/email_templates/
โ”‚   โ”‚   โ”œโ”€โ”€ company_info/           # From config/memory/shared/pipeline/{pipeline_id}/company_info/
โ”‚   โ”‚   โ””โ”€โ”€ senders/                # Runtime data (created by agents)
โ”‚   โ”‚       โ””โ”€โ”€ {sender_email}/
โ”‚   โ”‚           โ”œโ”€โ”€ interactions.jsonl
โ”‚   โ”‚           โ””โ”€โ”€ preferences.json
โ”‚   โ””โ”€โ”€ agents/
โ”‚       โ””โ”€โ”€ {agent_id}/
โ”‚           โ”œโ”€โ”€ memory/             # Agent individual memory (isolated)
โ”‚           โ”‚   โ””โ”€โ”€ patterns/
โ”‚           โ”‚       โ””โ”€โ”€ patterns.jsonl
โ”‚           โ””โ”€โ”€ workspace/          # Agent workspace (temporary)

Template Initialization:

  • Template files from config/memory/shared/pipeline/{pipeline_id}/ are copied to data/agentos/{pipeline_id}/shared/ on first run
  • Template files from config/memory/shared/global/ are copied to data/agentos/global_shared/ on first run
  • If template files are updated, they are re-copied (checks modification time)
  • Agent-specific directories are created on first agent execution
  • Runtime data directories are created when agents write to them

Best Practices

1. Use Appropriate Memory Types

  • /global/: For project-wide reference data (system docs, compliance rules) or cross-pipeline runtime data
  • /shared/: For pipeline-wide reference data (templates, company info) or shared runtime data between agents
  • /memory/: For agent-specific persistent data (preferences, history, patterns) - isolated, not shared
  • /workspace/: For temporary working files (drafts, intermediate results)

Memory Isolation Rules:

  • /memory/ is agent-specific and isolated - agents cannot access each other's /memory/ directories
  • If agents need to share data, use /shared/ (pipeline-level) or /global/ (project-level)
  • /workspace/ is also agent-specific and temporary

2. Enable Auto-Indexing Strategically

Enable auto_index: true for directories you want to search semantically:

directories:
  - path: "/memory/senders/"
    auto_index: true  # Good for searching past interactions
  - path: "/workspace/"
    auto_index: false  # Skip indexing temporary files

3. Use Read-Only for Shared Data

Protect shared templates and company info from accidental modifications:

memory:
  shared:
    directories:
      - path: "/shared/email_templates/"
        readonly: true  # Prevent agents from modifying templates

4. Organize Memory Configuration

All memory-related configuration is organized under config/memory/:

config/memory/
โ”œโ”€โ”€ memory.yml                    # Global memory configuration (future)
โ”œโ”€โ”€ prompts/                      # Custom memory prompt templates
โ”‚   โ”œโ”€โ”€ reply_context_wizard.jinja
โ”‚   โ””โ”€โ”€ reply_polish_wizard.jinja
โ””โ”€โ”€ shared/
    โ”œโ”€โ”€ global/                   # Global memory templates
    โ”‚   โ”œโ”€โ”€ reference/
    โ”‚   โ””โ”€โ”€ compliance/
    โ””โ”€โ”€ pipeline/                 # Pipeline memory templates
        โ”œโ”€โ”€ reply_wizard/          # Pipeline-specific templates
        โ”‚   โ”œโ”€โ”€ email_templates/
        โ”‚   โ””โ”€โ”€ company_info/
        โ””โ”€โ”€ invoice_processor/     # Different pipeline, different templates
            โ””โ”€โ”€ email_templates/

Template Paths:

  • Global templates: config/memory/shared/global/
  • Pipeline templates: config/memory/shared/pipeline/{pipeline_id}/
  • Custom memory prompts: config/memory/prompts/{agent_id}.jinja

5. Use Schema Definitions for Structured Data

Define schemas for directories that store structured data:

memory:
  shared:
    directories:
      - path: "/shared/senders/"
        schemas:
          interactions:
            file: "interactions.jsonl"
            format: "jsonl"
            write_mode: "append"  # Preserves history
            structure:
              timestamp: "ISO 8601 timestamp"
              original_email: {...}
              response: {...}

Benefits:

  • Auto-generated instructions in prompts
  • Clear structure documentation
  • Proper file formats (JSONL for append, JSON for overwrite)
  • Scalable and maintainable

6. Custom Memory Prompt Sections

Create custom memory prompt templates for better agent guidance:

# config/memory/prompts/my_agent.jinja
## Memory System

### Workflow for Your Task:
1. Check existing data: `agentos_shell("ls /memory/your_data/")`
2. Load reference: `agentos_shell("cat /shared/reference.md")`
3. Store results: `agentos_shell('echo "result" > /memory/your_data/result.md')`
4. Search similar: `agentos_shell('semgrep "similar pattern"')`

Reference in agent config:

memory:
  prompt_section:
    jinja: memory/prompts/my_agent.jinja

7. Template File Management

  • Keep template files organized:
    • Global: config/memory/shared/global/
    • Pipeline: config/memory/shared/pipeline/{pipeline_id}/
  • Use descriptive filenames and directory structures
  • Template files are automatically initialized on first run
  • Updates to templates are detected and re-copied
  • Use template_source in config to specify custom template locations

Example: Email Reply Wizard

The ensemble starter template includes a complete AgentOS implementation for the Reply Wizard pipeline:

Pipeline Configuration (config/pipelines/reply_wizard.yml):

memory:
  shared:
    directories:
      - path: "/shared/email_templates/"
        description: "Email template library (READ-ONLY)"
        readonly: true
        auto_index: true
        bootstrap: true
        template_source: "config/memory/shared/pipeline/reply_wizard/email_templates/"
      - path: "/shared/company_info/"
        description: "Company information (READ-ONLY)"
        readonly: true
        auto_index: true
        bootstrap: true
        template_source: "config/memory/shared/pipeline/reply_wizard/company_info/"
      - path: "/shared/senders/"
        description: "Sender interaction history (shared across agents)"
        readonly: false
        auto_index: true
        bootstrap: true
        schemas:
          interactions:
            file: "interactions.jsonl"
            format: "jsonl"
            write_mode: "append"
            structure:
              timestamp: "ISO 8601 timestamp"
              original_email: {...}
              response: {...}

Agent Configuration (config/agents/reply_context_wizard.yml):

mcp:
  
  servers:
    - url: "http://localhost:8050/mcp"
      toolkits: ["agentos_memory"]
      tools: ["agentos_shell"]

memory:
  inherit: true
  directories:
    - path: "/workspace/"
      description: "Working directory"
      readonly: false
      auto_index: false
      bootstrap: true
  prompt_section:
    jinja: memory/prompts/reply_context_wizard.jinja
      toolkits: ["agentos_memory"]
      tools: ["agentos_shell"]

memory:
  inherit: true
  directories:
    - path: "/memory/senders/"
      description: "Sender preferences and history"
      auto_index: true
    - path: "/memory/patterns/"
      description: "Email pattern library"
      auto_index: true
    - path: "/workspace/"
      description: "Working directory"
      auto_index: false
  prompt_section:
    jinja: config/memory/reply_context_wizard.jinja

Template Files (config/shared/reply_wizard/):

email_templates/
โ”œโ”€โ”€ greetings/
โ”‚   โ”œโ”€โ”€ formal.md
โ”‚   โ”œโ”€โ”€ casual.md
โ”‚   โ””โ”€โ”€ professional.md
โ”œโ”€โ”€ closings/
โ”‚   โ””โ”€โ”€ ...
โ””โ”€โ”€ structures/
    โ””โ”€โ”€ ...
company_info/
โ”œโ”€โ”€ standard_responses.md
โ”œโ”€โ”€ policies.md
โ””โ”€โ”€ tone_guidelines.md

See src/topaz_agent_kit/templates/starters/ensemble/ for the complete implementation.

Troubleshooting

Memory directories not created:

  • Ensure bootstrap: true is set for directories that should be created automatically
  • Check that the agent has memory configuration in its YAML file
  • Verify MCP server is running and agentos_memory toolkit is enabled

Template files not initialized:

  • Ensure template files exist in config/shared/{pipeline_id}/
  • Check that pipeline has memory.shared.directories configuration
  • Verify bootstrap: true is set for shared directories
  • Check logs for initialization errors

Semantic search not working:

  • Ensure auto_index: true is set for directories you want to search
  • Files must be written to indexed directories (not just read)
  • Use semgrep command, not grep for semantic search

Permission denied errors:

  • Check readonly: true settings - agents cannot write to read-only directories
  • Verify path mappings are correct in memory configuration
  • Check sandbox security logs for blocked operations

๐Ÿข Enterprise Features

Production Readiness

  • Health Monitoring: Built-in health checks and status endpoints
  • Error Handling: Comprehensive error recovery and logging
  • Scalability: Horizontal scaling support for high-volume deployments
  • Security: Environment-based configuration and secure API key management

Development Tools

  • Hot Reload: Development mode with automatic code reloading
  • Validation: Comprehensive configuration validation and error reporting
  • Testing: Built-in test framework for agent workflows
  • Documentation: Auto-generated API documentation and workflow diagrams

๐Ÿ“ฆ Portable Demos

Create portable, zero-setup demo packages that can run on any machine without requiring Python, Node.js, or package installation.

Overview

The portable demo system uses a shared runtime approach:

  • Runtime (~200-300MB): Python environment with Topaz Agent Kit pre-installed (shared across projects)
  • Demo Project (~20-30MB): Project-specific files, data, and launcher scripts

This allows you to:

  • Share one runtime with multiple demo projects
  • Keep demo packages small and easy to distribute
  • Ensure consistent execution environment across machines
  • Provide zero-setup demos for customers and stakeholders

Quick Start

Prerequisites: uv must be installed for faster exports.

curl -LsSf https://astral.sh/uv/install.sh | sh  # Mac/Linux

1. Build Package

python build.py

Note: You can skip this step if you already have a wheel file and use --skip-build flag.

2. Export Wheel, Runtime, and Demo (Combined)

# Export all three (wheel + runtime + demo) - default
topaz-agent-kit export -p projects/pa --output ./exports

# Skip build step (use existing wheel)
topaz-agent-kit export -p projects/pa --output ./exports --skip-build

# Export only wheel + demo (skip runtime)
topaz-agent-kit export -p projects/pa --output ./exports --skip-runtime

# Export only wheel (skip runtime + demo) - no --project needed
topaz-agent-kit export --output ./exports --skip-runtime --skip-demo

# Export only runtime + demo (skip wheel)
topaz-agent-kit export -p projects/pa --output ./exports --skip-wheel

# Export without zip files (keep directories)
topaz-agent-kit export -p projects/pa --output ./exports --no-zip

# Creates (default mode):
# - ./exports/topaz_agent_kit-0.11.1-py3-none-any.whl
# - ./exports/INSTALL.md (installation instructions)
# - ./exports/tak-runtime-v0.11.1-20260209.zip
# - ./exports/pa-20260209.zip

# Creates (with --no-zip):
# - ./exports/topaz_agent_kit-0.11.1-py3-none-any.whl
# - ./exports/INSTALL.md (installation instructions)
# - ./exports/tak-runtime-v0.11.1-20260209/ (directory)
# - ./exports/pa-20260209/ (directory)

Note: --project is only required when exporting demo. Use --skip-demo to skip demo export.

3. Or Export Separately

# Export runtime (one-time)
topaz-agent-kit export-runtime --output ./exports

# Skip build step (use existing wheel)
topaz-agent-kit export-runtime --skip-build --output ./exports

# Export demo project
topaz-agent-kit export-demo -p projects/pa --output ./exports

User Workflow

Option 1: Full Runtime (Easiest)

# 1. Extract runtime (one-time)
unzip tak-runtime-v0.11.1-20260209.zip

# 2. Extract demo project
unzip pa-20260209.zip

# Folder structure (both in same directory):
# your-folder/
# โ”œโ”€โ”€ tak-runtime-v0.11.1-20260209/    # Runtime
# โ””โ”€โ”€ pa-20260209/                      # Demo project

# 3. Run the demo (from inside demo folder)
cd pa-20260209
./run-demo.sh fastapi    # Mac/Linux
# OR
run-demo.bat fastapi     # Windows

# Note: Script finds runtime in same directory or parent directory

# 4. Open browser: http://localhost:8090

Option 2: Wheel File Only (For Developers)

If you have Python 3.11+ and prefer managing your own environment:

# 1. Extract demo project
unzip pa-20260209.zip
cd pa-20260209

# 2. Create virtual environment
uv venv  # OR: python -m venv venv
source venv/bin/activate  # Mac/Linux
# OR: venv\Scripts\activate  # Windows

# 3. Install wheel file (get from creator)
uv pip install --prerelease allow topaz_agent_kit-0.11.1-py3-none-any.whl
# OR: pip install topaz_agent_kit-0.11.1-py3-none-any.whl

# 4. Set up environment
cp .env.example .env  # Edit if needed

# 5. Run demo
python -m topaz_agent_kit.cli.main serve fastapi --project .

# 6. Open browser: http://localhost:8090

Benefits of wheel-only: Smaller download (~25-40MB vs ~220-330MB), use your own Python environment, more control.

Adding More Projects

# Just extract new project (runtime already exists)
unzip nexus-20260209.zip
cd nexus-20260209
./run-demo.sh fastapi    # Uses same runtime!

Service Modes

The launcher script supports multiple service modes:

  • fastapi - Web interface with UI (default)
  • mcp - Model Context Protocol server
  • services - Unified agent services (A2A)
  • cli - Command-line interface
  • all - Start all services simultaneously

Examples:

./run-demo.sh fastapi    # Start web interface
./run-demo.sh mcp        # Start MCP server
./run-demo.sh services   # Start unified services
./run-demo.sh cli        # Start CLI interface
./run-demo.sh all        # Start all services

Command Reference

Export Runtime

topaz-agent-kit export-runtime [OPTIONS]

Options:
  --output PATH    Output directory or zip file path (default: current directory)
  --dev           Create dev build with timestamp and git hash
  --version, -v   Custom version string (overrides auto-detection)
  --skip-build    Skip building wheel (use existing wheel from dist/)
  --no-zip        Skip creating zip file (keep directory instead)

Examples:

# Auto-generate name in current directory
topaz-agent-kit export-runtime

# Specify output directory (auto-generates name)
topaz-agent-kit export-runtime --output ./exports

# Dev version
topaz-agent-kit export-runtime --dev --output ./exports

# Skip build step (use existing wheel)
topaz-agent-kit export-runtime --skip-build --output ./exports

Note: Requires uv to be installed. The export process uses uv for faster and more reliable package installation.

Export Demo

topaz-agent-kit export-demo [OPTIONS]

Options:
  --project, -p PATH  Path to project directory (required)
  --output PATH       Output directory or zip file path (default: current directory)
  --version, -v       Required runtime version (auto-detected if not provided)
  --no-zip            Skip creating zip file (keep directory instead)

Examples:

# Auto-generate name in current directory
topaz-agent-kit export-demo -p projects/pa

# Specify output directory (auto-generates name)
topaz-agent-kit export-demo -p projects/pa --output ./exports

# Export without zip file (keep directory)
topaz-agent-kit export-demo -p projects/pa --output ./exports --no-zip

Export Wheel

topaz-agent-kit export-wheel [OPTIONS]

Options:
  --output PATH       Output directory (default: current directory)
  --skip-build        Skip building wheel (use existing wheel from dist/)
  --version, -v       Custom version string (overrides auto-detection)
  --dev               Create dev build (currently unused, kept for API consistency)

Examples:

# Export wheel file to current directory
topaz-agent-kit export-wheel

# Export wheel to specific directory
topaz-agent-kit export-wheel --output ./exports

# Skip build step (use existing wheel)
topaz-agent-kit export-wheel --output ./exports --skip-build

Creates:

  • topaz_agent_kit-{version}-py3-none-any.whl
  • INSTALL.md (installation instructions)

Export (Combined)

topaz-agent-kit export [OPTIONS]

Options:
  --project, -p PATH  Path to project directory (required only if demo is exported)
  --output PATH       Output directory (default: current directory)
  --dev              Create dev build with timestamp and git hash
  --version, -v       Custom version string (overrides auto-detection)
  --skip-build       Skip building wheel (use existing wheel from dist/)
  --skip-wheel       Skip wheel file export
  --skip-runtime, --skip-rt  Skip runtime export
  --skip-demo        Skip demo project export
  --no-zip           Skip creating zip files (keep directories instead)

Examples:

# Export all three (wheel + runtime + demo) - default
topaz-agent-kit export -p projects/pa --output ./exports

# Export only wheel + demo (skip runtime)
topaz-agent-kit export -p projects/pa --output ./exports --skip-runtime

# Export only wheel (skip runtime + demo) - no --project needed
topaz-agent-kit export --output ./exports --skip-runtime --skip-demo

# Export only runtime + demo (skip wheel)
topaz-agent-kit export -p projects/pa --output ./exports --skip-wheel

# Export only wheel + runtime (skip demo) - no --project needed
topaz-agent-kit export --output ./exports --skip-demo

# Skip build step (use existing wheel)
topaz-agent-kit export -p projects/pa --output ./exports --skip-build

# Export without zip files (keep directories)
topaz-agent-kit export -p projects/pa --output ./exports --no-zip

Creates (default mode):

  • topaz_agent_kit-{version}-py3-none-any.whl
  • INSTALL.md (installation instructions)
  • tak-runtime-v{version}-{YYYYMMDD}.zip
  • {project-name}-{YYYYMMDD}.zip

Creates (with --no-zip):

  • topaz_agent_kit-{version}-py3-none-any.whl
  • INSTALL.md (installation instructions)
  • tak-runtime-v{version}-{YYYYMMDD}/ (directory)
  • {project-name}-{YYYYMMDD}/ (directory)

Note: --project is only required when exporting demo. Use --skip-demo to skip demo export.

Features

โœ… Zero setup - No Python/Node installation needed
โœ… Small packages - Projects ~30MB each
โœ… Shared runtime - One runtime for multiple projects
โœ… Version safety - Strict version matching prevents issues
โœ… Easy distribution - Just zip files
โœ… Cross-platform - Works on Mac, Windows, Linux
โœ… Auto-naming - Zip files automatically named with version and date
โœ… Auto-create directories - Output directories created automatically

File Structure

user-machine/
โ”œโ”€โ”€ tak-runtime-v0.11.1-20260209/    # Shared runtime
โ”‚   โ”œโ”€โ”€ venv/                          # Python environment
โ”‚   โ”œโ”€โ”€ VERSION                        # Runtime version
โ”‚   โ””โ”€โ”€ README.md
โ”‚
โ”œโ”€โ”€ pa-20260209/                       # Project 1
โ”‚   โ”œโ”€โ”€ config/
โ”‚   โ”œโ”€โ”€ agents/
โ”‚   โ”œโ”€โ”€ services/
โ”‚   โ”œโ”€โ”€ tools/
โ”‚   โ”œโ”€โ”€ data/                          # Pre-populated data
โ”‚   โ”œโ”€โ”€ run-demo.sh                    # Launcher (Mac/Linux)
โ”‚   โ”œโ”€โ”€ run-demo.bat                   # Launcher (Windows)
โ”‚   โ””โ”€โ”€ README.md
โ”‚
โ””โ”€โ”€ nexus-20260209/                    # Project 2 (shares runtime)
    โ”œโ”€โ”€ config/
    โ”œโ”€โ”€ data/
    โ””โ”€โ”€ run-demo.sh

Version Management

  • Strict Version Matching: Runtime version must exactly match demo requirement
  • Version Formats:
    • Release: 0.11.1 (from pyproject.toml)
    • Dev: 0.11.1-dev-20260209-a1b2c3d (timestamp + git hash)
    • Custom: Any string you specify
  • Date Suffix: All zip files include date (YYYYMMDD) for easy tracking

Runtime Discovery

The launcher script automatically finds the runtime in:

  1. Same directory as the project
  2. Parent directory

Make sure the runtime is extracted in one of these locations.

Troubleshooting

Runtime not found:

  • Ensure runtime zip is extracted
  • Check it's in the same directory or parent directory
  • Verify tak-runtime-v*/VERSION file exists

Version mismatch:

  • Download the correct runtime version
  • Check the date matches (if using dated versions)

Port already in use:

  • Check config/pipelines.yml for port configuration
  • Stop other services using the same port

For detailed documentation, see Portable Demos Guide and Portable Demos Workflow.

๐Ÿ“š Examples

Mathematical Problem Solving

# Create math-focused project
topaz-agent-kit init --starter math_demo ./math_project

# Run the math pipeline
topaz-agent-kit serve fastapi --project ./math_project

Financial Analysis

# Create stock analysis project
topaz-agent-kit init --starter stock_analysis ./finance_project

# Upload financial documents and analyze
topaz-agent-kit serve fastapi --project ./finance_project

Content Generation with HITL

# Create content creation project with HITL
topaz-agent-kit init --starter ensemble ./content_project

# Run with enhanced HITL system
topaz-agent-kit serve fastapi --project ./content_project

The Article Smith pipeline demonstrates advanced HITL integration:

  • Research Approval: Review and approve research findings
  • Draft Review: Provide detailed feedback with retry capability
  • Publication Choice: Select publication approach with conditional routing

๐Ÿค Contributing

We welcome contributions! Please see our Contributing Guide for details.

Development Setup

# Clone the repository
git clone https://github.com/topaz-agent-kit/topaz-agent-kit.git
cd topaz-agent-kit

# Install in development mode
pip install -e ".[dev,fastapi,mcp,ui]"

# Run tests
pytest tests/

# Run the development server
topaz-agent-kit serve fastapi --project projects/ensemble --reload

Building the Package

To build the package from source (includes UI build):

# Build the package (builds UI and creates wheel)
python build.py

# The build script will output installation instructions with the correct wheel file path
# Example output:
#   uv add --prerelease=allow /path/to/dist/topaz_agent_kit-0.3.0-py3-none-any.whl

Build Process:

  1. UI Build: Compiles the Next.js UI and copies it to the package
  2. Package Build: Creates the wheel file using uv build

Built Artifacts:

  • Wheel file: dist/topaz_agent_kit-{version}-py3-none-any.whl

After building, you can install the package in a new project using the wheel file path shown in the build output.

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

๐Ÿ†˜ Support


Topaz Agent Kit - From idea to demo, from demo to dialogue, from dialogue to impact. โœจ

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

topaz_agent_kit-0.13.8-py3-none-any.whl (20.1 MB view details)

Uploaded Python 3

File details

Details for the file topaz_agent_kit-0.13.8-py3-none-any.whl.

File metadata

File hashes

Hashes for topaz_agent_kit-0.13.8-py3-none-any.whl
Algorithm Hash digest
SHA256 15fcfbee7ebc3202f5494d146a95a13f27180781e2f0376569bad3b59d554b78
MD5 aac2a57be572a9767865c7509524a66e
BLAKE2b-256 0f41f2c946faa5d82fce75e5a867e5337017805f7f2b90656cba51ddee83699b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page