Config-driven multi-agent orchestration kit with optional API/UI/MCP extras
Project description
Topaz Agent Kit
A powerful, config-driven multi-agent orchestration framework for building sophisticated AI workflows across multiple frameworks and protocols.
๐ What is Topaz Agent Kit?
Topaz Agent Kit is a lightweight, developer-focused toolkit that enables rapid creation, deployment, and orchestration of AI agent workflows. Instead of building from scratch, teams can assemble sophisticated multi-agent demos in hours, experiment freely across frameworks, and focus on delivering real business value.
โจ Key Features
- ๐ Framework Flexibility: Build with LangGraph, CrewAI, Agno, ADK, OAK, or MAF โ mix and match without being tied to a single ecosystem
- ๐ Protocol Choice: Create agents that work with A2A or IN-PROC protocols for flexible multi-agent communication
- ๐งฐ Rich Tool Ecosystem: Pre-built MCP toolkits for document processing, web search, math, email, SEC filings, and more
- ๐ผ๏ธ Multimodal Support: Native support for images, documents, and URLs across all frameworks โ agents process visual and document inputs directly
- โก Rapid Development: From idea to working demo in hours, not weeks
- ๐๏ธ Modern Web UI: Real-time agent interaction visualization with drag-and-drop file uploads, session management, AG-UI protocol, and script execution
- ๐ฑ App Mode: Config-driven UI apps with three canvas modes (Declarative, Agent, Hybrid), 47 widget types, and flexible layouts for collaborative AI-assisted editing
- ๐๏ธ Operations Center: Centralized case management interface with pipeline-specific views, dashboard analytics, Operations Assistant, and comprehensive case detail modals
- ๐ช Portal Mode: Customer-facing portal at
/portalwith config-based login, portal state (GET/PATCH), and an in-page assistant chat; one session per user, separate from main chat - ๐ช Enhanced HITL: Multi-type gates (approval, input, selection) with async HITL support for batch processing, dynamic flow control (retry, skip, stop), and context injection
- โก Async HITL: Non-blocking human-in-the-loop with checkpoint management, case queuing, and pipeline resumption for high-throughput workflows
- ๐ Document Intelligence: Complete RAG-as-a-Service with document upload, analysis, semantic search, and citation support
- ๐ 10 Execution Patterns: Sequential, parallel, repeat, loop, conditional, switch, handoff, group chat, nested patterns, and pipeline composition
- ๐ Pipeline Composition: Use entire pipelines as nodes within other pipelines for maximum reusability
- โก Event-Driven Pipelines: Automatic pipeline execution triggered by file system events, webhooks, or custom triggers
- ๐จ Auto-Generated Diagrams: Professional workflow diagrams with Graphviz for all pipelines
- ๐ง Prompt Intelligence: Automatic variable detection and context-aware agent generation
- ๐ฌ Intelligent Routing: LLM-based pipeline selection based on user intent
- ๐ SOP-Driven Agents: Standard Operating Procedure support for structured, consistent agent workflows
- ๐ง AgentOS Memory: Filesystem-based memory system with Unix-like commands for persistent agent memory, shared templates, and semantic search
- ๐ฆ Production Ready: Built-in monitoring, error handling, logging, and scalability features
๐๏ธ Architecture Overview
Multi-Framework Support
Topaz Agent Kit supports 6 major AI frameworks with unified orchestration:
| Framework | Description | Best For | Multimodal Support |
|---|---|---|---|
| LangGraph | State-based agent workflows | Complex stateful processes | โ Images, Documents, URLs |
| CrewAI | Collaborative agent teams | Multi-agent collaboration | โ ๏ธ URLs only (local file limitation) |
| Agno | Lightweight agent framework | Simple, fast agent tasks | โ Images, Documents, URLs |
| ADK | Google Agent Development Kit | Google ecosystem integration | โ Images, Documents, URLs |
| OAK | OpenAI Agents Kit | OpenAI model optimization | โ Images, Documents, URLs |
| MAF | Microsoft Agent Framework | Enterprise agent orchestration | โ Images, Documents, URLs |
Note: Semantic Kernel (SK) support has been removed due to dependency conflicts with A2UI protocol support. All SK agents have been migrated to Microsoft Agent Framework (MAF), which is Microsoft's recommended path forward. See Framework Limitations for details.
Multimodal Input Support: All frameworks (except CrewAI with limitations) can process images, documents, and URLs directly as agent input. Files are automatically preprocessed and passed to agents in framework-native formats:
- Images: Local files and URLs supported (base64-encoded for most frameworks)
- Documents: PDFs, text files, and other document types (extracted text included when available)
- URLs: Automatically detected in user messages and processed as images or documents
Note: CrewAI has known limitations with local image processing when using Azure OpenAI models. Image URLs work reliably; local files may require alternative approaches.
Framework Limitations
Semantic Kernel (SK) Removal
Status: โ Completely Removed - Semantic Kernel framework support has been completely removed from Topaz Agent Kit as of this version.
Reason: SK had a dependency conflict with A2UI protocol support:
- SK requires
pydantic<2.12 - A2UI requires
pydantic>=2.12.5 - These requirements are incompatible and cannot be resolved
What Was Removed:
SKBaseAgentclass and all SK framework code- SK framework configuration (
sk.yml) - SK from framework validation and factory registration
- All SK-related imports and dependencies
Migration: All existing SK agents have been migrated to Microsoft Agent Framework (MAF), which is Microsoft's recommended path forward for agent development. MAF provides similar capabilities to SK with better future support and full A2UI compatibility.
For Existing Projects: If you have SK agents in your project:
- Change
type: sktotype: mafin agent config files - Regenerate agents:
topaz-agent-kit generate <project_dir> - Test your agents (MAF has similar capabilities to SK)
For New Projects: Use MAF, LangGraph, CrewAI, Agno, ADK, or OAK frameworks. SK is no longer available.
Note: The SK base class (sk_base_agent.py) and configuration file (sk.yml) have been completely removed from the codebase. Attempting to use type: sk in agent configurations will result in validation errors.
See SK and A2UI Dependency Conflict Documentation for detailed migration guide and technical details.
Execution Patterns
Revolutionary pattern-based execution replaces complex graph definitions:
- Sequential: Run agents in order with dependency management
- Parallel: Execute multiple agents simultaneously for maximum efficiency
- Repeat: Execute the same agent multiple times in parallel with instance-specific inputs (new)
- Loop: Iterative execution with configurable termination conditions
- Conditional: Execute agents or entire branch based on dynamic conditions
- Switch: Route to different branches based on field values or expressions
- Handoff: LLM-driven routing to specialist agents with auto-generated orchestrator
- Group Chat: Multiple agents collaborate in a shared conversation (new)
- Nested: Combine patterns for complex workflows
- Pipeline Composition: Use entire pipelines as nodes within other pipelines for maximum reusability (new)
Pattern Containers & Grouping (UI)
- The UI renders pattern containers for every pattern_started/pattern_finished pair across all patterns (sequential, parallel, repeat, loop, switch, handoff, group_chat). Containers show name/description, status, timestamps, and elapsed time.
- Child items (agent cards, protocol chips, HITL gates, nested pattern containers) attach to a parent pattern via
parent_pattern_id, preserving hierarchy even for deep nesting and repeat/loop instances. - Concrete examples from starter pipelines:
- Haiku Writers Room (group_chat): One
group_chatcontainer with the three participants (haiku_form_expert,haiku_imagery_specialist,haiku_editor) as child cards in round order. - Math Compass (sequential + switch): Top-level sequential container; inside it a switch container (โComplexity Routerโ) with a nested sequential path for the TRUE branch containing the calculator agent card. Protocol chips sit immediately before their target card inside the parent container.
- TCI Policy Risk Assessor (loop + nested sequential + parallel + switch): Loop container per application; inside it a nested sequential โSingle Application Assessment Flowโ; inside that two parallel containers (โData Collection Phaseโ, โRisk Factor Assessmentโ) followed by a switch container (โDecision Routingโ) whose cases each render as nested sequential containers.
- ECI Claims Vetter (loop + nested sequential + parallel): Loop over claims; each iteration shows a sequential container (โSingle Claim Processing Flowโ) containing a parallel container (โParallel Validation Checksโ) and the involved agent cards.
- Handoff patterns (e.g., specialist routing): Handoff container with orchestrator card and the chosen specialist card inside; pattern_finished marks completion or failure.
- Haiku Writers Room (group_chat): One
- Backend emits
pattern_started/pattern_finishedwith name, description,parent_pattern_id, started_at/ended_at; the frontend groups by these IDs and sorts by timestamp/priority so workflow โ patterns โ cards/chips/HITL render in chronological order. Each repeat/loop instance and nested pattern gets a uniquepattern_id, preventing cross-iteration mixing.
Using Variables in Pattern Descriptions
Pattern descriptions support Jinja2 templating to display dynamic information about the current execution context. This is particularly useful for showing relevant data about items being processed in loops or conditional branches.
Key Concepts
-
Timing: Pattern descriptions are rendered BEFORE agents execute, so you can only use variables from:
- Scanner outputs (e.g.,
eci_pending_claims_scanner.pending_claims_list) - Loop items (e.g.,
current_claim,current_application) - Database fields already stored
- Cannot use: Extractor outputs or agent results (they haven't run yet)
- Scanner outputs (e.g.,
-
Variable Access: Use the same Jinja2 syntax as in agent inputs:
- Simple variables:
{{variable_name}} - Nested access:
{{agent_id.field}}or{{current_item.field}} - Filters:
{{amount | round(2)}} - Conditionals:
{% if condition %}...{% endif %}
- Simple variables:
-
Table Formatting: Use markdown tables to display structured data:
- Multi-column tables for loop descriptions (showing all items)
- Two-column tables (Field | Value) for single item details
Example: Loop Description (All Items)
Display a summary table of all items being processed:
pattern:
type: loop
name: "Claims Processing Loop"
description: |
## Iterative Claim Processing
Processes each pending claim through the complete vetting workflow.
{% if eci_pending_claims_scanner.pending_claims_list and eci_pending_claims_scanner.pending_claims_list | length > 0 %}
## Pending Claims Summary
| Claim ID | Seller | Claimant | Claim Amount | Claim Reason |
|----------|--------|----------|-------------|-------------|
{% for claim in eci_pending_claims_scanner.pending_claims_list %}
| {{ claim.claim_id }} | {{ claim.seller_name or 'N/A' }} | {{ claim.claimant_name or 'N/A' }} | {% if claim.invoice_amount %}${{ claim.invoice_amount | round(2) }}{% else %}N/A{% endif %} | {{ claim.claim_reason_category or 'N/A' }} |
{% endfor %}
{% endif %}
iterate_over: "eci_pending_claims_scanner.pending_claims_list"
loop_item_key: "current_claim"
Result: Shows a table with all pending claims before processing begins.
Example: Single Item Description (Two-Column Format)
Display detailed information about the current item being processed:
pattern:
type: sequential
name: "Single Claim Processing Flow"
description: |
## Individual Claim Vetting Process
Complete workflow for processing a single claim.
{% if current_claim %}
## Current Claim Details
| Field | Value |
|-------|-------|
| Claim ID | {{ current_claim.claim_id }} |
{% if current_claim.policy_number %} | Policy Number | {{ current_claim.policy_number }} |
{% endif %}{% if current_claim.seller_name %} | Seller | {{ current_claim.seller_name }} |
{% endif %}{% if current_claim.buyer_name %} | Buyer | {{ current_claim.buyer_name }} |
{% endif %}{% if current_claim.invoice_amount %} | Claim Amount | ${{ current_claim.invoice_amount | round(2) }} |
{% endif %}{% if current_claim.claim_reason_category %} | Claim Reason | {{ current_claim.claim_reason_category }} |
{% endif %}
{% endif %}
Result: Shows detailed information about the current claim in a two-column format.
Example: Application Loop
pattern:
type: loop
name: "Applications Processing Loop"
description: |
## Iterative Application Processing
Processes each pending application through the complete risk assessment workflow.
{% if tci_pending_applications_scanner.pending_applications_list and tci_pending_applications_scanner.pending_applications_list | length > 0 %}
## Pending Applications Summary
| Application ID | Applicant (Seller) | Buyer | Requested Credit Limit | Requested Term |
|----------------|-------------------|-------|----------------------|----------------|
{% for app in tci_pending_applications_scanner.pending_applications_list %}
| {{ app.application_id }} | {{ app.seller_name or 'N/A' }} | {{ app.buyer_name or 'N/A' }} | {% if app.requested_amount %}{{ app.currency_symbol or '$' }}{{ app.requested_amount | format_currency }}{% else %}N/A{% endif %} | {% if app.requested_term_days %}{{ app.requested_term_days }} days{% else %}N/A{% endif %} |
{% endfor %}
{% endif %}
iterate_over: "tci_pending_applications_scanner.pending_applications_list"
loop_item_key: "current_application"
Dynamic List Iteration (dynamic_iterate_over)
For recursive discovery or scenarios where new items are added during iteration, enable dynamic re-evaluation of the list:
pattern:
type: sequential
steps:
- node: scanner
- type: loop
iterate_over: "scanner.items_list"
loop_item_key: "current_item"
dynamic_iterate_over: true # Re-evaluate list before each iteration
termination:
max_iterations: 50 # Safety limit (required for dynamic mode)
body:
type: sequential
steps:
- node: processor
- node: discovery_agent # May add new items to scanner.items_list
- node: recorder
- node: finalizer
Key Features:
dynamic_iterate_over: true: Re-evaluates theiterate_overlist before each iteration- Picks up new items: Items added during iteration (e.g., by
discovery_agent) are automatically processed - Duplicate prevention: Tracks processed items to avoid processing the same item twice
- Safety limit: Always use
max_iterationsto prevent infinite loops
How It Works:
- Before each iteration, the loop re-resolves the
iterate_overpath - Filters out already processed items (using item ID or hash)
- Processes the first unprocessed item
- Continues until no new items are found or
max_iterationsis reached
Use Cases:
- Recursive Discovery: Finding related items that may lead to more items (e.g., ReconVoy pipeline)
- Database Updates: Processing items that may trigger new items to be added to the queue
- Dynamic Workloads: When the work list grows during processing
Example: Recursive Discovery:
- type: loop
iterate_over: "related_items_discovery.related_items"
loop_item_key: "related_item"
dynamic_iterate_over: true # Pick up newly discovered items
termination:
max_iterations: 50 # Safety limit
body:
type: sequential
steps:
- node: item_discovery # Finds foreign book matches
- node: related_items_discovery # May add more items to related_items list
Performance Note: Dynamic iteration re-evaluates the list on each iteration, which may have performance implications for expensive operations (database queries, complex context resolution). Use static iteration (dynamic_iterate_over: false or omitted) when the list doesn't change during processing.
Accessing Accumulated Loop Results
When accumulate_results is enabled (default: true), loop patterns automatically create *_instances aliases for downstream agents, similar to repeat patterns. This allows downstream agents to access all loop iteration results, not just the last one.
How it works:
- After the loop completes, for each agent that ran inside the loop body, a dictionary is created with the pattern
{agent_id}_instances - Each iteration's result is stored with a key like
{agent_id}_0,{agent_id}_1, etc. - Downstream agents can access all iterations using
{{agent_id}_instances}in their prompts or inputs
Example:
pattern:
type: sequential
steps:
- node: question_loader
- type: loop
iterate_over: "question_loader.questions_list"
loop_item_key: "current_question"
accumulate_results: true # Default, but explicit for clarity
body:
type: sequential
steps:
- node: validator # Runs for each question
- node: summary_reporter # Can access all validator results
In the summary_reporter agent configuration, you can access all validation results:
# config/agents/summary_reporter.yml
prompt:
inputs:
inline: |
- Total Questions: {{question_loader.total_count}}
- Validation Results: {{validator_instances}}
The validator_instances dictionary will contain:
{
"validator_0": {"validation_results": {...}, "tools_used": {...}},
"validator_1": {"validation_results": {...}, "tools_used": {...}},
"validator_2": {"validation_results": {...}, "tools_used": {...}}
}
Key Points:
- Only agents that run inside the loop body get
*_instancesaliases - The alias is created automatically when
accumulate_results=true(default) - Instance keys follow the pattern
{agent_id}_{iteration_index}(0-based) - Prefers structured
parsedoutput when available, otherwise uses raw result - This mirrors repeat pattern behavior for consistency across patterns
When to use accumulate_results:
- โ Enable (default): When you need downstream agents to process all loop results (e.g., summary reports, aggregations, batch processing)
- โ Disable: When you only need the final iteration's result or want to reduce memory usage for very large loops
Best Practices
-
Use Conditional Rendering: Always check if variables exist before using them:
{% if current_claim.seller_name %} | Seller | {{ current_claim.seller_name }} |{% endif %}
-
Provide Fallbacks: Use
or 'N/A'for optional fields:{{ claim.seller_name or 'N/A' }}
-
Format Numbers: Use built-in Jinja2 filters for currency and decimals:
{{ amount | format_currency }} # "125,000.00" {{ 0.85 | format_percentage }} # "85.0%" {{ amount | format_number(decimals=2) }} # "125,000.00"
See Available Jinja2 Filters for all formatting options.
-
Loop Descriptions: Show all items in a multi-column table
-
Single Item Descriptions: Use two-column format (Field | Value) for readability
-
Scanner Data: Ensure your scanner agents extract all needed fields from the database:
SELECT claim_id, seller_name, claimant_name, invoice_amount, claim_reason_category FROM claims WHERE status = 'pending'
-
Table Formatting: Use Jinja2 whitespace control to ensure proper markdown table formatting:
- Use
{%-and-%}to strip whitespace around tags - Separate conditional rows onto their own lines
- Example:
| Field | Value | |-------|-------| | Claim ID | {{ current_claim.claim_id }} | {%- if current_claim.seller_name %} | Seller | {{ current_claim.seller_name }} | {%- endif %} {%- if current_claim.buyer_name %} | Buyer | {{ current_claim.buyer_name }} | {%- endif %}
For loop tables:
| Claim ID | Seller | Claimant | |----------|--------|----------| {%- for claim in scanner.pending_claims_list %} | {{ claim.claim_id }} | {{ claim.seller_name }} | {{ claim.claimant_name }} | {%- endfor %}
Why: Jinja2 whitespace control (
{%-and-%}) prevents extra blank lines that break markdown table formatting. - Use
Markdown Table Formatting Best Practices
Based on testing and validation, follow these rules to ensure tables render correctly:
Critical Rules:
-
No Blank Lines Within Tables: Blank lines between table rows break markdown table formatting
โ BAD: {% if condition %} | Status | {{ status }} | {% endif %} โ GOOD: {%- if condition %} | Status | {{ status }} | {%- endif %}
-
Whitespace Control for Conditionals: Always use
{%-and-%}for conditionals in table cellsโ BAD: | Amount | {% if sym %}{{ sym }}{{ amount | format_currency }}{% else %}N/A{% endif %} | โ GOOD: | Amount | {%- if sym %}{{ sym }}{{ amount | format_currency }}{%- else %}N/A{%- endif %} |
-
Nested Conditionals: Apply whitespace control to conditionals, but NOT to
{% set %}statements (they need newlines for markdown tables)โ BAD (whitespace control on set strips newline): | Amount | {%- if has_amount %}{%- set sym = currency_symbol %}{%- if sym | length >= 3 %}{{ sym }} {{ amount }}{%- else %}{{ sym }}{{ amount }}{%- endif %}{%- else %}N/A{%- endif %} | โ GOOD (no whitespace control on set, preserves newlines): | Amount | {%- if has_amount %}{% set sym = currency_symbol %}{%- if sym | length >= 3 %}{{ sym }} {{ amount }}{%- else %}{{ sym }}{{ amount }}{%- endif %}{%- else %}N/A{%- endif %} |
-
checks_table Pattern: Use
{{ checks_table }}(not{{- checks_table -}}) to preserve newlines, and ensure blank line after</summary>. Use{% if %}(NOT{%- if %}) for conditionals around checks_table to preserve newlines.โ BAD (strips newlines, breaks table): {%- if agent.checks_table %}{{- agent.checks_table -}}{%- else %} **Summary:** {{ agent.details }} {%- endif %} โ BAD (no blank line after </summary>, table won't render): <details> <summary>Test Summary</summary> {%- if agent.checks_table %} {{ agent.checks_table }} {%- endif %} </details> โ GOOD (preserves newlines, blank line after </summary>, no whitespace control on if): <details> <summary>Test Summary</summary> {% if agent.checks_table %} {{ agent.checks_table }} {% endif %} </details>
-
Set Statements Before Table Rows: Use
{% set %}(NOT{%- set %}) when used before markdown tables, especially in<details>blocks. The whitespace control strips newlines that markdown tables need.โ BAD (whitespace control strips newline, breaks table rendering): <details> <summary>Test Summary</summary> {%- set fh_score = assessor.score if assessor else None %} {% if agent.checks_table %} {{ agent.checks_table }} {% endif %} </details> โ GOOD (no whitespace control on set, preserves newlines): <details> <summary>Test Summary</summary> {% set fh_score = assessor.score if assessor else None %} {% if agent.checks_table %} {{ agent.checks_table }} {% endif %} </details>
Also: Put
{% set %}statements on separate lines before table rows, not inlineโ BAD (set on same line as table row, breaks formatting): | Risk Factor | Score | Weight | Weighted Score | |-------------|-------|--------|----------------| {% set fh_score_val = tci_financial_health_assessor.score if tci_financial_health_assessor else None %}| Financial Health | {{ fh_score_val | default_if_none }} | 0.22 | {{ fh_weighted | default_if_none }} | โ GOOD (set on separate line before table row): | Risk Factor | Score | Weight | Weighted Score | |-------------|-------|--------|----------------| {% set fh_score_val = tci_financial_health_assessor.score if tci_financial_health_assessor else None %} {% set fh_weighted = (fh_score_val * 0.22) | round(2) if fh_score_val is not none else None %} | Financial Health | {{ fh_score_val | default_if_none }} | 0.22 | {{ fh_weighted | default_if_none }} |
-
Filters Don't Introduce Whitespace: Filters themselves are safe, but Jinja2 tags need control
โ SAFE (filters don't add whitespace): | Amount | {{ amount | format_currency }} | | Status | {{ status | default_if_none }} | โ ๏ธ NEEDS CONTROL (tags add whitespace): | Amount | {%- set sym = currency_symbol %}{{ amount | format_currency(currency_symbol=sym) }} |
-
Tables Inside HTML Tags: Markdown tables inside HTML tags (like
<details>) require a blank line after the closing tagโ BAD (table starts on same line as </summary>): <details> <summary>Test Summary</summary> | Check | Status | |-------|--------| | Test | PASS | </details> โ GOOD (blank line after </summary>): <details> <summary>Test Summary</summary> | Check | Status | |-------|--------| | Test | PASS | </details>
Key Learnings from Testing:
- Blank lines within tables: Break markdown rendering - never include blank lines between table rows
- Blank lines before tables: Required when tables are inside HTML tags (e.g.,
<details>) - checks_table pattern: Use
{{ checks_table }}(not{{- checks_table -}}) to preserve newlines that are part of the table structure - Whitespace control for conditionals: Use
{%-and-%}for conditionals and loops within table cells - Set statements: Use
{% set %}(NOT{%- set %}) when used before markdown tables - whitespace control strips newlines that markdown parsers need - Set statement placement: Place
{% set %}statements on separate lines before table rows, not inline
Testing: Use the tests/test_markdown_tables.py script to validate table formatting:
python tests/test_markdown_tables.py
# or
python -m pytest tests/test_markdown_tables.py -v
The test script generates markdown files in tests/output/markdown_tables/ for visual inspection and validates:
- No blank lines within tables
- Tables don't start on the same line as previous content
- Consistent column counts across all table rows
- Proper whitespace control in conditionals
Testing: Use the tests/test_markdown_tables.py script to validate table formatting before deployment:
python tests/test_markdown_tables.py
# or
python -m pytest tests/test_markdown_tables.py -v
Available Jinja2 Filters
The Topaz Agent Kit provides a comprehensive set of Jinja2 filters for formatting and styling in templates. These filters are automatically available in:
- Pattern descriptions (pipeline YAML files)
- HITL gate descriptions
- Agent input templates (YAML
inputs.inlinesections)
Number Formatting:
{{ 125000 | format_currency }} # "125,000.00"
{{ 125000 | format_currency(decimals=0) }} # "125,000"
{{ 1250.5 | format_number(decimals=2) }} # "1,250.50"
{{ 0.85 | format_percentage }} # "85.0%"
{{ 0.8523 | format_percentage(decimals=2) }} # "85.23%"
Score/Risk Color Coding:
{# For risk scores where lower is better (0-25 green, 26-50 amber, etc.) #}
<span style="color: {{ risk_score | risk_score_color }};">{{ risk_score }}</span>
{# For credit/quality scores where higher is better (85-100 green, 70-84 amber, etc.) #}
<span style="color: {{ credit_score | credit_score_color }};">{{ credit_score }}</span>
{# Generic score color with custom thresholds #}
<span style="color: {{ score | score_color(thresholds=[(80, "#green"), (60, "#yellow"), (0, "#red")]) }};">{{ score }}</span>
Text Formatting:
{{ long_text | truncate_text(50) }} # Truncate to 50 chars with "..."
{{ 5 | pluralize("item") }} # "items"
{{ 1 | pluralize("child", "children") }} # "child"
{{ text | highlight_text("search term") }} # Highlight search terms
Date/Time Formatting:
{{ "2025-01-28" | format_date }} # "2025-01-28"
{{ "2025-01-28" | format_date("%B %d, %Y") }} # "January 28, 2025"
{{ 3665 | format_duration }} # "1 hour 1 minute 5 seconds"
{{ 3665 | format_duration(compact=True) }} # "1h 1m 5s"
Data Formatting:
{{ 1572864 | format_file_size }} # "1.5 MB"
{{ "1234567890" | mask_sensitive(4) }} # "1234******"
{{ "1234567890" | format_phone }} # "(123) 456-7890"
Utility Functions:
{{ 10 | safe_divide(2) }} # 5.0
{{ 10 | safe_divide(0, "N/A") }} # "N/A"
{{ None | default_if_none("โ") }} # "โ"
Complete Filter Reference:
| Filter | Description | Example |
|---|---|---|
format_currency(value, decimals=2) |
Format as currency with commas | 125000 โ "125,000.00" |
format_number(value, decimals=0, thousands_sep=",") |
Format number with optional decimals | 1250.5 โ "1,250.50" |
format_percentage(value, decimals=1, multiply=True) |
Format as percentage | 0.85 โ "85.0%" |
risk_score_color(value) |
Color for risk scores (lower is better) | 15 โ "#22c55e" (green) |
credit_score_color(value) |
Color for credit scores (higher is better) | 90 โ "#22c55e" (green) |
score_color(value, thresholds, low_is_better) |
Generic score color with custom thresholds | See examples above |
truncate_text(value, max_length=100, suffix="...") |
Truncate text with suffix | "Very long text" โ "Very long..." |
pluralize(value, singular, plural=None) |
Singular/plural form | 5 โ "items" |
format_date(value, format_str="%Y-%m-%d") |
Format date/datetime | "2025-01-28" โ "January 28, 2025" |
format_duration(seconds, compact=False) |
Format duration | 3665 โ "1 hour 1 minute 5 seconds" |
format_file_size(value, binary=False) |
Format bytes as file size | 1572864 โ "1.5 MB" |
mask_sensitive(value, visible_chars=4, mask_char="*") |
Mask sensitive data | "1234567890" โ "1234******" |
format_phone(value, format_str="us") |
Format phone number | "1234567890" โ "(123) 456-7890" |
safe_divide(numerator, denominator, default=0) |
Safe division | 10 / 0 โ 0 |
default_if_none(value, default="N/A") |
Default for None values | None โ "N/A" |
Available Variables by Context
- Loop Description: Access the full list from scanner (e.g.,
scanner.pending_list) - Loop Body (Sequential/Parallel): Access current item via
loop_item_key(e.g.,current_claim,current_application) - Conditional/Switch Branches: Access variables from upstream agents or context
- Not Available: Extractor outputs or downstream agent results (they haven't executed yet)
Hierarchy snapshots (cards/chips/patterns)
-
Haiku Writers Room (group_chat)
- group_chat (Haiku Writers Room)
- card: haiku_form_expert
- card: haiku_imagery_specialist
- card: haiku_editor
- group_chat (Haiku Writers Room)
-
Math Compass (sequential + switch)
- sequential: Math Problem Solving Flow
- chip: orchestrator โ math_strategist
- card: math_strategist
- switch: Complexity Router
- sequential (TRUE case)
- chip: math_strategist โ math_calculator
- card: math_calculator
- sequential (TRUE case)
- sequential: Math Problem Solving Flow
-
TCI Policy Risk Assessor (loop โ sequential โ parallel โ switch)
- loop: Applications Processing Loop (one per application)
- sequential: Single Application Assessment Flow
- parallel: Data Collection Phase (collectors)
- parallel: Risk Factor Assessment (risk scorers)
- switch: Decision Routing
- sequential: case Modified Terms
- sequential: case Rejection
- sequential: case Escalation
- sequential: case Information Request
- sequential: Single Application Assessment Flow
- loop: Applications Processing Loop (one per application)
-
ECI Claims Vetter (loop โ sequential โ parallel)
- loop: Claims Processing Loop (one per claim)
- sequential: Single Claim Processing Flow
- parallel: Parallel Validation Checks (e.g., history lookup, eligibility)
- sequential: Single Claim Processing Flow
- loop: Claims Processing Loop (one per claim)
-
Handoff
- handoff:
Protocol Support
Flexible agent communication protocols:
- A2A (Agent-to-Agent): Direct agent interaction patterns via SDK (default for remote agents)
- IN-PROC: Local in-process execution for performance (automatic for local agents)
- Mixed: Use different protocols per agent in the same workflow
- Auto-detection: Local agents automatically use IN-PROC regardless of pattern protocol
Why A2A Only?
ACP (Agent Communication Protocol) support has been removed as of version 0.3.0. This change was made for the following reasons:
-
Official Deprecation: The ACP SDK has been officially deprecated and merged with A2A under the Linux Foundation (August 2025). The ACP SDK is no longer actively maintained and new features are not being added.
-
Simplified Configuration: Removing ACP support significantly simplifies agent configuration:
- Single
urlfield instead ofurls.a2aandurls.acpobjects - No
protocol_supportarray needed - Cleaner, more maintainable YAML files
- Reduced port management complexity
- Single
-
Unified Protocol: A2A protocol now supports all ACP functionality plus additional features like path-based routing, making it the single recommended protocol for remote agent communication.
-
Future-Proof: A2A is actively developed with community support and is the long-term standard for agent-to-agent communication.
Migration: If you have existing agents using ACP, they should be migrated to A2A. The A2A SDK provides full compatibility with ACP features, and migration is straightforward - simply update the remote.url field in your agent configurations.
๐งฐ Built-in MCP Toolkits
Topaz Agent Kit includes 15 comprehensive MCP toolkits with 75+ tools ready to use:
๐ Document Intelligence
DocExtract Toolkit (doc_extract_*)
Extract structured data, tables, and metadata from documents.
doc_extract_structured_data: Extract structured data from PDFs, DOCX, PPTX, HTML, Markdown using AI-powered field extractiondoc_extract_tables: Extract tables from documents with structure preservationdoc_extract_metadata: Extract document metadata (title, author, creation date, etc.)
DocRAG Toolkit (doc_rag_*)
Document retrieval and semantic search with ChromaDB.
doc_rag_query_document: Query documents using semantic search with citation supportdoc_rag_list_documents: List all documents in the collection
ImageRAG Toolkit (image_rag_*)
Image analysis and OCR-based search.
image_rag_query_images: Query images using OCR text content with semantic searchimage_rag_list_images: List all images in the collection
๐ Web & Search
Browser Toolkit (browser_*)
Web scraping and automation.
browser_scrape_website_content: Scrape and extract content from websites
Serper API Toolkit (serper_api_*)
Web search integration via Serper API.
serper_api_search_internet: Search the internet with Serper APIserper_api_search_news: Search news articles with Serper API
SEC API Toolkit (sec_api_*)
SEC filings and financial document retrieval.
sec_api_search_10q: Search and retrieve SEC 10-Q quarterly reportssec_api_search_10k: Search and retrieve SEC 10-K annual reports
๐ข Math & Analysis
Math Toolkit (math_*)
Advanced mathematical operations and problem-solving.
Basic Operations:
math_multiply: Multiply two numbersmath_evaluate_expression: Evaluate mathematical expressions safelymath_percentage_of: Calculate percentage of a valuemath_percent_change: Calculate percent change between two valuesmath_fraction_to_decimal: Convert fraction to decimalmath_ceil_divide: Ceiling division (round up)
Advanced Math:
math_solve_equations: Solve systems of equationsmath_solve_inequality: Solve inequalitiesmath_compute_log: Compute logarithms (natural or custom base)math_compute_exp: Compute exponential functionsmath_trig_functions: Compute trigonometric functions (sin, cos, tan, etc.)math_differentiate: Differentiate mathematical expressionsmath_integrate_expr: Integrate mathematical expressions
Text Processing:
math_summarize: Summarize text (rule-based)math_llm_summarize: Summarize text using LLMmath_extract_quantities: Extract numerical quantities from textmath_llm_parse_math_problem: Parse math problems using LLMmath_sanitize_expression: Sanitize mathematical expressions for safe evaluation
๐ง Communication
Email Toolkit (email_*)
Gmail integration for email management via SimpleGmail.
Email Operations:
email_send: Send emails with HTML/plain text, attachments, and custom headersemail_list_messages: List messages from a label/folder with paginationemail_read_message: Read full message content with attachmentsemail_search: Search emails by query with filtersemail_move: Move emails between labels/foldersemail_mark_read: Mark emails as reademail_mark_unread: Mark emails as unreademail_delete: Delete emailsemail_download_attachment: Download email attachments to local path
Email Management:
email_list_labels: List all Gmail labels/folders
Helper Tools:
email_get_company_info: Retrieve company informationemail_get_email_signature: Generate professional email signatureemail_spell_check: Perform spell check on text
๐๏ธ Database
SQLite Toolkit (sqlite_*)
SQLite database operations.
sqlite_query: Execute SELECT queries and return results as JSONsqlite_execute: Execute INSERT, UPDATE, DELETE statementssqlite_schema: Get table schema information
๐ Filesystem
Filesystem Toolkit (fs_*)
Basic file and directory operations.
fs_listdir: List directory contentsfs_makedirs: Create directories (withexist_okoption)fs_move_file: Move files between locations (with automatic directory creation)
โ๏ธ Travel
Flights Toolkit (flight_*)
Flight search and booking via Amadeus API.
flight_search: Search for flights with flexible date optionsflight_price: Get detailed pricing for flight offersflight_seatmap: Get seat map informationroute_price_metrics: Get price metrics for routesairline_lookup: Lookup airline information by IATA codesflight_book: Book flights with traveler and payment informationflight_order_get: Get flight order detailsflight_order_cancel: Cancel flight orders
Hotels Toolkit (hotel_*)
Hotel search and booking via Amadeus API.
hotel_search: Search for hotels by location, amenities, ratingshotel_offers: Get hotel offers with pricinghotel_offer_details: Get detailed offer informationhotel_book: Book hotels with guest and payment information
Activities Toolkit (activities_*)
Travel activities and points of interest via Amadeus API.
activities_search: Search for activities by location, date, keywordsactivity_details: Get detailed activity informationpois_search: Search for points of interest
๐ ๏ธ Utilities
Common Toolkit (common_*)
General utility functions.
common_ocr_reader: Extract text from images using OCRcommon_form_parser: Parse form data from textcommon_entity_normalizer: Normalize entity names and valuescommon_read_image: Read and process image filescommon_read_document: Read and process document files
Insurance Toolkit (insurance_*)
Insurance-specific domain tools.
insurance_policy_lookup: Lookup insurance policy informationinsurance_actuarial_calculator: Perform actuarial calculationsinsurance_severity_classifier: Classify claim severityinsurance_fraud_scoring: Score claims for fraud riskinsurance_duplicate_claim_checker: Check for duplicate claimsinsurance_anomaly_detector: Detect anomalies in claims data
๐ง AgentOS Memory Toolkit (agentos_*)
Filesystem-based memory system for agents to store, retrieve, and search information using Unix-like commands.
agentos_shell: Execute shell commands in a sandboxed filesystem (ls,cat,echo,grep,semgrep,mkdir, etc.)
Key Features:
- Filesystem as Memory: Agents use familiar Unix commands to manage memory
- Pipeline-Level Shared Memory: Share templates, company info, and reference data across agents
- Agent-Level Individual Memory: Each agent has isolated memory for personal data
- Auto-Indexing: Files automatically indexed for semantic search (
semgrep) - Security: Sandboxed execution with path traversal protection
- Configurable Structure: Define memory directories in YAML configuration
Example Usage:
# Agent configuration with AgentOS memory
mcp:
servers:
- url: "http://localhost:8050/mcp"
toolkits: ["agentos_memory"]
tools: ["agentos_shell"]
memory:
inherit: true # Inherit shared memory from pipeline
directories:
- path: "/memory/senders/"
description: "Sender preferences and history"
readonly: false
auto_index: true
- path: "/workspace/"
description: "Working directory for drafts"
readonly: false
auto_index: false
Agent Prompt Example:
{{agentos_memory_section}}
## Workflow:
1. Check sender history: `agentos_shell("ls /memory/senders/")`
2. Load preferences: `agentos_shell("cat /memory/senders/{email}/preferences.md")`
3. Access shared templates: `agentos_shell("cat /shared/email_templates/greetings/formal.md")`
4. Store new data: `agentos_shell('echo "content" > /memory/senders/{email}/preferences.md')`
5. Semantic search: `agentos_shell('semgrep "similar email pattern"')`
Pipeline-Level Shared Memory:
# Pipeline configuration
memory:
shared:
directories:
- path: "/shared/email_templates/"
description: "Email template library (READ-ONLY)"
readonly: true
auto_index: true
bootstrap: true
- path: "/shared/company_info/"
description: "Company information (READ-ONLY)"
readonly: true
auto_index: true
bootstrap: true
Best Practices:
- Use
/shared/for read-only reference data (templates, company info, policies) - Use
/memory/for agent-specific data that persists across sessions - Use
/workspace/for temporary working files - Enable
auto_index: truefor directories you want to search semantically - Set
readonly: truefor shared directories to prevent accidental modifications
For detailed documentation, see the AgentOS Memory System section below.
๐ง External MCP Server Integration
Connect to external MCP servers for enterprise integrations:
mcp:
servers:
- url: "http://enterprise-mcp-server:8080/mcp"
toolkits: ["enterprise", "database"]
tools: ["enterprise_*", "db_*"]
Features:
- Connect to any MCP-compatible server
- Custom toolkits and tools from external servers
- Automatic tool discovery and registration
- Secure credential management
๐ Quick Start
Installation
From PyPI (Recommended)
# Install Topaz Agent Kit using pip
pip install topaz-agent-kit
# Or with optional dependencies
pip install topaz-agent-kit[fastapi,mcp,ui]
# Install using uv (requires --prerelease=allow due to dependency requirements)
uv add --prerelease=allow topaz-agent-kit
# Or with optional dependencies
uv add --prerelease=allow "topaz-agent-kit[fastapi,mcp,ui]"
Note for uv users: The --prerelease=allow flag may be required for some dependencies that use pre-release versions. This is typically needed for agent-framework (MAF) beta versions.
Reproducible Installation (Exact Versions)
For reproducible installs with exact dependency versions matching the development environment:
Using pip:
# Download requirements.txt from the package source distribution
# Or extract it from the installed package:
pip show -f topaz-agent-kit | grep requirements.txt
# Install with exact versions
pip install -r requirements.txt topaz-agent-kit
Using uv (Recommended):
# Option 1: Install with exact versions using uv pip (pip-compatible)
uv pip install -r requirements.txt topaz-agent-kit
# Option 2: Sync environment to match requirements.txt exactly
uv pip sync requirements.txt
uv pip install topaz-agent-kit
# Option 3: If you have a project with pyproject.toml, you can also:
# Extract requirements.txt from source distribution, then:
uv pip install -r requirements.txt topaz-agent-kit
Why use exact versions? The requirements.txt file contains exact pinned versions of all dependencies that were tested and verified during development. This ensures:
- โ Same dependency versions as the development environment
- โ Reproducible builds across different machines
- โ Avoids version conflicts and compatibility issues
Note: The requirements.txt file is automatically generated from uv.lock during the build process and is included in the source distribution. Both pip and uv can use this file directly.
From Local Wheel File
If you've built the package locally or have a wheel file:
# Using uv (recommended for projects using uv)
uv add --prerelease=allow /path/to/topaz_agent_kit-0.3.0-py3-none-any.whl
# Or using pip
pip install /path/to/topaz_agent_kit-0.3.0-py3-none-any.whl
# Or using uv pip
uv pip install /path/to/topaz_agent_kit-0.3.0-py3-none-any.whl
Note: Replace 0.3.0 with the actual version number and update the path to your wheel file location.
Create Your First Project
# Quick start with default basic template (recommended for beginners)
topaz-agent-kit init .
# Or specify a template explicitly
topaz-agent-kit init -f basic ./my_project # Basic foundation (same as above)
topaz-agent-kit init -s ensemble ./my_project # Full starter with examples
Run Your Agents
# Start web interface (recommended)
topaz-agent-kit serve fastapi --project ./my_project
# Or command-line interface
topaz-agent-kit serve cli --project ./my_project
# Or MCP server
topaz-agent-kit serve mcp --project ./my_project
Generate Workflow Diagrams
Topaz Agent Kit automatically generates professional workflow diagrams for all your pipelines using Graphviz:
# Diagrams are auto-generated during project initialization
topaz-agent-kit init . # Default basic template
topaz-agent-kit init -s ensemble ./my_project # Or use starter template
# Or generate diagrams manually for existing projects
topaz-agent-kit generate --project ./my_project
Generated Files: projects/ensemble/ui/static/assets/{pipeline}_workflow.{dot,svg}
Visual Elements
- Agent Nodes: Blue rounded rectangles with agent names
- HITL Gates: Colored diamonds (blue=approval, purple=input, teal=selection)
- SWITCH Pattern: Amber hexagon for conditional routing
- Conditional Nodes/Gates: Gray diamonds with TRUE/FALSE paths
- LOOP Pattern: Self-loop with iteration count label
- HANDOFF Pattern: Gray circles for virtual orchestrator with dotted edges to specialists
- GROUP CHAT: Ellipse hub with solid edges to participants, dashed returns, LOOP self-loop, END edge label (ALL CAPS)
- Arrow Labels: ALL CAPS labels showing gate paths (APPROVE, REJECT, TRUE, FALSE)
Example Workflow
START โ Agent A โ HITL Gate โ True โ Agent B โ END
โ
Reject โ END
Group Chat Pattern
Multiple agents collaborate in a shared conversation. Supports:
- Selection strategies:
round_robin,llm(custom or virtual orchestrator) - Termination:
max_roundsand/orcondition - Diagramming: Central hub, dashed returns, LOOP self-loop, END edge labeled in ALL CAPS
YAML example (Haiku Writers Room):
pattern:
type: group_chat
participants:
- node: haiku_form_exper
- node: haiku_imagery_specialist
- node: haiku_editor
selection_strategy: round_robin
termination:
max_rounds: 9
condition: "contains(last_message, 'APPROVED')"
Generate workflow diagrams:
# Diagrams auto-generated during init
topaz-agent-kit init -s ensemble ./my_project
# Or generate manually
topaz-agent-kit generate --project ./my_project
Outputs:
projects/ensemble/ui/static/assets/haiku_writers_room_workflow.svg
Implementation references:
- Pattern runner:
src/topaz_agent_kit/core/execution_patterns.py(GroupChatRunner) - Graphviz generator:
src/topaz_agent_kit/cli/graphviz_generator.py
Event-Driven Pipelines
Pipelines can be automatically triggered by external events, enabling reactive workflows that respond to file system changes, webhooks, database updates, or custom triggers.
Key Features
- ๐ Multiple Trigger Types: File watcher (filesystem events), webhook (HTTP), database (row changes), scheduled (cron), and extensible for custom triggers
- ๐ File Watcher: Monitor directories for file creation, modification, deletion, or move events
- ๐ฏ Pattern Matching: Use wildcards to filter files (e.g.,
*.txt,contract_*.pdf) - ๐ Context Extraction: Jinja2 templates extract context from trigger events (file paths, metadata, etc.)
- ๐ Session Strategies: Control how sessions are managed (
per_file,per_pipeline,custom) - โก Non-Blocking: Event triggers don't interfere with normal user-initiated pipeline execution
Configuration
Add event_triggers section to your pipeline configuration:
name: "Math Repeater"
description: "Solves math problems from files automatically"
# Event triggers configuration
event_triggers:
type: "file_watcher"
watch_directory: "data/repeat"
file_patterns:
- "*.txt"
event_types:
- "created"
extract_context:
user_text_template: "Solve problems in {{source}}"
session_strategy: "per_file"
# Normal pipeline pattern (unchanged)
pattern:
type: sequential
steps:
- node: math_repeater_parser
# ... rest of pattern
Trigger Types
File Watcher (currently supported):
- Monitors a directory for file system events
- Supports wildcard patterns (
*.txt,contract_*.pdf) - Event types:
created,modified,deleted,moved - Automatically passes file paths to pipeline execution
Future trigger types (extensible architecture):
webhook: HTTP POST triggersdatabase: Row insert/update triggersscheduled: Cron-based scheduling- Custom triggers via plugin system
Context Extraction
Use Jinja2 templates to extract context from trigger events:
extract_context:
user_text_template: "Process file: {{file_name}} ({{file_size}} bytes)"
Available variables (file_watcher):
{{source}}- Full file path{{file_path}}- Alias for source{{file_name}}- Filename only{{file_size}}- File size in bytes{{event_type}}- Event type (created, modified, etc.)- All metadata fields from the trigger event
Session Strategies
Control how chat sessions are managed for triggered executions:
per_file(default): New session for each file event - isolated, no shared contextper_pipeline: One session for all events - accumulates context across filescustom: Pipeline-specific logic (e.g., percontract_id, peruser_id)
Visual Representation
Workflow diagrams show event triggers as an alternative entry point:
[FILE WATCHER] โโ(dashed)โโ> START โ Agent A โ Agent B โ END
โ
โ (watching: data/repeat/*.txt)
- Trigger node: Folder icon, light blue, shows trigger type
- Dashed edge: Indicates alternative entry point
- Normal flow: Unchanged - user-initiated execution still works
Normal Execution Unchanged
Important: Event triggers are additive - they don't change normal pipeline execution:
- โ
User-initiated execution:
START โ agents โ END(unchanged) - โ
Event-triggered execution:
TRIGGER โ START โ agents โ END(new) - โ
Both paths converge at
START, so the rest of the workflow is identical
Example Use Cases
- Document Processing: Auto-process contracts when uploaded to
data/contracts/ - Batch Processing: Process math problems when files appear in
data/problems/ - Contract Lifecycle: Trigger sub-pipelines when contract files are created/modified
- Data Pipeline: Process CSV files automatically when dropped in watch directory
Implementation
Event triggers are managed by the TriggerManager service:
- Startup: Triggers are initialized when FastAPI starts
- Monitoring: Background services watch for events
- Execution: Events trigger pipeline execution via orchestrator
- Shutdown: Triggers are gracefully stopped on app shutdown
Code location:
- Trigger system:
src/topaz_agent_kit/core/triggers/ - File watcher:
src/topaz_agent_kit/core/triggers/file_watcher.py - Manager:
src/topaz_agent_kit/core/triggers/manager.py - Integration:
src/topaz_agent_kit/services/fastapi_app.py
๐ ๏ธ Creating New Pipelines
Pipeline Generation Workflow
Topaz Agent Kit includes a comprehensive, interactive workflow for generating complete pipeline configurations from a use case description. This workflow guides you through requirements gathering, workflow design, file generation, and validation.
When to Use the Workflow
Use the pipeline generation workflow when you need to:
โ
Create a new pipeline from scratch
โ
Generate all configuration files (pipeline config, agent configs, prompts, UI manifests)
โ
Ensure production-ready output with proper validation
โ
Follow best practices and naming conventions
โ
Avoid common mistakes (conflicts, incorrect context variables, duplicate file extraction)
Don't use this workflow for:
- โ Minor modifications to existing pipelines (edit files directly)
- โ Quick experiments or prototypes (use simpler approaches)
How to Use the Workflow
For Users:
-
Start a conversation with your AI assistant (e.g., Cursor Composer, ChatGPT, Claude)
-
Request pipeline generation:
I need to create a new pipeline. Please follow the pipeline generation workflow.Or be more specific:
I want to create a pipeline for [your use case]. Follow the pipeline generation workflow. -
Work interactively through the workflow:
- Answer questions about your requirements
- Review proposals at each checkpoint
- Approve or request changes
- Confirm before file generation
-
Review generated files and test the pipeline
Example Usage:
User: "I need to create a pipeline for analyzing customer feedback emails
and generating response suggestions."
AI Assistant: "I'll help you create this pipeline. Let me follow the pipeline
generation workflow. First, let me understand your use case better..."
The AI will guide you through:
- Step 1: Comprehensive requirements gathering (use case, agents, patterns, HITL gates, etc.)
- Step 2: Workflow design and proposal
- Step 3: Interactive refinement
- Step 4: File generation (all config files, prompts, UI manifests)
- Step 5: Validation and summary
Workflow Features
- 6 Review Checkpoints: The workflow includes explicit checkpoints where the AI pauses for your approval
- Complete File Generation: Generates all necessary files (pipeline config, agent configs, prompts, UI manifests, icons)
- Conflict Detection: Automatically checks for agent ID conflicts with existing files
- Validation: Validates all generated files before completion
- Best Practices: Follows naming conventions, context variable patterns, and MCP tool usage guidelines
What Gets Generated
The workflow generates all files needed for a complete pipeline:
- Pipeline Config:
config/pipelines/{pipeline_id}.yml(backend logic) - Agent Configs:
config/agents/{agent_id}.yml(one per agent) - Prompt Templates:
config/prompts/{agent_id}.jinja(one per agent) - UI Manifest:
config/ui_manifests/{pipeline_id}.yml(display metadata) - Assets: Icons and workflow diagrams
- Main Config Updates: Updates to
config/pipelines.yml(registry) andconfig/ui_manifest.yml(global UI settings) - Assistant Classification: Updates
assistant_intent_classifier.jinjafor pipeline discovery
All files are placed in: src/topaz_agent_kit/templates/starters/ensemble/
Documentation
For detailed information about the workflow, see the complete workflow guide:
- Workflow Guide:
docs/workflows/pipeline_generation_workflow.md
Quick Example
# 1. Start conversation with AI assistant
# 2. Request: "I need to create a pipeline for customer support ticket analysis.
# Follow the pipeline generation workflow."
# 3. Work through the interactive workflow:
# - Answer questions about requirements
# - Review workflow proposals
# - Approve file generation
# - Review validation results
# 4. Test your new pipeline:
topaz-agent-kit serve fastapi --project ./my_project
The workflow ensures you get production-ready, validated pipeline configurations that follow all best practices and naming conventions.
๐ง Local Tools: Pipeline-Specific Python Functions
Local tools are the strongest case for pipeline-specific business logic. They allow you to implement deterministic, schema-aware operations that agents can call directly, with automatic adaptation across all supported frameworks.
When to Use Local Tools
โ Use local tools for:
- Pipeline-specific database operations: Schema-aware queries, validations, aggregations
- Deterministic business logic: Billing calculations, statistical aggregations, simulations
- Data transformations: Complex joins, data validation, format conversions specific to your domain
- Operations requiring correctness guarantees: Critical calculations that must be reproducible and testable
โ Don't use local tools for:
- Generic SQL queries โ Use
sqlite_queryMCP tool - Generic file operations โ Use
fs_*MCP tools - External API calls โ Use MCP tools or external services
- Simple text generation โ Let the LLM handle it
How to Define Local Tools
1. Create a tool module in your project:
projects/your_pipeline/
tools/
your_pipeline/
your_tools.py
2. Implement tools with @pipeline_tool decorator:
# tools/your_pipeline/your_tools.py
from topaz_agent_kit.local_tools.registry import pipeline_tool
from typing import Dict, Any, Optional
import sqlite3
@pipeline_tool(toolkit="your_pipeline", name="validate_and_summarize")
def validate_and_summarize(
db_file: str,
target_state: str
) -> Dict[str, Any]:
"""Validate database schema and summarize data.
Args:
db_file: Absolute path to SQLite database file
target_state: Target state name
Returns:
Dictionary with validation results and summary
"""
conn = sqlite3.connect(db_file)
try:
# Your schema-aware validation and aggregation logic
# ...
return {
"ok": True,
"target_state": target_state,
"summary": {...}
}
finally:
conn.close()
@pipeline_tool(toolkit="your_pipeline", name="calculate_bill")
def calculate_bill(
db_file: str,
customer_id: str,
usage_kwh: float
) -> Dict[str, Any]:
"""Calculate bill for a customer.
Args:
db_file: Absolute path to database
customer_id: Customer identifier
usage_kwh: Usage in kilowatt-hours
Returns:
Dictionary with billing details
"""
# Your billing calculation logic
# ...
return {"total": 123.45, "breakdown": {...}}
Key Requirements:
- Use type annotations for all parameters (required for framework adaptation)
- Provide docstrings (used as tool descriptions)
- Use
@pipeline_tool(toolkit="...", name="...")decorator - Return structured data (dict, list, or JSON-serializable types)
How to Use Local Tools in Agents
Wire tools into agent configuration:
# config/agents/your_agent.yml
local_tools:
modules:
- tools.your_pipeline.your_tools
toolkits: ["your_pipeline"]
tools: ["your_pipeline.*"] # Pattern: all tools in toolkit
# Or specify individual tools:
# tools: ["your_pipeline.validate_and_summarize", "your_pipeline.calculate_bill"]
Configuration Options:
modules: List of Python module paths (relative to project root)toolkits: List of toolkit names to filter bytools: List of tool name patterns (supports glob patterns like"your_pipeline.*")
Framework Compatibility
Local tools are automatically adapted for all supported frameworks via FrameworkToolAdapter:
- โ CrewAI: Tools converted to CrewAI Tool objects
- โ LangGraph: Tools converted to LangChain StructuredTool objects
- โ Microsoft Agent Framework (MAF): Tools integrated via MCPStreamableHTTPTool
- โ OAK: Tools converted to FunctionTool objects
- โ ADK: Tools passed as callables
- โ MAF: Tools used directly with Pydantic model generation
- โ Agno: Tools passed as callables
The same tool implementation works across all frameworks - no framework-specific code needed!
Complete Example: Rate Case Pipeline
See the ensemble starter for a complete reference:
- Tool Implementation:
projects/ensemble/tools/rate_case_filing_navigator/rate_case_tools.py - Agent Configuration:
projects/ensemble/config/agents/rate_case_data_summarizer.yml
The rate case pipeline demonstrates:
- Schema-aware database operations
- Complex billing calculations (tiered rates, time-of-use)
- Data validation and aggregation
- Multiple tools in a single toolkit
- Tools used across multiple agents and frameworks
Best Practices
- Group related tools in a single module under a toolkit
- Use descriptive names that indicate the toolkit (e.g.,
rate_case_validate_and_summarize) - Provide comprehensive docstrings - they become tool descriptions
- Use type annotations - required for proper framework adaptation
- Handle errors gracefully - log errors and return structured error responses
- Keep tools focused - one tool, one responsibility
๐ฏ Advanced Features
๐ง Prompt Intelligence Engine
Automatically detects variables in prompts and generates agent classes with proper context handling. The Prompt Intelligence Engine supports multiple variable syntaxes for flexible input handling.
Input Variable Options
When defining agent inputs in YAML files, you can use several variable syntaxes:
1. Simple Variables
Reference variables from the main execution context:
inputs:
inline: |
User request: {{user_text}}
Current date: {{current_date}}
Resolution: Variables are resolved from the main context, HITL results, or upstream agent outputs (in that order).
2. Agent-Prefixed Variables
Explicitly reference variables from specific upstream agents:
inputs:
inline: |
Problem: {{user_text}}
Strategist Output:
- Expression: {{math_strategist.expression}}
- Steps: {{math_strategist.steps}}
Calculator Output:
- Result: {{math_calculator.result}}
- Rationale: {{math_calculator.rationale}}
Syntax: {{agent_id.variable_name}}
Resolution: Directly accesses the specified field from the upstream agent's parsed output.
Benefits:
- Explicit dependency declaration
- Clear data flow visualization
- Prevents ambiguity when multiple agents have similar field names
3. Expression Variables
Use complex expressions with conditional logic, operators, and functions:
inputs:
inline: |
Expression: {{math_strategist.expression if math_strategist.expression else 'No expression provided'}}
Step count: {{len(math_strategist.steps) if math_strategist.steps else 0}}
Status: {{'Ready' if math_calculator.result else 'Pending'}}
Supported Operators:
- Comparison:
==,!=,>,<,>=,<= - Boolean:
AND,OR,NOT - String:
contains,starts_with,ends_with,in,not in - Null checks:
is null,is not null - Functions:
len(array)for array length - Ternary:
A if B else C(Python-style conditional expressions)
Examples:
# Ternary expression with default value
{{upstream_agent.field if upstream_agent.field else 'default_value'}}
# Conditional based on array length
{{'Complex' if len(agent.steps) > 5 else 'Simple'}}
# Nested field access with fallback
{{agent.nested.field if agent.nested else 'N/A'}}
# Boolean logic
{{'High' if agent.score > 0.8 AND agent.verified else 'Low'}}
Resolution: Expressions are evaluated at runtime using the ExpressionEvaluator, which has access to all upstream agent outputs and context variables.
4. Variables with Default Values
Provide default values for variables that might not exist:
inputs:
inline: |
Topic: {{topic:General Discussion}}
Tone: {{tone:Professional}}
Max length: {{max_length:1000}}
Syntax: {{variable_name:default_value}}
Resolution: If the variable is not found in context, the default value is used.
Note: Default values work with simple variables. For prefixed variables or expressions, use ternary expressions instead.
5. Mixed Usage
Combine all variable types in a single prompt:
inputs:
task:
description:
inline: |
User Request: {{user_text}}
Previous Agent Analysis:
- Expression: {{math_strategist.expression if math_strategist.expression else 'Not provided'}}
- Steps: {{math_strategist.steps}}
- Complexity: {{'High' if len(math_strategist.steps) > 5 else 'Low'}}
Current Calculation:
- Result: {{math_calculator.result}}
- Rationale: {{math_calculator.rationale if math_calculator.rationale else 'No rationale available'}}
Settings:
- Max iterations: {{max_iterations:10}}
- Debug mode: {{debug_mode:false}}
Variable Resolution Order
Variables are resolved in the following order:
- Main Context: Root-level context variables (e.g.,
user_text,current_date) - HITL Results: Data from human-in-the-loop gates (e.g.,
user_preferences,approval_feedback) - Upstream Agents: Outputs from agents that executed earlier in the pipeline
- For simple variables: Searches all upstream agents automatically
- For prefixed variables: Directly accesses the specified agent's output
- For expressions: Evaluates using all available context
INPUTS Tab Display
The INPUTS tab in the UI shows only variables that were explicitly defined in your YAML configuration:
- โ
Shown: Variables explicitly used in
inputs,task.description,instruction, etc. - โ Hidden: System variables (
context,pipeline_data), upstream agent dicts (added for Jinja2 expressions), and internal variables
This ensures the INPUTS tab displays only user-facing variables that are relevant to understanding the agent's input.
Backward Compatibility
All existing variable syntaxes continue to work:
- Simple variables:
{{variable_name}}โ - Default values:
{{variable_name:default}}โ - Jinja2 filters:
{{variable_name | filter}}โ
New syntaxes are additive and don't break existing configurations.
Best Practices
- Use Prefixed Variables for Clarity: When referencing upstream agents, use
{{agent_id.field}}for explicit dependencies - Use Expressions for Logic: Complex conditional logic should use expressions rather than multiple simple variables
- Provide Defaults: Use default values or ternary expressions to handle missing data gracefully
- Document Dependencies: Prefixed variables make data flow explicit and easier to understand
- Test Expressions: Verify complex expressions work correctly with your data structures
Example: Math Compass Pipeline
# math_calculator.yml
inputs:
inline: |
Expression: {{math_strategist.expression if math_strategist.expression else 'No expression provided'}}
Steps: {{math_strategist.steps}}
# math_auditor.yml
inputs:
task:
description:
inline: |
Problem: {{user_text}}
Strategist Output:
- Expression: {{math_strategist.expression}}
- Steps: {{math_strategist.steps}}
Calculator Output:
- Result: {{math_calculator.result}}
- Rationale: {{math_calculator.rationale}}
Features:
- Automatic variable extraction from all syntaxes
- Context-aware generation with proper resolution logic
- Template rendering with Jinja2
- Type-safe variable injection
- Expression evaluation with full operator support
- Backward compatible with existing configurations
๐ Repeat Pattern
The Repeat Pattern allows you to execute the same agent multiple times in parallel, with each instance receiving unique input data. This is perfect for processing arrays of items, batch operations, or parallelizing independent tasks.
Basic Configuration
pattern:
type: sequential
steps:
- node: data_parser
- type: parallel
repeat:
node: processor_agent
instances: "data_parser.item_count"
input_mapping:
item: "data_parser.items[index]"
item_index: "index"
instance_id_template: "{{node_id}}_{{index}}"
- node: aggregator_agent
Key Parameters
node(required): The agent ID to repeatinstances(required): Number of instances (integer) or expression string (e.g.,"parser.count")input_mapping(optional): Maps input variable names to templates with{{index}}substitutioninstance_id_template(optional): Template for generating unique instance IDs (default:"{{node_id}}_instance_{{index}}")instance_context_key(optional): Context key for instance metadata (default:"repeat_instance")
Instance-Specific Inputs
Use input_mapping to provide unique data to each instance:
repeat:
node: math_solver
instances: "problem_parser.problem_count"
input_mapping:
problem_text: "problem_parser.problems[index]"
problem_index: "index"
How it works:
{{index}}is automatically replaced with the instance number (0, 1, 2, ...)- Array indexing like
problems[index]extracts the specific item for each instance - Each instance receives its own copy of the mapped variables
Accessing Instance Results in Downstream Agents
After all instances complete, their results are aggregated into a dictionary accessible by downstream agents. Use the pattern {base_agent_id}_instances to access all instance results:
# In downstream agent's inputs section
inputs:
inline: |
{% if math_solver_instances %}
Solver Results:
{% for instance_id, solver_data in math_solver_instances.items() %}
- {{instance_id}}:
Problem: {{solver_data.problem_text}}
Answer: {{solver_data.answer}}
Explanation: {{solver_data.explanation}}
{% endfor %}
{% endif %}
Key Points:
- The aggregated dictionary uses the key
{base_agent_id}_instances(e.g.,math_solver_instances) - Each entry is keyed by the instance ID (e.g.,
math_solver_0,math_solver_1) - The value is the parsed output from that instance
- Use Jinja2
{% for %}loops to iterate over all instances - Loop variables (
instance_id,solver_data) are automatically filtered out from INPUTS tab
Important: Using Loop Variables in Templates
The prompt intelligence engine only detects and expands loop variables that are used directly in {{ }} expressions. Variables used only in {% set %} statements are not detected for expansion in the INPUTS tab.
โ
Correct - Use variables directly in {{ }} expressions:
inputs:
inline: |
{% for instance_id, file_data in file_report_generator_instances.items() %}
- {{instance_id}}:
File Name: {{file_data.file_report_generator.file_name}}
Report: {{file_data.file_report_generator.report_md}}
{% endfor %}
โ Incorrect - Using {% set %} prevents variable detection:
inputs:
inline: |
{% for instance_id, file_data in file_report_generator_instances.items() %}
{% set file_report = file_data.file_report_generator %}
- {{instance_id}}:
File Name: {{file_report.file_name}} # file_data won't be expanded in INPUTS tab
{% endfor %}
Why? The prompt intelligence engine scans {{ }} expressions to identify variables for the INPUTS tab. Variables used only in {% set %} statements are not detected, so they won't appear in the INPUTS tab with expanded values (e.g., file_data[0], file_data[1]).
Best Practice: Use loop variables directly in {{ }} expressions instead of {% set %} for better visibility in the INPUTS tab.
Complete Example: Math Problem Solver
# Pipeline configuration
pattern:
type: sequential
steps:
- node: math_repeater_parser
- type: parallel
repeat:
node: math_repeater_solver
instances: "math_repeater_parser.problem_count"
instance_id_template: "{{node_id}}_{{index}}"
input_mapping:
problem_text: "math_repeater_parser.problems[index]"
problem_index: "index"
- node: math_repeater_report_generator
# Parser agent outputs: {problem_count: 3, problems: ["2+2", "3*4", "10/2"]}
# Each solver instance receives:
# Instance 0: problem_text="2+2", problem_index=0
# Instance 1: problem_text="3*4", problem_index=1
# Instance 2: problem_text="10/2", problem_index=2
# Report generator accesses all results:
inputs:
inline: |
Problem Count: {{math_repeater_parser.problem_count}}
{% if math_repeater_solver_instances %}
Solutions:
{% for instance_id, solver_data in math_repeater_solver_instances.items() %}
- Problem {{solver_data.problem_index}}: {{solver_data.problem_text}}
Answer: {{solver_data.answer}}
{% endfor %}
{% endif %}
Instance Context Metadata
Each instance has access to metadata via the instance_context_key:
# In agent's prompt template
inputs:
inline: |
Processing item {{problem_instance.index}} of {{problem_instance.instance_id}}
Item: {{problem_text}}
Available metadata:
{{instance_context_key}}.index: The instance index (0, 1, 2, ...){{instance_context_key}}.instance_id: The unique instance ID (e.g.,math_solver_0)
Best Practices
- Use Dynamic Instance Counts: Use expressions like
"parser.count"instead of hardcoded numbers - Clear Variable Names: Use descriptive names in
input_mapping(e.g.,problem_textnottext) - Access Results via
_instancesDictionary: Always use the{base_agent_id}_instancespattern in downstream agents - Handle Empty Results: Use
{% if agent_instances %}checks before iterating - Unique Instance IDs: Customize
instance_id_templateif you need specific ID formats
Use Cases
- Batch Processing: Process multiple files, documents, or data items in parallel
- Parallel Problem Solving: Solve multiple independent problems simultaneously
- Data Validation: Validate multiple records concurrently
- Content Generation: Generate multiple variations or responses in parallel
- API Calls: Make multiple independent API calls simultaneously
๐ Enhanced Repeat Pattern (Sequential Repeat)
The Enhanced Repeat Pattern extends the basic repeat pattern to allow repeating an entire sequence of agents in parallel. This enables complex workflows where each instance runs through a multi-step process independently.
Configuration
pattern:
type: sequential
steps:
- node: folder_scanner
- type: parallel
repeat:
type: sequential # Enhanced repeat: nested sequential pattern
instances: "folder_scanner.file_count"
instance_id_template: "file_{{index}}"
instance_context_key: "file_instance"
steps:
- node: file_reader
input_mapping:
file_path: "folder_scanner.file_paths[index]"
file_index: "index"
- node: processor
- node: report_generator
- node: final_aggregator
Key Differences from Basic Repeat
type: sequential(required): Indicates this is an enhanced repeat with nested sequential patternsteps(required): Array of agents/steps to run in sequence for each instanceinput_mapping: Can be defined globally (applies to all agents) or per-step (overrides global)- Each instance runs the entire sequence independently and in parallel with other instances
How It Works
- Instance Creation: Creates N instances based on
instancesexpression - Parallel Execution: All instances run their sequences in parallel
- Sequence Execution: Within each instance, agents run sequentially
- Result Aggregation: Results from the last agent in each sequence are aggregated
Example: Multi-File Processing
pattern:
type: sequential
steps:
- node: folder_scanner # Scans folder, outputs: {file_count: 3, file_paths: [...]}
- type: parallel
repeat:
type: sequential
instances: "folder_scanner.file_count" # 3 instances
instance_id_template: "file_{{index}}"
instance_context_key: "file_instance"
steps:
- node: file_reader # Reads file for this instance
input_mapping:
file_path: "folder_scanner.file_paths[index]"
- node: processor # Processes the file
- node: file_reporter # Generates report for this file
- node: final_aggregator # Aggregates all file reports
Execution Flow:
- 3 parallel sequences run simultaneously:
- Instance 0:
file_reader_0โprocessor_0โfile_reporter_0 - Instance 1:
file_reader_1โprocessor_1โfile_reporter_1 - Instance 2:
file_reader_2โprocessor_2โfile_reporter_2
- Instance 0:
- All sequences complete before
final_aggregatorruns
Accessing Results from Enhanced Repeat
Results from the last agent in each sequence are aggregated into {last_agent_id}_instances:
# In final_aggregator's inputs
inputs:
inline: |
{% if file_reporter_instances %}
File Reports:
{% for instance_id, file_data in file_reporter_instances.items() %}
- {{instance_id}}: {{file_data.report_md}}
{% endfor %}
{% endif %}
๐ Nested Repeat Patterns
You can nest repeat patterns within enhanced repeat patterns to create complex hierarchical workflows. This is useful when each instance needs to process multiple sub-items.
Configuration
pattern:
type: sequential
steps:
- node: folder_scanner
- type: parallel
repeat:
type: sequential # Enhanced repeat: process each file
instances: "folder_scanner.file_count"
instance_id_template: "file_{{index}}"
instance_context_key: "file_instance"
steps:
- node: file_reader
input_mapping:
file_path: "folder_scanner.file_paths[index]"
- type: parallel
repeat: # Nested repeat: process each problem in the file
node: problem_solver
instances: "file_reader.problem_count"
input_mapping:
problem_text: "file_reader.problems[index]"
problem_index: "index"
instance_id_template: "{{node_id}}_{{index}}"
- node: file_report_generator
- node: final_report_generator
Instance ID Structure
Nested repeat patterns create hierarchical instance IDs:
- Format:
{base_agent_id}_{parent_index}_{nested_index} - Example:
problem_solver_0_0= File 0, Problem 0 - Display: "Problem Solver (Instance 1.1)" = File 1, Problem 1 (1-based)
ID Naming Convention:
- Parent index (first number): The outer repeat pattern instance (e.g., file index)
- Nested index (second number): The inner repeat pattern instance (e.g., problem index)
- Both indices are 0-based in IDs, but displayed as 1-based in UI
Example: Multi-File Problem Solver
# Pipeline processes multiple files, each containing multiple problems
pattern:
type: sequential
steps:
- node: folder_scanner # Finds 2 files
- type: parallel
repeat:
type: sequential
instances: "folder_scanner.file_count" # 2 instances (file_0, file_1)
instance_id_template: "file_{{index}}"
steps:
- node: file_reader # Reads file_0.txt (3 problems) or file_1.txt (2 problems)
- type: parallel
repeat: # Nested: solve problems within each file
node: problem_solver
instances: "file_reader.problem_count"
input_mapping:
problem_text: "file_reader.problems[index]"
instance_id_template: "{{node_id}}_{{index}}"
- node: file_report_generator # Report for this file's problems
- node: final_report_generator # Aggregates all file reports
Instance IDs Created:
file_reader_0(file 0)problem_solver_0_0(file 0, problem 0)problem_solver_0_1(file 0, problem 1)problem_solver_0_2(file 0, problem 2)file_report_generator_0(file 0)file_reader_1(file 1)problem_solver_1_0(file 1, problem 0)problem_solver_1_1(file 1, problem 1)file_report_generator_1(file 1)
Accessing Nested Instance Results
Nested repeat patterns create scoped instance dictionaries to avoid conflicts:
# In file_report_generator (within file_0 instance)
inputs:
inline: |
# Access problem solvers for THIS file only
{% if problem_solver_instances %}
Solutions for this file:
{% for instance_id, solver_data in problem_solver_instances.items() %}
- {{solver_data.problem_text}}: {{solver_data.answer}}
{% endfor %}
{% endif %}
Scoping:
- Within each file instance,
problem_solver_instancescontains only that file's problem solvers - The system automatically scopes nested instance dictionaries to prevent cross-contamination
- Both scoped (
problem_solver_instances_file_0) and unscoped (problem_solver_instances) keys are available
Note on Loop Variables: When accessing nested structures (e.g., file_data.file_report_generator.file_name), use variables directly in {{ }} expressions rather than {% set %} statements. This ensures the prompt intelligence engine can detect and expand loop variables for the INPUTS tab. See the "Important: Using Loop Variables in Templates" section above for details.
Frontend Visibility
Agent Cards:
- Each instance appears as a separate card in the UI
- Instance numbers are displayed in the format: "Agent Name (Instance N)" or "Agent Name (Instance Parent.Nested)"
- Example: "Problem Solver (Instance 1.1)" for file 1, problem 1
INPUTS Tab:
- Shows instance-specific variables from
input_mapping - Displays loop variables (e.g.,
instance_id,solver_data) when iterating over instances - Filters out internal context variables automatically
OUTPUTS Tab:
- Shows the parsed output for each instance
- Instance ID is displayed in the output JSON (
agent_idfield) - Each instance's output is stored separately in
upstreamcontext
Protocol Chips:
- Each instance shows its own protocol (local/remote)
- Instances are visually grouped by their parent pattern
Important Nuances
-
Instance ID Uniqueness: Nested patterns ensure unique IDs by including parent index
problem_solver_0_0(file 0, problem 0) vsproblem_solver_0_1(file 1, problem 0)- This prevents conflicts when multiple parent instances run in parallel
-
Context Scoping: Nested patterns automatically scope instance dictionaries
problem_solver_instances_file_0(scoped to file 0)problem_solver_instances(unscoped, accessible within file context)
-
Input Mapping Inheritance:
- Global
input_mappingapplies to all agents in the sequence - Step-level
input_mappingoverrides global mapping for that step - Nested patterns inherit parent context automatically
- Global
-
Result Aggregation:
- Enhanced repeat: Aggregates results from the last agent in each sequence
- Nested repeat: Aggregates results from the nested agent, scoped to parent instance
-
Instance Context Key:
- Default:
repeat_instance(basic repeat) - Customizable:
file_instance,problem_instance, etc. - Access via:
{{instance_context_key.index}}and{{instance_context_key.instance_id}}
- Default:
Complete Example: Enhanced Math Repeater
# Pipeline: Process multiple math problem files, solve problems in each file, generate reports
pattern:
type: sequential
steps:
- node: folder_scanner # Scans folder, finds files
- type: parallel
repeat:
type: sequential # Enhanced repeat: process each file
instances: "folder_scanner.file_count"
instance_id_template: "file_{{index}}"
instance_context_key: "file_instance"
steps:
- node: file_reader # Read file
input_mapping:
file_path: "folder_scanner.file_paths[index]"
file_index: "index"
- type: parallel
repeat: # Nested repeat: solve each problem
node: problem_solver
instances: "file_reader.problem_count"
input_mapping:
problem_text: "file_reader.problems[index]"
problem_index: "index"
instance_id_template: "{{node_id}}_{{index}}"
- node: file_report_generator # Report for this file
- node: final_report_generator # Aggregate all file reports
# file_report_generator accesses problem_solver_instances (scoped to this file)
# final_report_generator accesses file_report_generator_instances (all files)
Best Practices for Nested Patterns
- Use Descriptive Instance Context Keys:
file_instance,problem_instanceinstead of genericrepeat_instance - Customize Instance ID Templates: Use meaningful templates like
"file_{{index}}"for clarity - Scope Awareness: Understand that nested instance dictionaries are automatically scoped
- Access Patterns: Use unscoped keys within the same parent instance context
- Error Handling: Each instance fails independently; failed instances are reported separately
- Performance: Nested patterns can create many parallel instances; monitor resource usage
๐ Pipeline Composition (Pipeline as Node)
Pipeline Composition allows you to use an entire pipeline as a node within another pipeline. This powerful feature enables maximum code reusability, modular workflow design, and the ability to compose complex workflows from simpler, tested pipelines.
Overview
Instead of copying agents or recreating functionality, you can reference an existing pipeline and use it as a single step in a new pipeline. The sub-pipeline executes with its own isolated context, and its results are made available to the parent pipeline in a structured format.
Basic Configuration
# Parent pipeline configuration
pipelines:
- id: math_repeater
pipeline_file: "pipelines/math_repeater.yml"
nodes:
- id: folder_scanner
# config_file is optional - defaults to "agents/folder_scanner.yml" if not specified
- id: final_report_generator
# config_file is optional - defaults to "agents/final_report_generator.yml" if not specified
pattern:
type: sequential
steps:
- node: folder_scanner
- pipeline: math_repeater # Use pipeline as a node
input_mapping:
user_text: "{{folder_scanner.file_paths[0]}}"
- node: final_report_generator
Pipeline Registry
Similar to the nodes registry, you define a pipelines registry in your pipeline configuration:
pipelines:
- id: math_repeater
pipeline_file: "pipelines/math_repeater.yml" # Relative to config/pipelines/
- id: data_processor
pipeline_file: "pipelines/data_processor.yml"
Important:
pipeline_fileis relative to theconfig/pipelines/directory of the parent pipeline- Pipeline IDs must be unique within a pipeline configuration
- Circular dependencies are detected and prevented during validation
Using Pipelines in Patterns
You can use pipeline: instead of node: in any pattern step:
pattern:
type: sequential
steps:
- node: folder_scanner
- pipeline: math_repeater # Pipeline as a step
input_mapping:
user_text: "{{folder_scanner.file_path}}"
- node: aggregator
Input Mapping
Sub-pipelines can receive inputs from the parent pipeline through input_mapping:
pattern:
type: sequential
steps:
- node: folder_scanner
- pipeline: math_repeater
input_mapping:
user_text: "{{folder_scanner.file_paths[index]}}" # Dynamic input per instance
config: "{{folder_scanner.config}}"
How Input Mapping Works:
- The
input_mappingis evaluated in the parent pipeline's context - Values are set in the sub-pipeline's context before execution
- The sub-pipeline's first node can access these values via
{{variable_name}} - If the sub-pipeline has an
inputssection defined, it takes precedence and can transform the mapped values
Accessing Sub-Pipeline Outputs
Sub-pipeline results are stored in a structured format in the parent's context:
# Access individual node outputs
{{math_repeater.nodes.math_repeater_parser.problem_count}}
{{math_repeater.nodes.math_repeater_solver.answer}}
{{math_repeater.nodes.math_repeater_report_generator.report_md}}
# Access final output (if outputs.final is configured)
{{math_repeater.report_md}}
{{math_repeater.summary}}
Output Structure:
context.upstream[pipeline_id] = {
"result": final_output, # Final pipeline output
"parsed": final_parsed, # Parsed final output
"nodes": { # Individual node outputs
"node_id": {
"result": node_result,
"parsed": node_parsed
}
},
"intermediate": { # Intermediate outputs (if configured)
"output_id": {
"value": intermediate_value
}
}
}
Pipeline in Repeat Patterns
You can use pipelines in repeat patterns to process multiple items:
pattern:
type: sequential
steps:
- node: folder_scanner
- type: parallel
repeat:
pipeline: math_repeater # Repeat pipeline for each file
instances: "folder_scanner.file_count"
instance_id_template: "{{pipeline_id}}_instance_{{index}}"
instance_context_key: "file_instance"
input_mapping:
user_text: "{{folder_scanner.file_paths[index]}}"
- node: final_report_generator
Instance Results Access:
When a pipeline is repeated, all instances are collected in {pipeline_id}_instances:
# In final_report_generator inputs
inputs:
inline: |
{% if math_repeater_instances %}
Pipeline Instance Results:
{% for instance_id, instance_data in math_repeater_instances.items() %}
- {{instance_id}}:
Report: {{instance_data.nodes.math_repeater_report_generator.report_md}}
Total Problems: {{instance_data.nodes.math_repeater_report_generator.total_problems}}
Solved: {{instance_data.nodes.math_repeater_report_generator.solved_count}}
{% endfor %}
{% endif %}
Context Isolation and Sharing
Isolated Context:
- Sub-pipelines execute with an isolated upstream context (deep copy)
- Sub-pipeline agents write to their own upstream copy
- Parent pipeline's upstream is not affected by sub-pipeline execution
Shared Context:
- Read-only data is shared (shallow copy):
project_dir,emitter,user_profiles, etc. - Sub-pipelines can read parent's upstream data for input mapping expressions
- Sub-pipelines cannot write to parent's upstream (isolation)
Context Flow:
Parent Context
โโโ upstream (deep copied for sub-pipeline)
โ โโโ parent_node_1 (readable by sub-pipeline)
โ โโโ parent_node_2 (readable by sub-pipeline)
โโโ project_dir (shared)
โโโ emitter (shared)
โโโ user_profiles (shared)
Sub-Pipeline Context (isolated)
โโโ upstream (isolated copy)
โ โโโ parent_node_1 (read-only, from parent)
โ โโโ parent_node_2 (read-only, from parent)
โ โโโ sub_node_1 (writable, isolated)
โ โโโ sub_node_2 (writable, isolated)
โโโ [input_mapping variables] (set before execution)
Technical Details
Agent Factory Isolation:
- Sub-pipelines create their own
AgentFactorywith merged configuration - Parent's
agent_factoryis removed from context before sub-pipeline execution - Sub-pipeline's
AgentFactorycan find agents from both parent and sub-pipeline configs
Agent Bus Isolation:
- Sub-pipelines create their own
AgentBuswith merged configuration - Each sub-pipeline has its own agent registry and routing logic
- Remote agents in sub-pipelines are properly routed
Configuration Merging:
- Parent pipeline config is merged with sub-pipeline config
- Sub-pipeline's
nodestake precedence (for sub-pipeline execution) - Parent's
nodesremain available (for parent pipeline execution)
Result Storage:
- Sub-pipeline results are stored using
instance_pipeline_id(for repeat patterns) orpipeline_id(for single execution) - Results are filtered to include only sub-pipeline nodes (not parent nodes)
- Results are stored in nested structure:
context.upstream[pipeline_id]
Pattern Type Support
Pipeline composition is supported across all pattern types:
| Pattern Type | Support | Notes |
|---|---|---|
| Sequential | โ Full | Pipelines execute in order |
| Parallel | โ Full | Multiple pipelines execute simultaneously |
| Repeat | โ Full | Pipeline instances run in parallel with unique inputs |
| Loop | โ Full | Pipelines can be looped with termination conditions |
| Conditional | โ Full | Pipelines execute based on conditions |
| Switch | โ Full | Different pipelines in different branches |
| Handoff | โ ๏ธ Limited | Orchestrator must reference pipeline's final output node |
| Group Chat | โ Full | Pipelines can participate as group chat members |
Complete Example: Multi-File Processing with Pipeline Reuse
# Parent pipeline: pipeline_math_repeater.yml
pipelines:
- id: math_repeater
pipeline_file: "pipelines/math_repeater.yml"
nodes:
- id: enhanced_math_repeater_folder_scanner
config_file: "agents/enhanced_math_repeater_folder_scanner.yml"
- id: enhanced_math_repeater_final_report_generator
config_file: "agents/enhanced_math_repeater_final_report_generator.yml"
pattern:
type: sequential
steps:
- node: enhanced_math_repeater_folder_scanner
- type: parallel
repeat:
pipeline: math_repeater # Reuse math_repeater pipeline
instances: "enhanced_math_repeater_folder_scanner.file_count"
instance_id_template: "{{pipeline_id}}_instance_{{index}}"
instance_context_key: "file_instance"
input_mapping:
user_text: "{{enhanced_math_repeater_folder_scanner.file_paths[index]}}"
- node: enhanced_math_repeater_final_report_generator
# Sub-pipeline: math_repeater.yml
nodes:
- id: math_repeater_parser
config_file: "agents/math_repeater_parser.yml"
- id: math_repeater_solver
config_file: "agents/math_repeater_solver.yml"
- id: math_repeater_report_generator
config_file: "agents/math_repeater_report_generator.yml"
pattern:
type: sequential
steps:
- node: math_repeater_parser
- type: parallel
repeat:
node: math_repeater_solver
instances: "math_repeater_parser.problem_count"
input_mapping:
problem_text: "math_repeater_parser.problems[index]"
- node: math_repeater_report_generator
outputs:
final:
node: math_repeater_report_generator
selectors: ["report_md", "total_problems", "solved_count"]
Execution Flow:
enhanced_math_repeater_folder_scannerscans folder, finds 2 filesmath_repeaterpipeline runs 2 instances in parallel:- Instance 0: Processes
file_0.txtโ parser โ solver(s) โ report - Instance 1: Processes
file_1.txtโ parser โ solver(s) โ report
- Instance 0: Processes
enhanced_math_repeater_final_report_generatoraggregates all reports
Final Report Generator Access:
# In enhanced_math_repeater_final_report_generator inputs
inputs:
inline: |
Folder Scanner Results:
File Count: {{enhanced_math_repeater_folder_scanner.file_count}}
{% if math_repeater_instances %}
Math Repeater Pipeline Instance Results:
{% for instance_id, instance_data in math_repeater_instances.items() %}
- {{instance_id}}:
Report: {{instance_data.nodes.math_repeater_report_generator.report_md}}
Total Problems: {{instance_data.nodes.math_repeater_report_generator.total_problems}}
Solved: {{instance_data.nodes.math_repeater_report_generator.solved_count}}
{% endfor %}
{% endif %}
Visual Representation
Pipeline nodes are visually distinct in generated workflow diagrams:
- Color: Light orange background (
#FFF3E0) with dark orange border (#FF6F00) - Label: Shows pipeline name (e.g., "Math Repeater Pipeline")
- Instance Info: For repeat patterns, shows "Pipeline Instance N"
Best Practices
- Reuse Over Recreate: Use pipeline composition instead of copying agents
- Clear Input Mapping: Explicitly map inputs from parent to sub-pipeline
- Output Configuration: Define
outputs.finalin sub-pipelines for clean final output access - Instance ID Templates: Use descriptive templates like
"{{pipeline_id}}_instance_{{index}}" - Context Awareness: Understand that sub-pipelines have isolated upstream contexts
- Error Handling: Sub-pipeline failures are isolated and reported separately
- Agent Reuse: Reuse agents across pipelines to avoid duplication (service generation handles this automatically)
Common Patterns
Pattern 1: Sequential Pipeline Composition
steps:
- node: preprocessor
- pipeline: processor # Process with reusable pipeline
- node: postprocessor
Pattern 2: Parallel Pipeline Execution
steps:
- type: parallel
steps:
- pipeline: data_processor
- pipeline: image_processor
- pipeline: text_processor
Pattern 3: Pipeline in Repeat Pattern
steps:
- node: folder_scanner
- type: parallel
repeat:
pipeline: file_processor
instances: "folder_scanner.file_count"
input_mapping:
file_path: "{{folder_scanner.file_paths[index]}}"
Pattern 4: Nested Pipeline Composition
# Parent pipeline uses sub-pipeline
# Sub-pipeline can also use other pipelines
# Creates hierarchical workflow composition
Limitations and Considerations
- Circular Dependencies: Detected during validation, prevented at runtime
- Context Isolation: Sub-pipelines cannot directly modify parent's upstream (by design)
- Agent Factory: Each sub-pipeline creates its own factory (necessary for agent discovery)
- Service Generation: Agents reused across pipelines share service files (no duplication)
- Edge Protocol: Correctly detected for local/remote agents in sub-pipelines
- Expression Evaluation: Supports hierarchical paths like
{pipeline_id}.{node_id}.{field}
Troubleshooting
Issue: Sub-pipeline agents not found
- Cause: Agent factory not properly isolated
- Solution: Ensure sub-pipeline's
AgentFactoryis created with merged config
Issue: Input mapping not working
- Cause: Expression evaluation failing
- Solution: Check that parent's upstream data is accessible (use
{{parent_node.field}})
Issue: Cannot access sub-pipeline outputs
- Cause: Wrong path structure
- Solution: Use
{{pipeline_id}.nodes.{node_id}.{field}}or{{pipeline_id}.{field}}for final output
Issue: Edge protocol showing A2A for local agents
- Cause: Agent factory not in context for protocol detection
- Solution: Fixed in code - uses
agent_runner.agent_busas fallback
๐ฌ Intelligent Pipeline Selection
LLM-based assistant automatically routes user requests to the correct pipeline:
# User: "Translate this to Spanish"
# Assistant routes to: translator pipeline
# User: "Solve this math problem"
# Assistant routes to: math_compass pipeline
# User: "Write a haiku about autumn"
# Assistant routes to: haiku_writers_room pipeline
๐ Multi-Pipeline Architecture
Run multiple specialized pipelines in a single project:
# config/pipeline.yml
pipelines:
- id: math_compass
# config_file is optional - defaults to "pipelines/math_compass.yml" if not specified
- id: stock_analysis
# config_file is optional - defaults to "pipelines/stock_analysis.yml" if not specified
- id: translator
# config_file is optional - defaults to "pipelines/translator.yml" if not specified
๐ Configuration File Paths (Optional)
The config_file field is optional for nodes, pipelines, and independent agents. Topaz Agent Kit automatically resolves configuration file paths using sensible defaults:
- For nodes: Defaults to
agents/{id}.yml(e.g.,agents/my_agent.ymlforid: my_agent) - For pipelines: Defaults to
pipelines/{id}.yml(e.g.,pipelines/my_pipeline.ymlforid: my_pipeline) - For independent_agents: Defaults to
agents/{id}.yml
When to specify config_file explicitly:
- If your configuration file is in a different location or has a different name
- If you want to use a non-standard directory structure
- If you need to override the default path for clarity
Examples:
# Using default path (recommended)
nodes:
- id: my_agent # Automatically resolves to "agents/my_agent.yml"
# Explicit override (when needed)
nodes:
- id: my_agent
config_file: "agents/custom/my_agent.yml" # Custom path
# Pipeline registry with defaults
pipelines:
- id: my_pipeline # Automatically resolves to "pipelines/my_pipeline.yml"
- id: legacy_pipeline
config_file: "pipelines/legacy/old_pipeline.yml" # Custom path
๐ Context-Aware Output Management
Configure intermediate and final outputs per pipeline:
outputs:
intermediate:
selectors: ["content", "summary"]
final:
selectors: ["content", "summary", "metadata"]
transform: |
๐ Analysis Summary
{{ content }}
๐ Metadata: {{ metadata }}
๐จ Visual Workflow Diagrams
Automatically generate professional diagrams with:
- Agent nodes with proper styling
- HITL gates with flow paths
- Pattern visualization (switch, loop, handoff)
- Protocol indicators
- Termination conditions
๐ Session Management
- SQLite-based session persistence
- Cross-pipeline context sharing
- Document availability across sessions
- Chat history with citations
๐ Available Templates
Starter Templates
ensemble: Complete multi-agent system with document processing, math, stock analysis, and content generationmath_demo: Mathematical problem-solving with calculator, strategist, and auditor agentsstock_analysis: Financial analysis pipeline with research, filing, and investment advisor agentsarticle_smith: Content creation workflow with research, writing, critique, and editing agentsreply_wizard: Email reply generation with context analysis, drafting, and polishing
Foundation Templates
basic: Minimal project structure for custom agent development
๐ง CLI Commands
Project Management
# Create new project (defaults to basic foundation)
topaz-agent-kit init . # Quick start with basic template
topaz-agent-kit init -f basic ./my_project # Explicit basic foundation
topaz-agent-kit init -s ensemble ./my_project # Full starter template
# Scaffold project structure only
topaz-agent-kit scaffold --starter <template> <output_dir>
# Generate agent code from configuration
topaz-agent-kit generate <project_dir>
# Validate project configuration
topaz-agent-kit validate <project_dir>
Service Management
# Start web interface with hot reload
topaz-agent-kit serve fastapi --project <project_dir> --reload
# Start command-line interface
topaz-agent-kit serve cli --project <project_dir>
# Start MCP server
topaz-agent-kit serve mcp --project <project_dir>
# Start services (A2A unified service)
topaz-agent-kit serve services --project <project_dir>
Discovery
# List available templates
topaz-agent-kit list --starters
topaz-agent-kit list --foundations
topaz-agent-kit list --all
Portable Demos
Prerequisites: uv must be installed for faster exports. Install with:
curl -LsSf https://astral.sh/uv/install.sh | sh # Mac/Linux
# Export all three (wheel + runtime + demo) - default
topaz-agent-kit export -p projects/pa --output ./exports
# Export wheel only
topaz-agent-kit export-wheel --output ./exports
# Export runtime only
topaz-agent-kit export-runtime --output ./exports
# Export demo project only
topaz-agent-kit export-demo -p projects/pa --output ./exports
# Export only wheel + demo (skip runtime)
topaz-agent-kit export -p projects/pa --output ./exports --skip-runtime
# Export only wheel (skip runtime + demo) - no --project needed
topaz-agent-kit export --output ./exports --skip-runtime --skip-demo
# Dev build with git hash
topaz-agent-kit export-runtime --dev --output ./exports
# Skip build step (use existing wheel)
topaz-agent-kit export-runtime --skip-build --output ./exports
See Portable Demos section for detailed documentation.
โ๏ธ Configuration
Pipeline Configuration (pipeline.yml)
Define your agent workflows with YAML, including integrated HITL gates:
# Multi-pipeline configuration with HITL integration
pipelines:
- id: "content_creator"
name: "Content Creator"
description: "AI-powered content creation with human oversight"
pattern:
type: sequential
steps:
- node: research_analyst
- gate: approve_research
on_approve: continue
on_reject: stop
- node: content_author
- gate: review_draft
on_approve: continue
on_reject: retry_node
retry_target: content_author
max_retries: 3
- node: editor
# HITL Gates configuration
gates:
- id: approve_research
type: approval
title: "Approve Research"
description: "Review research findings and approve to proceed"
timeout_ms: 30000
on_timeout: approve
- id: review_draft
type: input
title: "Review Draft"
description: "Provide feedback on the content draft"
fields:
- name: feedback
label: "Your Feedback"
type: textarea
required: true
validation:
min_length: 10
max_length: 1000
- name: action
label: "Action"
type: select
required: true
options:
- value: "approve"
label: "Approve"
- value: "refine"
label: "Request Revision"
- value: "reject"
label: "Reject"
target_agents: ["content_author"]
context_key: "draft_feedback"
timeout_ms: 120000
on_timeout: skip
# Agent registry
nodes:
- id: research_analyst
# config_file is optional - defaults to "agents/research_analyst.yml" if not specified
- id: content_author
# config_file is optional - defaults to "agents/content_author.yml" if not specified
- id: editor
# config_file is optional - defaults to "agents/editor.yml" if not specified
Agent Configuration (agents/*.yml)
# Agent definition with HITL context integration
instruction: |
You are a content author. Create high-quality content based on research findings.
{% if draft_feedback %}
Previous feedback: {{ draft_feedback.feedback }}
Action requested: {{ draft_feedback.action }}
{% endif %}
framework: "langgraph"
model: "azure_openai"
# MCP tool integration
mcp:
servers:
- url: "http://127.0.0.1:8050/mcp"
toolkits: ["doc_extract", "common"]
tools: ["doc_extract_*", "common_*"]
# Output configuration
outputs:
final:
selectors: ["content", "summary"]
transform: |
๐ **Content**: {{ value.content }}
๐ **Summary**: {{ value.summary }}
UI Configuration (ui_manifest.yml)
# Web interface customization
brand:
logo: "assets/my-logo.png"
name: "My Agent System"
appearance:
default_theme: "dark"
default_accent: "210 92% 56%"
features:
pipeline_panel: true
hitl: true # Enable Enhanced HITL System
typing_indicator: true
# Pipeline cards
cards:
- id: "content_creator"
title: "Content Creator"
subtitle: "AI-powered content creation with human oversight"
icon: "assets/content-creator.svg"
๐ Web Interface Features
Scripts Tab
The Scripts tab in the sidebar provides a convenient way to discover, configure, and execute setup scripts for your pipelines. This is especially useful for initializing databases, generating mock data, and setting up test environments.
Features:
- Script Discovery: Automatically lists all executable scripts from
project_dir/scripts/ - Script Registry: Uses
scripts.ymlfor human-readable names, descriptions, and parameter definitions - Interactive Execution: Run scripts directly from the UI with parameter configuration
- Parameter Management:
- View default parameters from registry
- Modify parameter values before execution
- Add custom parameters (string or flag types)
- Visual indicators for parameters using default values
- Execution Monitoring: Real-time output display with:
- Execution status (running, success, error)
- Standard output (stdout)
- Error output (stderr)
- Return code
- Execution time
Script Types Supported:
- Python scripts (
.py) - PowerShell scripts (
.ps1) - Shell scripts (
.sh) - Batch files (
.bat,.cmd)
Script Registry Format (scripts.yml):
scripts:
- filename: "setup_eci_database.py"
name: "Setup ECI Database"
description: "Initializes the ECI Claims database and generates mock data"
category: "Setup"
parameters:
- name: "db-path"
description: "Path to SQLite database file"
type: "string"
default: "projects/ensemble/data/eci/eci_database.db"
required: false
- name: "reset"
description: "Reset database (drop existing tables)"
type: "flag"
default: "false"
required: false
- name: "approve-count"
description: "Number of 'approve' claims to generate"
type: "integer"
default: "1"
required: false
Parameter Types:
string: Text input with optional default valueinteger: Numeric input with optional default valueboolean: Checkbox input (true/false)flag: Command-line flag (added to command if value is true)
Usage:
- Navigate to the Scripts tab in the sidebar
- Browse available scripts grouped by category
- Click Run next to any script
- Review and modify parameters in the modal dialog
- Click Run Script to execute
- Monitor execution output in real-time
Path Resolution: Scripts automatically handle path resolution based on execution context:
- Scripts can use paths relative to repository root (e.g.,
projects/ensemble/data/...) - Works correctly whether run from repository root or project directory
- Uses intelligent path resolution utilities for maximum compatibility
๐ Web Interface Features
Real-time Agent Visualization
- Live Pipeline Execution: Watch agents execute in real-time with AG-UI protocol
- Interactive Workflow Diagrams: Visual representation of agent flows with dynamic updates
- Step-by-step Progress: Detailed execution tracking with intermediate results
- Protocol Indicators: Visual representation of A2A and IN-PROC protocols in UI
Document Management
- Drag & Drop Upload: Intuitive file upload interface
- Multi-file Support: Upload multiple documents simultaneously
- Session Persistence: Files available across chat sessions
- Document Preview: Built-in PDF and image preview capabilities
- Direct Agent Processing: Files are automatically passed to agents as multimodal input (images, documents, URLs)
- Agents receive file content directly (not just file paths)
- Supports images (JPEG, PNG, GIF, WebP, etc.) for visual analysis
- Supports documents (PDF, DOCX, TXT, CSV, etc.) with text extraction
- URL detection and processing for remote resources
Enhanced HITL Interface
- Multi-Type Gates: Dynamic UI rendering for approval, input, and selection gates
- Form Validation: Real-time validation for input fields with custom rules
- Flow Control Visualization: See retry, skip, and stop actions in real-time
- Context-Aware Prompts: HITL data automatically available in agent prompts
- Timeout Management: Configurable timeouts with fallback behaviors
Assistant Response Card
Display structured assistant decision context in the UI timeline:
# config/pipeline.yml
assistant:
id: assistant_intent_classifier
type: maf
model: azure_openai
prompt:
instruction:
jinja: prompts/assistant_intent_classifier.jinja
Features:
- Full Structured Response: Shows complete assistant response data including tool planning, execution, reasoning, and metadata
- Timeline Placement: Appears after agent pipeline cards, before the final assistant message
- Generic Design: Automatically displays all fields from structured response (no hardcoding, future-proof)
- Tabbed Interface:
- INPUTS tab: Shows user input + added context (final message sent to assistant)
- OUTPUTS tab: Shows complete structured response data
- Metadata Chips: Displays framework, model, pipeline name, agent name, and status as header chips
- Persistent: Saved to chat database and restored on session reload
- User Control: Display controlled via "Assistant Card" setting in sidebar (Settings tab โ Execution Settings)
Card Content:
- Assistant response text
- Tool planned vs tool executed
- Tool parameters
- Raw tool output (for debugging)
- Reasoning (if available)
- Success/error status
- Framework and model information
- Pipeline/agent metadata
User Settings
The sidebar Settings tab provides comprehensive control over UI appearance and execution visibility:
Appearance Settings
- Theme: Choose between System (follows OS preference), Light, or Dark mode
- Accent Color: Select from preset color schemes (Sky, Emerald, Amber, Violet, Blush)
- Project Defaults: Theme and accent can be configured in
ui_manifest.ymlas project defaults - Restore Defaults: Reset all settings to project defaults with a single click
Execution Settings
Control which execution-related UI components are displayed in the timeline:
- Assistant Card: Show/hide assistant response cards with decision context (tool planning, execution, reasoning)
- Agent Cards & Protocol Chips: Show/hide agent execution cards and protocol indicators (IN-PROC, A2A)
- Workflow Card: Show/hide pipeline workflow visualization diagram
- Citations & Sources: Show/hide citation cards and sources indicator from RAG responses
- Footers: Show/hide footer entries with action buttons (copy, feedback, regenerate)
- File Upload Cards: Show/hide file upload progress/status cards in timeline
Features:
- Real-time Updates: Changes apply immediately without page refresh
- Persistent: Settings saved to browser localStorage and restored on page load
- Cross-tab Sync: Settings synchronized across multiple browser tabs/windows
- User Preferences: All settings are user-specific and override project defaults
- HITL Gates: Always displayed regardless of settings (required for user interaction)
Note: Execution settings control UI visibility only. All events are still emitted by the backend; the frontend filters display based on user preferences.
๐ฑ App Mode: Config-Driven UI Apps
App Mode enables you to build config-driven UI applications where users and AI agents collaborate on shared artifacts (articles, forms, documents, etc.) through a rich widget library and flexible layouts.
Overview
App Mode provides:
- 47 Widget Types: Input widgets, viewers, editors, cards, chips, tables, and more
- Flexible Layouts: Grid, flex, columns, and stack layouts with responsive design
- State Management: Persistent state stored in database with real-time sync
- AI Integration: Agents can read and update canvas state via tools
- Validation: Client-side and server-side validation support
- Three Canvas Modes: Declarative (YAML-defined), Agent (AI-generated), and Hybrid (mix of both)
Canvas Modes: How App Mode Works
Topaz Agent Kit supports three modes for canvas UI generation, giving you flexibility in how you define and control your app's interface:
1. Declarative Mode (YAML-Defined UI)
Definition: UI structure is fully defined in YAML manifest. Agents can update state (data) but cannot modify UI structure.
Characteristics:
- โ Stable & Predictable: UI structure never changes
- โ Type Safety: Bindings validated against YAML definitions
- โ Developer Control: Full control over UI layout
- โ Performance: Predictable rendering, no structure changes
- โ ๏ธ Less Flexible: Cannot adapt to data variations
Use Cases:
- Stable forms with fixed structure
- Critical UI that must remain consistent
- Applications where layout is predetermined
- Production apps requiring predictable behavior
Example Configuration:
# config/apps/invoice_review_app.yml
app_id: invoice_review_app
title: Invoice Review
canvas:
source: declarative # Explicit mode declaration (default if not specified)
layout:
type: grid
columns: 12
sections:
- id: invoice_overview
title: "Invoice Overview"
widget: key_value_form
binding: invoice.data
editable: true
- id: validation_status
title: "Validation Status"
widget: toggle
binding: invoice.validated
editable: false # Agent-only output
- id: review_notes
title: "Review Notes"
widget: markdown_editor
binding: invoice.notes
editable: true
default_state:
invoice:
data: {}
validated: false
notes: ""
Agent Capabilities in Declarative Mode:
- โ
get_canvas_state(path?)- Read canvas state - โ
update_canvas(path, value)- Update state data - โ Cannot modify UI structure (sections, layout, widgets)
Example Agent Interaction:
# Agent can read and update data
current_invoice = get_canvas_state("invoice.data")
update_canvas("invoice.validated", True)
update_canvas("invoice.notes", "Invoice validated successfully")
# But cannot add new sections or change widget types
2. Agent Mode (AI-Generated UI)
Definition: UI structure is fully generated by agent. YAML provides only app metadata and state schema.
Characteristics:
- โ Fully Adaptive: UI adapts to data and context
- โ Data-Driven: Structure reflects data analysis
- โ Intelligent Layout: Agent optimizes for information
- โ ๏ธ Less Predictable: Structure can change
- โ ๏ธ Requires State Schema: Need schema for validation
Use Cases:
- Data-driven dashboards that adapt to content
- Dynamic analysis tools with varying data structures
- Exploratory interfaces where structure is unknown
- AI-powered UI generation based on data patterns
Example Configuration:
# config/apps/dynamic_analysis_app.yml
app_id: dynamic_analysis_app
title: Dynamic Data Analysis
canvas:
source: agent # Fully agent-driven
# No sections defined - agent creates them
# REQUIRED: State schema for validation
state_schema:
type: object
properties:
analysis:
type: object
properties:
data:
type: array
insights:
type: array
visualizations:
type: object
required: [analysis]
# OPTIONAL: Initial state hint
default_state:
analysis:
data: []
insights: []
visualizations: {}
Agent Capabilities in Agent Mode:
- โ
get_state_schema()- Understand state structure - โ
initialize_state()- Create state paths - โ
get_canvas_state(path?)- Read state - โ
update_canvas(path, value)- Update state - โ
update_canvas_section()- Create/modify sections - โ
update_canvas_structure()- Modify layout
Example Agent Workflow:
# 1. Understand data structure
schema = get_state_schema()
# 2. Initialize state paths
initialize_state({
"analysis.data": [],
"analysis.insights": [],
"analysis.visualizations": {}
})
# 3. Create UI sections based on data
update_canvas_section(
section_id="data_table",
section_spec={
"title": "Data Overview",
"widget": "table_view",
"binding": "analysis.data"
}
)
# 4. Add insights section
update_canvas_section(
section_id="insights",
section_spec={
"title": "Key Insights",
"widget": "cards_grid",
"binding": "analysis.insights"
}
)
# 5. Organize layout
update_canvas_structure({
"layout": {"type": "grid", "columns": 2},
"order": ["data_table", "insights"]
})
3. Hybrid Mode (Mix of YAML + Agent)
Definition: UI structure is mix of YAML-defined and agent-generated sections. YAML defines stable core, agent fills placeholders and enhances hybrid sections.
Characteristics:
- โ Best of Both Worlds: Stable core + adaptive content
- โ Gradual Enhancement: Start static, add dynamic sections
- โ Developer Control: Lock critical sections
- โ Agent Flexibility: Fill placeholders, enhance sections
- โ ๏ธ More Complex: Requires merge logic
Use Cases:
- Core UI with dynamic insights
- Stable forms with AI-generated recommendations
- Fixed layout with adaptive content sections
- Production apps needing both stability and flexibility
Example Configuration:
# config/apps/hybrid_invoice_app.yml
app_id: hybrid_invoice_app
title: Invoice Analysis
canvas:
source: hybrid # Mix of YAML + agent
layout:
type: grid
columns: 12
sections:
# Declarative section (never changes)
- id: invoice_overview
source: declarative # Explicit
locked: true # Agent cannot modify
title: "Invoice Overview"
widget: key_value_form
binding: invoice.data
editable: true
# Placeholder section (agent fills)
- id: agent_insights
source: agent
placeholder: true # Mark as placeholder
agent_hint: "Generate insights based on invoice data"
title: "AI Insights"
# No widget/binding - agent will create
# Hybrid section (YAML base + agent enhancements)
- id: validation_results
source: hybrid
title: "Validation Results"
widget: toggle
binding: validation.status
# Agent can add more controls here
default_state:
invoice:
data: {}
validation:
status: false
Section Types in Hybrid Mode:
-
Declarative Sections (
source: declarative):- Rendered exactly as YAML defines
- Agent cannot modify (if
locked: true) - Bindings from YAML
-
Placeholder Sections (
source: agent,placeholder: true):- Agent fills with content
- Replaces placeholder when agent generates
- Bindings created by agent
-
Hybrid Sections (
source: hybrid):- YAML provides base structure
- Agent can add controls, sections
- Merges YAML + agent content
Agent Capabilities in Hybrid Mode:
- โ
get_canvas_structure()- See full structure - โ
get_canvas_state()- Read state - โ
update_canvas()- Update state - โ
update_canvas_section()- Fill placeholders, enhance hybrid - โ Cannot modify
locked: truesections - โ Cannot modify
source: declarativesections (unless not locked)
Example Agent Interaction:
# 1. See current structure
structure = get_canvas_structure()
# 2. Fill placeholder section
update_canvas_section(
section_id="agent_insights",
section_spec={
"widget": "cards_grid",
"binding": "analysis.insights",
"title": "AI-Generated Insights"
}
)
# 3. Enhance hybrid section (add more controls)
update_canvas_section(
section_id="validation_results",
section_spec={
"widget": "toggle",
"binding": "validation.status",
# Agent adds additional controls
"additional_controls": [
{
"id": "confidence_score",
"widget": "number",
"binding": "validation.confidence"
}
]
}
)
# Cannot modify locked declarative sections
# update_canvas_section("invoice_overview", ...) # โ Will fail
Placeholder Sections
Placeholder sections allow you to define sections in YAML that agents will fill with content. This is particularly useful in hybrid mode.
Configuration:
canvas:
source: hybrid
sections:
- id: ai_recommendations
source: agent
placeholder: true # Mark as placeholder
agent_hint: "Generate recommendations based on current data"
title: "AI Recommendations"
# No widget/binding defined - agent creates these
How It Works:
- YAML defines placeholder section with
placeholder: true - Frontend shows loading/empty state until agent fills it
- Agent calls
update_canvas_section()to fill placeholder - Placeholder is replaced with agent-generated content
Example Agent Filling Placeholder:
# Agent detects placeholder section
structure = get_canvas_structure()
# Returns: {"sections": {"ai_recommendations": {"placeholder": true, ...}}}
# Agent fills placeholder
update_canvas_section(
section_id="ai_recommendations",
section_spec={
"widget": "list_editor",
"binding": "recommendations.list",
"title": "AI Recommendations",
"editable": true
}
)
Mode Comparison
| Aspect | Declarative | Agent | Hybrid |
|---|---|---|---|
| UI Definition | YAML manifest | Agent generates | Mix of both |
| Flexibility | Fixed structure | Fully adaptive | Selective adaptation |
| Use Cases | Stable forms, critical UI | Data-driven dashboards | Core UI + dynamic insights |
| State Schema | Optional | Recommended | Recommended |
| Binding Management | YAML-defined | Agent creates | Mix of both |
| Performance | Predictable | Needs optimization | Balanced |
| Developer Control | Full | Minimal | Selective |
Choosing the Right Mode
Use Declarative Mode when:
- โ UI structure is known and stable
- โ You need predictable, consistent layouts
- โ Critical UI that must remain unchanged
- โ Production apps requiring type safety
Use Agent Mode when:
- โ UI structure depends on data analysis
- โ You need fully adaptive interfaces
- โ Data-driven dashboards with varying content
- โ Exploratory tools with unknown structure
Use Hybrid Mode when:
- โ You have core UI that must stay stable
- โ You want dynamic insights or recommendations
- โ You need both developer control and agent flexibility
- โ Gradual enhancement from static to dynamic
Quick Example
# config/apps/article_app.yml
app_id: article_app
title: "Article Editor"
description: "Collaborative article editing"
canvas:
layout:
type: grid
columns: 2
gap: 4
sections:
- id: title
widget: text
binding: article.title
placeholder: "Enter article title..."
validation:
required: true
max: 200
- id: body
widget: markdown_editor
binding: article.body
colSpan: 2
placeholder: "Write your article..."
- id: tags
widget: tags_input
binding: article.tags
colSpan: 2
- id: preview
widget: markdown_viewer
binding: article.body
colSpan: 2
editable: false
default_state:
article:
title: ""
body: ""
tags: []
Widget Library
Topaz Agent Kit provides 47 widget types organized into categories:
Input Widgets
Text Inputs:
text- Single-line text inputtextarea- Multi-line text inputemail- Email input with validationurl- URL input with validationpassword- Password input with show/hide togglenumber- Number input with min/max/stepcolor- Color picker
Date/Time:
date- Date pickerdatetime- Date and time pickertime- Time picker
Selection:
select- Dropdown select (single or multiple)multiselect- Multi-select dropdowncheckbox- Single checkboxcheckbox_group- Multiple checkboxes groupradio- Radio button grouptoggle- Toggle switch (on/off)
Rich Editors:
markdown_editor- Rich markdown editor with toolbarrich_text_editor- WYSIWYG rich text editorcode_editor- Code editor with syntax highlighting
Other Inputs:
slider- Range slider inputrating- Star rating widget
Viewer Widgets (Read-only)
markdown_viewer- Markdown rendererhtml_viewer- HTML renderer (sandboxed)code_viewer- Code viewer with syntax highlightingjson_viewer- JSON viewerpdf_viewer- PDF viewerimage_viewer- Image viewer (supports web URLs and local files)video_viewer- Video player
Array/List Widgets
list_editor- Editable list/array editorlist_view- Read-only list viewkey_value_form- Editable key-value pairs formtable_editor- Editable table/grid editortable_view- Read-only table view
Cards Widgets
card- Single card widgetcards_grid- Grid of cards (supports editing)cards_horizontal- Horizontal cards layout (supports editing)cards_vertical- Vertical cards layout (supports editing)
Chips Widgets
chips_horizontal- Horizontal chips display (supports editing)chips_vertical- Vertical chips display (supports editing)chips_view- Read-only chips displaytags_input- Editable tags/chips input
File Upload Widgets
file_upload- File upload widgetimage_upload- Image upload widget with preview
Map Widgets
map- Interactive map using OpenStreetMap/Leaflet (no API key required)google_map- Interactive map using Google Maps (requires API key; see Maps & Full-Screen Widgets)
Layout Widgets
section- Section container widgetdivider- Visual divider/separatorspacer- Spacing widgettabs- Tabbed interface widgetaccordion- Accordion/collapsible widget
Maps & Full-Screen Widgets
Map and viewer widgets (map, google_map, image_viewer, video_viewer, etc.) can fill the entire canvas when configured as full-screen widgets. This is ideal for map-centric apps, dashboards, and immersive viewers.
Full-Screen Configuration
Use fullScreenWidgets in your canvas config to make specific widgets fill the canvas (no labels, section titles hidden):
canvas:
source: agent
fullScreenWidgets: ["google_map"] # or ["map"] for OSM
title: ""
# ...
Or set fullScreen: true on individual controls. For agent mode, fullScreenWidgets ensures the agent creates a single full-screen map control.
Map Widgets: map vs google_map
| Widget | Tiles | API Key | Use Case |
|---|---|---|---|
map |
OpenStreetMap | None | General mapping, no setup |
google_map |
Google Maps | Required | Directions, geocoding, rich features |
Both share the same state shape: { center: [lat, lng], zoom, markers: [], route: [] }. Agents can use tools like geocode_place, get_directions to populate markers and routes.
Google Maps API Key Setup
Development (Next.js dev server):
- Add
NEXT_PUBLIC_GOOGLE_MAPS_API_KEYtoapps/ui/.env.development
Production (packaged UI served by FastAPI):
- Add
GOOGLE_MAPS_API_KEYto your project.envfile, or set as an environment variable when starting the server - The backend injects the key into the manifest at runtime; no build-time configuration needed
- Users can provide their own key without modifying
.env.production
# In project .env (e.g. projects/nexus/.env)
GOOGLE_MAPS_API_KEY=your_production_key_here
Map State Shape
default_state:
google_map: # or "map" for OSM
center: [20, 0]
zoom: 2
markers: []
route: []
Widget Properties
Each widget supports common properties and widget-specific properties:
Common Properties
All widgets support these at the section level:
sections:
- id: my_field
widget: text
binding: data.field
editable: true # Enable/disable editing
placeholder: "Enter..." # Placeholder text
validation: # Validation rules
required: true
min: 5
max: 100
pattern: "^[A-Z]"
Widget-Specific Properties
Configure via widgetProps:
sections:
- id: article_title
widget: text
binding: article.title
widgetProps:
debounceMs: 300 # Debounce delay
- id: category
widget: select
binding: article.category
widgetProps:
options:
- value: "tech"
label: "Technology"
- value: "news"
label: "News"
multiple: false
- id: content
widget: markdown_editor
binding: article.body
widgetProps:
debounceMs: 800
toolbar: true
preview: true
- id: rating
widget: rating
binding: article.rating
widgetProps:
maxRating: 5
size: lg
allowHalf: true
- id: images
widget: image_viewer
binding: article.images
widgetProps:
alt: "Article images"
Layout System
Configure layouts at the canvas level:
canvas:
layout:
type: grid # grid, flex, columns, or stack
columns: 3 # For grid/columns layout
gap: 4 # Tailwind gap value
direction: row # For flex layout
wrap: true # For flex layout
Layout Types:
grid- CSS Grid layout with configurable columnsflex- Flexbox layout with direction and wrap optionscolumns- Multi-column layoutstack- Vertical stack (default)
Section-Level Layout:
sections:
- id: field1
widget: text
binding: data.field1
colSpan: 6 # Grid column span (1-12)
width: "1/2" # Width: "full", "1/2", "1/3", "auto", or pixel value
order: 2 # Display order
alignSelf: center # Self alignment
responsive: # Responsive overrides
sm:
colSpan: 12
width: full
md:
colSpan: 6
width: "1/2"
lg:
colSpan: 4
width: "1/3"
Complete Widget Reference
For detailed widget properties and examples, see:
- Full Reference:
docs/widget_properties_reference.md - Query Script:
python scripts/list_widget_properties.py [widget_type]
Example App Configurations
Article Editor:
app_id: article_app
title: "Article Editor"
canvas:
layout:
type: grid
columns: 2
sections:
- id: title
widget: text
binding: article.title
validation:
required: true
- id: body
widget: markdown_editor
binding: article.body
colSpan: 2
- id: tags
widget: tags_input
binding: article.tags
colSpan: 2
- id: preview
widget: markdown_viewer
binding: article.body
colSpan: 2
editable: false
default_state:
article:
title: ""
body: ""
tags: []
Form Builder:
app_id: form_app
title: "Dynamic Form"
canvas:
layout:
type: stack
gap: 6
sections:
- id: name
widget: text
binding: form.name
validation:
required: true
- id: email
widget: email
binding: form.email
validation:
required: true
- id: age
widget: number
binding: form.age
widgetProps:
min: 18
max: 100
step: 1
- id: country
widget: select
binding: form.country
widgetProps:
options:
- value: "us"
label: "United States"
- value: "uk"
label: "United Kingdom"
- id: interests
widget: checkbox_group
binding: form.interests
widgetProps:
options:
- value: "tech"
label: "Technology"
- value: "sports"
label: "Sports"
direction: horizontal
- id: rating
widget: rating
binding: form.rating
widgetProps:
maxRating: 5
- id: notes
widget: textarea
binding: form.notes
widgetProps:
rows: 5
Data Dashboard:
app_id: dashboard_app
title: "Data Dashboard"
canvas:
layout:
type: grid
columns: 3
sections:
- id: summary
widget: cards_grid
binding: dashboard.summary
colSpan: 3
widgetProps:
columns: 3
- id: data_table
widget: table_view
binding: dashboard.data
colSpan: 3
widgetProps:
headers:
- "Name"
- "Value"
- "Status"
- id: chart
widget: image_viewer
binding: dashboard.chart_url
colSpan: 2
- id: stats
widget: list_view
binding: dashboard.stats
colSpan: 1
Advanced Features
State Validation:
app_id: validated_app
state_schema:
type: object
properties:
article:
type: object
required: ["title", "body"]
properties:
title:
type: string
minLength: 5
maxLength: 200
body:
type: string
minLength: 100
Array Binding:
sections:
- id: item_name
widget: text
binding: items[0].name # Access array elements
- id: item_tags
widget: tags_input
binding: items[0].tags # Nested array access
Conditional Sections: Sections can be conditionally rendered based on state (future feature).
AI Agent Integration
Agents can interact with canvas state using tools:
# Agent tool: update_canvas
update_canvas(
path="article.title",
value="New Title"
)
# Agent tool: read_canvas
title = read_canvas(path="article.title")
Best Practices
- Use Appropriate Widgets: Choose widgets that match your data type
- Set Validation: Add validation rules for user inputs
- Responsive Design: Use responsive properties for mobile support
- Read-only Views: Use
editable: falsefor agent-generated content - State Schema: Define
state_schemafor complex data structures - Default State: Provide sensible defaults in
default_state
Advanced Features
- Citation Support: Source attribution for AI responses with clickable links
- Multi-session Management: Multiple concurrent chat sessions with persistence
- Custom Branding: Logo, themes, and color customization
- AG-UI Protocol: Complete AG-UI support for standardized agent interaction events
๐ก AG-UI Events Reference
Topaz Agent Kit implements the complete AG-UI protocol with 16 standard events plus custom convenience methods. All events are emitted through the AGUIEventEmitter class.
Standard AG-UI Events
Text Message Events
text_message_start(role: str = "assistant") -> str
- Starts a new text message stream
- Returns:
message_idfor tracking the message - Use: Beginning of agent responses or assistant messages
text_message_content(message_id: str, delta: str)
- Streams incremental text content
- Use: Streaming partial text updates during message generation
text_message_end(message_id: str)
- Completes a text message
- Use: Mark the end of a message stream
Tool Call Events
tool_call_start(tool_name: str, agent_name: Optional[str] = None) -> str
- Starts a tool call execution
- Returns:
tool_call_idfor tracking the call - Use: When an agent begins executing a tool
tool_call_args(tool_call_id: str, args: Dict[str, Any])
- Sends tool call arguments
- Use: Provide parameters for the tool call
tool_call_end(tool_call_id: str)
- Completes tool call execution
- Use: Mark the end of tool call execution
tool_call_result(tool_call_id: str, result: Any, error: Optional[str] = None)
- Sends tool call result or error
- Use: Provide the tool's output or error information
State Events
state_delta(delta: List[Dict[str, Any]])
- Sends incremental state changes
- Use: Update UI with partial state changes
messages_snapshot(messages: List[Dict[str, Any]])
- Sends complete message history
- Use: Initialize or update full conversation context
step_output(node_id: str, result: Optional[Any] = None, status: str = "completed", error_message: Optional[str] = None, ended_at: Optional[str] = None, elapsed_ms: Optional[int] = None)
- Emits step output/result data (uses STATE_SNAPSHOT internally)
- Use: Update agent node status and results in the UI
- Note: Header comes from
step_started, inputs fromstep_input
Run Lifecycle Events
run_started(run_id: Optional[str] = None, thread_id: Optional[str] = None, session_id: Optional[str] = None, pipeline_name: Optional[str] = None) -> str
- Starts a new agent run or pipeline execution
- Returns:
run_idfor tracking the run - Use: Beginning of pipeline or agent execution
- Supports: Session and pipeline metadata via
rawEventfield
run_finished(run_id: str, result: Optional[Any] = None, thread_id: Optional[str] = None)
- Completes an agent run or pipeline
- Use: Mark successful completion of execution
run_error(run_id: str, error: str, details: Optional[Dict[str, Any]] = None, thread_id: Optional[str] = None)
- Reports an error during run execution
- Use: Handle and report execution errors
run_metadata(run_id: str, pipeline_name: Optional[str] = None, agent_id: Optional[str] = None, framework: Optional[str] = None, model: Optional[str] = None, run_mode: Optional[str] = None, extra: Optional[Dict[str, Any]] = None)
- Emits run metadata with pipeline/agent information (uses CUSTOM event)
- Use: Provide additional context about the run (framework, model, etc.)
Step Lifecycle Events
step_started(step_id: Optional[str] = None, agent_name: Optional[str] = None, framework: Optional[str] = None, model: Optional[str] = None, run_mode: Optional[str] = None, protocol_support: Optional[List[str]] = None, started_at: Optional[str] = None) -> str
- Starts an agent step with optional header metadata
- Returns:
step_idfor tracking the step - Use: Beginning of individual agent execution
- Supports: Framework, model, protocol metadata via
rawEventfield - Auto-generates step names with counts for multiple executions
step_input(step_name: str, node_id: str, inputs: Optional[Dict[str, Any]] = None)
- Emits step input data (uses CUSTOM event)
- Use: Provide input parameters for a step
- Note: Header comes from
step_started
step_finished(step_id: str, result: Optional[Any] = None, status: Optional[str] = None, error: Optional[str] = None)
- Completes an agent step with optional status and error
- Use: Mark completion or failure of a step
- Supports: Status and error via
rawEventfield
Special Events
custom_event(name: str, value: Dict[str, Any])
- Sends custom application-specific events
- Use: Extend AG-UI protocol with domain-specific events
raw_event(data: Dict[str, Any])
- Sends raw event data
- Use: Low-level event emission when needed
Convenience Methods
Topaz Agent Kit provides convenience methods for common patterns:
hitl_request(gate_id: str, gate_type: str, title: str = "", description: str = "", fields: List[Dict[str, Any]] = None, options: List[Dict[str, Any]] = None, buttons: Dict[str, Any] = None, timeout_ms: int = 300000, on_timeout: str = "reject", context_key: str = None, retry_target: str = None, max_retries: int = None)
- Emits HITL request event (uses CUSTOM event)
- Use: Request human approval, input, or selection
- Types:
"approval","input","selection"
hitl_result(gate_id: str, decision: str, actor: str = "user", data: Any = None)
- Emits HITL result event (uses CUSTOM event)
- Use: Report human decision from HITL gate
edge_protocol(from_agent: str, to_agent: str, protocol: str, label: Optional[str] = None)
- Emits edge protocol event (uses CUSTOM event)
- Use: Document agent connections and communication protocols
- Protocols:
"A2A","IN-PROC"
session_title_updated(session_id: str, title: str)
- Emits session title update event (uses CUSTOM event)
- Use: Update session title dynamically
assistant_response(data: Dict[str, Any])
- Emits assistant response card event (uses CUSTOM event)
- Use: Display structured assistant decision context in UI timeline
- Data includes: assistant_response, tool_planned, tool_executed, tool_params, raw_tool_output, reasoning, success, error, framework, model, pipeline_name, agent_name, user_input
- Timeline placement: After agent pipeline cards, before assistant message
- Display controlled by frontend "Assistant Card" setting (always emitted by backend)
get_step_name(step_id: str) -> Optional[str]
- Helper method to get step name for a given step_id
- Use: Retrieve human-readable step name for display
Event Usage Patterns
Small Talk Flow (5-6 events)
RUN_STARTED โ [CUSTOM(assistant_response)] โ TEXT_MESSAGE_START โ TEXT_MESSAGE_CONTENT โ
TEXT_MESSAGE_END โ RUN_FINISHED
Note: CUSTOM(assistant_response) is always emitted by backend. Display is controlled by frontend "Assistant Card" setting.
Pipeline Flow (19+ events)
RUN_STARTED โ STEP_STARTED โ STEP_INPUT โ STATE_DELTA โ STEP_OUTPUT โ STEP_FINISHED โ
STEP_STARTED โ ... โ CUSTOM(assistant_response) โ TEXT_MESSAGE_START โ TEXT_MESSAGE_CONTENT โ
TEXT_MESSAGE_END โ RUN_FINISHED
Note: CUSTOM(assistant_response) is always emitted by backend. Display is controlled by frontend "Assistant Card" setting.
HITL Flow (6+ events)
RUN_STARTED โ STEP_STARTED โ CUSTOM(hitl_request) โ ... โ CUSTOM(hitl_result) โ
STEP_FINISHED โ RUN_FINISHED
Event Implementation
All events are implemented in AGUIEventEmitter class (src/topaz_agent_kit/core/ag_ui_event_emitter.py):
from topaz_agent_kit.core.ag_ui_event_emitter import AGUIEventEmitter
# Initialize emitter with emit function
emitter = AGUIEventEmitter(emit_fn=your_emit_function)
# Use standard events
message_id = emitter.text_message_start(role="assistant")
emitter.text_message_content(message_id, "Hello")
emitter.text_message_end(message_id)
# Use convenience methods
emitter.hitl_request(
gate_id="approve_content",
gate_type="approval",
title="Approve Content"
)
Event Structure
All events follow AG-UI protocol structure:
- Type: Standard AG-UI event type (e.g.,
TEXT_MESSAGE_START) - Timestamp: Milliseconds since epoch
- Data: Event-specific fields
- rawEvent: Optional metadata for run/step events
๐ช Enhanced Human-in-the-Loop (HITL) System
Multi-Type Gates
Topaz Agent Kit features a sophisticated HITL system with three gate types for different interaction patterns:
Approval Gates
Simple approve/reject decisions with optional form fields:
gates:
- id: review_content
type: approval
title: "Review Content"
description: "Approve or reject the generated content"
fields:
- name: feedback
label: "Feedback (optional)"
type: textarea
timeout_ms: 30000
on_timeout: reject
Input Gates
Collect structured data from users via customizable forms:
gates:
- id: collect_requirements
type: input
title: "Project Requirements"
description: "Please provide the project requirements"
fields:
- name: project_name
label: "Project Name"
type: text
required: true
- name: budget
label: "Budget"
type: number
required: true
validation:
min: 1000
max: 100000
- name: priority
label: "Priority Level"
type: select
required: true
options:
- value: "low"
label: "Low"
- value: "high"
label: "High"
target_agents: ["project_manager"]
context_key: "project_requirements"
Selection Gates
Present multiple options for user choice:
gates:
- id: choose_approach
type: selection
title: "Choose Implementation Approach"
description: "Select the best approach for this feature"
options:
- value: "microservices"
label: "Microservices Architecture"
description: "Break down into small, independent services"
- value: "monolith"
label: "Monolithic Architecture"
description: "Single, unified application"
target_agents: ["architect"]
context_key: "implementation_approach"
Dynamic Flow Control
Gates are integrated directly into pipeline patterns with powerful flow control:
pattern:
type: sequential
steps:
- node: content_author
- gate: review_draft
on_approve: continue
on_reject: retry_node
retry_target: content_author
max_retries: 3
- gate: choose_publication
on_selection:
blog: skip_to_node
whitepaper: continue
draft: stop
skip_to:
blog: simple_publisher
- node: chief_editor
- node: simple_publisher
Flow Control Actions
continue: Proceed to next stepstop: Terminate pipeline executionretry_node: Re-run a specific agent with HITL feedbackskip_to_node: Jump to a different agent based on conditions
Context Injection
HITL data is automatically injected into agent contexts:
# Gate configuration
gates:
- id: user_feedback
type: input
target_agents: ["content_writer", "editor"]
context_key: "user_preferences"
# Agent prompt can now use:
# {{user_preferences.project_name}}
# {{user_preferences.budget}}
# {{user_preferences.deadline}}
Advanced HITL Features
Pre-populated Fields
Set default values for input gate fields to pre-fill forms with context data:
gates:
- id: collect_clarification
type: input
title: "One more detail to plan your trip"
description: "{{ trip_requester.clarification_prompt | default('Please provide missing details to continue.') }}"
fields:
- name: clarification_response
label: "Your answer"
type: textarea
required: false
default: "{{ trip_requester.suggested_response | default('') }}" # Pre-populate from context
validation:
min_length: 1
max_length: 2000
context_key: "trip_clarifications"
context_strategy: append
Example with Dynamic Defaults:
gates:
- id: review_booking
type: input
title: "Review Booking Details"
fields:
- name: traveler_name
label: "Traveler Name"
type: text
default: "{{ user_profile.name }}" # Pre-fill from user profile
- name: email
label: "Email"
type: email
default: "{{ user_profile.email }}"
- name: phone
label: "Phone"
type: tel
default: "{{ user_profile.phone | default('') }}"
- name: special_requests
label: "Special Requests (optional)"
type: textarea
default: "{{ previous_booking.special_requests | default('') }}"
Options Source (Dynamic Selection Gates)
Pull selection options dynamically from upstream agent outputs:
gates:
- id: select_flights
type: selection
title: "Select Flight"
description: "Choose your preferred flight option"
options_source: "trip_flights_expert.flights_options" # Pull from expert output
buttons:
submit:
label: "CONFIRM"
description: "Confirm your flight selection"
cancel:
label: "CANCEL"
description: "Cancel and stop pipeline"
context_key: "flight_selection"
timeout_ms: 300000
How it Works:
- Expert Agent Output (e.g.,
trip_flights_expert):
{
"flights_options": [
{
"value": "offer_123",
"id": "offer_123",
"label": "1 stop: $720.00",
"description": "PHX โ JFK โ CDG; Depart: 2025-05-10 10:00, Arrive: 2025-05-11 08:30; Duration: 12h 15m; Carriers: AA, AF"
},
{
"value": "offer_456",
"id": "offer_456",
"label": "Non-stop: $850.50",
"description": "PHX โ CDG; Depart: 2025-05-10 08:00, Arrive: 2025-05-10 16:30; Duration: 11h 30m; Carrier: AA"
}
],
"has_flights": true
}
- Gate Configuration:
gates:
- id: select_flights
type: selection
options_source: "trip_flights_expert.flights_options" # Automatically pulls from expert
# Each option object is used as-is: {value, id, label, description}
- User Selection: User selects an option, and the entire option object is stored in
flight_selection.
Real-World Example (Trip Planner):
pattern:
type: sequential
steps:
- node: trip_requester
- type: parallel
steps:
# Flights domain: Expert โ Gate (appears immediately when expert finishes)
- type: sequential
condition: "trip_requester.flights_ready == true"
steps:
- node: trip_flights_expert
- gate: select_flights
condition: "trip_flights_expert.has_flights == true"
on_submit: continue
on_cancel: stop
gates:
- id: select_flights
type: selection
title: "Select Flight"
description: "Choose your preferred flight option"
options_source: "trip_flights_expert.flights_options" # Dynamic options from expert
context_key: "flight_selection" # Stores selected option object
Multi-Domain Selection:
gates:
# Flights selection (from flights expert)
- id: select_flights
type: selection
options_source: "trip_flights_expert.flights_options"
# Hotels selection (from hotels expert)
- id: select_hotels
type: selection
options_source: "trip_hotels_expert.hotels_options"
# Activities selection (from activities expert)
- id: select_activities
type: input
fields:
- name: selected_activities
label: "Select Activities"
type: checkbox
options_source: "trip_activities_expert.activities_options" # Works with checkbox too
Conditional Fields
Show/hide fields dynamically based on expressions evaluated at runtime:
gates:
- id: collect_booking_details
type: input
title: "Booking Information"
fields:
- name: booking_type
label: "Booking Type"
type: select
required: true
options:
- value: "flight"
label: "Flight"
- value: "hotel"
label: "Hotel"
- value: "package"
label: "Flight + Hotel Package"
# Show only if booking_type is "flight" or "package"
- name: origin
label: "Origin Airport"
type: text
required: true
condition: "booking_type == 'flight' OR booking_type == 'package'"
- name: destination
label: "Destination Airport"
type: text
required: true
condition: "booking_type == 'flight' OR booking_type == 'package'"
# Show only if booking_type is "hotel" or "package"
- name: check_in
label: "Check-in Date"
type: date
required: true
condition: "booking_type == 'hotel' OR booking_type == 'package'"
- name: check_out
label: "Check-out Date"
type: date
required: true
condition: "booking_type == 'hotel' OR booking_type == 'package'"
# Show only if previous booking exists
- name: use_loyalty_points
label: "Use Loyalty Points"
type: checkbox
condition: "user_profile.loyalty_points > 0"
# Show based on nested field access
- name: special_requests
label: "Special Requests"
type: textarea
condition: "trip_requester.require_special_services == true"
Advanced Conditional Examples:
gates:
- id: travel_details
type: input
fields:
# Show based on array length
- name: traveler_count
label: "Number of Travelers"
type: number
default: 1
# Show fields for each traveler (conditional repetition)
- name: traveler_1_name
label: "Traveler 1 Name"
type: text
condition: "traveler_count >= 1"
- name: traveler_1_age
label: "Traveler 1 Age"
type: number
condition: "traveler_count >= 1"
- name: traveler_2_name
label: "Traveler 2 Name"
type: text
condition: "traveler_count >= 2"
- name: traveler_2_age
label: "Traveler 2 Age"
type: number
condition: "traveler_count >= 2"
# Show based on upstream agent output
- name: preferred_class
label: "Preferred Class"
type: select
condition: "trip_requester.flights_ready == true"
options:
- value: "economy"
label: "Economy"
- value: "business"
label: "Business"
- value: "first"
label: "First Class"
# Show based on complex expression
- name: insurance_required
label: "Travel Insurance Required"
type: checkbox
condition: "trip_requester.total_cost > 1000 AND trip_requester.international == true"
Conditional Field Operators:
- Comparison:
==,!=,>,<,>=,<= - Boolean:
AND,OR,NOT - String:
contains,starts_with,ends_with,in,not in - Null checks:
is null,is not null - Functions:
len(array)for array length - Nested access:
agent_id.field,agent_id.nested.field
Example with Nested Conditions:
gates:
- id: review_claim
type: input
fields:
- name: claim_amount
label: "Claim Amount"
type: number
# Show if amount exceeds threshold
- name: requires_manual_review
label: "Requires Manual Review"
type: checkbox
condition: "claim_amount > 10000"
# Show if fraud detected AND amount is high
- name: fraud_investigation
label: "Fraud Investigation Required"
type: checkbox
condition: "claim_analyzer.fraud_detected == true AND claim_amount > 5000"
# Show based on upstream agent flag
- name: priority_notes
label: "Priority Notes"
type: textarea
condition: "claim_analyzer.priority == 'high' OR claim_analyzer.risk_score > 0.8"
External HITL Description Templates
For complex HITL gate descriptions with extensive formatting, dynamic content, or reusable templates, you can store them in separate Jinja2 files within the config/hitl/ folder. This keeps your pipeline YAML files clean and makes gate descriptions easier to maintain.
When to Use Separate Jinja Files:
-
โ Use separate files for:
- Complex descriptions with multiple sections (e.g., claim details, validation summary, recommendations)
- Extensive markdown tables and formatting
- Long descriptions (50+ lines)
- Descriptions with complex nested conditionals and loops
- Templates that benefit from better syntax highlighting and editing
-
โ Use inline descriptions for:
- Simple, short descriptions (1-10 lines)
- Single-line dynamic content
- Quick prototypes or simple approval gates
Examples:
eci_decision_gate.jinja- Complex gate with multiple sections, tables, and validation summaries (moved from inline)tci_recommendation_review_gate.jinja- Extensive risk assessment display with multiple tables- Simple inline:
description: "{{ agent_id.summary | default('Review the results.') }}"
Folder Structure:
project/
โโโ config/
โ โโโ hitl/ # HITL gate description templates
โ โ โโโ review_gate.jinja
โ โ โโโ approval_gate.jinja
โ โ โโโ input_gate.jinja
โ โโโ pipelines/
โ โ โโโ my_pipeline.yml
โ โโโ ...
Usage in Pipeline YAML:
Instead of inline descriptions, reference external Jinja templates:
gates:
- id: review_content
type: approval
title: "Review Content"
description:
jinja: "hitl/review_gate.jinja" # Path relative to config/ directory
timeout_ms: 300000
on_timeout: reject
Template Features:
HITL description templates support full Jinja2 syntax and have access to the entire pipeline context:
- Upstream Agent Outputs: Access data from any agent that executed before the gate
- Context Variables: Use any variables stored in the pipeline context
- Conditional Logic: Show/hide content based on conditions
- Formatting: Use Markdown, HTML, and Jinja2 filters for rich formatting
Example Template (config/hitl/review_gate.jinja):
## Application Review
**Application ID:** {{ current_application.application_id }}
### Application Details
| Field | Value |
|-------|-------|
| Applicant Name | {{ current_application.applicant_name }} |
| Requested Amount | {% if current_application.requested_amount %}{{ currency_symbol }}{{ "{:,.0f}".format(current_application.requested_amount) }}{% else %}N/A{% endif %} |
| Status | {{ current_application.status }} |
### Risk Assessment
{% if risk_analyzer %}
**Risk Score:** <span style="color: {{ 'green' if risk_analyzer.score < 50 else 'orange' if risk_analyzer.score < 75 else 'red' }}; font-weight: bold;">{{ risk_analyzer.score }}</span> / 100
**Risk Level:** {{ risk_analyzer.risk_level }}
{% if risk_analyzer.risk_factors %}
**Risk Factors:**
{% for factor in risk_analyzer.risk_factors %}
- {{ factor.name }}: {{ factor.score }} ({{ factor.severity }})
{% endfor %}
{% endif %}
{% else %}
Risk assessment is being processed...
{% endif %}
### Recommendation
{% if recommendation_generator %}
**Recommended Action:** {{ recommendation_generator.action }}
**Confidence:** {{ recommendation_generator.confidence }}%
**Rationale:** {{ recommendation_generator.rationale }}
{% endif %}
Benefits:
- โ Separation of Concerns: Keep pipeline logic separate from UI descriptions
- โ Reusability: Share templates across multiple gates or pipelines
- โ Maintainability: Update descriptions without touching pipeline YAML
- โ Version Control: Track description changes independently
- โ Complex Formatting: Support for rich Markdown/HTML without cluttering YAML
Path Resolution:
- Templates are resolved relative to the
config/directory - Use forward slashes (
/) as path separators - Example:
hitl/review_gate.jinjaresolves toconfig/hitl/review_gate.jinja
Fallback Behavior:
If a template file cannot be loaded:
- The system logs a warning
- Falls back to an empty string (gate will still function, just without description)
- Pipeline execution continues normally
๐๏ธ Operations Center
The Operations Center provides a centralized interface for managing async HITL cases, monitoring pipeline health, and reviewing cases across multiple pipelines. It's designed for operations teams who need to efficiently review, approve, and manage large volumes of cases.
Overview
The Operations Center provides:
- Centralized Case Management: View and manage cases from all pipelines in one place
- Pipeline-Specific Views: Custom columns, fields, and analytics per pipeline
- Operations Assistant: Natural language interface for managing cases
- Dashboard Analytics: Pipeline-specific metrics, charts, and timelines
- Case Detail Views: Comprehensive case information with tabs for review, data, timeline, and chat
- Bulk Operations: Approve/reject multiple cases efficiently
Accessing Operations Center
Navigate to /operations in the web UI to access the Operations Center. The interface provides:
Main Components:
- Case List Panel: Filterable table of all cases
- Dashboard: Pipeline-specific analytics and metrics
- Case Detail Modal: Comprehensive case review interface
- Operations Assistant: Chat-based case management
Case List Panel
The case list provides a tabbed view for managing cases:
"All" Tab:
- Shows all cases across all pipelines
- Common fields: Case ID, Pipeline, Status, HITL Status, Created At
- Filter by pipeline, status, time range, and search
Pipeline-Specific Tabs:
- Custom columns defined in
list_view.column_order - Pipeline-specific fields from
list_view.pipeline_fields - Filtered by same criteria as "All" tab
Features:
- Status Badges: Visual indicators for Pending, In-Progress, Completed, Failed
- Sorting: Click column headers to sort
- Filtering: Filter by pipeline, status, date range, and search
- Bulk Actions: Select multiple cases for bulk operations
Dashboard
Pipeline-specific analytics cards provide insights into case processing:
Card Types:
- Metric Cards: Count, sum, average, min, max of fields
- Percentage Cards: Ratio calculations (e.g., anomaly detection rate)
- Distribution Charts: Donut/bar charts showing value distributions
- Timeline Cards: Case creation and completion timelines
Configuration Example:
# config/operations/my_pipeline.yml
dashboard:
cards:
# Percentage metric
- type: "percentage"
title: "Anomaly Detection Rate"
icon: "AlertTriangle"
numerator:
field: "analyzer.anomaly_detected"
filter: true
denominator:
field: "total"
color: "amber"
# Numeric metric
- type: "metric"
title: "Average Confidence Score"
icon: "TrendingUp"
field: "analyzer.confidence_score"
aggregation: "avg"
format: "number"
decimals: 2
color: "blue"
# Distribution chart
- type: "donut"
title: "Anomaly Types"
icon: "PieChart"
field: "analyzer.anomaly_type"
show_legend: true
show_percentages: true
value_mapping:
"capital_revenue_misclassification": "Capital/Revenue"
"lease_rou_misclassification": "Lease/ROU"
Case Detail Modal
The case detail modal provides comprehensive case information across multiple tabs:
Review Tab:
- HITL gate information and description
- Approve/Reject buttons
- Optional form fields for additional input
- Direct response to HITL requests
Data Tab:
- Structured case data organized by sections
- All agent outputs displayed
- Field types: text, multiline, number, boolean, list, object
- Conditional sections based on data availability
Timeline Tab:
- System events: case created, completed, failed, HITL queued/responded
- Custom events from agent outputs
- Chronological view of case lifecycle
- Event details with timestamps
Documents Tab (if configured):
- Uploaded documents
- Generated documents
- Document preview and download
Chat Tab:
- Operations Assistant for natural language interactions
- Ask questions about cases
- Approve/reject via chat commands
- Get case summaries and insights
Operations Assistant
The Operations Assistant is a specialized AI assistant for managing cases through natural language:
Capabilities:
- โ Approve/reject HITL requests
- โ Get case details and summaries
- โ List cases with filters
- โ Answer questions about cases
- โ Bulk operations via chat commands
Example Interactions:
User: "Show me all pending cases for invoice pipeline"
Assistant: Lists all pending cases with details
User: "Approve case CASE-12345"
Assistant: Approves the case and resumes pipeline
User: "Reject case CASE-12345 with notes: Invalid data format"
Assistant: Rejects with notes and resumes pipeline
User: "What's the status of case CASE-12345?"
Assistant: Provides detailed case status and information
Configuration:
# config/prompts/operations_assistant.jinja
You are the Operations Assistant for managing HITL cases.
You have access to these tools:
- approve_hitl_request(queue_item_id, notes?)
- reject_hitl_request(queue_item_id, notes?)
- get_case_details(case_id)
- get_queue_items(filters?)
- get_case_list(filters?)
Help users manage cases efficiently through natural language.
Base Templates for Main Chat, App Assistant, and Operations Assistant
The kit provides base Jinja2 templates for the main chat (intent classifier), the App Assistant (per-app chat in App Mode), and the Operations Assistant so projects only override whatโs project-specific. Shared rules, output format, and reasoning stay in the base; projects (or per-app prompts) supply context and examples.
Why use base templates:
- Single source of truth: Critical rules, JSON format, tool-selection logic, and โThoughtโ (reasoning) instructions live in one place.
- Less duplication: Projects donโt copy long prompts; they extend the base and fill in pipelines (or app-only tools) and examples.
- Easier updates: Kit improvements to the base (e.g. new rules or reasoning wording) apply to all projects that extend it.
Main chat (intent classifier):
- Base:
src/topaz_agent_kit/prompts/base/assistant_intent_classifier_base.jinjaโ name, critical rules, triggered pipelines (optional), assistant_response guidelines, Reasoning (Thought Process), session title, suggested questions, tool selection logic, output format, and a default Conversational example. - Project file:
config/prompts/assistant_intent_classifier.jinjain your project (or in the starter template). Use{% extends "base/assistant_intent_classifier_base.jinja" %}and override only:{% block pipelines_list %}โ list of pipelines (and โUSE THIS whenโฆโ) or, for app-only projects (e.g. Nexus), override{% block available_tools %}and{% block tool_planned_options %}/{% block tool_executed_options %}with app tools (e.g. insert_widget, get_current_datetime, run_process_file_turn).{% block examples %}โ start with{{ super() }}to keep the base Conversational example, then add your pipeline/tool examples. Use overall thought process in each exampleโsreasoning(what the user wanted, how you interpreted it, and how you chose the response/tool).
- Pipeline projects (PA, basic, author, icp, ecgc, ensemble): override
pipelines_listandexamplesonly. - App-only projects (e.g. Nexus): override
triggered_pipelines(empty),available_tools(app-only tools, no execute_pipeline/execute_agent),tool_planned_options/tool_executed_options, andexamples.
App Assistant (App Mode):
- Base:
src/topaz_agent_kit/prompts/base/app_assistant_base.jinjaโ generic app-assistant instructions (canvas tools, agent/hybrid/declarative mode, widget usage, output format). Contains overridable blocks such asassistant_intro,canvas_mode_instructions,current_state, andapp_guidelines. - Per-app file: Each app can have its own prompt, e.g.
config/prompts/article_app_assistant.jinja,config/prompts/recipe_app_assistant.jinja. Use{% extends "base/app_assistant_base.jinja" %}and override the blocks you need (e.g.assistant_introfor name and role,current_statefor canvas state summary,app_guidelinesfor app-specific behavior). The base supplies canvas tools, structure rules, and reasoning; app prompts supply app context and guidelines. The appโs agent config references its prompt viaprompt.jinja: prompts/<app_id>_app_assistant.jinja(or a custom path).
Operations Assistant:
- Base:
src/topaz_agent_kit/prompts/base/operations_assistant_base.jinjaโ full Operations Assistant prompt with a single overridable block. - Project file:
config/prompts/operations_assistant.jinja. Use{% extends "base/operations_assistant_base.jinja" %}and override only{% block project_and_pipeline_context %}with project- and pipeline-specific context (e.g. which pipelines exist, how to refer to cases, or a short workflow list). All other rules and tool usage come from the base.
Loader behavior: The orchestration layer loads the projectโs prompt file (e.g. from config/prompts/) and passes template_path so that {% extends "base/..." %} resolves correctly. Projects that donโt provide a file fall back to whatever the loaderโs default is (often the base rendered as-is, which for the classifier leaves the pipeline list empty).
Assistant Thought Process (Reasoning)
All three assistant experiences expose a Thought (reasoning) section so users can see why the assistant chose a given response or tool. The backend sends a reasoning (or thought_process) field; the UI shows it in a collapsible โThoughtโ block when the execution setting is enabled.
Where Thought appears:
- Main chat โ The intent classifier returns a
reasoningfield (overall thought process: understanding + decision, including which pipeline/agent/tool). The backend adds it to theassistant_responseevent asthought_process; the main chat UI shows a โThoughtโ section above the assistant message when Thought Process is on in execution settings. - App Assistant โ App assistant turns include reasoning and tool steps. The app chat UI shows โThoughtโ above the assistant reply when the setting is enabled.
- Operations Assistant โ Operations Assistant responses include reasoning and tool usage. The Case Detail Chat (and Operations Chat Panel) shows โThoughtโ above the assistant message when the setting is enabled.
Enabling or hiding Thought: In the UI, use Execution settings (e.g. in the sidebar): turn Thought Process on or off. When on, all three assistants show the collapsible Thought section when the backend provides reasoning/thought_process. No extra database storage is required; the assistant response payload (including thought_process) is already persisted with the session.
Operations Configuration
Configure how cases are displayed and managed:
Identity Configuration:
# config/operations/my_pipeline.yml
identity:
prefix: "CASE" # Case ID prefix (e.g., "CASE-ABC12345")
uniqueness: "uuid_suffix" # "uuid_suffix" (default), "timestamp", or "none"
Detail View Configuration:
detail_view:
# Modal header
modal:
title: "Case Details for {{ current_item.id }}"
subtitle: "Transaction: {{ current_item.transaction_id }}"
# Tab configuration
tabs:
order:
- overview
- data
- timeline
- review
- review_response_outcome
# Data sections
sections:
- name: "Item Details"
fields:
- field: "current_item.id"
label: "Item ID"
type: text
- field: "analyzer.result"
label: "Result"
type: text
List View Configuration:
list_view:
# Pipeline-specific fields
pipeline_fields:
- key: "transaction_id"
field: "current_item.transaction_id"
label: "Transaction ID"
type: text
- key: "amount"
field: "current_item.amount"
label: "Amount"
type: number
value_mapping:
"capital_revenue_misclassification": "CAPITAL/REVENUE"
color_mapping:
"high": "red"
"medium": "amber"
"low": "green"
# Column order
column_order:
- "case_id"
- "transaction_id"
- "amount"
- "status"
- "hitl_status"
- "created_at"
Post-run hooks (ops pipelines)
When an ops-scoped pipeline (run for an existing case from the Operations Center) completes successfully, you can run post-run actions defined in that pipeline's YAML. No pipeline-specific code lives in the orchestrator; all behavior is driven by the pipeline config under post_run.
Where to configure: In the pipeline file that is run for a case (e.g. config/pipelines/move_eligibility_checks.yml), add a top-level post_run key.
Available hooks:
| Key | Purpose |
|---|---|
update_portal_state |
Update the case's linked portal session state (e.g. move status, scheduled date). |
update_case_data |
Patch case_data at dotted paths (store pipeline results on the case). |
add_case_note |
Add a timeline note (e.g. "Eligibility check completed. Result: Eligible."). |
update_case_status |
Set case status (e.g. completed, open, processing). |
webhook |
Send an HTTP request (POST/PUT) to an external URL with optional Jinja-built payload. |
trigger_pipeline |
Optionally run another ops pipeline for the same case (e.g. chain eligibility โ scheduling). |
1. update_portal_state
Requires the case to have a session_id (portal-linked case). Updates the portal state for that session at the given paths.
# In config/pipelines/my_ops_pipeline.yml
post_run:
update_portal_state:
session_from: case
updates:
- path: move.status
value_expression: |
{% if agent_id.parsed.address_ok and agent_id.parsed.documents_ok %}Eligible{% else %}Not Eligible{% endif %}
- path: some.other.path
value: "literal"
path: Dotted path into portal state.value: Literal value, orvalue_expression: Jinja2 string; context is the pipeline's final upstream (agent outputs).
2. update_case_data
Patches the case's case_data at dotted paths. Use to store pipeline results for display or downstream use.
post_run:
update_case_data:
path_prefix: "result." # optional: all paths prefixed (e.g. result.summary)
updates:
- path: eligibility_summary
value_expression: "{{ my_agent.parsed.summary | default('Done') }}"
3. add_case_note
Adds a note to the case (Notes tab and Timeline). Rendered with Jinja; context includes final_upstream, case, and case_id.
post_run:
add_case_note:
note_expression: |
Eligibility check completed. Result: {{ my_agent.parsed.summary | default('N/A') }}.
internal: true # true = internal only; false = visible to customer
added_by: "post_run" # optional
4. update_case_status
Sets the case status (e.g. after a "close case" pipeline).
post_run:
update_case_status:
status: "completed"
# Or use expression:
# status_expression: "{% if my_agent.close %}completed{% else %}open{% endif %}"
Context for status_expression: final_upstream, case, case_id.
5. webhook
Sends an HTTP request after the pipeline completes. Requires httpx to be installed.
post_run:
webhook:
url: "https://api.example.com/case-updated"
# Or dynamic:
# url_expression: "https://api.example.com/cases/{{ case_id }}/notify"
method: POST
payload_expression: |
{"case_id": "{{ case_id }}", "summary": "{{ my_agent.parsed.summary | default('') }}"}
headers: # optional
X-Custom: "value"
timeout: 10 # optional, seconds
Context for url_expression and payload_expression: final_upstream, case, case_id. payload_expression must evaluate to valid JSON.
6. trigger_pipeline
Runs another ops pipeline for the same case (e.g. after eligibility, auto-run scheduling). The target pipeline must be in ops_pipelines in pipelines.yml.
post_run:
trigger_pipeline:
pipeline_id: move_scheduling # or use pipeline_id_expression for Jinja
condition_expression: | # optional; if false, skip trigger
{{ my_agent.parsed.eligible | default(false) }}
Use with care to avoid unbounded chains; consider a single level of chaining.
Best Practices
-
Case Configuration:
- Include all relevant agent outputs in
detail_view - Use conditional sections for optional data
- Keep field paths simple and clear
- Organize sections logically for easy review
- Include all relevant agent outputs in
-
Dashboard Design:
- Focus on key metrics for quick insights
- Use color coding for status indicators
- Include distribution charts for pattern analysis
- Show timelines for case lifecycle tracking
-
Operations Assistant:
- Use for bulk operations and complex queries
- Leverage natural language for efficiency
- Review cases in priority order
- Monitor case status and pipeline health
-
Field Display:
- Use value mappings for user-friendly labels
- Apply color coding for visual status indicators
- Include icons for quick recognition
- Format numbers appropriately (decimals, units)
Example: Complete Operations Setup
# config/operations/invoice_review.yml
identity:
prefix: "INV"
uniqueness: "uuid_suffix"
detail_view:
modal:
title: "Invoice Review: {{ current_item.invoice_number }}"
subtitle: "Amount: ${{ current_item.amount | round(2) }}"
tabs:
order:
- overview
- data
- timeline
- review
sections:
- name: "Invoice Details"
fields:
- field: "current_item.invoice_number"
label: "Invoice Number"
type: text
- field: "current_item.amount"
label: "Amount"
type: number
decimals: 2
- name: "Review Results"
fields:
- field: "reviewer.approved"
label: "Approved"
type: boolean
- field: "reviewer.notes"
label: "Review Notes"
type: multiline
list_view:
pipeline_fields:
- key: "invoice_number"
field: "current_item.invoice_number"
label: "Invoice #"
type: text
- key: "amount"
field: "current_item.amount"
label: "Amount"
type: number
decimals: 2
column_order:
- "case_id"
- "invoice_number"
- "amount"
- "status"
- "hitl_status"
dashboard:
cards:
- type: "metric"
title: "Total Invoices"
field: "total"
aggregation: "count"
color: "blue"
- type: "percentage"
title: "Approval Rate"
numerator:
field: "reviewer.approved"
filter: true
denominator:
field: "total"
color: "green"
โก Async HITL (Human-in-the-Loop)
Async HITL enables pipelines to continue processing while waiting for human review, making it ideal for batch processing, loop patterns, and high-throughput workflows where blocking on each review would be inefficient.
Why Async HITL?
Traditional Sync HITL blocks pipeline execution until a human responds:
- โ Simple and straightforward
- โ Pipeline stops completely, waiting for response
- โ Inefficient for batch processing (e.g., processing 100 items where only 10 need review)
- โ Can't process other items while waiting
Async HITL queues review requests and continues processing:
- โ Pipeline continues processing other items
- โ Perfect for batch/loop workflows
- โ Review requests queued for later processing
- โ Cases tracked independently
- โ Resume from checkpoints when ready
Use Cases:
- Batch Processing: Process 1000 claims, queue complex ones for review, continue with simple ones
- Loop Patterns: Process items in a loop, queue problematic items, continue with remaining items
- High-Throughput: Don't block on individual reviews when processing many items
- Operations Centers: Centralized review queue for multiple pipelines
Enabling Async HITL
Enable async HITL in your pipeline configuration:
# config/pipelines/my_pipeline.yml
name: "My Pipeline"
description: "Batch processing with async HITL"
# Enable async HITL mode
execution_settings:
hitl_mode: "async" # "sync" (default) or "async"
checkpoint_expiry_days: 7 # How long checkpoints remain valid
# Configure case management (required for async HITL)
operations:
config_file: "operations/my_pipeline.yml" # Operations configuration file
tracking_variables: # Optional: customize variable names
hitl_queued: "hitl_queued_cases" # Default: "hitl_queued_cases"
completed: "completed_cases" # Default: "completed_cases"
Key Settings:
hitl_mode: "async": Enables async HITL modecheckpoint_expiry_days: How long checkpoints remain valid (default: 7 days)operations.config_file: Path to operations YAML configuration (required)tracking_variables: Optional customization of context variable names
How Async HITL Works
-
Pipeline Execution:
- Pipeline runs normally until it hits a HITL gate
- Instead of blocking, creates a checkpoint and queues the request
- Pipeline continues processing (especially useful in loops)
-
Checkpoint Creation:
- Full execution context saved (all agent outputs up to the gate)
- Case created with extracted data for display
- HITL request queued for review
-
Operations UI:
- Review requests appear in Operations Center
- Users can review cases, see data, and respond
- Responses trigger pipeline resumption
-
Resume Execution:
- When user responds, pipeline resumes from checkpoint
- Pre-gate agents are skipped (outputs already in checkpoint)
- Post-gate agents execute with HITL response
Case YAML Configuration
Case YAML files define how cases are displayed and identified in the Operations UI. Create a case configuration file:
# config/operations/my_pipeline.yml
# Case type for categorization
case_type: "my_case_type"
# Identity configuration - how to identify cases
identity:
prefix: "CASE" # Case ID prefix (e.g., "CASE-ABC12345")
uniqueness: "uuid_suffix" # "uuid_suffix" (default), "timestamp", or "none"
# Detail view - what to show in case detail modal
detail_view:
# Optional: control Case Detail modal header and tabs
modal:
# Simple template form (uses Jinja-style {{ }})
title: "Case Details for {{ current_item.id }}"
subtitle: "Expression: {{ current_item.expression }}"
# Optional: expression form (uses expression evaluator)
# title_expression takes precedence over title when provided
# title_expression: "'Case#: ' + current_item.id if current_item.id else 'Case (ID missing)'"
tabs:
# Optional: base tab order. Tabs are filtered by availability at runtime.
order:
- overview
- data
- timeline
- documents
- review
- review_response_outcome
# Optional: human-friendly labels for tabs
labels:
overview: "Overview"
data: "Data"
timeline: "Timeline"
documents: "Documents"
review: "Review"
review_response_outcome: "Outcome"
# Optional: expression-based labels (advanced)
# labels_expression:
# overview: "'Overview for ' + current_item.id if current_item.id else 'Overview'"
# Sections power the "Data" tab content
sections:
- name: "Problem Details"
fields:
- field: "current_item.id"
label: "Item ID"
type: text
- field: "current_item.expression"
label: "Expression"
type: text
- name: "Analysis"
fields:
- field: "analyzer.result"
label: "Result"
type: text
- field: "analyzer.confidence"
label: "Confidence"
type: number
# Conditional sections - only show if condition is true
- name: "Review"
condition: "reviewer IS NOT NULL" # Only show if reviewer ran
fields:
- field: "reviewer.approved"
label: "Approved"
type: boolean
Field Types:
text: Single-line textmultiline: Multi-line textnumber: Numeric valueboolean: True/falselist: Array of itemsobject: Nested object
Field Paths:
- Use dot notation:
agent_id.fieldoragent_id.nested.field - Access loop items:
current_item.fieldorloop_item.field - Conditional sections:
condition: "agent_id IS NOT NULL"
Modal Header & Tabs:
- Use
modal.title/modal.subtitlewhen you want simple templates with{{variable}}substitution. - Use
modal.title_expression/modal.subtitle_expressionwhen you need logic (e.g., concatenation andif/else) using the expression evaluator. - Use
tabs.orderto control the base ordering of tabs; unavailable tabs (e.g.,documentswhen no documents are configured) are automatically ignored. - Use
tabs.labelsfor static tab labels, ortabs.labels_expressionfor expression-based labels.
List View Configuration
Configure how cases are displayed in the Operations UI case list table. The list view supports both common fields (always available) and pipeline-specific fields:
# config/operations/my_pipeline.yml
list_view:
# Define pipeline-specific fields (extracted from upstream context)
pipeline_fields:
- key: "transaction_id"
field: "current_item.transaction_id"
label: "Transaction ID"
type: text
- key: "amount"
field: "current_item.amount"
label: "Amount"
type: number
- key: "anomaly_type"
field: "analyzer.anomaly_type"
label: "Anomaly Type"
type: text
value_mapping: # Optional: map raw values to display labels
"capital_revenue_misclassification": "CAPITAL/REVENUE MISCLASSIFICATION"
"lease_rou_misclassification": "LEASE/ROU MISCLASSIFICATION"
# Column order for pipeline-specific tab
# Mix common field keys and pipeline field keys
# Only columns listed here will be shown on the pipeline tab
column_order:
- "case_id" # Common field
- "transaction_id" # Pipeline field
- "amount" # Pipeline field
- "anomaly_type" # Pipeline field
- "status" # Common field
- "hitl_status" # Common field
- "created_at" # Common field
- "actions" # Common field (action buttons)
Common Fields (always available, don't need to define):
case_id,pipeline_id,status,hitl_gate_title,hitl_descriptionhitl_status,hitl_decision,responded_by,created_at,updated_at,actions
Pipeline Fields:
key: Unique identifier for the field (used incolumn_order)field: Dot-notation path to extract from upstream context (e.g.,"agent_id.field")label: Display label for the column headertype: Field type (text,number,boolean,list,object)value_mapping: Optional mapping of raw values to display labelscolor_mapping: Optional color coding for field values (see Field Display Properties)icon_mapping: Optional icons for field values (see Field Display Properties)text_transform: Optional text transformation (see Field Display Properties)decimals: Optional decimal places for number fields (default: 0 for integers, 2 for percentages)
Column Order:
- When "ALL" tab is selected: Only common fields are shown (default order)
- When pipeline-specific tab is selected: Columns are shown in the order specified in
column_order - Mix common field keys and pipeline field keys in any order
Centralized Field Definitions
Define fields once and reuse them across list_view, detail_view, and dashboard sections. This ensures consistent display properties and eliminates duplication:
# config/operations/my_pipeline.yml
# =============================================================================
# CENTRALIZED FIELD DEFINITIONS
# =============================================================================
# Define fields once and reuse across list_view, detail_view, and dashboard
# This ensures consistent display properties (value_mapping, color_mapping, etc.)
fields:
# Status field with mappings
- key: "posting_status"
field: "poster.posting_result.status"
label: "Posting Status"
type: text
value_mapping:
"success": "Posted"
"error": "Failed"
color_mapping:
"success": "green"
"error": "red"
icon_mapping:
"success": "CheckCircle"
"error": "XCircle"
text_transform: "uppercase"
# Amount field with formatting
- key: "amount"
field: "current_item.amount"
label: "Amount"
type: number
decimals: 2
# Decision field with mappings
- key: "decision"
field: "assessor.decision"
label: "Decision"
type: text
value_mapping:
"allowed": "Allowed"
"not_allowed": "Not Allowed"
"uncertain": "Uncertain"
color_mapping:
"allowed": "green"
"not_allowed": "red"
"uncertain": "amber"
icon_mapping:
"allowed": "CheckCircle"
"not_allowed": "XCircle"
"uncertain": "AlertCircle"
text_transform: "uppercase"
# Now reference these fields in list_view and detail_view
list_view:
pipeline_fields:
- key: "posting_status" # Reference centralized field
- key: "amount" # Reference centralized field
detail_view:
sections:
- name: "Results"
fields:
- key: "posting_status" # Reference centralized field
- key: "decision" # Reference centralized field
Benefits:
- DRY Principle: Define once, use everywhere
- Consistency: Same display properties across all views
- Maintainability: Update in one place, applies everywhere
- Override Support: Can override specific properties inline if needed
Referencing Fields:
- Use
key: "field_key"to reference a centralized field definition - Can override specific properties inline (e.g.,
key: "amount", decimals: 0to override decimals)
Field Display Properties
Enhance field display with color coding, icons, and text transformations:
# config/operations/my_pipeline.yml
list_view:
pipeline_fields:
- key: "status"
field: "processor.status"
label: "Status"
type: text
# Value mapping: Transform raw values to display labels
value_mapping:
"success": "Success"
"error": "Error"
"pending": "Pending"
# Color mapping: Apply colors to field values
# Simple format: color name
color_mapping:
"success": "green"
"error": "red"
"pending": "amber"
# Advanced format: Detailed color configuration
# color_mapping:
# "success":
# background: "green"
# text: "white"
# "error":
# background: "red"
# text: "white"
# Icon mapping: Display icons before text (lucide-react icons)
icon_mapping:
"success": "CheckCircle"
"error": "XCircle"
"pending": "Clock"
# Text transformation: Transform text display
text_transform: "uppercase" # Options: uppercase, lowercase, capitalize, title-case, sentence-case, kebab-case, snake_case, camelCase, PascalCase, truncate, abbreviate, remove-whitespace, normalize-whitespace, none
# Text transform options (for truncate/abbreviate)
# text_transform_options:
# max_length: 20 # For truncate
Color Mapping:
- Simple format:
"value": "color_name"(e.g.,"success": "green") - Advanced format:
"value": {background: "green", text: "white"}for custom styling - Available colors:
red,amber,yellow,green,blue,indigo,purple,pink,gray - Works with:
value_mapping- colors are applied to the mapped display value
Icon Mapping:
- Maps field values to
lucide-reacticon names - Icons appear before the text
- Available icons:
CheckCircle,XCircle,AlertCircle,Clock,FileCheck,AlertTriangle, etc. - See Lucide Icons for full list
Text Transform Options:
uppercase:"hello"โ"HELLO"lowercase:"HELLO"โ"hello"capitalize:"hello world"โ"Hello World"title-case:"hello world"โ"Hello World"(same as capitalize)sentence-case:"HELLO WORLD"โ"Hello world"kebab-case:"Hello World"โ"hello-world"snake_case:"Hello World"โ"hello_world"camelCase:"Hello World"โ"helloWorld"PascalCase:"hello world"โ"HelloWorld"truncate: Truncate text with ellipsis (requirestext_transform_options.max_length)abbreviate: Abbreviate long text (requirestext_transform_options.max_length)remove-whitespace: Remove all whitespacenormalize-whitespace: Normalize multiple spaces to single spacenone: No transformation (default)
Best Practices:
- Use centralized fields for fields used in multiple places
- Combine mappings: Use
value_mapping+color_mapping+icon_mappingfor rich displays - Consistent colors: Use the same color scheme across related fields (e.g., success=green, error=red)
- Meaningful icons: Choose icons that clearly represent the value (e.g., CheckCircle for success, XCircle for error)
- Text transforms: Use
uppercasefor status fields,capitalizefor names,truncatefor long text
Dashboard Configuration
Configure pipeline-specific analytics cards for the Operations dashboard. Cards are filtered by the same criteria as the list view (pipeline, time range, status, search):
# config/operations/my_pipeline.yml
dashboard:
# Enable dashboard for this pipeline
# Cards to display (order matters)
cards:
# Card 1: Percentage metric
- type: "percentage"
title: "Anomaly Detection Rate"
icon: "AlertTriangle"
numerator:
field: "analyzer.anomaly_detected"
filter: true # Count only where field is truthy
denominator:
field: "total" # Special field: total cases
color: "amber"
show_breakdown: true
footer:
type: "text"
content: "Based on {{totalCases}} processed cases"
# Card 2: Numeric metric
- type: "metric"
title: "Average Confidence Score"
icon: "TrendingUp"
field: "analyzer.confidence_score"
aggregation: "avg" # count, sum, avg, min, max
format: "number" # number, currency, percentage
decimals: 2
color: "blue"
suffix: "/ 1.0"
visualization: "gauge" # none, gauge, trend
footer:
type: "legend"
items:
- label: "Score Range"
value: "0.0 - 1.0"
- label: "Cases"
value: "{{totalCases}}"
# Card 3: Distribution chart (donut)
- type: "donut"
title: "Anomaly Types"
icon: "PieChart"
field: "analyzer.anomaly_type"
show_legend: true
show_percentages: true
colors: # Optional: custom colors per value
"capital_revenue_misclassification": "red"
"lease_rou_misclassification": "amber"
# Card 4: Bar chart (same as donut, different visualization)
- type: "bar"
title: "Status Distribution"
icon: "BarChart3"
field: "status"
show_legend: true
# Card 5: Timeline distribution
- type: "timeline"
title: "Cases Timeline"
icon: "Calendar"
# Uses created_at field automatically
# Card 6: Reuse default card from "All" tab
- type: "default"
card_id: "response_method"
title: "Response Method"
Card Types:
-
metric: Numeric metric with aggregationaggregation:count,sum,avg,min,maxformat:number,currency,percentagevisualization:none,gauge,trend
-
percentage: Percentage with numerator/denominatornumerator.field: Field to count (with optionalfilter: true)denominator.field: Total count (use"total"for all cases)
-
donut: Distribution chart (pie chart)- Groups cases by field value
- Shows percentages and legend
-
bar: Bar chart- Same as donut, different visualization
-
timeline: Time-based distribution- Groups cases by time period (morning/afternoon/evening/night)
- Uses
created_atfield automatically
-
default: Reuse built-in card from "All" tabcard_id: ID of default card to reuse
Field Paths:
- Use dot notation:
agent_id.fieldoragent_id.nested.field - Special fields:
"status","created_at","total"
Icons: Use Lucide icon names (e.g., AlertTriangle, TrendingUp, PieChart, BarChart3, Calendar)
Colors: red, amber, yellow, green, blue, indigo, purple, pink, gray
Value Mapping in Cards: Dashboard cards also support value_mapping for donut/bar charts:
dashboard:
cards:
- type: "donut"
title: "Status Distribution"
field: "processor.status"
value_mapping: # Map raw values to display labels in legend
"success": "Success"
"error": "Error"
colors:
"success": "green"
"error": "red"
Timeline Configuration
Configure the Timeline tab to display system events and custom events from agent outputs:
# config/operations/my_pipeline.yml
timeline:
sort_order: "desc" # "asc" or "desc" - most recent first or oldest first
# System-generated events (automatically created)
system_events:
case_created:
label: "Case Created"
icon: "FileText"
color: "blue"
show_details: true # Show additional details below event
details:
- field: "current_item.doc_no"
label: "Document No"
- field: "current_item.amount"
label: "Amount"
- field: "current_item.currency"
label: "Currency"
case_completed:
label: "Case Completed"
icon: "CheckCircle"
color: "green"
timestamp_source: "completed_at" # Use completed_at timestamp
show_details: true
hitl_queued:
label: "Awaiting Human Review"
icon: "Clock"
color: "amber"
show_details: true
hitl_response:
label: "Human Review {{ decision }}"
icon: "UserCheck"
color: "conditional" # green for approve, red for reject
show_details: true
details:
- field: "responded_by"
label: "Responded By"
- field: "selection"
label: "Selection"
# Custom events extracted from agent outputs
custom_events:
# Assessment completed event
- source_field: "assessor" # Agent output object
label: "Assessment Completed"
icon: "CheckCircle"
color: "green"
# Use agent's execution timestamp for accurate ordering
timestamp_source: "assessor._executed_at" # Agent's execution timestamp
condition: "assessor IS NOT NULL" # Only show if assessor ran
show_details: true
details:
- key: "decision" # Reference centralized field definition
- key: "confidence" # Reference centralized field definition
- field: "assessor.expense_type"
label: "Expense Type"
System Events:
case_created: When case is created (usescreated_attimestamp)case_completed: When case is completed (usescompleted_attimestamp)case_failed: When case fails (usesfailed_attimestamp)hitl_queued: When HITL request is queuedhitl_response: When human responds to HITL request
Custom Events:
source_field: Agent ID or field path to check for existencetimestamp_source: Where to get timestamp from:"case_created": Usecase.created_at"case_updated": Usecase.updated_at"agent_id._executed_at": Use agent's execution timestamp (recommended for accurate ordering)- Field path:
"agent_id.field"to extract from agent output
condition: Expression to evaluate (e.g.,"assessor IS NOT NULL")details: List of fields to display below the event
Timeline Details:
- Details can reference centralized field definitions using
key: "field_key" - Details support all field display properties (
value_mapping,color_mapping,icon_mapping,text_transform) - Details are displayed in a collapsible section below the event
Best Practices:
- Use agent timestamps: Set
timestamp_source: "agent_id._executed_at"for custom events to get accurate ordering - Show relevant details: Include key information in
details(e.g., decision, amount, status) - Consistent icons/colors: Use the same icon/color scheme as list view for consistency
- Reference centralized fields: Use
key: "field_key"in details to reuse field definitions - Conditional events: Use
conditionto only show events when relevant (e.g., only show HITL events for HITL cases)
Example: Complete Timeline Configuration:
timeline:
sort_order: "desc"
system_events:
case_created:
label: "Case Created"
icon: "FileText"
color: "blue"
show_details: true
details:
- field: "current_item.doc_no"
label: "Document No"
- field: "current_item.amount"
label: "Amount"
type: number
decimals: 2
case_completed:
label: "Case Completed"
icon: "CheckCircle"
color: "green"
timestamp_source: "completed_at"
show_details: true
custom_events:
# Assessment event
- source_field: "midas_assessor"
label: "Assessment Completed"
icon: "CheckCircle"
color: "green"
timestamp_source: "midas_assessor._executed_at"
condition: "midas_assessor IS NOT NULL"
show_details: true
details:
- key: "decision" # Uses centralized field definition
- key: "confidence" # Uses centralized field definition
# Posting event
- source_field: "acta_poster"
label: "Journal Posted"
icon: "FileCheck"
color: "green"
timestamp_source: "acta_poster._executed_at"
condition: "acta_poster IS NOT NULL"
show_details: true
details:
- field: "acta_poster.posting_result.status"
label: "Status"
- field: "acta_poster.posting_result.batch_id"
label: "Batch ID"
Resume Behavior Configuration
Control how agents behave when resuming from checkpoints using resume_behavior in agent configuration:
# config/agents/my_agent.yml
id: my_agent
type: agno
model: "azure_openai"
# Resume behavior options:
# - "always" (default): Always run, unless output already in upstream
# - "skip_on_resume": Never run when resuming (skip completely)
# - "run_only_when_complete": Only run when all loop iterations complete
resume_behavior: "always"
Resume Behavior Options:
-
always(default):- Agent runs normally during resume
- Skips only if output already exists in checkpoint's upstream
- Use for: Most agents that should run after HITL
-
skip_on_resume:- Agent never runs when resuming from checkpoint
- Always skipped during resume
- Use for: Agents that should only run once (e.g., initializers, scanners)
-
run_only_when_complete:- Only runs when all loop iterations complete (not during resume)
- Skips during resume if in a loop
- Use for: Summary/aggregation agents that need all loop results
Example:
# Scanner - should only run once, not on resume
agents:
- id: batch_scanner
resume_behavior: "skip_on_resume" # Never run on resume
# Processor - should run after HITL
agents:
- id: item_processor
resume_behavior: "always" # Run after HITL (default)
# Summary reporter - needs all loop iterations
agents:
- id: batch_summary
resume_behavior: "run_only_when_complete" # Only when loop finishes
Loop Patterns with Async HITL
Async HITL is particularly powerful in loop patterns:
pattern:
type: sequential
steps:
- node: batch_scanner
- type: loop
iterate_over: "batch_scanner.items_list"
loop_item_key: "current_item"
body:
type: sequential
steps:
- node: item_processor
- gate: review_item # Async HITL - queues and continues
- node: item_finalizer
- node: batch_summary # Runs after all items processed
How It Works:
- Loop processes each item
- When HITL gate is hit:
- Checkpoint created for that iteration
- Case queued for review
- Loop continues with next item (doesn't wait)
- Queued cases tracked in
hitl_queued_casescontext variable - Summary agent can report on queued vs completed cases
Tracking Variables:
hitl_queued_cases: List of cases queued for HITL (automatically populated)completed_cases: List of cases that completed without HITL (automatically populated)
Access in downstream agents:
# Summary agent can report on queued cases
agents:
- id: batch_summary
prompt:
instruction:
inline: |
Report on batch processing:
- Total items: {{batch_scanner.total_count}}
- Queued for review: {{hitl_queued_cases | length}}
- Completed: {{completed_cases | length}}
Operations UI Usage
The Operations UI provides a centralized interface for managing async HITL cases. See the Operations Center section above for comprehensive details.
Quick Reference:
Accessing Operations UI:
- Navigate to
/operationsin the web UI - View all cases across pipelines or filter by pipeline
- See case summary (total, pending, completed, in-progress)
Case List Panel:
- Filter by Pipeline: Select specific pipeline or view all
- Tabbed View:
- "All" tab: Shows all cases with common fields
- Pipeline-specific tabs: Shows cases for that pipeline with custom columns (from
list_view.column_order)
- Custom Columns: Pipeline-specific fields displayed based on
list_viewconfiguration - Case Cards/Table: Each case shows:
- Case ID and status
- Pipeline name
- Created timestamp
- Case type
- Custom pipeline fields (if configured)
- Status Badges: Pending, In-Progress, Completed, Failed
- Dashboard: Pipeline-specific analytics cards (if
dashboard.)- Metrics, percentages, distribution charts, timelines
- Filtered by same criteria as list view
Case Detail Modal:
- Review Tab:
- HITL gate information
- Gate description (from HITL template)
- Approve/Reject buttons
- Optional form fields
- Data Tab:
- Structured case data (from
detail_viewin operations YAML) - Organized by sections
- All agent outputs displayed
- Structured case data (from
- Timeline Tab:
- System events: case created, completed, failed, HITL queued/responded
- Custom events from agent outputs
- Chronological view of case lifecycle
- Chat Tab:
- Operations Assistant for natural language interactions
- Ask questions about cases
- Approve/reject via chat commands
Responding to HITL Requests:
-
Direct Response (Review Tab):
- Click "Approve" or "Reject" button
- Fill optional form fields if present
- Pipeline resumes immediately
-
Via Operations Assistant (Chat Tab):
- Ask: "Approve case CASE-12345"
- Ask: "Reject case CASE-12345 with notes: Invalid data"
- Assistant handles approval/rejection on your behalf
Case Status Flow:
- Pending: Queued, waiting for review
- In-Progress: User opened case, reviewing
- Completed: HITL responded, pipeline resumed successfully
- Failed: Pipeline resume failed
Advanced Async HITL Patterns
Pattern 1: Batch Processing with Priority Queue
Process items in batches, queue complex ones for review, continue with simple ones:
pattern:
type: sequential
steps:
- node: batch_scanner
- type: loop
iterate_over: "batch_scanner.items_list"
loop_item_key: "current_item"
body:
type: sequential
steps:
- node: complexity_analyzer
- type: gate
gate_id: review_complex
condition: "complexity_analyzer.complexity_score > 0.7"
# Only queues complex items
- node: item_processor
- node: batch_summary
Pattern 2: Parallel Processing with Async HITL
Process multiple items in parallel, queue problematic ones:
pattern:
type: sequential
steps:
- node: item_scanner
- type: parallel
steps:
- type: loop
iterate_over: "item_scanner.items_list"
loop_item_key: "current_item"
body:
type: sequential
steps:
- node: item_validator
- gate: review_failed
condition: "item_validator.valid == false"
- node: item_processor
- node: aggregation
Pattern 3: Conditional Async HITL
Only queue items that meet certain criteria:
pattern:
type: sequential
steps:
- node: data_extractor
- type: gate
gate_id: review_anomaly
condition: "data_extractor.anomaly_detected == true"
# Only queues items with anomalies
# Items without anomalies skip gate and continue
- node: finalizer
Checkpoint Management
Checkpoint Expiry:
execution_settings:
hitl_mode: "async"
checkpoint_expiry_days: 7 # Checkpoints expire after 7 days
Checkpoint Contents:
- Full execution context (all agent outputs up to gate)
- Pipeline state and variables
- Loop iteration state (if in loop)
- Case data for display
Resume Behavior:
- Pre-gate agents are skipped (outputs already in checkpoint)
- Post-gate agents execute with HITL response
- Loop continues from next iteration
- Summary agents run only when loop completes
Error Handling
Failed Resumes:
- Case status set to "Failed"
- Error message stored in case
- Checkpoint remains valid for retry
- User can retry resume from Operations UI
Expired Checkpoints:
- Cases with expired checkpoints cannot be resumed
- User notified in Operations UI
- Option to create new case or skip
Partial Failures:
- If some items in batch fail, others continue
- Failed cases tracked separately
- Summary reports include failure counts
Performance Considerations
Batch Size:
- Process items in reasonable batches (100-1000 items)
- Monitor checkpoint storage size
- Consider checkpoint expiry for long-running batches
Queue Management:
- Review cases in priority order
- Use Operations Assistant for bulk operations
- Monitor queue depth and processing rate
Database Optimization:
- Index case tables by pipeline_id, status, created_at
- Archive completed cases periodically
- Monitor checkpoint table size
Best Practices
-
Case YAML Design:
- Include all relevant agent outputs in
detail_view - Use conditional sections for optional data
- Keep field paths simple and clear
- Organize sections logically for easy review
- Include all relevant agent outputs in
-
Resume Behavior:
- Use
skip_on_resumefor one-time agents (scanners, initializers) - Use
run_only_when_completefor summary agents - Default
alwaysfor most processing agents
- Use
-
Loop Patterns:
- Track queued vs completed cases for reporting
- Use summary agents to aggregate results
- Consider checkpoint expiry for long-running batches
-
Operations UI:
- Use Operations Assistant for bulk operations
- Review cases in priority order
- Monitor case status and pipeline health
Example: Batch Math Solver
Complete example of async HITL in a batch processing pipeline:
# config/pipelines/math_batch_solver.yml
name: "Math Batch Solver"
description: "Batch processing with async HITL"
execution_settings:
hitl_mode: "async"
checkpoint_expiry_days: 7
operations:
config_file: "operations/math_batch_solver.yml"
pattern:
type: sequential
steps:
- node: batch_problem_parser
- type: loop
iterate_over: "batch_problem_parser.problems_list"
loop_item_key: "current_problem"
body:
type: sequential
steps:
- node: math_strategist
- node: math_calculator
- gate: approve_auditor # Async HITL - queues complex problems
- node: math_auditor
- node: batch_summary_reporter # Reports on all problems
Operations Configuration (config/operations/math_batch_solver.yml):
identity:
prefix: "BATCH"
uniqueness: "uuid_suffix"
detail_view:
sections:
- name: "Problem Details"
fields:
- field: "current_problem.expression"
label: "Expression"
type: text
- name: "Calculation"
fields:
- field: "math_calculator.result"
label: "Result"
type: text
- name: "Audit"
condition: "math_auditor IS NOT NULL"
fields:
- field: "math_auditor.is_valid"
label: "Valid"
type: boolean
Summary Agent (config/agents/batch_summary_reporter.yml):
id: batch_summary_reporter
type: agno
model: "azure_openai"
resume_behavior: "run_only_when_complete" # Only when loop finishes
prompt:
instruction:
inline: |
Generate batch processing summary:
- Total problems: {{batch_problem_parser.total_count}}
- Queued for review: {{hitl_queued_cases | length}}
- Completed: {{completed_cases | length}}
๐ช Portal Mode
Portal Mode provides a customer-facing experience at /portal: login, portal content (e.g. billing, usage, account), and a collapsible assistant chat. The main page (/) is unchanged. Portal uses the same backend orchestrator and pipelines as the main chat, with one session per portal user and separate state managed via config/portals/.
Overview
- Routes:
/portal(single-portal: redirects to first portal) or/portal/[portalId](e.g./portal/customer,/portal/hawkeye). - Login: Config-based; users defined per portal in
config/portals/{portal_id}/users.yml. No conversations list on portal; one session per user. - Portal content: React component (from starter or vibe-coded) with
portalState,onUpdate, andcurrentUser. State is stored per user and exposed asGET /api/portal/stateandPATCH /api/portal/state. - Assistant chat: Same primary assistant as main page; can run pipelines and use portal tools (e.g.
get_portal_state,update_portal_state). Chat history can be cleared per session.
Enabling Portal
Portal is enabled when config/portals/ exists with portals.yml and at least one portal that has users.yml:
- Required:
config/portals/portals.yml(master config listing portals). - Required per portal:
config/portals/{portal_id}/users.yml(users for login). - Optional per portal:
state_schema.yml,initial_state.yml,chat_actions.yml.
If portal is not enabled, /portal may return 404 or redirect to /.
Config Layout
config/portals/
โโโ portals.yml # Master: lists portals (title, component, login_component, operations, etc.)
โโโ customer/ # Per-portal folder (portal_id = customer)
โ โโโ users.yml # Required: users for login
โ โโโ state_schema.yml # Optional: schema for portal state
โ โโโ initial_state.yml # Optional: default state for new sessions
โ โโโ chat_actions.yml # Optional: button-to-chat actions
โโโ hawkeye/ # Another portal (portal_id = hawkeye)
โโโ users.yml
โโโ state_schema.yml
โโโ initial_state.yml
โโโ chat_actions.yml
portals.yml (master config):
# Master portal config. Keys = portal_id (used in URLs, session, case_data).
portals:
customer:
title: "Customer Portal"
subtitle: "Manage account and services."
component: "portals/ampere"
login_component: "portals/ampere/LoginPage"
# Optional: operations for case types (used by create_case)
operations:
- id: general
description: "General assistance"
- id: billing
description: "Billing questions"
default_operation: general
hawkeye:
title: "Hawkeye"
subtitle: "Decodes intent. Detects deviation."
component: "portals/hawkeye"
login_component: "portals/hawkeye/LoginPage"
component must match a component registered in the app (e.g. portals/ampere after init copies the template to apps/ui/src/portals/ampere).
Configuration Examples
config/portals/{portal_id}/users.yml (required per portal)
Lists users who can log in for that portal. Use password for development only; prefer password_hash (e.g. bcrypt) in production.
users:
- username: demo1
password: demo123
display_name: "Demo User 1"
- username: demo2
password: demo123
display_name: "Demo User 2"
# Production: use password_hash instead of password
# - username: alice
# password_hash: "<bcrypt-hash>"
# display_name: "Alice Smith"
config/portals/{portal_id}/state_schema.yml (optional)
Defines the shape of portal state. The UI and assistant tools use dotted paths (e.g. billing.current_balance, ui.current_page). Keep types and descriptions so the assistant and UI know what each field is for.
description: "Portal state"
state:
account:
account_id: { type: string, description: "Account identifier" }
status: { type: string, description: "Account status" }
billing:
current_balance: { type: number, description: "Amount due" }
due_date: { type: string, description: "Bill due date (ISO date)" }
ui:
current_page: { type: string, description: "Active page id (e.g. account|billing|usage)" }
You can add nested objects and arrays (with type: array, optional items). The Ampere starter includes a full utilities-style schema (services, outages, payments, alerts, documents, etc.).
config/portals/{portal_id}/initial_state.yml (optional)
Seeds portal state when a user has no state yet. Use a default key for all users; optionally add a key per username (from users.yml) for user-specific seed data. The backend returns only the inner state object to the API/UI.
default:
account:
account_id: "ACC-001"
status: "active"
billing:
current_balance: 0
due_date: ""
ui:
current_page: "account"
# Optional: per-user overrides (key = username from users.yml)
demo1:
account:
account_id: "ACC-100"
status: "active"
billing:
current_balance: 82.45
due_date: "2026-03-15"
ui:
current_page: "billing"
chat_actions.yml (optional)
Maps action keys to user messages and assistant instructions. The UI can call โsend to chatโ with an action key (e.g. from a button); the backend sends the corresponding user_message and injects assistant_instruction for that turn. Optional phrases list: when the user types free-form text, the longest matching phrase selects the action.
ask_about_bill:
user_message: "I have a question about my bill."
phrases: ["question about my bill", "about my bill", "explain my bill"]
assistant_instruction: |
The user is asking about their bill. Use get_portal_state to read billing.current_balance,
billing.due_date, billing_history. Answer using the numbers and dates from state.
report_outage:
user_message: "I want to report an outage."
phrases: ["report an outage", "report outage"]
assistant_instruction: |
The user has clicked "Report outage". Use get_portal_state to check state. Use
update_portal_state to set outages.active to true and outages.status_text to a short
message. If they need human follow-up, use create_case.
For a full set of actions (payments, plan changes, start/stop service, etc.), see the Ampere starterโs config/portals/customer/chat_actions.yml.
Portal Content Contract
Portal content components receive:
- portalState: Current state (from
GET /api/portal/state). Read-only; shape should matchconfig/portals/{portal_id}/state_schema.yml. - onUpdate(path, value): Updates state via
PATCH /api/portal/state; app refetches and re-renders. Use dotted paths (e.g.ui.current_page,bill.current_balance). - currentUser:
{ id, display_name }for the logged-in user.
The assistant can read and update the same state via tools (get_portal_state, update_portal_state), so the UI and chat stay in sync after refetch.
APIs
| Method | Endpoint | Description |
|---|---|---|
| POST | /api/portal/login |
Login; body { username, password }; returns user + session_id |
| POST | /api/portal/logout |
Logout; clear auth session |
| GET | /api/portal/state |
Get portal state for current user's session (auth required) |
| PATCH | /api/portal/state |
Update portal state (auth required) |
| GET | /api/portal/config |
Portal config (component, branding) for frontend; supports ?portal_id= for multi-portal |
| POST | /api/portal/clear-history |
Clear chat turns for current user's portal session (auth required) |
Starter Templates and Adding Portal to a Project
Starters that include portal (e.g. Ampere, Hawkeye) ship with config/portals/ and portal React components in portals/{portal_id}/; topaz-agent-kit init --starter <name> projects/<target> copies the portal slice into the project and registers the component. To add portal to an existing project:
- Create
config/portals/portals.yml(master config) andconfig/portals/{portal_id}/users.yml(required). - Add
config/portals/{portal_id}/state_schema.ymland optionallyinitial_state.yml,chat_actions.yml. - Add a portal content component (e.g. under
apps/ui/src/portals/<project_name>) that acceptsPortalContentPropsand register it soportals.ymlcomponentmatches the registry key.
Example: Single-portal project (Ampere)
Portal ID customer, content at portals/customer/. URL: /portal redirects to /portal/customer.
Example: Single-portal project (Hawkeye)
Portal ID hawkeye, content at portals/hawkeye/. URL: /portal redirects to /portal/hawkeye.
For full design and implementation details, see docs/design/multi_portal_design.md and docs/design/portal_design_and_implementation.md.
๐ฏ Conditional Node Execution & Branch-Level Conditions
Node-Level Conditions
Execute agents conditionally based on runtime evaluation of context variables:
pattern:
type: sequential
steps:
- node: claim_analyzer
# Only run fraud check for high-risk claims
- node: fraud_checker
condition: "claim_analyzer.risk_score > 0.8"
# Stop pipeline if state is not supported
- node: rate_case_customer_segmentor
condition: "rate_case_data_summarizer.normalized_data_ready == true"
on_false: stop
# If-else pattern: Execute alternative handler when condition is false
- node: main_processor
condition: "data.ready == true"
on_false:
- node: error_handler
- node: cleanup
- node: notify_user
# Manual review if amount exceeds threshold or fraud detected
- node: manual_reviewer
condition: >
(claim_analyzer.amount > 10000 AND claim_analyzer.tier != "platinum") OR
fraud_checker.detected == true
Branch-Level Conditions
Apply conditions to entire branches (sequential, parallel, loop, switch, handoff, group_chat) to skip whole execution paths:
pattern:
type: sequential
steps:
- node: trip_requester
# EAGER: Parallel execution where each domain runs independently
# Entire branches are skipped if condition is false
- type: parallel
steps:
# Flights domain: Only run if flights are needed
- type: sequential
condition: "trip_requester.flights_ready == true" # Entire branch skipped if false
steps:
- node: trip_flights_expert
- gate: select_flights
condition: "trip_flights_expert.has_flights == true"
on_submit: continue
# Hotels domain: Only run if hotels are needed
- type: sequential
condition: "trip_requester.hotels_ready == true" # Entire branch skipped if false
steps:
- node: trip_hotels_expert
- gate: select_hotels
condition: "trip_hotels_expert.has_hotels == true"
on_submit: continue
# Activities domain: Only run if activities are needed
- type: sequential
condition: "trip_requester.activities_ready == true" # Entire branch skipped if false
steps:
- node: trip_activities_expert
- gate: select_activities
condition: "trip_activities_expert.has_activities == true"
on_submit: continue
Benefits of Branch-Level Conditions:
- Efficiency: Skip entire execution paths, not just individual nodes
- Cleaner Logic: One condition controls a whole workflow branch
- Resource Savings: No partial execution if the branch condition fails
- Eager Execution: Enable parallel branches that appear as soon as ready
Supported Pattern Types
Branch-level conditions work with all pattern types:
pattern:
type: sequential
steps:
# Conditional sequential branch
- type: sequential
condition: "coordinator.need_flights == true"
steps:
- node: flights_expert
- gate: select_flights
# Conditional parallel branch
- type: parallel
condition: "coordinator.multi_domain == true"
steps:
- type: sequential
condition: "coordinator.need_hotels == true"
steps:
- node: hotels_expert
- type: sequential
condition: "coordinator.need_activities == true"
steps:
- node: activities_expert
# Conditional loop branch
- type: loop
condition: "requires_iteration == true"
max_iterations: 5
body:
- node: refine_output
- gate: approve_refinement
# Conditional switch branch
- type: switch(category)
condition: "classification_complete == true"
cases:
high_priority:
- node: premium_handler
low_priority:
- node: standard_handler
# Conditional handoff branch
- type: handoff
condition: "requires_specialist == true"
handoffs:
- node: specialist_a
- node: specialist_b
# Conditional group chat branch
- type: group_chat
condition: "collaboration_needed == true"
participants:
- node: agent_a
- node: agent_b
selection_strategy: round_robin
on_false Actions
When a condition evaluates to false, you can control what happens:
1. Default Behavior (Skip and Continue):
- node: optional_processor
condition: "data.available == true"
# If false: Skip node, continue to next step
2. Stop Pipeline:
- node: critical_validator
condition: "data.valid == true"
on_false: stop # Stop pipeline execution if condition is false
3. If-Else Pattern (Execute Alternative Steps):
# Single alternative node
- node: main_processor
condition: "data.ready == true"
on_false:
- node: error_handler
# Multiple alternative steps
- node: main_processor
condition: "data.ready == true"
on_false:
- node: error_handler
- node: cleanup
- node: notify_user
# Full pattern in else branch
- node: main_processor
condition: "data.ready == true"
on_false:
- type: sequential
steps:
- node: error_handler
- gate: user_approval
- node: cleanup
Benefits:
- Stop Action: Gracefully end pipeline when prerequisites aren't met
- If-Else Pattern: Execute alternative workflows without complex switch patterns
- Flexible: Supports single nodes, multiple steps, or full patterns in
on_false
Supported Conditions
Operators:
- Comparison:
==,!=,>,<,>=,<= - Boolean:
AND,OR,NOT - String:
contains,starts_with,ends_with,in,not in - Null checks:
is null,is not null - Functions:
len(array)for array length
Variable Resolution:
agent_id.fieldโ Automatically resolves from agent outputagent_id.nested.fieldโ Supports nested field accesscontext_variableโ Falls back to root context
Real-World Example (Trip Planner with Eager Gates):
pattern:
type: sequential
steps:
- node: trip_requester
# Parallel execution with branch-level conditions
# Each branch appears immediately when its expert finishes (eager gates)
- type: parallel
steps:
# Flights branch: Runs only if flights_ready is true
- type: sequential
condition: "trip_requester.flights_ready == true"
steps:
- node: trip_flights_expert
- gate: select_flights
condition: "trip_flights_expert.has_flights == true"
on_submit: continue
on_cancel: stop
# Hotels branch: Runs only if hotels_ready is true
- type: sequential
condition: "trip_requester.hotels_ready == true"
steps:
- node: trip_hotels_expert
- gate: select_hotels
condition: "trip_hotels_expert.has_hotels == true"
on_submit: continue
on_cancel: stop
# Activities branch: Runs only if activities_ready is true
- type: sequential
condition: "trip_requester.activities_ready == true"
steps:
- node: trip_activities_expert
- gate: select_activities
condition: "trip_activities_expert.has_activities == true"
on_submit: continue
on_cancel: stop
# Aggregator runs after all selections are made
- node: trip_aggregator
Benefits:
- Resource Efficiency: Skip unnecessary processing branches entirely
- Smart Routing: Adapt workflow based on runtime conditions
- Cost Optimization: Only run branches when conditions are met
- Eager Execution: Gates appear immediately when experts finish (no waiting)
- Cleaner Logic: Single condition controls entire execution path
๐ฏ Switch Pattern (MVP-8.0)
Dynamic Branching with Expressions
Route execution to different branches based on field values or evaluated expressions:
pattern:
type: sequential
steps:
- node: claim_classifier
# Switch on field value
- type: switch(claim_classifier.category)
cases:
medical:
- node: medical_specialist
- node: medical_reviewer
property:
- node: property_assessor
auto:
- node: auto_adjuster
default:
- node: general_processor
# Switch on expression result (boolean)
- type: switch(claim_analyzer.amount > 10000)
cases:
true:
- gate: premium_approval
on_approve: continue
- node: premium_handler
false:
- node: standard_handler
# Switch on complex expression
- type: switch(len(math_strategist.steps) > 4)
cases:
true: # Complex problem
- node: calculator
- gate: approve_auditor
condition: "len(math_strategist.steps) > 6"
on_approve: continue
- node: auditor
false: # Simple problem
- node: calculator
Switch Features
Expression Support:
- Evaluates any expression: boolean, numeric, string, function calls
- Supports
len(array)function - Variable resolution same as conditional nodes
Syntax Options:
# Shorthand (recommended)
- type: switch(field_name)
# With expression
- type: switch(len(agent.steps) > 5)
# Verbose syntax
- type: switch
field: field_name
Case Matching:
- Literal values: strings, numbers, booleans (e.g.,
false,true,5,"medical") - Expression results: evaluated and matched to cases
- Type-aware matching: strict type checking between expression result and case keys
Nested Patterns:
- Cases can contain: nodes, sequential, parallel, loop, or nested switch
- Supports conditional gates within cases
Real-World Example (Math Compass with Three Complexity Tiers):
pattern:
type: sequential
steps:
- node: math_strategist
# SWITCH: Route based on problem complexity
- type: switch(len(math_strategist.steps) > 2)
cases:
false: # Simple problem (โค2 steps)
- node: math_calculator
true: # Complex problem (>2 steps)
- node: math_calculator
- gate: approve_auditor
condition: "len(math_strategist.steps) > 4"
on_approve: continue
on_reject: stop
- node: math_auditor
Output Template (Three Complexity Tiers):
outputs:
final:
transform: |
{% set step_count = results.math_strategist.steps|length %}
{% if 'math_auditor' in results %}
# Complex Problem ({{ step_count }} steps)
โ
Final Answer (Audited): {{ results.math_auditor.final_answer }}
Complexity: {{ 'Medium' if step_count <= 4 else 'High (Manual Review)' }}
{% else %}
# Simple Problem ({{ step_count }} steps)
๐ Calculated Result: {{ results.math_calculator.result }}
Complexity: Low (No audit needed)
{% endif %}
Benefits:
- Multi-Path Routing: Different workflows for different scenarios
- Expression-Based: Evaluate complex conditions with functions
- Conditional Gates: Skip gates automatically based on logic
- Cost Efficiency: Route to appropriate handling tier
- Human-in-the-Loop: Conditional gates for manual review when needed
๐ฏ Handoff Pattern (MVP-9.0)
LLM-Driven Agent Routing
Intelligent handoff pattern where a virtual orchestrator automatically routes requests to specialist agents based on user intent. The orchestrator is auto-generated from agent descriptionsโno orchestration code required!
Automatic Orchestration
The handoff pattern creates a virtual orchestrator at runtime from your agent descriptions:
- Orchestrator analyzes request: Evaluates user intent
- Routes to specialist: Automatically selects the best specialist
- Specialist handles request: Processes with full context
- Returns to orchestrator: Provides final closing message
Configuration
Global Orchestrator Setting (config/pipeline.yml):
# Global configuration for all handoff patterns
orchestrator:
model: azure_openai # Model for virtual orchestrator
Pipeline Configuration (config/pipelines/translator.yml):
name: "Translator"
description: "Multi-language translation with intelligent routing"
nodes:
- id: spanish_translator
config_file: agents/spanish_translator.yml
- id: english_translator
config_file: agents/english_translator.yml
- id: hindi_translator
config_file: agents/hindi_translator.yml
# Handoff pattern - orchestrator is auto-generated!
pattern:
type: handoff
handoffs:
- node: spanish_translator
- node: english_translator
- node: hindi_translator
outputs:
final:
selectors: ["content"]
Agent Configuration (config/agents/spanish_translator.yml):
# Description is used by orchestrator to route requests
description: "Translates text to and from Spanish with natural, fluent translations"
instruction: |
You are a professional Spanish translation specialist.
Your expertise:
- Translate to Spanish (Espaรฑol)
- Translate from Spanish
- Maintain natural, fluent Spanish
- Preserve meaning and tone
When translating:
- Use natural Spanish expressions
- Maintain the original tone (formal/casual)
- Preserve technical terminology appropriately
framework: "langgraph"
model: "azure_openai"
How It Works
Example Flow:
User: "Translate 'Hello, how are you?' to Spanish"
1. Virtual Orchestrator:
- Analyzes request
- Identifies Spanish translation need
- Routes: "HANDOFF: spanish_translator"
2. Spanish Translator:
- Receives full context
- Translates to Spanish
- Returns: "Hola, ยฟcรณmo estรกs?"
3. Virtual Orchestrator (returns):
- Receives translator response
- Provides final closing message
- Returns to user
Result: "Hola, ยฟcรณmo estรกs?" with friendly closing
Orchestrator Prompt Generation
The orchestrator prompt is automatically generated from agent descriptions:
Available Specialists:
1. spanish_translator: Translates text to and from Spanish with natural, fluent translations
2. english_translator: Translates text to and from English with clarity and precision
3. hindi_translator: Translates text to and from Hindi with cultural sensitivity
When a user needs a specialist, respond EXACTLY with:
HANDOFF: <specialist_id>
Features
Zero Configuration:
- No custom orchestrator code needed
- Auto-generated from agent descriptions
- Works with existing agents
Intelligent Routing:
- LLM analyzes user intent
- Routes to appropriate specialist
- Handles multi-step requests
Context Preservation:
- Full user context passed to specialist
- Orchestrator receives specialist response
- Automatic return flow
Transparent UI:
- Virtual orchestrator steps are hidden
- Only specialist agents visible to users
- Clean, focused UI experience
Optional Custom Orchestrator
You can provide a custom orchestrator instead of the virtual one:
pattern:
type: handoff
orchestrator: "custom_orchestrator" # Custom agent
handoffs:
- node: specialist_a
- node: specialist_b
Real-World Example (Translator Pipeline)
# Try the translator pipeline
topaz-agent-kit init --starter ensemble ./my_project
topaz-agent-kit serve fastapi --project ./my_project
# Test scenarios:
"Translate 'Hello' to Spanish"
"Translate 'เคจเคฎเคธเฅเคคเฅ' to English"
"What languages do you support?"
"Translate 'Good morning' to Spanish and then translate that to Hindi"
What Happens:
- First scenario: Routes to Spanish translator โ returns to orchestrator
- Second scenario: Routes to English translator โ returns to orchestrator
- Third scenario: Orchestrator responds directly (no handoff needed)
- Fourth scenario: Routes to Spanish โ then Hindi โ final message
Best Practices
- Clear Agent Descriptions: Use descriptive
descriptionfields in agent YAML - Specific Capabilities: Describe exactly what each specialist does
- Context Preservation: Ensure agents handle upstream context
- Orchestrator Model: Configure globally for consistency
- Handoff Format: Orchestrator uses
"HANDOFF: <agent_id>"pattern
Benefits
โ
Intelligent Routing: LLM decides which specialist to use
โ
Automatic Generation: No orchestrator code required
โ
Universal Compatibility: Works with all frameworks
โ
Zero Configuration: Just list specialists
โ
Cost Efficient: Only runs specialist when needed
๐ MCP Integration
Built-in MCP Toolkits
| Toolkit | Tools | Count | Description |
|---|---|---|---|
| DocExtract | doc_extract_* |
3 | Document content extraction and structured data |
| DocRAG | doc_rag_* |
2 | Document retrieval and semantic search |
| ImageRAG | image_rag_* |
2 | Image processing and OCR-based search |
| Browser | browser_* |
1 | Web scraping and automation |
| SerperAPI | serper_api_* |
2 | Web search integration |
| SEC API | sec_api_* |
2 | Financial document retrieval |
| Math | math_* |
20 | Mathematical operations and problem solving |
email_* |
13 | Gmail integration and email management | |
| SQLite | sqlite_* |
3 | SQLite database operations |
| Filesystem | fs_* |
3 | File and directory operations |
| Flights | flight_* |
9 | Flight search and booking |
| Hotels | hotel_* |
4 | Hotel search and booking |
| Activities | activities_* |
3 | Travel activities and POI search |
| Common | common_* |
5 | General utility functions |
| Insurance | insurance_* |
6 | Insurance-specific domain tools |
| SOP | sop_* |
8 | Standard Operating Procedure reader and navigator |
| Total | - | 83+ | Comprehensive toolkit ecosystem |
Standard Operating Procedures (SOP) Support
Topaz Agent Kit includes built-in SOP-driven agent capabilities that enable agents to follow structured Standard Operating Procedures stored as markdown files. This is particularly useful for complex, multi-step workflows that require consistent execution.
SOP Toolkit Features
The SOP MCP Toolkit provides 8 tools for agents to read and navigate SOPs:
| Tool | Purpose |
|---|---|
sop_initialize |
Load SOP manifest, return overview + available sections |
sop_get_section |
Read specific section content (procedures, references) |
sop_get_example |
Get scenario examples for specific use cases |
sop_get_troubleshooting |
Get troubleshooting guidance for errors |
sop_list_sections |
List available sections by type |
sop_get_glossary_term |
Look up domain-specific terms from pipeline glossary |
sop_list_glossary_terms |
List all available glossary terms |
sop_invalidate_cache |
Clear cached SOP data after updates |
SOP Structure
SOPs are organized in your project's config/sop/ directory:
config/sop/
โโโ <pipeline>/
โโโ glossary.md # Pipeline-specific terminology
โโโ <agent>/
โโโ manifest.yml # SOP structure and metadata (REQUIRED)
โโโ overview.md # High-level workflow guidance
โโโ steps/
โ โโโ step_01_*.md # Procedural steps
โ โโโ step_02_*.md
โโโ scenarios/
โ โโโ two_way_match.md # Example scenarios
โ โโโ three_way_match.md
โโโ troubleshooting.md # Error resolution guide
Using SOPs in Agents
- Enable SOP Toolkit: Add
sopto your agent's MCP toolkits:
agents:
- id: my_sop_agent
mcp:
toolkits: ["sop"]
# ... other config
- Initialize SOP: Agent calls
sop_initializeat start:
# Agent automatically calls:
sop_initialize(
project_dir="/path/to/project",
sop_path="config/sop/my_pipeline/my_agent/manifest.yml"
)
- Follow Procedures: Agent reads steps as needed:
# Before each step:
sop_get_section(
sop_path="config/sop/my_pipeline/my_agent/manifest.yml",
section_id="step_02_find_match"
)
- Handle Scenarios: Agent can reference examples:
# When encountering specific scenario:
sop_get_example(
sop_path="config/sop/my_pipeline/my_agent/manifest.yml",
scenario_name="two_way_match"
)
SOP Section Types
procedure: Step-by-step instructions (executed in order)reference: Contextual information (read on-demand)example: Scenario examples (for pattern matching)troubleshooting: Error resolution guides
Benefits
โ
Consistent Execution: Agents follow documented procedures
โ
Easy Updates: Modify SOPs without changing code
โ
Domain Knowledge: Pipeline-specific glossaries and terminology
โ
Error Handling: Built-in troubleshooting guidance
โ
Scenario Support: Example-based learning for agents
โ
Caching: Efficient section loading with LRU cache
Example: ReconVoy SOP
The ICP starter template includes a complete SOP example for the ReconVoy matcher agent:
- 6 procedural steps: From foreign book identification to journal proposal
- 2 scenario examples: Two-way and three-way matching patterns
- Pipeline glossary: Domain-specific terms (GBP items, processing_status, etc.)
- Troubleshooting guide: Common issues and resolutions
See src/topaz_agent_kit/templates/starters/icp/config/sop/reconvoy/ for a complete reference implementation.
Custom MCP Servers
Connect to external MCP servers for enterprise integrations:
mcp:
servers:
- url: "http://enterprise-mcp-server:8080/mcp"
toolkits: ["enterprise", "database"]
tools: ["enterprise_*", "db_*"]
MCP Toolkit Development Guidelines
When creating MCP toolkits, follow these best practices to ensure compatibility across all frameworks:
โ Never use Any type for parameters
The Any type causes JSON schema generation issues with mcpadapt (used by CrewAI's MCPServerAdapter), resulting in "Cannot take a Union of no types" errors.
# โ BAD - Don't use Any for parameters
def book_flight(travelers: Any, payments: Any | None = None) -> Dict[str, Any]:
...
# โ
GOOD - Use specific types
def book_flight(travelers: list[dict], payments: Optional[list[dict]] = None) -> Dict[str, Any]:
...
# โ
GOOD - Use Optional for nullable parameters
def search_flights(origin: str, destination: str, returnDate: Optional[str] = None) -> Dict[str, Any]:
...
Type Guidelines:
- Use specific types:
str,int,float,bool,list[dict],dict,Dict[str, Any] - Use
Optional[...]for nullable parameters:Optional[str] = None,Optional[int] = None - Return types can use
Dict[str, Any]ordict(both work fine) - Never use
Anyas a parameter type - always specify the concrete type
Why this matters:
- CrewAI's
MCPServerAdapterusesmcpadaptto convert MCP tool schemas mcpadaptcannot generate valid JSON schemas forAny-typed parameters- This causes schema parsing errors that prevent tool initialization
- Other frameworks (LangGraph, Agno, etc.) work fine, but CrewAI requires strict typing
๐ง AgentOS Memory System
AgentOS is a filesystem-based memory system that enables agents to store, retrieve, and search information using familiar Unix-like commands. Instead of complex APIs, agents interact with memory through a single agentos_shell tool that supports commands like ls, cat, echo, grep, semgrep, and mkdir.
Overview
AgentOS provides a 3-level memory hierarchy with declarative schema definitions:
/global/- Project-wide shared memory (system docs, cross-pipeline data)/shared/- Pipeline-wide shared memory (templates, shared runtime data)/memory/- Agent-specific individual memory (isolated, not shared)/workspace/- Agent workspace (temporary files)
Key Features:
- โ Declarative schemas: Define file structures in YAML, auto-generate instructions
- โ
Template-based initialization: Copy templates from
config/memory/shared/to runtime - โ Two types of shared memory: System files (read-only templates) and runtime data (write-once, read-many)
- โ Auto-indexing: Semantic search across indexed files
- โ
Isolation: Agent memory is isolated; use
/shared/or/global/for sharing
Key Concepts
1. Filesystem as Memory
Agents manage memory using standard Unix commands:
# List directories
agentos_shell("ls /")
# Read a file
agentos_shell("cat /memory/senders/john@example.com/preferences.md")
# Write a file
agentos_shell('echo "preference data" > /memory/senders/john@example.com/preferences.md')
# Semantic search across indexed files
agentos_shell('semgrep "similar email pattern"')
# Create directories
agentos_shell("mkdir -p /memory/senders/john@example.com")
2. Memory Hierarchy
AgentOS provides a 3-level memory hierarchy:
-
/global/: Global shared memory (project-wide, read-only for agents)- Shared across all pipelines in the project
- Typically contains system-wide reference data, compliance rules
- Initialized from
config/memory/shared/global/template files - Can also contain runtime data (write-once, read-many) for cross-pipeline sharing
-
/shared/: Pipeline-level shared memory (pipeline-wide, configurable readonly)- Shared across all agents in a pipeline
- Typically contains templates, company info, policies
- Initialized from
config/memory/shared/pipeline/{pipeline_id}/template files - Can also contain runtime data (shared between agents in the same pipeline)
-
/memory/: Agent-level individual memory (agent-specific, read-write)- Isolated per agent instance (not shared with other agents)
- Persists across sessions
- Used for agent-specific data (preferences, patterns, history)
- If agents need to share data, use
/shared/instead
-
/workspace/: Agent workspace (agent-specific, temporary)- Temporary working directory
- Used for drafts, intermediate files
- Can be cleared between sessions
3. Auto-Indexing
Files in directories with auto_index: true are automatically indexed for semantic search using semgrep:
directories:
- path: "/memory/senders/"
auto_index: true # Enable semantic search
Configuration
Memory Configuration Structure
All memory-related configuration is organized under config/memory/:
config/memory/
โโโ memory.yml # Global memory configuration (future)
โโโ prompts/ # Custom memory prompt templates
โ โโโ {agent_id}.jinja
โโโ shared/
โโโ global/ # Global memory templates
โ โโโ reference/
โ โโโ compliance/
โโโ pipeline/ # Pipeline memory templates
โโโ {pipeline_id}/
โโโ email_templates/
โโโ company_info/
Global Memory (Project-Wide)
Global memory is configured in config/memory/memory.yml (future feature). Template files are stored in config/memory/shared/global/:
# config/memory/memory.yml (future)
memory:
global:
directories:
- path: "/global/reference/"
description: "System-wide reference data (READ-ONLY)"
readonly: true
auto_index: true
bootstrap: true
template_source: "config/memory/shared/global/reference/"
Template Files: Create template files in config/memory/shared/global/:
config/memory/shared/global/
โโโ reference/
โ โโโ system_docs.json
โโโ compliance/
โโโ rules.json
These files are automatically copied to data/agentos/global_shared/ on first run.
Pipeline-Level Shared Memory
Define shared memory directories in your pipeline configuration. All agents in the pipeline with memory.inherit: true (default) automatically have access to these shared directories:
# config/pipelines/reply_wizard.yml
name: "Reply Wizard"
description: "Email reply generation pipeline"
memory:
shared:
directories:
# Type 1: System files (read-only templates)
- path: "/shared/email_templates/"
description: "Email template library (READ-ONLY)"
readonly: true
auto_index: true
bootstrap: true
template_source: "config/memory/shared/pipeline/reply_wizard/email_templates/"
# Type 2: Runtime data (write-once, read-many)
- path: "/shared/senders/"
description: "Sender interaction history (shared across agents)"
readonly: false
auto_index: true
bootstrap: true
schemas:
interactions:
file: "interactions.jsonl"
format: "jsonl"
write_mode: "append"
structure:
timestamp: "ISO 8601 timestamp"
original_email: {...}
response: {...}
preferences:
file: "preferences.json"
format: "json"
write_mode: "overwrite"
structure:
preferred_tone: "string"
communication_style: "string"
Inheritance: Agents with memory.inherit: true (the default) automatically inherit access to all pipeline-level shared directories. They can access these via /shared/ paths. Set inherit: false in an agent's configuration to disable inheritance for that specific agent.
Template Files: Create template files in config/memory/shared/pipeline/{pipeline_id}/:
config/memory/shared/pipeline/reply_wizard/
โโโ email_templates/
โ โโโ greetings/
โ โ โโโ formal.md
โ โ โโโ casual.md
โ โ โโโ professional.md
โ โโโ closings/
โ โ โโโ ...
โ โโโ structures/
โ โโโ ...
โโโ company_info/
โโโ standard_responses.md
โโโ policies.md
โโโ tone_guidelines.md
These files are automatically copied to data/agentos/{pipeline_id}/shared/ on first run.
Two Types of Shared Memory:
-
System Files (Read-Only Templates):
- Source: Template files in
config/memory/shared/pipeline/{pipeline_id}/ - Behavior: Copied once, read-only for agents
- Use case: Reference docs, templates, policies
- Update: Edit template files, re-copied on next run
- Source: Template files in
-
Runtime Data (Write-Once, Read-Many):
- Source: Created at runtime by agents/pipelines
- Behavior: One pipeline/agent writes, others read
- Use case: Cross-pipeline data, shared state, interaction history
- Update: Agents write via
agentos_shell, other pipelines/agents read
Agent-Level Memory Configuration
Configure agent-specific memory in agent YAML files. Agent memory is isolated - not shared with other agents. If agents need to share data, use /shared/ (pipeline-level) or /global/ (project-level):
# config/agents/reply_context_wizard.yml
id: reply_context_wizard
type: agno
model: "azure_openai"
# Enable MCP and AgentOS
mcp:
servers:
- url: "http://localhost:8050/mcp"
toolkits: ["agentos_memory"]
tools: ["agentos_shell"]
# Memory configuration
memory:
inherit: true # Inherit shared memory from pipeline
directories:
- path: "/memory/patterns/"
description: "Agent-specific learned patterns (not shared)"
readonly: false
auto_index: true
bootstrap: true
schemas:
learned_patterns:
file: "patterns.jsonl"
format: "jsonl"
write_mode: "append"
structure:
pattern: "string"
context: "string"
confidence: "float"
- path: "/workspace/"
description: "Working directory"
readonly: false
auto_index: false
bootstrap: true
prompt_section:
jinja: memory/prompts/reply_context_wizard.jinja # Custom memory prompt template
Memory Configuration Options:
| Option | Type | Default | Description |
|---|---|---|---|
inherit |
bool |
true |
Inherit shared memory from pipeline. When true, agent automatically has access to all pipeline-level shared directories (e.g., /shared/email_templates/, /shared/company_info/). Set to false to disable inheritance. |
directories |
list[object] |
[] |
Agent-specific directories (agent-level individual memory, isolated from other agents) |
directories[].path |
str |
required | Virtual path (e.g., /memory/senders/) |
directories[].description |
str |
required | Human-readable description |
directories[].readonly |
bool |
false |
Make directory read-only |
directories[].auto_index |
bool |
true |
Enable semantic search indexing |
directories[].bootstrap |
bool |
true |
Create directory on initialization |
directories[].template_source |
str |
null |
Optional: Template source path relative to project root (e.g., config/memory/shared/pipeline/reply_wizard/email_templates/) |
directories[].schemas |
object |
{} |
Optional: File schemas for this directory (see Schema Definitions below) |
prompt_section |
object |
null |
Optional: Custom memory prompt template (inline/file/jinja). Paths are relative to config/memory/prompts/ (e.g., memory/prompts/reply_context_wizard.jinja). If not provided, system uses a default template that lists available directories and commands. |
Inheritance Behavior:
inherit: true(default): Agent automatically inherits all pipeline-level shared memory directories defined inconfig/pipelines/{pipeline_id}.ymlundermemory.shared.directories. These are accessible via/shared/paths and are read-only by default.inherit: false: Agent does not inherit pipeline shared memory. Only agent-specific directories are available.
Prompt Integration
AgentOS memory sections are automatically injected into agent prompts. Use the {{agentos_memory_section}} marker in your prompt templates:
Default Behavior: If prompt_section is not specified in agent configuration, the system automatically provides a default memory prompt template that:
- Lists all available memory directories (agent-specific and inherited shared)
- Shows directory descriptions and read-only status
- Provides examples of available commands (
ls,cat,echo,semgrep, etc.)
Custom Templates: You can override the default by providing a prompt_section configuration:
# config/prompts/reply_context_wizard.jinja
You are an email context extraction agent.
{{agentos_memory_section}}
## Workflow:
1. **Check sender history**: `agentos_shell("ls /memory/senders/")` to see if you know this sender
2. **Load sender preferences**: If sender exists, `agentos_shell("cat /memory/senders/{sender_email}/preferences.md")`
3. **Check email templates**: `agentos_shell("ls /shared/email_templates/")` for relevant templates
4. **Load company info**: `agentos_shell("cat /shared/company_info/standard_responses.md")` if needed
5. **Store new sender info**: After analysis, `agentos_shell('echo "preferences" > /memory/senders/{sender_email}/preferences.md')`
6. **Search patterns**: `agentos_shell('semgrep "similar email intent"')` to find similar past emails
Custom Memory Prompt Templates (Optional):
If you want to customize the memory prompt section, create a custom template in config/memory/prompts/{agent_id}.jinja and reference it in the agent configuration:
memory:
prompt_section:
jinja: memory/prompts/reply_context_wizard.jinja
If prompt_section is not provided, the system automatically uses a default template that lists available directories, commands, and schema documentation.
When to Use Default Template (Recommended for most cases):
โ Use the default template when:
- You have simple memory needs (just listing directories and basic commands)
- You want to get started quickly without customizing prompts
- Your agents have standard memory usage patterns
- You prefer consistency across agents
- You want to reduce maintenance overhead
The default template automatically:
- Lists all available memory directories (agent-specific and inherited shared)
- Shows directory descriptions and read-only status
- Provides examples of all available commands
- Adapts to your configuration (only shows directories you've defined)
When to Use Custom Template:
โ Create a custom template when:
- You need workflow-specific guidance for how agents should use memory
- You want to provide step-by-step instructions tailored to your agent's task
- You need to emphasize specific commands or usage patterns
- You want to include domain-specific examples or use cases
- Your agent has complex memory workflows that need detailed explanation
- You want to guide agents through specific memory access patterns
Example Custom Template:
# config/memory/reply_context_wizard.jinja
## Memory System
You have access to a filesystem-based memory via the `agentos_shell` tool.
### Your Memory Structure:
{% for dir in memory.directories %}
- {{ dir.path }} - {{ dir.description }}{% if dir.readonly %} (READ-ONLY){% endif %}
{% endfor %}
{% if memory.shared_directories %}
### Shared Memory (from pipeline):
{% for dir in memory.shared_directories %}
- {{ dir.path }} - {{ dir.description }} (READ-ONLY)
{% endfor %}
{% endif %}
### Workflow for Email Context Extraction:
1. **Check sender history**: `agentos_shell("ls /memory/senders/")` to see if you know this sender
2. **Load sender preferences**: If sender exists, `agentos_shell("cat /memory/senders/{sender_email}/preferences.md")`
3. **Check email templates**: `agentos_shell("ls /shared/email_templates/")` for relevant templates
4. **Load company info**: `agentos_shell("cat /shared/company_info/standard_responses.md")` if needed
5. **Store new sender info**: After analysis, `agentos_shell('echo "preferences" > /memory/senders/{sender_email}/preferences.md')`
6. **Search patterns**: `agentos_shell('semgrep "similar email intent"')` to find similar past emails
### Available Commands:
- `agentos_shell("ls /")` - List root directories
- `agentos_shell("cat /memory/file.md")` - Read file
- `agentos_shell('echo "content" > /memory/file.md')` - Write file
- `agentos_shell('semgrep "query"')` - Semantic search across indexed memory
- `agentos_shell("mkdir -p /memory/subdir")` - Create directory
Best Practice: Start with the default template. Only create a custom template if you find that agents need more specific guidance or workflow instructions for your use case.
Available Commands
The agentos_shell tool supports a subset of Unix commands:
| Command | Description | Example |
|---|---|---|
ls [path] |
List directory contents | agentos_shell("ls /memory/senders/") |
cat [file] |
Read file contents | agentos_shell("cat /memory/file.md") |
echo "text" > [file] |
Write to file | agentos_shell('echo "data" > /memory/file.md') |
echo "text" >> [file] |
Append to file | agentos_shell('echo "more" >> /memory/file.md') |
grep "pattern" [file] |
Search text in file | agentos_shell('grep "keyword" /memory/file.md') |
semgrep "query" |
Semantic search across indexed files | agentos_shell('semgrep "similar pattern"') |
mkdir -p [path] |
Create directory | agentos_shell("mkdir -p /memory/subdir") |
Security Features:
- โ Sandboxed Execution: Commands run in isolated filesystem
- โ Path Traversal Protection: Prevents access outside allowed directories
- โ Command Injection Prevention: Validates and sanitizes commands
- โ
Read-Only Enforcement: Respects
readonly: trueconfiguration - โ Rate Limiting: Prevents command flooding
- โ Audit Logging: All commands are logged for security
Schema Definitions
Define file structures declaratively in YAML configuration. Schemas enable:
- Auto-generated instructions: System generates read/write commands from schema
- Type safety: Clear field definitions for structured data
- Scalability: Proper formats (JSONL for append-only, JSON for overwrite)
- Maintainability: Change structure in one place
Schema Configuration:
memory:
shared:
directories:
- path: "/shared/senders/"
schemas:
interactions:
file: "interactions.jsonl"
format: "jsonl" # jsonl, json, markdown
write_mode: "append" # append, overwrite
structure:
timestamp: "ISO 8601 timestamp"
original_email:
subject: "string"
content: "string"
sender:
name: "string"
email: "string"
response:
subject: "string"
content: "string"
# Optional: Custom instructions (overrides auto-generated)
instructions:
read: "Read all interactions: `agentos_shell(command='cat /shared/senders/<sender_email>/interactions.jsonl')`"
write: "Append new interaction: `agentos_shell(command='echo \"<single_line_json>\" >> /shared/senders/<sender_email>/interactions.jsonl')`"
Schema Fields:
| Field | Type | Required | Description |
|---|---|---|---|
file |
str |
Yes | Filename for this schema |
format |
enum |
No | File format: jsonl, json, markdown (default: json) |
write_mode |
enum |
No | Write behavior: append, overwrite (default: overwrite) |
readonly |
bool |
No | Whether this file is read-only (default: false) |
structure |
object |
No | Simple structure definition (key-value mapping) |
instructions |
object |
No | Custom read/write instructions (overrides auto-generated) |
Auto-Generated Instructions:
The system automatically generates read/write instructions from schemas:
- JSONL format: Uses
>>(append) or>(overwrite) with single-line JSON - JSON format: Uses
>(overwrite) or requires parsing for append - Markdown format: Uses
>(overwrite) or>>(append)
You can override auto-generated instructions with custom instructions.read and instructions.write fields.
Runtime Structure
AgentOS creates the following directory structure at runtime:
data/agentos/
โโโ global_shared/ # Global shared memory
โ โโโ reference/ # From config/memory/shared/global/reference/
โ โโโ compliance/ # From config/memory/shared/global/compliance/
โโโ {pipeline_id}/
โ โโโ shared/ # Pipeline shared memory
โ โ โโโ email_templates/ # From config/memory/shared/pipeline/{pipeline_id}/email_templates/
โ โ โโโ company_info/ # From config/memory/shared/pipeline/{pipeline_id}/company_info/
โ โ โโโ senders/ # Runtime data (created by agents)
โ โ โโโ {sender_email}/
โ โ โโโ interactions.jsonl
โ โ โโโ preferences.json
โ โโโ agents/
โ โโโ {agent_id}/
โ โโโ memory/ # Agent individual memory (isolated)
โ โ โโโ patterns/
โ โ โโโ patterns.jsonl
โ โโโ workspace/ # Agent workspace (temporary)
Template Initialization:
- Template files from
config/memory/shared/pipeline/{pipeline_id}/are copied todata/agentos/{pipeline_id}/shared/on first run - Template files from
config/memory/shared/global/are copied todata/agentos/global_shared/on first run - If template files are updated, they are re-copied (checks modification time)
- Agent-specific directories are created on first agent execution
- Runtime data directories are created when agents write to them
Best Practices
1. Use Appropriate Memory Types
/global/: For project-wide reference data (system docs, compliance rules) or cross-pipeline runtime data/shared/: For pipeline-wide reference data (templates, company info) or shared runtime data between agents/memory/: For agent-specific persistent data (preferences, history, patterns) - isolated, not shared/workspace/: For temporary working files (drafts, intermediate results)
Memory Isolation Rules:
/memory/is agent-specific and isolated - agents cannot access each other's/memory/directories- If agents need to share data, use
/shared/(pipeline-level) or/global/(project-level) /workspace/is also agent-specific and temporary
2. Enable Auto-Indexing Strategically
Enable auto_index: true for directories you want to search semantically:
directories:
- path: "/memory/senders/"
auto_index: true # Good for searching past interactions
- path: "/workspace/"
auto_index: false # Skip indexing temporary files
3. Use Read-Only for Shared Data
Protect shared templates and company info from accidental modifications:
memory:
shared:
directories:
- path: "/shared/email_templates/"
readonly: true # Prevent agents from modifying templates
4. Organize Memory Configuration
All memory-related configuration is organized under config/memory/:
config/memory/
โโโ memory.yml # Global memory configuration (future)
โโโ prompts/ # Custom memory prompt templates
โ โโโ reply_context_wizard.jinja
โ โโโ reply_polish_wizard.jinja
โโโ shared/
โโโ global/ # Global memory templates
โ โโโ reference/
โ โโโ compliance/
โโโ pipeline/ # Pipeline memory templates
โโโ reply_wizard/ # Pipeline-specific templates
โ โโโ email_templates/
โ โโโ company_info/
โโโ invoice_processor/ # Different pipeline, different templates
โโโ email_templates/
Template Paths:
- Global templates:
config/memory/shared/global/ - Pipeline templates:
config/memory/shared/pipeline/{pipeline_id}/ - Custom memory prompts:
config/memory/prompts/{agent_id}.jinja
5. Use Schema Definitions for Structured Data
Define schemas for directories that store structured data:
memory:
shared:
directories:
- path: "/shared/senders/"
schemas:
interactions:
file: "interactions.jsonl"
format: "jsonl"
write_mode: "append" # Preserves history
structure:
timestamp: "ISO 8601 timestamp"
original_email: {...}
response: {...}
Benefits:
- Auto-generated instructions in prompts
- Clear structure documentation
- Proper file formats (JSONL for append, JSON for overwrite)
- Scalable and maintainable
6. Custom Memory Prompt Sections
Create custom memory prompt templates for better agent guidance:
# config/memory/prompts/my_agent.jinja
## Memory System
### Workflow for Your Task:
1. Check existing data: `agentos_shell("ls /memory/your_data/")`
2. Load reference: `agentos_shell("cat /shared/reference.md")`
3. Store results: `agentos_shell('echo "result" > /memory/your_data/result.md')`
4. Search similar: `agentos_shell('semgrep "similar pattern"')`
Reference in agent config:
memory:
prompt_section:
jinja: memory/prompts/my_agent.jinja
7. Template File Management
- Keep template files organized:
- Global:
config/memory/shared/global/ - Pipeline:
config/memory/shared/pipeline/{pipeline_id}/
- Global:
- Use descriptive filenames and directory structures
- Template files are automatically initialized on first run
- Updates to templates are detected and re-copied
- Use
template_sourcein config to specify custom template locations
Example: Email Reply Wizard
The ensemble starter template includes a complete AgentOS implementation for the Reply Wizard pipeline:
Pipeline Configuration (config/pipelines/reply_wizard.yml):
memory:
shared:
directories:
- path: "/shared/email_templates/"
description: "Email template library (READ-ONLY)"
readonly: true
auto_index: true
bootstrap: true
template_source: "config/memory/shared/pipeline/reply_wizard/email_templates/"
- path: "/shared/company_info/"
description: "Company information (READ-ONLY)"
readonly: true
auto_index: true
bootstrap: true
template_source: "config/memory/shared/pipeline/reply_wizard/company_info/"
- path: "/shared/senders/"
description: "Sender interaction history (shared across agents)"
readonly: false
auto_index: true
bootstrap: true
schemas:
interactions:
file: "interactions.jsonl"
format: "jsonl"
write_mode: "append"
structure:
timestamp: "ISO 8601 timestamp"
original_email: {...}
response: {...}
Agent Configuration (config/agents/reply_context_wizard.yml):
mcp:
servers:
- url: "http://localhost:8050/mcp"
toolkits: ["agentos_memory"]
tools: ["agentos_shell"]
memory:
inherit: true
directories:
- path: "/workspace/"
description: "Working directory"
readonly: false
auto_index: false
bootstrap: true
prompt_section:
jinja: memory/prompts/reply_context_wizard.jinja
toolkits: ["agentos_memory"]
tools: ["agentos_shell"]
memory:
inherit: true
directories:
- path: "/memory/senders/"
description: "Sender preferences and history"
auto_index: true
- path: "/memory/patterns/"
description: "Email pattern library"
auto_index: true
- path: "/workspace/"
description: "Working directory"
auto_index: false
prompt_section:
jinja: config/memory/reply_context_wizard.jinja
Template Files (config/shared/reply_wizard/):
email_templates/
โโโ greetings/
โ โโโ formal.md
โ โโโ casual.md
โ โโโ professional.md
โโโ closings/
โ โโโ ...
โโโ structures/
โโโ ...
company_info/
โโโ standard_responses.md
โโโ policies.md
โโโ tone_guidelines.md
See src/topaz_agent_kit/templates/starters/ensemble/ for the complete implementation.
Troubleshooting
Memory directories not created:
- Ensure
bootstrap: trueis set for directories that should be created automatically - Check that the agent has
memoryconfiguration in its YAML file - Verify MCP server is running and
agentos_memorytoolkit is enabled
Template files not initialized:
- Ensure template files exist in
config/shared/{pipeline_id}/ - Check that pipeline has
memory.shared.directoriesconfiguration - Verify
bootstrap: trueis set for shared directories - Check logs for initialization errors
Semantic search not working:
- Ensure
auto_index: trueis set for directories you want to search - Files must be written to indexed directories (not just read)
- Use
semgrepcommand, notgrepfor semantic search
Permission denied errors:
- Check
readonly: truesettings - agents cannot write to read-only directories - Verify path mappings are correct in memory configuration
- Check sandbox security logs for blocked operations
๐ข Enterprise Features
Production Readiness
- Health Monitoring: Built-in health checks and status endpoints
- Error Handling: Comprehensive error recovery and logging
- Scalability: Horizontal scaling support for high-volume deployments
- Security: Environment-based configuration and secure API key management
Development Tools
- Hot Reload: Development mode with automatic code reloading
- Validation: Comprehensive configuration validation and error reporting
- Testing: Built-in test framework for agent workflows
- Documentation: Auto-generated API documentation and workflow diagrams
๐ฆ Portable Demos
Create portable, zero-setup demo packages that can run on any machine without requiring Python, Node.js, or package installation.
Overview
The portable demo system uses a shared runtime approach:
- Runtime (~200-300MB): Python environment with Topaz Agent Kit pre-installed (shared across projects)
- Demo Project (~20-30MB): Project-specific files, data, and launcher scripts
This allows you to:
- Share one runtime with multiple demo projects
- Keep demo packages small and easy to distribute
- Ensure consistent execution environment across machines
- Provide zero-setup demos for customers and stakeholders
Quick Start
Prerequisites: uv must be installed for faster exports.
curl -LsSf https://astral.sh/uv/install.sh | sh # Mac/Linux
1. Build Package
python build.py
Note: You can skip this step if you already have a wheel file and use --skip-build flag.
2. Export Wheel, Runtime, and Demo (Combined)
# Export all three (wheel + runtime + demo) - default
topaz-agent-kit export -p projects/pa --output ./exports
# Skip build step (use existing wheel)
topaz-agent-kit export -p projects/pa --output ./exports --skip-build
# Export only wheel + demo (skip runtime)
topaz-agent-kit export -p projects/pa --output ./exports --skip-runtime
# Export only wheel (skip runtime + demo) - no --project needed
topaz-agent-kit export --output ./exports --skip-runtime --skip-demo
# Export only runtime + demo (skip wheel)
topaz-agent-kit export -p projects/pa --output ./exports --skip-wheel
# Export without zip files (keep directories)
topaz-agent-kit export -p projects/pa --output ./exports --no-zip
# Creates (default mode):
# - ./exports/topaz_agent_kit-0.11.1-py3-none-any.whl
# - ./exports/INSTALL.md (installation instructions)
# - ./exports/tak-runtime-v0.11.1-20260209.zip
# - ./exports/pa-20260209.zip
# Creates (with --no-zip):
# - ./exports/topaz_agent_kit-0.11.1-py3-none-any.whl
# - ./exports/INSTALL.md (installation instructions)
# - ./exports/tak-runtime-v0.11.1-20260209/ (directory)
# - ./exports/pa-20260209/ (directory)
Note: --project is only required when exporting demo. Use --skip-demo to skip demo export.
3. Or Export Separately
# Export runtime (one-time)
topaz-agent-kit export-runtime --output ./exports
# Skip build step (use existing wheel)
topaz-agent-kit export-runtime --skip-build --output ./exports
# Export demo project
topaz-agent-kit export-demo -p projects/pa --output ./exports
User Workflow
Option 1: Full Runtime (Easiest)
# 1. Extract runtime (one-time)
unzip tak-runtime-v0.11.1-20260209.zip
# 2. Extract demo project
unzip pa-20260209.zip
# Folder structure (both in same directory):
# your-folder/
# โโโ tak-runtime-v0.11.1-20260209/ # Runtime
# โโโ pa-20260209/ # Demo project
# 3. Run the demo (from inside demo folder)
cd pa-20260209
./run-demo.sh fastapi # Mac/Linux
# OR
run-demo.bat fastapi # Windows
# Note: Script finds runtime in same directory or parent directory
# 4. Open browser: http://localhost:8090
Option 2: Wheel File Only (For Developers)
If you have Python 3.11+ and prefer managing your own environment:
# 1. Extract demo project
unzip pa-20260209.zip
cd pa-20260209
# 2. Create virtual environment
uv venv # OR: python -m venv venv
source venv/bin/activate # Mac/Linux
# OR: venv\Scripts\activate # Windows
# 3. Install wheel file (get from creator)
uv pip install --prerelease allow topaz_agent_kit-0.11.1-py3-none-any.whl
# OR: pip install topaz_agent_kit-0.11.1-py3-none-any.whl
# 4. Set up environment
cp .env.example .env # Edit if needed
# 5. Run demo
python -m topaz_agent_kit.cli.main serve fastapi --project .
# 6. Open browser: http://localhost:8090
Benefits of wheel-only: Smaller download (~25-40MB vs ~220-330MB), use your own Python environment, more control.
Adding More Projects
# Just extract new project (runtime already exists)
unzip nexus-20260209.zip
cd nexus-20260209
./run-demo.sh fastapi # Uses same runtime!
Service Modes
The launcher script supports multiple service modes:
- fastapi - Web interface with UI (default)
- mcp - Model Context Protocol server
- services - Unified agent services (A2A)
- cli - Command-line interface
- all - Start all services simultaneously
Examples:
./run-demo.sh fastapi # Start web interface
./run-demo.sh mcp # Start MCP server
./run-demo.sh services # Start unified services
./run-demo.sh cli # Start CLI interface
./run-demo.sh all # Start all services
Command Reference
Export Runtime
topaz-agent-kit export-runtime [OPTIONS]
Options:
--output PATH Output directory or zip file path (default: current directory)
--dev Create dev build with timestamp and git hash
--version, -v Custom version string (overrides auto-detection)
--skip-build Skip building wheel (use existing wheel from dist/)
--no-zip Skip creating zip file (keep directory instead)
Examples:
# Auto-generate name in current directory
topaz-agent-kit export-runtime
# Specify output directory (auto-generates name)
topaz-agent-kit export-runtime --output ./exports
# Dev version
topaz-agent-kit export-runtime --dev --output ./exports
# Skip build step (use existing wheel)
topaz-agent-kit export-runtime --skip-build --output ./exports
Note: Requires uv to be installed. The export process uses uv for faster and more reliable package installation.
Export Demo
topaz-agent-kit export-demo [OPTIONS]
Options:
--project, -p PATH Path to project directory (required)
--output PATH Output directory or zip file path (default: current directory)
--version, -v Required runtime version (auto-detected if not provided)
--no-zip Skip creating zip file (keep directory instead)
Examples:
# Auto-generate name in current directory
topaz-agent-kit export-demo -p projects/pa
# Specify output directory (auto-generates name)
topaz-agent-kit export-demo -p projects/pa --output ./exports
# Export without zip file (keep directory)
topaz-agent-kit export-demo -p projects/pa --output ./exports --no-zip
Export Wheel
topaz-agent-kit export-wheel [OPTIONS]
Options:
--output PATH Output directory (default: current directory)
--skip-build Skip building wheel (use existing wheel from dist/)
--version, -v Custom version string (overrides auto-detection)
--dev Create dev build (currently unused, kept for API consistency)
Examples:
# Export wheel file to current directory
topaz-agent-kit export-wheel
# Export wheel to specific directory
topaz-agent-kit export-wheel --output ./exports
# Skip build step (use existing wheel)
topaz-agent-kit export-wheel --output ./exports --skip-build
Creates:
topaz_agent_kit-{version}-py3-none-any.whlINSTALL.md(installation instructions)
Export (Combined)
topaz-agent-kit export [OPTIONS]
Options:
--project, -p PATH Path to project directory (required only if demo is exported)
--output PATH Output directory (default: current directory)
--dev Create dev build with timestamp and git hash
--version, -v Custom version string (overrides auto-detection)
--skip-build Skip building wheel (use existing wheel from dist/)
--skip-wheel Skip wheel file export
--skip-runtime, --skip-rt Skip runtime export
--skip-demo Skip demo project export
--no-zip Skip creating zip files (keep directories instead)
Examples:
# Export all three (wheel + runtime + demo) - default
topaz-agent-kit export -p projects/pa --output ./exports
# Export only wheel + demo (skip runtime)
topaz-agent-kit export -p projects/pa --output ./exports --skip-runtime
# Export only wheel (skip runtime + demo) - no --project needed
topaz-agent-kit export --output ./exports --skip-runtime --skip-demo
# Export only runtime + demo (skip wheel)
topaz-agent-kit export -p projects/pa --output ./exports --skip-wheel
# Export only wheel + runtime (skip demo) - no --project needed
topaz-agent-kit export --output ./exports --skip-demo
# Skip build step (use existing wheel)
topaz-agent-kit export -p projects/pa --output ./exports --skip-build
# Export without zip files (keep directories)
topaz-agent-kit export -p projects/pa --output ./exports --no-zip
Creates (default mode):
topaz_agent_kit-{version}-py3-none-any.whlINSTALL.md(installation instructions)tak-runtime-v{version}-{YYYYMMDD}.zip{project-name}-{YYYYMMDD}.zip
Creates (with --no-zip):
topaz_agent_kit-{version}-py3-none-any.whlINSTALL.md(installation instructions)tak-runtime-v{version}-{YYYYMMDD}/(directory){project-name}-{YYYYMMDD}/(directory)
Note: --project is only required when exporting demo. Use --skip-demo to skip demo export.
Features
โ
Zero setup - No Python/Node installation needed
โ
Small packages - Projects ~30MB each
โ
Shared runtime - One runtime for multiple projects
โ
Version safety - Strict version matching prevents issues
โ
Easy distribution - Just zip files
โ
Cross-platform - Works on Mac, Windows, Linux
โ
Auto-naming - Zip files automatically named with version and date
โ
Auto-create directories - Output directories created automatically
File Structure
user-machine/
โโโ tak-runtime-v0.11.1-20260209/ # Shared runtime
โ โโโ venv/ # Python environment
โ โโโ VERSION # Runtime version
โ โโโ README.md
โ
โโโ pa-20260209/ # Project 1
โ โโโ config/
โ โโโ agents/
โ โโโ services/
โ โโโ tools/
โ โโโ data/ # Pre-populated data
โ โโโ run-demo.sh # Launcher (Mac/Linux)
โ โโโ run-demo.bat # Launcher (Windows)
โ โโโ README.md
โ
โโโ nexus-20260209/ # Project 2 (shares runtime)
โโโ config/
โโโ data/
โโโ run-demo.sh
Version Management
- Strict Version Matching: Runtime version must exactly match demo requirement
- Version Formats:
- Release:
0.11.1(frompyproject.toml) - Dev:
0.11.1-dev-20260209-a1b2c3d(timestamp + git hash) - Custom: Any string you specify
- Release:
- Date Suffix: All zip files include date (YYYYMMDD) for easy tracking
Runtime Discovery
The launcher script automatically finds the runtime in:
- Same directory as the project
- Parent directory
Make sure the runtime is extracted in one of these locations.
Troubleshooting
Runtime not found:
- Ensure runtime zip is extracted
- Check it's in the same directory or parent directory
- Verify
tak-runtime-v*/VERSIONfile exists
Version mismatch:
- Download the correct runtime version
- Check the date matches (if using dated versions)
Port already in use:
- Check
config/pipelines.ymlfor port configuration - Stop other services using the same port
For detailed documentation, see Portable Demos Guide and Portable Demos Workflow.
๐ Examples
Mathematical Problem Solving
# Create math-focused project
topaz-agent-kit init --starter math_demo ./math_project
# Run the math pipeline
topaz-agent-kit serve fastapi --project ./math_project
Financial Analysis
# Create stock analysis project
topaz-agent-kit init --starter stock_analysis ./finance_project
# Upload financial documents and analyze
topaz-agent-kit serve fastapi --project ./finance_project
Content Generation with HITL
# Create content creation project with HITL
topaz-agent-kit init --starter ensemble ./content_project
# Run with enhanced HITL system
topaz-agent-kit serve fastapi --project ./content_project
The Article Smith pipeline demonstrates advanced HITL integration:
- Research Approval: Review and approve research findings
- Draft Review: Provide detailed feedback with retry capability
- Publication Choice: Select publication approach with conditional routing
๐ค Contributing
We welcome contributions! Please see our Contributing Guide for details.
Development Setup
# Clone the repository
git clone https://github.com/topaz-agent-kit/topaz-agent-kit.git
cd topaz-agent-kit
# Install in development mode
pip install -e ".[dev,fastapi,mcp,ui]"
# Run tests
pytest tests/
# Run the development server
topaz-agent-kit serve fastapi --project projects/ensemble --reload
Building the Package
To build the package from source (includes UI build):
# Build the package (builds UI and creates wheel)
python build.py
# The build script will output installation instructions with the correct wheel file path
# Example output:
# uv add --prerelease=allow /path/to/dist/topaz_agent_kit-0.3.0-py3-none-any.whl
Build Process:
- UI Build: Compiles the Next.js UI and copies it to the package
- Package Build: Creates the wheel file using
uv build
Built Artifacts:
- Wheel file:
dist/topaz_agent_kit-{version}-py3-none-any.whl
After building, you can install the package in a new project using the wheel file path shown in the build output.
๐ License
This project is licensed under the MIT License - see the LICENSE file for details.
๐ Support
- Documentation: Full Documentation
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Community: Discord Server
Topaz Agent Kit - From idea to demo, from demo to dialogue, from dialogue to impact. โจ
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file topaz_agent_kit-0.13.8-py3-none-any.whl.
File metadata
- Download URL: topaz_agent_kit-0.13.8-py3-none-any.whl
- Upload date:
- Size: 20.1 MB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
15fcfbee7ebc3202f5494d146a95a13f27180781e2f0376569bad3b59d554b78
|
|
| MD5 |
aac2a57be572a9767865c7509524a66e
|
|
| BLAKE2b-256 |
0f41f2c946faa5d82fce75e5a867e5337017805f7f2b90656cba51ddee83699b
|